Coverage Report - org.apache.giraph.lib.AdjacencyListVertexReader
 
Classes in this File Line Coverage Branch Coverage Complexity
AdjacencyListVertexReader
0%
0/31
0%
0/10
1.75
AdjacencyListVertexReader$LineSanitizer
N/A
N/A
1.75
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.lib;
 19  
 
 20  
 import com.google.common.collect.Maps;
 21  
 import org.apache.giraph.graph.BasicVertex;
 22  
 import org.apache.giraph.graph.BspUtils;
 23  
 import org.apache.giraph.graph.Edge;
 24  
 import org.apache.giraph.lib.TextVertexInputFormat.TextVertexReader;
 25  
 import org.apache.hadoop.conf.Configuration;
 26  
 import org.apache.hadoop.io.LongWritable;
 27  
 import org.apache.hadoop.io.Text;
 28  
 import org.apache.hadoop.io.Writable;
 29  
 import org.apache.hadoop.io.WritableComparable;
 30  
 import org.apache.hadoop.mapreduce.RecordReader;
 31  
 
 32  
 import java.io.IOException;
 33  
 import java.util.Map;
 34  
 
 35  
 /**
 36  
  * VertexReader that readers lines of text with vertices encoded as adjacency
 37  
  * lists and converts each token to the correct type.  For example, a graph
 38  
  * with vertices as integers and values as doubles could be encoded as:
 39  
  *   1 0.1 2 0.2 3 0.3
 40  
  * to represent a vertex named 1, with 0.1 as its value and two edges, to
 41  
  * vertices 2 and 3, with edge values of 0.2 and 0.3, respectively.
 42  
  *
 43  
  * @param <I> Vertex index value
 44  
  * @param <V> Vertex value
 45  
  * @param <E> Edge value
 46  
  * @param <M> Message data
 47  
  */
 48  
 @SuppressWarnings("rawtypes")
 49  
 public abstract class AdjacencyListVertexReader<I extends WritableComparable,
 50  
     V extends Writable, E extends Writable, M extends Writable> extends
 51  
     TextVertexInputFormat.TextVertexReader<I, V, E, M> {
 52  
   /** Delimiter for split */
 53  
   public static final String LINE_TOKENIZE_VALUE = "adj.list.input.delimiter";
 54  
   /** Default delimiter for split */
 55  
   public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
 56  
   /** Cached delimiter used for split */
 57  0
   private String splitValue = null;
 58  
 
 59  
   /**
 60  
    * Utility for doing any cleaning of each line before it is tokenized.
 61  
    */
 62  
   public interface LineSanitizer {
 63  
     /**
 64  
      * Clean string s before attempting to tokenize it.
 65  
      *
 66  
      * @param s String to be cleaned.
 67  
      * @return Sanitized string.
 68  
      */
 69  
     String sanitize(String s);
 70  
   }
 71  
 
 72  
   /**
 73  
    * Sanitizer from constructor.
 74  
    */
 75  
   private final LineSanitizer sanitizer;
 76  
 
 77  
   /**
 78  
    * Constructor with line record reader.
 79  
    *
 80  
    * @param lineRecordReader Reader from {@link TextVertexReader}.
 81  
    */
 82  
   public AdjacencyListVertexReader(
 83  
       RecordReader<LongWritable, Text> lineRecordReader) {
 84  0
     super(lineRecordReader);
 85  0
     sanitizer = null;
 86  0
   }
 87  
 
 88  
   /**
 89  
    * Constructor with line record reader.
 90  
    *
 91  
    * @param lineRecordReader Reader from {@link TextVertexReader}.
 92  
    * @param sanitizer Sanitizer to be used.
 93  
    */
 94  
   public AdjacencyListVertexReader(
 95  
       RecordReader<LongWritable, Text> lineRecordReader,
 96  
       LineSanitizer sanitizer) {
 97  0
     super(lineRecordReader);
 98  0
     this.sanitizer = sanitizer;
 99  0
   }
 100  
 
 101  
   /**
 102  
    * Store the Id for this line in an instance of its correct type.
 103  
    *
 104  
    * @param s Id of vertex from line
 105  
    * @param id Instance of Id's type, in which to store its value
 106  
    */
 107  
   public abstract void decodeId(String s, I id);
 108  
 
 109  
   /**
 110  
    * Store the value for this line in an instance of its correct type.
 111  
    * @param s Value from line
 112  
    * @param value Instance of value's type, in which to store its value
 113  
    */
 114  
   public abstract void decodeValue(String s, V value);
 115  
 
 116  
   /**
 117  
    * Store an edge from the line into an instance of a correctly typed Edge
 118  
    * @param id The edge's id from the line
 119  
    * @param value The edge's value from the line
 120  
    * @param edge Instance of edge in which to store the id and value
 121  
    */
 122  
   public abstract void decodeEdge(String id, String value, Edge<I, E> edge);
 123  
 
 124  
 
 125  
   @Override
 126  
   public boolean nextVertex() throws IOException, InterruptedException {
 127  0
     return getRecordReader().nextKeyValue();
 128  
   }
 129  
 
 130  
   @Override
 131  
   public BasicVertex<I, V, E, M> getCurrentVertex()
 132  
     throws IOException, InterruptedException {
 133  0
     Configuration conf = getContext().getConfiguration();
 134  0
     String line = getRecordReader().getCurrentValue().toString();
 135  0
     BasicVertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
 136  
 
 137  0
     if (sanitizer != null) {
 138  0
       line = sanitizer.sanitize(line);
 139  
     }
 140  
 
 141  0
     if (splitValue == null) {
 142  0
       splitValue = conf.get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
 143  
     }
 144  
 
 145  0
     String [] values = line.split(splitValue);
 146  
 
 147  0
     if ((values.length < 2) || (values.length % 2 != 0)) {
 148  0
       throw new IllegalArgumentException(
 149  
         "Line did not split correctly: " + line);
 150  
     }
 151  
 
 152  0
     I vertexId = BspUtils.<I>createVertexIndex(conf);
 153  0
     decodeId(values[0], vertexId);
 154  
 
 155  0
     V value = BspUtils.<V>createVertexValue(conf);
 156  0
     decodeValue(values[1], value);
 157  
 
 158  0
     int i = 2;
 159  0
     Map<I, E> edges = Maps.newHashMap();
 160  0
     Edge<I, E> edge = new Edge<I, E>();
 161  0
     while (i < values.length) {
 162  0
       decodeEdge(values[i], values[i + 1], edge);
 163  0
       edges.put(edge.getDestVertexId(), edge.getEdgeValue());
 164  0
       i += 2;
 165  
     }
 166  0
     vertex.initialize(vertexId, value, edges, null);
 167  0
     return vertex;
 168  
   }
 169  
 }