Coverage Report - org.apache.giraph.examples.SimplePageRankVertex
 
Classes in this File Line Coverage Branch Coverage Complexity
SimplePageRankVertex
100%
22/22
100%
6/6
1.438
SimplePageRankVertex$SimplePageRankVertexInputFormat
100%
2/2
N/A
1.438
SimplePageRankVertex$SimplePageRankVertexMasterCompute
100%
5/5
N/A
1.438
SimplePageRankVertex$SimplePageRankVertexOutputFormat
100%
3/3
N/A
1.438
SimplePageRankVertex$SimplePageRankVertexReader
100%
16/16
75%
3/4
1.438
SimplePageRankVertex$SimplePageRankVertexWorkerContext
95%
21/22
75%
3/4
1.438
SimplePageRankVertex$SimplePageRankVertexWriter
100%
4/4
N/A
1.438
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.examples;
 20  
 
 21  
 import org.apache.giraph.aggregators.DoubleMaxAggregator;
 22  
 import org.apache.giraph.aggregators.DoubleMinAggregator;
 23  
 import org.apache.giraph.aggregators.LongSumAggregator;
 24  
 import org.apache.giraph.graph.BspUtils;
 25  
 import org.apache.giraph.graph.DefaultMasterCompute;
 26  
 import org.apache.giraph.graph.LongDoubleFloatDoubleVertex;
 27  
 import org.apache.giraph.graph.Vertex;
 28  
 import org.apache.giraph.graph.VertexReader;
 29  
 import org.apache.giraph.graph.VertexWriter;
 30  
 import org.apache.giraph.graph.WorkerContext;
 31  
 import org.apache.giraph.io.GeneratedVertexInputFormat;
 32  
 import org.apache.giraph.io.TextVertexOutputFormat;
 33  
 import org.apache.giraph.io.TextVertexOutputFormat.TextVertexWriter;
 34  
 import org.apache.hadoop.io.DoubleWritable;
 35  
 import org.apache.hadoop.io.FloatWritable;
 36  
 import org.apache.hadoop.io.LongWritable;
 37  
 import org.apache.hadoop.io.Text;
 38  
 import org.apache.hadoop.mapreduce.InputSplit;
 39  
 import org.apache.hadoop.mapreduce.RecordWriter;
 40  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 41  
 import org.apache.log4j.Logger;
 42  
 
 43  
 import com.google.common.collect.Maps;
 44  
 
 45  
 import java.io.IOException;
 46  
 import java.util.Map;
 47  
 
 48  
 /**
 49  
  * Demonstrates the basic Pregel PageRank implementation.
 50  
  */
 51  
 @Algorithm(
 52  
     name = "Page rank"
 53  
 )
 54  420
 public class SimplePageRankVertex extends LongDoubleFloatDoubleVertex {
 55  
   /** Number of supersteps for this test */
 56  
   public static final int MAX_SUPERSTEPS = 30;
 57  
   /** Logger */
 58  1
   private static final Logger LOG =
 59  
       Logger.getLogger(SimplePageRankVertex.class);
 60  
   /** Sum aggregator name */
 61  1
   private static String SUM_AGG = "sum";
 62  
   /** Min aggregator name */
 63  1
   private static String MIN_AGG = "min";
 64  
   /** Max aggregator name */
 65  1
   private static String MAX_AGG = "max";
 66  
 
 67  
   @Override
 68  
   public void compute(Iterable<DoubleWritable> messages) {
 69  310
     if (getSuperstep() >= 1) {
 70  300
       double sum = 0;
 71  300
       for (DoubleWritable message : messages) {
 72  300
         sum += message.get();
 73  
       }
 74  300
       DoubleWritable vertexValue =
 75  
           new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
 76  300
       setValue(vertexValue);
 77  300
       aggregate(MAX_AGG, vertexValue);
 78  300
       aggregate(MIN_AGG, vertexValue);
 79  300
       aggregate(SUM_AGG, new LongWritable(1));
 80  300
       LOG.info(getId() + ": PageRank=" + vertexValue +
 81  
           " max=" + getAggregatedValue(MAX_AGG) +
 82  
           " min=" + getAggregatedValue(MIN_AGG));
 83  
     }
 84  
 
 85  310
     if (getSuperstep() < MAX_SUPERSTEPS) {
 86  300
       long edges = getNumEdges();
 87  300
       sendMessageToAllEdges(
 88  
           new DoubleWritable(getValue().get() / edges));
 89  300
     } else {
 90  10
       voteToHalt();
 91  
     }
 92  310
   }
 93  
 
 94  
   /**
 95  
    * Worker context used with {@link SimplePageRankVertex}.
 96  
    */
 97  2
   public static class SimplePageRankVertexWorkerContext extends
 98  
       WorkerContext {
 99  
     /** Final max value for verification for local jobs */
 100  
     private static double FINAL_MAX;
 101  
     /** Final min value for verification for local jobs */
 102  
     private static double FINAL_MIN;
 103  
     /** Final sum value for verification for local jobs */
 104  
     private static long FINAL_SUM;
 105  
 
 106  
     public static double getFinalMax() {
 107  2
       return FINAL_MAX;
 108  
     }
 109  
 
 110  
     public static double getFinalMin() {
 111  2
       return FINAL_MIN;
 112  
     }
 113  
 
 114  
     public static long getFinalSum() {
 115  2
       return FINAL_SUM;
 116  
     }
 117  
 
 118  
     @Override
 119  
     public void preApplication()
 120  
       throws InstantiationException, IllegalAccessException {
 121  2
     }
 122  
 
 123  
     @Override
 124  
     public void postApplication() {
 125  2
       FINAL_SUM = this.<LongWritable>getAggregatedValue(SUM_AGG).get();
 126  2
       FINAL_MAX = this.<DoubleWritable>getAggregatedValue(MAX_AGG).get();
 127  2
       FINAL_MIN = this.<DoubleWritable>getAggregatedValue(MIN_AGG).get();
 128  
 
 129  2
       LOG.info("aggregatedNumVertices=" + FINAL_SUM);
 130  2
       LOG.info("aggregatedMaxPageRank=" + FINAL_MAX);
 131  2
       LOG.info("aggregatedMinPageRank=" + FINAL_MIN);
 132  2
     }
 133  
 
 134  
     @Override
 135  
     public void preSuperstep() {
 136  62
       if (getSuperstep() >= 3) {
 137  56
         LOG.info("aggregatedNumVertices=" +
 138  
             getAggregatedValue(SUM_AGG) +
 139  
             " NumVertices=" + getTotalNumVertices());
 140  56
         if (this.<LongWritable>getAggregatedValue(SUM_AGG).get() !=
 141  
             getTotalNumVertices()) {
 142  0
           throw new RuntimeException("wrong value of SumAggreg: " +
 143  
               getAggregatedValue(SUM_AGG) + ", should be: " +
 144  
               getTotalNumVertices());
 145  
         }
 146  56
         DoubleWritable maxPagerank = getAggregatedValue(MAX_AGG);
 147  56
         LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
 148  56
         DoubleWritable minPagerank = getAggregatedValue(MIN_AGG);
 149  56
         LOG.info("aggregatedMinPageRank=" + minPagerank.get());
 150  
       }
 151  62
     }
 152  
 
 153  
     @Override
 154  62
     public void postSuperstep() { }
 155  
   }
 156  
 
 157  
   /**
 158  
    * Master compute associated with {@link SimplePageRankVertex}.
 159  
    * It registers required aggregators.
 160  
    */
 161  2
   public static class SimplePageRankVertexMasterCompute extends
 162  
       DefaultMasterCompute {
 163  
     @Override
 164  
     public void initialize() throws InstantiationException,
 165  
         IllegalAccessException {
 166  2
       registerAggregator(SUM_AGG, LongSumAggregator.class);
 167  2
       registerPersistentAggregator(MIN_AGG, DoubleMinAggregator.class);
 168  2
       registerPersistentAggregator(MAX_AGG, DoubleMaxAggregator.class);
 169  2
     }
 170  
   }
 171  
 
 172  
   /**
 173  
    * Simple VertexReader that supports {@link SimplePageRankVertex}
 174  
    */
 175  
   public static class SimplePageRankVertexReader extends
 176  
       GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable,
 177  
       DoubleWritable> {
 178  
     /** Class logger */
 179  1
     private static final Logger LOG =
 180  
         Logger.getLogger(SimplePageRankVertexReader.class);
 181  
 
 182  
     /**
 183  
      * Constructor.
 184  
      */
 185  
     public SimplePageRankVertexReader() {
 186  6
       super();
 187  6
     }
 188  
 
 189  
     @Override
 190  
     public boolean nextVertex() {
 191  36
       return totalRecords > recordsRead;
 192  
     }
 193  
 
 194  
     @Override
 195  
     public Vertex<LongWritable, DoubleWritable,
 196  
         FloatWritable, DoubleWritable>
 197  
     getCurrentVertex() throws IOException {
 198  
       Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
 199  30
       vertex = BspUtils.createVertex(configuration);
 200  
 
 201  30
       LongWritable vertexId = new LongWritable(
 202  
           (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
 203  30
       DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
 204  30
       long targetVertexId =
 205  
           (vertexId.get() + 1) %
 206  
           (inputSplit.getNumSplits() * totalRecords);
 207  30
       float edgeValue = vertexId.get() * 100f;
 208  30
       Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
 209  30
       edges.put(new LongWritable(targetVertexId), new FloatWritable(edgeValue));
 210  30
       vertex.initialize(vertexId, vertexValue, edges, null);
 211  30
       ++recordsRead;
 212  30
       if (LOG.isInfoEnabled()) {
 213  30
         LOG.info("next: Return vertexId=" + vertex.getId().get() +
 214  
             ", vertexValue=" + vertex.getValue() +
 215  
             ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
 216  
       }
 217  30
       return vertex;
 218  
     }
 219  
   }
 220  
 
 221  
   /**
 222  
    * Simple VertexInputFormat that supports {@link SimplePageRankVertex}
 223  
    */
 224  12
   public static class SimplePageRankVertexInputFormat extends
 225  
     GeneratedVertexInputFormat<LongWritable,
 226  
             DoubleWritable, FloatWritable, DoubleWritable> {
 227  
     @Override
 228  
     public VertexReader<LongWritable, DoubleWritable,
 229  
     FloatWritable, DoubleWritable> createVertexReader(InputSplit split,
 230  
       TaskAttemptContext context)
 231  
       throws IOException {
 232  6
       return new SimplePageRankVertexReader();
 233  
     }
 234  
   }
 235  
 
 236  
   /**
 237  
    * Simple VertexWriter that supports {@link SimplePageRankVertex}
 238  
    */
 239  
   public static class SimplePageRankVertexWriter extends
 240  
       TextVertexWriter<LongWritable, DoubleWritable, FloatWritable> {
 241  
     /**
 242  
      * Constructor with line writer.
 243  
      *
 244  
      * @param lineRecordWriter Line writer that will do the writing.
 245  
      */
 246  
     public SimplePageRankVertexWriter(
 247  
         RecordWriter<Text, Text> lineRecordWriter) {
 248  2
       super(lineRecordWriter);
 249  2
     }
 250  
 
 251  
     @Override
 252  
     public void writeVertex(
 253  
       Vertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
 254  
       throws IOException, InterruptedException {
 255  15
       getRecordWriter().write(
 256  
           new Text(vertex.getId().toString()),
 257  
           new Text(vertex.getValue().toString()));
 258  15
     }
 259  
   }
 260  
 
 261  
   /**
 262  
    * Simple VertexOutputFormat that supports {@link SimplePageRankVertex}
 263  
    */
 264  6
   public static class SimplePageRankVertexOutputFormat extends
 265  
       TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
 266  
     @Override
 267  
     public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
 268  
     createVertexWriter(TaskAttemptContext context)
 269  
       throws IOException, InterruptedException {
 270  2
       RecordWriter<Text, Text> recordWriter =
 271  
           textOutputFormat.getRecordWriter(context);
 272  2
       return new SimplePageRankVertexWriter(recordWriter);
 273  
     }
 274  
   }
 275  
 }