Coverage Report - org.apache.giraph.examples.PageRankWithKryoSimpleWritable
 
Classes in this File Line Coverage Branch Coverage Complexity
PageRankWithKryoSimpleWritable
0%
0/27
0%
0/6
0
PageRankWithKryoSimpleWritable$DoubleMaxWrapperAggregator
0%
0/5
N/A
0
PageRankWithKryoSimpleWritable$DoubleMinWrapperAggregator
0%
0/5
N/A
0
PageRankWithKryoSimpleWritable$EdgeValue
0%
0/8
N/A
0
PageRankWithKryoSimpleWritable$MessageValue
0%
0/9
N/A
0
PageRankWithKryoSimpleWritable$PageRankWithKryoMasterCompute
0%
0/5
N/A
0
PageRankWithKryoSimpleWritable$PageRankWithKryoVertexInputFormat
0%
0/2
N/A
0
PageRankWithKryoSimpleWritable$PageRankWithKryoVertexReader
0%
0/22
0%
0/4
0
PageRankWithKryoSimpleWritable$PageRankWithKryoWorkerContext
0%
0/27
0%
0/4
0
PageRankWithKryoSimpleWritable$VertexValue
0%
0/9
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.examples;
 20  
 
 21  
 import com.google.common.collect.Lists;
 22  
 import org.apache.giraph.aggregators.BasicAggregator;
 23  
 import org.apache.giraph.aggregators.LongSumAggregator;
 24  
 import org.apache.giraph.edge.Edge;
 25  
 import org.apache.giraph.edge.EdgeFactory;
 26  
 import org.apache.giraph.graph.BasicComputation;
 27  
 import org.apache.giraph.graph.Vertex;
 28  
 import org.apache.giraph.io.VertexReader;
 29  
 import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
 30  
 import org.apache.giraph.master.DefaultMasterCompute;
 31  
 import org.apache.giraph.worker.WorkerContext;
 32  
 import org.apache.giraph.writable.kryo.KryoSimpleWritable;
 33  
 import org.apache.hadoop.io.LongWritable;
 34  
 import org.apache.hadoop.mapreduce.InputSplit;
 35  
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 36  
 import org.apache.log4j.Logger;
 37  
 
 38  
 import java.io.IOException;
 39  
 import java.util.ArrayList;
 40  
 import java.util.List;
 41  
 import org.apache.giraph.examples.PageRankWithKryoSimpleWritable.VertexValue;
 42  
 import org.apache.giraph.examples.PageRankWithKryoSimpleWritable.MessageValue;
 43  
 import org.apache.giraph.examples.PageRankWithKryoSimpleWritable.EdgeValue;
 44  
 
 45  
 /**
 46  
  * Copy of SimplePageRank, modified to test vertex/edge and
 47  
  * message values that derives from KryoSimpleWritable.
 48  
  */
 49  
 @Algorithm(
 50  
         name = "Page rank"
 51  
 )
 52  0
 public class PageRankWithKryoSimpleWritable extends
 53  
         BasicComputation<LongWritable, VertexValue,
 54  
         EdgeValue, MessageValue> {
 55  
   /** Number of supersteps for this test */
 56  
   public static final int MAX_SUPERSTEPS = 30;
 57  
   /** Number of supersteps for this static  3;
 58  
   /** Logger */
 59  0
   private static final Logger LOG =
 60  0
           Logger.getLogger(PageRankWithKryoSimpleWritable.class);
 61  
   /** Sum aggregator name */
 62  0
   private static String SUM_AGG = "sum";
 63  
   /** Min aggregator name */
 64  0
   private static String MIN_AGG = "min";
 65  
   /** Max aggregator name */
 66  0
   private static String MAX_AGG = "max";
 67  
 
 68  
   @Override
 69  
   public void compute(
 70  
           Vertex<LongWritable, VertexValue,
 71  
           EdgeValue> vertex,
 72  
           Iterable<MessageValue> messages) throws IOException {
 73  0
     if (getSuperstep() >= 1) {
 74  0
       double sum = 0;
 75  0
       for (MessageValue message : messages) {
 76  0
         sum += message.get();
 77  0
       }
 78  0
       Double value = (0.15f / getTotalNumVertices()) + 0.85f * sum;
 79  0
       VertexValue vertexValue = new VertexValue(value);
 80  0
       vertex.setValue(vertexValue);
 81  0
       aggregate(MAX_AGG, vertexValue);
 82  0
       aggregate(MIN_AGG, vertexValue);
 83  0
       aggregate(SUM_AGG, new LongWritable(1));
 84  0
       LOG.info(vertex.getId() + ": PageRank=" + vertexValue +
 85  0
               " max=" + getAggregatedValue(MAX_AGG) +
 86  0
               " min=" + getAggregatedValue(MIN_AGG));
 87  
     }
 88  
 
 89  0
     if (getSuperstep() < MAX_SUPERSTEPS) {
 90  0
       long edges = vertex.getNumEdges();
 91  0
       sendMessageToAllEdges(vertex,
 92  0
           new MessageValue(vertex.getValue().get() / edges));
 93  0
     } else {
 94  0
       vertex.voteToHalt();
 95  
     }
 96  0
   }
 97  
 
 98  
   /**
 99  
    * Worker context used with {@link PageRankWithKryoSimpleWritable}.
 100  
    */
 101  0
   public static class PageRankWithKryoWorkerContext extends
 102  
           WorkerContext {
 103  
     /** Final max value for verification for local jobs */
 104  
     private static double FINAL_MAX;
 105  
     /** Final min value for verification for local jobs */
 106  
     private static double FINAL_MIN;
 107  
     /** Final sum value for verification for local jobs */
 108  
     private static long FINAL_SUM;
 109  
 
 110  
     public static double getFinalMax() {
 111  0
       return FINAL_MAX;
 112  
     }
 113  
 
 114  
     public static double getFinalMin() {
 115  0
       return FINAL_MIN;
 116  
     }
 117  
 
 118  
     public static long getFinalSum() {
 119  0
       return FINAL_SUM;
 120  
     }
 121  
 
 122  
     @Override
 123  
     public void preApplication()
 124  
             throws InstantiationException, IllegalAccessException {
 125  0
     }
 126  
 
 127  
     @Override
 128  
     public void postApplication() {
 129  0
       FINAL_SUM = this.<LongWritable>getAggregatedValue(SUM_AGG).get();
 130  0
       FINAL_MAX = this.<VertexValue>getAggregatedValue(MAX_AGG).get();
 131  0
       FINAL_MIN = this.<VertexValue>getAggregatedValue(MIN_AGG).get();
 132  
 
 133  0
       LOG.info("aggregatedNumVertices=" + FINAL_SUM);
 134  0
       LOG.info("aggregatedMaxPageRank=" + FINAL_MAX);
 135  0
       LOG.info("aggregatedMinPageRank=" + FINAL_MIN);
 136  0
     }
 137  
 
 138  
     @Override
 139  
     public void preSuperstep() {
 140  0
       if (getSuperstep() >= 3) {
 141  0
         LOG.info("aggregatedNumVertices=" +
 142  0
                 getAggregatedValue(SUM_AGG) +
 143  0
                 " NumVertices=" + getTotalNumVertices());
 144  0
         if (this.<LongWritable>getAggregatedValue(SUM_AGG).get() !=
 145  0
                 getTotalNumVertices()) {
 146  0
           throw new RuntimeException("wrong value of SumAggreg: " +
 147  0
                   getAggregatedValue(SUM_AGG) + ", should be: " +
 148  0
                   getTotalNumVertices());
 149  
         }
 150  0
         VertexValue maxPagerank = getAggregatedValue(MAX_AGG);
 151  0
         LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
 152  0
         VertexValue minPagerank = getAggregatedValue(MIN_AGG);
 153  0
         LOG.info("aggregatedMinPageRank=" + minPagerank.get());
 154  
       }
 155  0
     }
 156  
 
 157  
     @Override
 158  0
     public void postSuperstep() { }
 159  
   }
 160  
 
 161  
   /**
 162  
    * Master compute associated with {@link PageRankWithKryoSimpleWritable}.
 163  
    * It registers required aggregators.
 164  
    */
 165  0
   public static class PageRankWithKryoMasterCompute extends
 166  
           DefaultMasterCompute {
 167  
     @Override
 168  
     public void initialize() throws InstantiationException,
 169  
             IllegalAccessException {
 170  0
       registerAggregator(SUM_AGG, LongSumAggregator.class);
 171  0
       registerPersistentAggregator(MIN_AGG, DoubleMinWrapperAggregator.class);
 172  0
       registerPersistentAggregator(MAX_AGG, DoubleMaxWrapperAggregator.class);
 173  0
     }
 174  
   }
 175  
 
 176  
   /**
 177  
    * Simple VertexReader that supports {@link PageRankWithKryoSimpleWritable}
 178  
    */
 179  0
   public static class PageRankWithKryoVertexReader extends
 180  
           GeneratedVertexReader<LongWritable, VertexValue, EdgeValue> {
 181  
     /** Class logger */
 182  0
     private static final Logger LOG =
 183  0
         Logger.getLogger(
 184  
           PageRankWithKryoSimpleWritable.PageRankWithKryoVertexReader.class);
 185  
 
 186  
     @Override
 187  
     public boolean nextVertex() {
 188  0
       return totalRecords > recordsRead;
 189  
     }
 190  
 
 191  
     @Override
 192  
     public Vertex<LongWritable, VertexValue, EdgeValue>
 193  
     getCurrentVertex() throws IOException {
 194  0
       Vertex<LongWritable, VertexValue, EdgeValue> vertex =
 195  0
               getConf().createVertex();
 196  0
       LongWritable vertexId = new LongWritable(
 197  0
               (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
 198  0
       VertexValue vertexValue = new VertexValue(vertexId.get() * 10d);
 199  0
       long targetVertexId =
 200  0
               (vertexId.get() + 1) %
 201  0
                       (inputSplit.getNumSplits() * totalRecords);
 202  0
       float edgeValue = vertexId.get() * 100f;
 203  0
       List<Edge<LongWritable, EdgeValue>> edges = Lists.newLinkedList();
 204  0
       edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
 205  0
               new EdgeValue(edgeValue)));
 206  0
       vertex.initialize(vertexId, vertexValue, edges);
 207  0
       ++recordsRead;
 208  0
       if (LOG.isInfoEnabled()) {
 209  0
         LOG.info("next: Return vertexId=" + vertex.getId().get() +
 210  0
           ", vertexValue=" + vertex.getValue() +
 211  
           ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
 212  
       }
 213  0
       return vertex;
 214  
     }
 215  
   }
 216  
 
 217  
   /**
 218  
    *  VertexInputFormat that supports {@link PageRankWithKryoSimpleWritable}
 219  
    */
 220  0
   public static class PageRankWithKryoVertexInputFormat extends
 221  
           GeneratedVertexInputFormat<LongWritable, VertexValue, EdgeValue> {
 222  
     @Override
 223  
     public VertexReader<LongWritable, VertexValue,
 224  
             EdgeValue> createVertexReader(InputSplit split,
 225  
                                               TaskAttemptContext context)
 226  
             throws IOException {
 227  0
       return new PageRankWithKryoVertexReader();
 228  
     }
 229  
   }
 230  
 
 231  
   /**
 232  
    * Creating a custom vertex value class to force kryo to
 233  
    * register with a new ID. Please note that a custom
 234  
    * class containing a double array is not
 235  
    * necessary for the page rank application. It is only
 236  
    * used for testing the scenario of kryo encountering an
 237  
    * unregistered custom class.
 238  
    */
 239  
   public static class VertexValue extends KryoSimpleWritable {
 240  
     /** Storing the value in an array.
 241  
         Double array is an unregistered type
 242  
         hence kryo will assign a unique class id */
 243  
     private double[] ranks;
 244  
 
 245  
     /** Constructor */
 246  0
     public VertexValue() {
 247  0
     }
 248  
 
 249  
     /**
 250  
      * Constructor
 251  
      * @param val Vertex value
 252  
      */
 253  0
     public VertexValue(Double val) {
 254  0
       ranks = new double[1];
 255  
 
 256  0
       ranks[0] = val;
 257  0
     }
 258  
 
 259  
     /**
 260  
      * Get vertex value
 261  
      * @return Vertex value
 262  
      */
 263  
     public Double get() {
 264  0
       return ranks[0];
 265  
     }
 266  
 
 267  
     /**
 268  
      * Set vertex value.
 269  
      * @param val Vertex value
 270  
      */
 271  
     public void set(Double val) {
 272  0
       this.ranks[0] = val;
 273  0
     }
 274  
   }
 275  
 
 276  
   /**
 277  
    * Creating a custom edge value class to force kryo to
 278  
    * register with a new ID. Please note that a custom
 279  
    * class containing a float is not
 280  
    * necessary for the page rank application. It is only
 281  
    * used for testing the scenario of kryo encountering an
 282  
    * unregistered custom class.
 283  
    */
 284  
   public static class EdgeValue extends KryoSimpleWritable {
 285  
     /** Edge value */
 286  
     private Float realValue;
 287  
 
 288  
     /** Constructor */
 289  0
     public EdgeValue() {
 290  0
     }
 291  
     /**
 292  
      * Constructor
 293  
      * @param val Edge value
 294  
      */
 295  0
     public EdgeValue(Float val) {
 296  0
       realValue = val;
 297  0
     }
 298  
 
 299  
     /**
 300  
      * Get edge value
 301  
      * @return Edge value
 302  
      */
 303  
     public Float get() {
 304  0
       return realValue;
 305  
     }
 306  
 
 307  
     /**
 308  
      * Set edge value
 309  
      * @param val Edge value
 310  
      */
 311  
     public void set(Float val) {
 312  0
       this.realValue = val;
 313  0
     }
 314  
   }
 315  
 
 316  
   /**
 317  
    * Creating a custom message value class to force kryo to
 318  
    * register with a new ID. Please note that a custom
 319  
    * class containing a double list is not
 320  
    * necessary for the page rank application. It is only
 321  
    * used for testing the scenario of kryo encountering an
 322  
    * unregistered custom class.
 323  
    */
 324  
   public static class MessageValue extends KryoSimpleWritable {
 325  
     /** Storing the message in a list to test the list type */
 326  
     private List<Double> msgValue;
 327  
 
 328  
     /** Constructor */
 329  0
     public MessageValue() {
 330  0
     }
 331  
 
 332  
     /**
 333  
      * Constructor
 334  
      * @param val Message value
 335  
      */
 336  0
     public MessageValue(Double val) {
 337  0
       msgValue = new ArrayList<>();
 338  0
       msgValue.add(val);
 339  0
     }
 340  
 
 341  
     /**
 342  
      * Get message value
 343  
      * @return Message value
 344  
      */
 345  
     public Double get() {
 346  0
       return msgValue.get(0);
 347  
     }
 348  
 
 349  
     /**
 350  
      * Set message value
 351  
      * @param val Message value
 352  
      */
 353  
     public void set(Double val) {
 354  0
       this.msgValue.set(0, val);
 355  0
     }
 356  
   }
 357  
 
 358  
 
 359  
   /**
 360  
    * Aggregator for getting max double value
 361  
    */
 362  0
   public static class DoubleMaxWrapperAggregator extends
 363  
           BasicAggregator<VertexValue> {
 364  
     @Override
 365  
     public void aggregate(VertexValue value) {
 366  0
       getAggregatedValue().set(
 367  0
                       Math.max(getAggregatedValue().get(), value.get()));
 368  0
     }
 369  
 
 370  
     @Override
 371  
     public VertexValue createInitialValue() {
 372  0
       return new VertexValue(Double.NEGATIVE_INFINITY);
 373  
     }
 374  
   }
 375  
 
 376  
   /**
 377  
    * Aggregator for getting min double value.
 378  
    */
 379  0
   public static class DoubleMinWrapperAggregator
 380  
           extends BasicAggregator<VertexValue> {
 381  
     @Override
 382  
     public void aggregate(VertexValue value) {
 383  0
       getAggregatedValue().set(
 384  0
               Math.min(getAggregatedValue().get(), value.get()));
 385  0
     }
 386  
 
 387  
     @Override
 388  
     public VertexValue createInitialValue() {
 389  0
       return  new VertexValue(Double.MAX_VALUE);
 390  
     }
 391  
   }
 392  
 
 393  
 }