Coverage Report - org.apache.giraph.examples.NormalizingLongDoubleDoubleTextInputFormat
 
Classes in this File Line Coverage Branch Coverage Complexity
NormalizingLongDoubleDoubleTextInputFormat
0%
0/5
N/A
1.857
NormalizingLongDoubleDoubleTextInputFormat$NormalizingLongDoubleDoubleDoubleVertexReader
0%
0/30
0%
0/10
1.857
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.examples;
 20  
 
 21  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
 22  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 23  
 import org.apache.giraph.edge.Edge;
 24  
 import org.apache.giraph.edge.EdgeFactory;
 25  
 import org.apache.giraph.graph.Vertex;
 26  
 import org.apache.giraph.io.formats.TextVertexInputFormat;
 27  
 import org.apache.hadoop.io.DoubleWritable;
 28  
 import org.apache.hadoop.io.LongWritable;
 29  
 import org.apache.hadoop.mapreduce.InputSplit;
 30  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 31  
 
 32  
 import com.google.common.collect.Lists;
 33  
 
 34  
 import java.io.IOException;
 35  
 import java.util.Collection;
 36  
 import java.util.List;
 37  
 import java.util.regex.Pattern;
 38  
 
 39  
 /**
 40  
  * Simple text-based {@link org.apache.giraph.io.VertexInputFormat} for
 41  
  * unweighted graphs with long ids. Each line consists of: vertex
 42  
  * neighbor1:weight1 neighbor2:weight2 ...
 43  
  */
 44  0
 public class NormalizingLongDoubleDoubleTextInputFormat
 45  
     extends
 46  
     TextVertexInputFormat<LongWritable, DoubleWritable, DoubleWritable>
 47  
     implements ImmutableClassesGiraphConfigurable<LongWritable, DoubleWritable,
 48  
     DoubleWritable> {
 49  
   /** Configuration. */
 50  
   private ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
 51  
       DoubleWritable> conf;
 52  
 
 53  
   @Override
 54  
   public TextVertexReader createVertexReader(
 55  
       InputSplit split, TaskAttemptContext context) throws IOException {
 56  0
     return new NormalizingLongDoubleDoubleDoubleVertexReader();
 57  
   }
 58  
 
 59  
   @Override
 60  
   public void setConf(ImmutableClassesGiraphConfiguration<LongWritable,
 61  
       DoubleWritable, DoubleWritable> configuration) {
 62  0
     conf = configuration;
 63  0
   }
 64  
 
 65  
   @Override
 66  
   public ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
 67  
       DoubleWritable> getConf() {
 68  0
     return conf;
 69  
   }
 70  
 
 71  
   /**
 72  
    * Vertex reader associated with
 73  
    * {@link LongDoubleDoubleTextInputFormat}.
 74  
    */
 75  0
   public class NormalizingLongDoubleDoubleDoubleVertexReader
 76  
       extends NormalizingLongDoubleDoubleTextInputFormat.TextVertexReader {
 77  
     /** Separator of the vertex and neighbors */
 78  0
     private final Pattern edgeSeparator = Pattern.compile("\\s+");
 79  
     /** Separator of the edge id and edge weight */
 80  0
     private final Pattern weightSeparator = Pattern.compile(":");
 81  
 
 82  
     @Override
 83  
     public Vertex<LongWritable, DoubleWritable, DoubleWritable>
 84  
     getCurrentVertex() throws IOException, InterruptedException {
 85  0
       Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex =
 86  0
           conf.createVertex();
 87  
 
 88  0
       String[] tokens = edgeSeparator.split(getRecordReader()
 89  0
           .getCurrentValue().toString());
 90  0
       List<Edge<LongWritable, DoubleWritable>> edges = Lists
 91  0
           .newArrayListWithCapacity(tokens.length - 1);
 92  0
       parse(tokens, edges);
 93  0
       normalize(edges);
 94  
 
 95  0
       LongWritable vertexId = new LongWritable(Long.parseLong(tokens[0]));
 96  0
       vertex.initialize(vertexId, new DoubleWritable(), edges);
 97  
 
 98  0
       return vertex;
 99  
     }
 100  
 
 101  
     /**
 102  
      * Parse a set of tokens into a map ID -> weight.
 103  
      * @param tokens The tokens to be parsed.
 104  
      * @param edges The map that will contain the result of the parsing.
 105  
      */
 106  
     void parse(String[] tokens,
 107  
                Collection<Edge<LongWritable, DoubleWritable>> edges) {
 108  0
       for (int n = 1; n < tokens.length; n++) {
 109  0
         String[] parts = weightSeparator.split(tokens[n]);
 110  0
         edges.add(EdgeFactory.create(new LongWritable(Long.parseLong(parts[0])),
 111  0
             new DoubleWritable(Double.parseDouble(parts[1]))));
 112  
       }
 113  0
     }
 114  
 
 115  
     /**
 116  
      * Normalize the edges with L1 normalization.
 117  
      * @param edges The edges to be normalized.
 118  
      */
 119  
     void normalize(Collection<Edge<LongWritable, DoubleWritable>> edges) {
 120  0
       if (edges == null || edges.size() == 0) {
 121  0
         throw new IllegalArgumentException(
 122  
             "Cannot normalize an empy set of edges");
 123  
       }
 124  0
       float normalizer = 0.0f;
 125  0
       for (Edge<LongWritable, DoubleWritable> edge : edges) {
 126  0
         normalizer += edge.getValue().get();
 127  0
       }
 128  0
       for (Edge<LongWritable, DoubleWritable> edge : edges) {
 129  0
         edge.getValue().set(edge.getValue().get() / normalizer);
 130  0
       }
 131  0
     }
 132  
 
 133  
     @Override
 134  
     public boolean nextVertex() throws IOException, InterruptedException {
 135  0
       return getRecordReader().nextKeyValue();
 136  
     }
 137  
   }
 138  
 }