Coverage Report - org.apache.giraph.examples.VerifyMessage
 
Classes in this File Line Coverage Branch Coverage Complexity
VerifyMessage
0%
0/1
N/A
2.167
VerifyMessage$VerifiableMessage
0%
0/16
N/A
2.167
VerifyMessage$VerifyMessageComputation
0%
0/53
0%
0/22
2.167
VerifyMessage$VerifyMessageComputation$VerifyMessageVertexWorkerContext
0%
0/10
N/A
2.167
VerifyMessage$VerifyMessageMasterCompute
0%
0/3
N/A
2.167
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.examples;
 20  
 
 21  
 import org.apache.giraph.aggregators.LongSumAggregator;
 22  
 import org.apache.giraph.graph.BasicComputation;
 23  
 import org.apache.giraph.edge.Edge;
 24  
 import org.apache.giraph.edge.EdgeFactory;
 25  
 import org.apache.giraph.master.DefaultMasterCompute;
 26  
 import org.apache.giraph.graph.Vertex;
 27  
 import org.apache.giraph.worker.WorkerContext;
 28  
 import org.apache.hadoop.io.FloatWritable;
 29  
 import org.apache.hadoop.io.IntWritable;
 30  
 import org.apache.hadoop.io.LongWritable;
 31  
 import org.apache.hadoop.io.Writable;
 32  
 import org.apache.log4j.Logger;
 33  
 
 34  
 import java.io.DataInput;
 35  
 import java.io.DataOutput;
 36  
 import java.io.IOException;
 37  
 
 38  
 /**
 39  
  * An example that simply uses its id, value, and edges to compute new data
 40  
  * every iteration to verify that messages are sent and received at the
 41  
  * appropriate location and superstep.
 42  
  */
 43  0
 public class VerifyMessage {
 44  
   /**
 45  
    * Message that will be sent in {@link VerifyMessageComputation}.
 46  
    */
 47  0
   public static class VerifiableMessage implements Writable {
 48  
     /** Superstep sent on */
 49  
     private long superstep;
 50  
     /** Source vertex id */
 51  
     private long sourceVertexId;
 52  
     /** Value */
 53  
     private float value;
 54  
 
 55  
     /**
 56  
      * Default constructor used with reflection.
 57  
      */
 58  0
     public VerifiableMessage() { }
 59  
 
 60  
     /**
 61  
      * Constructor with verifiable arguments.
 62  
      * @param superstep Superstep this message was created on.
 63  
      * @param sourceVertexId Who send this message.
 64  
      * @param value A value associated with this message.
 65  
      */
 66  
     public VerifiableMessage(
 67  0
         long superstep, long sourceVertexId, float value) {
 68  0
       this.superstep = superstep;
 69  0
       this.sourceVertexId = sourceVertexId;
 70  0
       this.value = value;
 71  0
     }
 72  
 
 73  
     @Override
 74  
     public void readFields(DataInput input) throws IOException {
 75  0
       superstep = input.readLong();
 76  0
       sourceVertexId = input.readLong();
 77  0
       value = input.readFloat();
 78  0
     }
 79  
 
 80  
     @Override
 81  
     public void write(DataOutput output) throws IOException {
 82  0
       output.writeLong(superstep);
 83  0
       output.writeLong(sourceVertexId);
 84  0
       output.writeFloat(value);
 85  0
     }
 86  
 
 87  
     @Override
 88  
     public String toString() {
 89  0
       return "(superstep=" + superstep + ",sourceVertexId=" +
 90  
           sourceVertexId + ",value=" + value + ")";
 91  
     }
 92  
   }
 93  
 
 94  
   /**
 95  
    * Send and verify messages.
 96  
    */
 97  0
   public static class VerifyMessageComputation extends
 98  
       BasicComputation<LongWritable, IntWritable, FloatWritable,
 99  
           VerifiableMessage> {
 100  
     /** Dynamically set number of SUPERSTEPS */
 101  
     public static final String SUPERSTEP_COUNT =
 102  
         "verifyMessageVertex.superstepCount";
 103  
     /** User can access this after the application finishes if local */
 104  
     private static long FINAL_SUM;
 105  
     /** Number of SUPERSTEPS to run (6 by default) */
 106  0
     private static int SUPERSTEPS = 6;
 107  
     /** Class logger */
 108  0
     private static Logger LOG =
 109  0
         Logger.getLogger(VerifyMessageComputation.class);
 110  
 
 111  
     public static long getFinalSum() {
 112  0
       return FINAL_SUM;
 113  
     }
 114  
 
 115  
     /**
 116  
      * Worker context used with {@link VerifyMessageComputation}.
 117  
      */
 118  0
     public static class VerifyMessageVertexWorkerContext extends
 119  
         WorkerContext {
 120  
       @Override
 121  
       public void preApplication() throws InstantiationException,
 122  
       IllegalAccessException {
 123  0
         SUPERSTEPS = getContext().getConfiguration().getInt(
 124  0
             SUPERSTEP_COUNT, SUPERSTEPS);
 125  0
       }
 126  
 
 127  
       @Override
 128  
       public void postApplication() {
 129  0
         LongWritable sumAggregatorValue =
 130  0
             getAggregatedValue(LongSumAggregator.class.getName());
 131  0
         FINAL_SUM = sumAggregatorValue.get();
 132  0
       }
 133  
 
 134  
       @Override
 135  
       public void preSuperstep() {
 136  0
       }
 137  
 
 138  
       @Override
 139  0
       public void postSuperstep() { }
 140  
     }
 141  
 
 142  
     @Override
 143  
     public void compute(
 144  
         Vertex<LongWritable, IntWritable, FloatWritable> vertex,
 145  
         Iterable<VerifiableMessage> messages) throws IOException {
 146  0
       String sumAggregatorName = LongSumAggregator.class.getName();
 147  0
       if (getSuperstep() > SUPERSTEPS) {
 148  0
         vertex.voteToHalt();
 149  0
         return;
 150  
       }
 151  0
       if (LOG.isDebugEnabled()) {
 152  0
         LOG.debug("compute: " + getAggregatedValue(sumAggregatorName));
 153  
       }
 154  0
       aggregate(sumAggregatorName, new LongWritable(vertex.getId().get()));
 155  0
       if (LOG.isDebugEnabled()) {
 156  0
         LOG.debug("compute: sum = " +
 157  0
             this.<LongWritable>getAggregatedValue(sumAggregatorName).get() +
 158  0
             " for vertex " + vertex.getId());
 159  
       }
 160  0
       float msgValue = 0.0f;
 161  0
       for (VerifiableMessage message : messages) {
 162  0
         msgValue += message.value;
 163  0
         if (LOG.isDebugEnabled()) {
 164  0
           LOG.debug("compute: got msg = " + message +
 165  0
               " for vertex id " + vertex.getId() +
 166  0
               ", vertex value " + vertex.getValue() +
 167  0
               " on superstep " + getSuperstep());
 168  
         }
 169  0
         if (message.superstep != getSuperstep() - 1) {
 170  0
           throw new IllegalStateException(
 171  
               "compute: Impossible to not get a messsage from " +
 172  
                   "the previous superstep, current superstep = " +
 173  0
                   getSuperstep());
 174  
         }
 175  0
         if ((message.sourceVertexId != vertex.getId().get() - 1) &&
 176  0
             (vertex.getId().get() != 0)) {
 177  0
           throw new IllegalStateException(
 178  
               "compute: Impossible that this message didn't come " +
 179  
                   "from the previous vertex and came from " +
 180  0
                   message.sourceVertexId);
 181  
         }
 182  0
       }
 183  0
       int vertexValue = vertex.getValue().get();
 184  0
       vertex.setValue(new IntWritable(vertexValue + (int) msgValue));
 185  0
       if (LOG.isDebugEnabled()) {
 186  0
         LOG.debug("compute: vertex " + vertex.getId() +
 187  0
             " has value " + vertex.getValue() +
 188  0
             " on superstep " + getSuperstep());
 189  
       }
 190  0
       for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
 191  0
         FloatWritable newEdgeValue = new FloatWritable(
 192  0
             edge.getValue().get() + (float) vertexValue);
 193  0
         Edge<LongWritable, FloatWritable> newEdge =
 194  0
             EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue);
 195  0
         if (LOG.isDebugEnabled()) {
 196  0
           LOG.debug("compute: vertex " + vertex.getId() +
 197  0
               " sending edgeValue " + edge.getValue() +
 198  
               " vertexValue " + vertexValue +
 199  
               " total " + newEdgeValue +
 200  0
               " to vertex " + edge.getTargetVertexId() +
 201  0
               " on superstep " + getSuperstep());
 202  
         }
 203  0
         vertex.addEdge(newEdge);
 204  0
         sendMessage(edge.getTargetVertexId(),
 205  
             new VerifiableMessage(
 206  0
                 getSuperstep(), vertex.getId().get(), newEdgeValue.get()));
 207  0
       }
 208  0
     }
 209  
   }
 210  
 
 211  
   /**
 212  
    * Master compute associated with {@link VerifyMessageComputation}.
 213  
    * It registers required aggregators.
 214  
    */
 215  0
   public static class VerifyMessageMasterCompute extends
 216  
       DefaultMasterCompute {
 217  
     @Override
 218  
     public void initialize() throws InstantiationException,
 219  
         IllegalAccessException {
 220  0
       registerAggregator(LongSumAggregator.class.getName(),
 221  
           LongSumAggregator.class);
 222  0
     }
 223  
   }
 224  
 }