Coverage Report - org.apache.giraph.examples.VerifyMessage
 
Classes in this File Line Coverage Branch Coverage Complexity
VerifyMessage
0%
0/2
N/A
2.167
VerifyMessage$VerifiableMessage
0%
0/16
N/A
2.167
VerifyMessage$VerifyMessageMasterCompute
0%
0/3
N/A
2.167
VerifyMessage$VerifyMessageVertex
0%
0/35
0%
0/22
2.167
VerifyMessage$VerifyMessageVertex$VerifyMessageVertexWorkerContext
0%
0/8
N/A
2.167
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.examples;
 20  
 
 21  
 import org.apache.giraph.aggregators.LongSumAggregator;
 22  
 import org.apache.giraph.graph.DefaultMasterCompute;
 23  
 import org.apache.giraph.graph.Edge;
 24  
 import org.apache.giraph.graph.EdgeListVertex;
 25  
 import org.apache.giraph.graph.WorkerContext;
 26  
 import org.apache.hadoop.io.FloatWritable;
 27  
 import org.apache.hadoop.io.IntWritable;
 28  
 import org.apache.hadoop.io.LongWritable;
 29  
 import org.apache.hadoop.io.Writable;
 30  
 import org.apache.log4j.Logger;
 31  
 
 32  
 import java.io.DataInput;
 33  
 import java.io.DataOutput;
 34  
 import java.io.IOException;
 35  
 
 36  
 /**
 37  
  * An example that simply uses its id, value, and edges to compute new data
 38  
  * every iteration to verify that messages are sent and received at the
 39  
  * appropriate location and superstep.
 40  
  */
 41  0
 public class VerifyMessage {
 42  
   /**
 43  
    * Message that will be sent in {@link VerifyMessageVertex}.
 44  
    */
 45  0
   public static class VerifiableMessage implements Writable {
 46  
     /** Superstep sent on */
 47  
     private long superstep;
 48  
     /** Source vertex id */
 49  
     private long sourceVertexId;
 50  
     /** Value */
 51  
     private float value;
 52  
 
 53  
     /**
 54  
      * Default constructor used with reflection.
 55  
      */
 56  0
     public VerifiableMessage() { }
 57  
 
 58  
     /**
 59  
      * Constructor with verifiable arguments.
 60  
      * @param superstep Superstep this message was created on.
 61  
      * @param sourceVertexId Who send this message.
 62  
      * @param value A value associated with this message.
 63  
      */
 64  
     public VerifiableMessage(
 65  0
         long superstep, long sourceVertexId, float value) {
 66  0
       this.superstep = superstep;
 67  0
       this.sourceVertexId = sourceVertexId;
 68  0
       this.value = value;
 69  0
     }
 70  
 
 71  
     @Override
 72  
     public void readFields(DataInput input) throws IOException {
 73  0
       superstep = input.readLong();
 74  0
       sourceVertexId = input.readLong();
 75  0
       value = input.readFloat();
 76  0
     }
 77  
 
 78  
     @Override
 79  
     public void write(DataOutput output) throws IOException {
 80  0
       output.writeLong(superstep);
 81  0
       output.writeLong(sourceVertexId);
 82  0
       output.writeFloat(value);
 83  0
     }
 84  
 
 85  
     @Override
 86  
     public String toString() {
 87  0
       return "(superstep=" + superstep + ",sourceVertexId=" +
 88  
           sourceVertexId + ",value=" + value + ")";
 89  
     }
 90  
   }
 91  
 
 92  
   /**
 93  
    * Send and verify messages.
 94  
    */
 95  0
   public static class VerifyMessageVertex extends
 96  
       EdgeListVertex<LongWritable, IntWritable, FloatWritable,
 97  
       VerifiableMessage> {
 98  
     /** Dynamically set number of SUPERSTEPS */
 99  
     public static final String SUPERSTEP_COUNT =
 100  
         "verifyMessageVertex.superstepCount";
 101  
     /** User can access this after the application finishes if local */
 102  
     private static long FINAL_SUM;
 103  
     /** Number of SUPERSTEPS to run (6 by default) */
 104  0
     private static int SUPERSTEPS = 6;
 105  
     /** Class logger */
 106  0
     private static Logger LOG = Logger.getLogger(VerifyMessageVertex.class);
 107  
 
 108  
     public static long getFinalSum() {
 109  0
       return FINAL_SUM;
 110  
     }
 111  
 
 112  
     /**
 113  
      * Worker context used with {@link VerifyMessageVertex}.
 114  
      */
 115  0
     public static class VerifyMessageVertexWorkerContext extends
 116  
         WorkerContext {
 117  
       @Override
 118  
       public void preApplication() throws InstantiationException,
 119  
       IllegalAccessException {
 120  0
         SUPERSTEPS = getContext().getConfiguration().getInt(
 121  
             SUPERSTEP_COUNT, SUPERSTEPS);
 122  0
       }
 123  
 
 124  
       @Override
 125  
       public void postApplication() {
 126  0
         LongWritable sumAggregatorValue =
 127  
             getAggregatedValue(LongSumAggregator.class.getName());
 128  0
         FINAL_SUM = sumAggregatorValue.get();
 129  0
       }
 130  
 
 131  
       @Override
 132  
       public void preSuperstep() {
 133  0
       }
 134  
 
 135  
       @Override
 136  0
       public void postSuperstep() { }
 137  
     }
 138  
 
 139  
     @Override
 140  
     public void compute(Iterable<VerifiableMessage> messages) {
 141  0
       String sumAggregatorName = LongSumAggregator.class.getName();
 142  0
       if (getSuperstep() > SUPERSTEPS) {
 143  0
         voteToHalt();
 144  0
         return;
 145  
       }
 146  0
       if (LOG.isDebugEnabled()) {
 147  0
         LOG.debug("compute: " + getAggregatedValue(sumAggregatorName));
 148  
       }
 149  0
       aggregate(sumAggregatorName, new LongWritable(getId().get()));
 150  0
       if (LOG.isDebugEnabled()) {
 151  0
         LOG.debug("compute: sum = " +
 152  
             this.<LongWritable>getAggregatedValue(sumAggregatorName).get() +
 153  
             " for vertex " + getId());
 154  
       }
 155  0
       float msgValue = 0.0f;
 156  0
       for (VerifiableMessage message : messages) {
 157  0
         msgValue += message.value;
 158  0
         if (LOG.isDebugEnabled()) {
 159  0
           LOG.debug("compute: got msg = " + message +
 160  
               " for vertex id " + getId() +
 161  
               ", vertex value " + getValue() +
 162  
               " on superstep " + getSuperstep());
 163  
         }
 164  0
         if (message.superstep != getSuperstep() - 1) {
 165  0
           throw new IllegalStateException(
 166  
               "compute: Impossible to not get a messsage from " +
 167  
                   "the previous superstep, current superstep = " +
 168  
                   getSuperstep());
 169  
         }
 170  0
         if ((message.sourceVertexId != getId().get() - 1) &&
 171  
             (getId().get() != 0)) {
 172  0
           throw new IllegalStateException(
 173  
               "compute: Impossible that this message didn't come " +
 174  
                   "from the previous vertex and came from " +
 175  
                   message.sourceVertexId);
 176  
         }
 177  
       }
 178  0
       int vertexValue = getValue().get();
 179  0
       setValue(new IntWritable(vertexValue + (int) msgValue));
 180  0
       if (LOG.isDebugEnabled()) {
 181  0
         LOG.debug("compute: vertex " + getId() +
 182  
             " has value " + getValue() +
 183  
             " on superstep " + getSuperstep());
 184  
       }
 185  0
       for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
 186  0
         FloatWritable newEdgeValue = new FloatWritable(
 187  
             edge.getValue().get() + (float) vertexValue);
 188  0
         if (LOG.isDebugEnabled()) {
 189  0
           LOG.debug("compute: vertex " + getId() +
 190  
               " sending edgeValue " + edge.getValue() +
 191  
               " vertexValue " + vertexValue +
 192  
               " total " + newEdgeValue +
 193  
               " to vertex " + edge.getTargetVertexId() +
 194  
               " on superstep " + getSuperstep());
 195  
         }
 196  0
         addEdge(edge.getTargetVertexId(), newEdgeValue);
 197  0
         sendMessage(edge.getTargetVertexId(),
 198  
             new VerifiableMessage(
 199  
                 getSuperstep(), getId().get(), newEdgeValue.get()));
 200  0
       }
 201  0
     }
 202  
   }
 203  
 
 204  
   /**
 205  
    * Master compute associated with {@link VerifyMessageVertex}.
 206  
    * It registers required aggregators.
 207  
    */
 208  0
   public static class VerifyMessageMasterCompute extends
 209  
       DefaultMasterCompute {
 210  
     @Override
 211  
     public void initialize() throws InstantiationException,
 212  
         IllegalAccessException {
 213  0
       registerAggregator(LongSumAggregator.class.getName(),
 214  
           LongSumAggregator.class);
 215  0
     }
 216  
   }
 217  
 }