Coverage Report - org.apache.giraph.examples.SimplePageRankComputation
 
Classes in this File Line Coverage Branch Coverage Complexity
SimplePageRankComputation
0%
0/27
0%
0/6
1.5
SimplePageRankComputation$SimplePageRankMasterCompute
0%
0/5
N/A
1.5
SimplePageRankComputation$SimplePageRankVertexInputFormat
0%
0/2
N/A
1.5
SimplePageRankComputation$SimplePageRankVertexOutputFormat
0%
0/2
N/A
1.5
SimplePageRankComputation$SimplePageRankVertexOutputFormat$SimplePageRankVertexWriter
0%
0/5
N/A
1.5
SimplePageRankComputation$SimplePageRankVertexReader
0%
0/21
0%
0/4
1.5
SimplePageRankComputation$SimplePageRankWorkerContext
0%
0/27
0%
0/4
1.5
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.examples;
 20  
 
 21  
 import org.apache.giraph.aggregators.DoubleMaxAggregator;
 22  
 import org.apache.giraph.aggregators.DoubleMinAggregator;
 23  
 import org.apache.giraph.aggregators.LongSumAggregator;
 24  
 import org.apache.giraph.edge.Edge;
 25  
 import org.apache.giraph.edge.EdgeFactory;
 26  
 import org.apache.giraph.graph.BasicComputation;
 27  
 import org.apache.giraph.graph.Vertex;
 28  
 import org.apache.giraph.io.VertexReader;
 29  
 import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
 30  
 import org.apache.giraph.io.formats.TextVertexOutputFormat;
 31  
 import org.apache.giraph.master.DefaultMasterCompute;
 32  
 import org.apache.giraph.worker.WorkerContext;
 33  
 import org.apache.hadoop.io.DoubleWritable;
 34  
 import org.apache.hadoop.io.FloatWritable;
 35  
 import org.apache.hadoop.io.LongWritable;
 36  
 import org.apache.hadoop.io.Text;
 37  
 import org.apache.hadoop.mapreduce.InputSplit;
 38  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 39  
 import org.apache.log4j.Logger;
 40  
 
 41  
 import com.google.common.collect.Lists;
 42  
 
 43  
 import java.io.IOException;
 44  
 import java.util.List;
 45  
 
 46  
 /**
 47  
  * Demonstrates the basic Pregel PageRank implementation.
 48  
  */
 49  
 @Algorithm(
 50  
     name = "Page rank"
 51  
 )
 52  0
 public class SimplePageRankComputation extends BasicComputation<LongWritable,
 53  
     DoubleWritable, FloatWritable, DoubleWritable> {
 54  
   /** Number of supersteps for this test */
 55  
   public static final int MAX_SUPERSTEPS = 30;
 56  
   /** Logger */
 57  0
   private static final Logger LOG =
 58  0
       Logger.getLogger(SimplePageRankComputation.class);
 59  
   /** Sum aggregator name */
 60  0
   private static String SUM_AGG = "sum";
 61  
   /** Min aggregator name */
 62  0
   private static String MIN_AGG = "min";
 63  
   /** Max aggregator name */
 64  0
   private static String MAX_AGG = "max";
 65  
 
 66  
   @Override
 67  
   public void compute(
 68  
       Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
 69  
       Iterable<DoubleWritable> messages) throws IOException {
 70  0
     if (getSuperstep() >= 1) {
 71  0
       double sum = 0;
 72  0
       for (DoubleWritable message : messages) {
 73  0
         sum += message.get();
 74  0
       }
 75  0
       DoubleWritable vertexValue =
 76  0
           new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
 77  0
       vertex.setValue(vertexValue);
 78  0
       aggregate(MAX_AGG, vertexValue);
 79  0
       aggregate(MIN_AGG, vertexValue);
 80  0
       aggregate(SUM_AGG, new LongWritable(1));
 81  0
       LOG.info(vertex.getId() + ": PageRank=" + vertexValue +
 82  0
           " max=" + getAggregatedValue(MAX_AGG) +
 83  0
           " min=" + getAggregatedValue(MIN_AGG));
 84  
     }
 85  
 
 86  0
     if (getSuperstep() < MAX_SUPERSTEPS) {
 87  0
       long edges = vertex.getNumEdges();
 88  0
       sendMessageToAllEdges(vertex,
 89  0
           new DoubleWritable(vertex.getValue().get() / edges));
 90  0
     } else {
 91  0
       vertex.voteToHalt();
 92  
     }
 93  0
   }
 94  
 
 95  
   /**
 96  
    * Worker context used with {@link SimplePageRankComputation}.
 97  
    */
 98  0
   public static class SimplePageRankWorkerContext extends
 99  
       WorkerContext {
 100  
     /** Final max value for verification for local jobs */
 101  
     private static double FINAL_MAX;
 102  
     /** Final min value for verification for local jobs */
 103  
     private static double FINAL_MIN;
 104  
     /** Final sum value for verification for local jobs */
 105  
     private static long FINAL_SUM;
 106  
 
 107  
     public static double getFinalMax() {
 108  0
       return FINAL_MAX;
 109  
     }
 110  
 
 111  
     public static double getFinalMin() {
 112  0
       return FINAL_MIN;
 113  
     }
 114  
 
 115  
     public static long getFinalSum() {
 116  0
       return FINAL_SUM;
 117  
     }
 118  
 
 119  
     @Override
 120  
     public void preApplication()
 121  
       throws InstantiationException, IllegalAccessException {
 122  0
     }
 123  
 
 124  
     @Override
 125  
     public void postApplication() {
 126  0
       FINAL_SUM = this.<LongWritable>getAggregatedValue(SUM_AGG).get();
 127  0
       FINAL_MAX = this.<DoubleWritable>getAggregatedValue(MAX_AGG).get();
 128  0
       FINAL_MIN = this.<DoubleWritable>getAggregatedValue(MIN_AGG).get();
 129  
 
 130  0
       LOG.info("aggregatedNumVertices=" + FINAL_SUM);
 131  0
       LOG.info("aggregatedMaxPageRank=" + FINAL_MAX);
 132  0
       LOG.info("aggregatedMinPageRank=" + FINAL_MIN);
 133  0
     }
 134  
 
 135  
     @Override
 136  
     public void preSuperstep() {
 137  0
       if (getSuperstep() >= 3) {
 138  0
         LOG.info("aggregatedNumVertices=" +
 139  0
             getAggregatedValue(SUM_AGG) +
 140  0
             " NumVertices=" + getTotalNumVertices());
 141  0
         if (this.<LongWritable>getAggregatedValue(SUM_AGG).get() !=
 142  0
             getTotalNumVertices()) {
 143  0
           throw new RuntimeException("wrong value of SumAggreg: " +
 144  0
               getAggregatedValue(SUM_AGG) + ", should be: " +
 145  0
               getTotalNumVertices());
 146  
         }
 147  0
         DoubleWritable maxPagerank = getAggregatedValue(MAX_AGG);
 148  0
         LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
 149  0
         DoubleWritable minPagerank = getAggregatedValue(MIN_AGG);
 150  0
         LOG.info("aggregatedMinPageRank=" + minPagerank.get());
 151  
       }
 152  0
     }
 153  
 
 154  
     @Override
 155  0
     public void postSuperstep() { }
 156  
   }
 157  
 
 158  
   /**
 159  
    * Master compute associated with {@link SimplePageRankComputation}.
 160  
    * It registers required aggregators.
 161  
    */
 162  0
   public static class SimplePageRankMasterCompute extends
 163  
       DefaultMasterCompute {
 164  
     @Override
 165  
     public void initialize() throws InstantiationException,
 166  
         IllegalAccessException {
 167  0
       registerAggregator(SUM_AGG, LongSumAggregator.class);
 168  0
       registerPersistentAggregator(MIN_AGG, DoubleMinAggregator.class);
 169  0
       registerPersistentAggregator(MAX_AGG, DoubleMaxAggregator.class);
 170  0
     }
 171  
   }
 172  
 
 173  
   /**
 174  
    * Simple VertexReader that supports {@link SimplePageRankComputation}
 175  
    */
 176  0
   public static class SimplePageRankVertexReader extends
 177  
       GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable> {
 178  
     /** Class logger */
 179  0
     private static final Logger LOG =
 180  0
         Logger.getLogger(SimplePageRankVertexReader.class);
 181  
 
 182  
     @Override
 183  
     public boolean nextVertex() {
 184  0
       return totalRecords > recordsRead;
 185  
     }
 186  
 
 187  
     @Override
 188  
     public Vertex<LongWritable, DoubleWritable, FloatWritable>
 189  
     getCurrentVertex() throws IOException {
 190  0
       Vertex<LongWritable, DoubleWritable, FloatWritable> vertex =
 191  0
           getConf().createVertex();
 192  0
       LongWritable vertexId = new LongWritable(
 193  0
           (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
 194  0
       DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
 195  0
       long targetVertexId =
 196  0
           (vertexId.get() + 1) %
 197  0
           (inputSplit.getNumSplits() * totalRecords);
 198  0
       float edgeValue = vertexId.get() * 100f;
 199  0
       List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
 200  0
       edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
 201  
           new FloatWritable(edgeValue)));
 202  0
       vertex.initialize(vertexId, vertexValue, edges);
 203  0
       ++recordsRead;
 204  0
       if (LOG.isInfoEnabled()) {
 205  0
         LOG.info("next: Return vertexId=" + vertex.getId().get() +
 206  0
             ", vertexValue=" + vertex.getValue() +
 207  
             ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
 208  
       }
 209  0
       return vertex;
 210  
     }
 211  
   }
 212  
 
 213  
   /**
 214  
    * Simple VertexInputFormat that supports {@link SimplePageRankComputation}
 215  
    */
 216  0
   public static class SimplePageRankVertexInputFormat extends
 217  
     GeneratedVertexInputFormat<LongWritable, DoubleWritable, FloatWritable> {
 218  
     @Override
 219  
     public VertexReader<LongWritable, DoubleWritable,
 220  
     FloatWritable> createVertexReader(InputSplit split,
 221  
       TaskAttemptContext context)
 222  
       throws IOException {
 223  0
       return new SimplePageRankVertexReader();
 224  
     }
 225  
   }
 226  
 
 227  
   /**
 228  
    * Simple VertexOutputFormat that supports {@link SimplePageRankComputation}
 229  
    */
 230  0
   public static class SimplePageRankVertexOutputFormat extends
 231  
       TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
 232  
     @Override
 233  
     public TextVertexWriter createVertexWriter(TaskAttemptContext context)
 234  
       throws IOException, InterruptedException {
 235  0
       return new SimplePageRankVertexWriter();
 236  
     }
 237  
 
 238  
     /**
 239  
      * Simple VertexWriter that supports {@link SimplePageRankComputation}
 240  
      */
 241  0
     public class SimplePageRankVertexWriter extends TextVertexWriter {
 242  
       @Override
 243  
       public void writeVertex(
 244  
           Vertex<LongWritable, DoubleWritable, FloatWritable> vertex)
 245  
         throws IOException, InterruptedException {
 246  0
         getRecordWriter().write(
 247  0
             new Text(vertex.getId().toString()),
 248  0
             new Text(vertex.getValue().toString()));
 249  0
       }
 250  
     }
 251  
   }
 252  
 }