Coverage Report - org.apache.giraph.examples.SimpleSuperstepComputation
 
Classes in this File Line Coverage Branch Coverage Complexity
SimpleSuperstepComputation
0%
0/13
0%
0/8
2.5
SimpleSuperstepComputation$SimpleSuperstepVertexInputFormat
0%
0/2
N/A
2.5
SimpleSuperstepComputation$SimpleSuperstepVertexOutputFormat
0%
0/2
N/A
2.5
SimpleSuperstepComputation$SimpleSuperstepVertexOutputFormat$SimpleSuperstepVertexWriter
0%
0/5
N/A
2.5
SimpleSuperstepComputation$SimpleSuperstepVertexReader
0%
0/24
0%
0/6
2.5
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.examples;
 20  
 
 21  
 import org.apache.giraph.edge.Edge;
 22  
 import org.apache.giraph.edge.EdgeFactory;
 23  
 import org.apache.giraph.graph.BasicComputation;
 24  
 import org.apache.giraph.graph.Vertex;
 25  
 import org.apache.giraph.io.VertexReader;
 26  
 import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
 27  
 import org.apache.giraph.io.formats.TextVertexOutputFormat;
 28  
 import org.apache.hadoop.io.FloatWritable;
 29  
 import org.apache.hadoop.io.IntWritable;
 30  
 import org.apache.hadoop.io.LongWritable;
 31  
 import org.apache.hadoop.io.Text;
 32  
 import org.apache.hadoop.mapreduce.InputSplit;
 33  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 34  
 import org.apache.log4j.Logger;
 35  
 
 36  
 import com.google.common.collect.Lists;
 37  
 
 38  
 import java.io.IOException;
 39  
 import java.util.List;
 40  
 
 41  
 /**
 42  
  * Just a simple Vertex compute implementation that executes 3 supersteps, then
 43  
  * finishes.
 44  
  */
 45  0
 public class SimpleSuperstepComputation extends BasicComputation<LongWritable,
 46  
     IntWritable, FloatWritable, IntWritable> {
 47  
   @Override
 48  
   public void compute(
 49  
       Vertex<LongWritable, IntWritable, FloatWritable> vertex,
 50  
       Iterable<IntWritable> messages) throws IOException {
 51  
     // Some checks for additional testing
 52  0
     if (getTotalNumVertices() < 1) {
 53  0
       throw new IllegalStateException("compute: Illegal total vertices " +
 54  0
           getTotalNumVertices());
 55  
     }
 56  0
     if (getTotalNumEdges() < 0) {
 57  0
       throw new IllegalStateException("compute: Illegal total edges " +
 58  0
           getTotalNumEdges());
 59  
     }
 60  0
     if (vertex.isHalted()) {
 61  0
       throw new IllegalStateException("compute: Impossible to be halted - " +
 62  0
           vertex.isHalted());
 63  
     }
 64  
 
 65  0
     if (getSuperstep() > 3) {
 66  0
       vertex.voteToHalt();
 67  
     }
 68  0
   }
 69  
 
 70  
   /**
 71  
    * Simple VertexReader that supports {@link SimpleSuperstepComputation}
 72  
    */
 73  0
   public static class SimpleSuperstepVertexReader extends
 74  
       GeneratedVertexReader<LongWritable, IntWritable, FloatWritable> {
 75  
     /** Class logger */
 76  0
     private static final Logger LOG =
 77  0
         Logger.getLogger(SimpleSuperstepVertexReader.class);
 78  
 
 79  
     @Override
 80  
     public boolean nextVertex() throws IOException, InterruptedException {
 81  0
       return totalRecords > recordsRead;
 82  
     }
 83  
 
 84  
     @Override
 85  
     public Vertex<LongWritable, IntWritable, FloatWritable> getCurrentVertex()
 86  
       throws IOException, InterruptedException {
 87  0
       Vertex<LongWritable, IntWritable, FloatWritable> vertex =
 88  0
           getConf().createVertex();
 89  0
       long tmpId = reverseIdOrder ?
 90  0
           ((inputSplit.getSplitIndex() + 1) * totalRecords) -
 91  
           recordsRead - 1 :
 92  0
             (inputSplit.getSplitIndex() * totalRecords) + recordsRead;
 93  0
       LongWritable vertexId = new LongWritable(tmpId);
 94  0
       IntWritable vertexValue =
 95  0
           new IntWritable((int) (vertexId.get() * 10));
 96  0
       List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
 97  0
       long targetVertexId =
 98  0
           (vertexId.get() + 1) %
 99  0
           (inputSplit.getNumSplits() * totalRecords);
 100  0
       float edgeValue = vertexId.get() * 100f;
 101  0
       edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
 102  
           new FloatWritable(edgeValue)));
 103  0
       vertex.initialize(vertexId, vertexValue, edges);
 104  0
       ++recordsRead;
 105  0
       if (LOG.isInfoEnabled()) {
 106  0
         LOG.info("next: Return vertexId=" + vertex.getId().get() +
 107  0
             ", vertexValue=" + vertex.getValue() +
 108  
             ", targetVertexId=" + targetVertexId +
 109  
             ", edgeValue=" + edgeValue);
 110  
       }
 111  0
       return vertex;
 112  
     }
 113  
   }
 114  
 
 115  
   /**
 116  
    * Simple VertexInputFormat that supports {@link SimpleSuperstepComputation}
 117  
    */
 118  0
   public static class SimpleSuperstepVertexInputFormat extends
 119  
     GeneratedVertexInputFormat<LongWritable, IntWritable, FloatWritable> {
 120  
     @Override
 121  
     public VertexReader<LongWritable, IntWritable, FloatWritable>
 122  
     createVertexReader(InputSplit split, TaskAttemptContext context)
 123  
       throws IOException {
 124  0
       return new SimpleSuperstepVertexReader();
 125  
     }
 126  
   }
 127  
 
 128  
 
 129  
   /**
 130  
    * Simple VertexOutputFormat that supports {@link SimpleSuperstepComputation}
 131  
    */
 132  0
   public static class SimpleSuperstepVertexOutputFormat extends
 133  
       TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
 134  
     @Override
 135  
     public TextVertexWriter createVertexWriter(TaskAttemptContext context)
 136  
       throws IOException, InterruptedException {
 137  0
       return new SimpleSuperstepVertexWriter();
 138  
     }
 139  
 
 140  
     /**
 141  
      * Simple VertexWriter that supports {@link SimpleSuperstepComputation}
 142  
      */
 143  0
     public class SimpleSuperstepVertexWriter extends TextVertexWriter {
 144  
       @Override
 145  
       public void writeVertex(Vertex<LongWritable, IntWritable,
 146  
           FloatWritable> vertex) throws IOException, InterruptedException {
 147  0
         getRecordWriter().write(
 148  0
             new Text(vertex.getId().toString()),
 149  0
             new Text(vertex.getValue().toString()));
 150  0
       }
 151  
     }
 152  
   }
 153  
 }