Coverage Report - org.apache.giraph.examples.SimpleMutateGraphVertex
 
Classes in this File Line Coverage Branch Coverage Complexity
SimpleMutateGraphVertex
88%
44/50
81%
26/32
3.3
SimpleMutateGraphVertex$SimpleMutateGraphVertexWorkerContext
100%
18/18
100%
2/2
3.3
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.examples;
 20  
 
 21  
 import org.apache.giraph.graph.Edge;
 22  
 import org.apache.giraph.graph.EdgeListVertex;
 23  
 import org.apache.giraph.graph.Vertex;
 24  
 import org.apache.giraph.graph.WorkerContext;
 25  
 import org.apache.hadoop.io.DoubleWritable;
 26  
 import org.apache.hadoop.io.FloatWritable;
 27  
 import org.apache.hadoop.io.LongWritable;
 28  
 import org.apache.log4j.Logger;
 29  
 
 30  
 import java.io.IOException;
 31  
 
 32  
 /**
 33  
  * Vertex to allow unit testing of graph mutations.
 34  
  */
 35  40
 public class SimpleMutateGraphVertex extends EdgeListVertex<
 36  
     LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
 37  
   /** Class logger */
 38  1
   private static Logger LOG =
 39  
       Logger.getLogger(SimpleMutateGraphVertex.class);
 40  
   /** Maximum number of ranges for vertex ids */
 41  20
   private long maxRanges = 100;
 42  
 
 43  
 
 44  
   /**
 45  
    * Unless we create a ridiculous number of vertices , we should not
 46  
    * collide within a vertex range defined by this method.
 47  
    *
 48  
    * @param range Range index
 49  
    * @return Starting vertex id of the range
 50  
    */
 51  
   private long rangeVertexIdStart(int range) {
 52  55
     return (Long.MAX_VALUE / maxRanges) * range;
 53  
   }
 54  
 
 55  
   @Override
 56  
   public void compute(Iterable<DoubleWritable> messages)
 57  
     throws IOException {
 58  120
     SimpleMutateGraphVertexWorkerContext workerContext =
 59  
         (SimpleMutateGraphVertexWorkerContext) getWorkerContext();
 60  120
     if (getSuperstep() == 0) {
 61  5
       LOG.debug("Reached superstep " + getSuperstep());
 62  115
     } else if (getSuperstep() == 1) {
 63  
       // Send messages to vertices that are sure not to exist
 64  
       // (creating them)
 65  5
       LongWritable destVertexId =
 66  
           new LongWritable(rangeVertexIdStart(1) + getId().get());
 67  5
       sendMessage(destVertexId, new DoubleWritable(0.0));
 68  5
     } else if (getSuperstep() == 2) {
 69  10
       LOG.debug("Reached superstep " + getSuperstep());
 70  100
     } else if (getSuperstep() == 3) {
 71  10
       long vertexCount = workerContext.getVertexCount();
 72  10
       if (vertexCount * 2 != getTotalNumVertices()) {
 73  0
         throw new IllegalStateException(
 74  
             "Impossible to have " + getTotalNumVertices() +
 75  
             " vertices when should have " + vertexCount * 2 +
 76  
             " on superstep " + getSuperstep());
 77  
       }
 78  10
       long edgeCount = workerContext.getEdgeCount();
 79  10
       if (edgeCount != getTotalNumEdges()) {
 80  0
         throw new IllegalStateException(
 81  
             "Impossible to have " + getTotalNumEdges() +
 82  
             " edges when should have " + edgeCount +
 83  
             " on superstep " + getSuperstep());
 84  
       }
 85  
       // Create vertices that are sure not to exist (doubling vertices)
 86  10
       LongWritable vertexIndex =
 87  
           new LongWritable(rangeVertexIdStart(3) + getId().get());
 88  
       Vertex<LongWritable, DoubleWritable,
 89  10
             FloatWritable, DoubleWritable> vertex =
 90  
         instantiateVertex(vertexIndex, new DoubleWritable(0.0), null, null);
 91  10
       addVertexRequest(vertex);
 92  
       // Add edges to those remote vertices as well
 93  10
       addEdgeRequest(vertexIndex,
 94  
           new Edge<LongWritable, FloatWritable>(
 95  
               getId(), new FloatWritable(0.0f)));
 96  10
     } else if (getSuperstep() == 4) {
 97  20
       LOG.debug("Reached superstep " + getSuperstep());
 98  70
     } else if (getSuperstep() == 5) {
 99  20
       long vertexCount = workerContext.getVertexCount();
 100  20
       if (vertexCount * 2 != getTotalNumVertices()) {
 101  0
         throw new IllegalStateException(
 102  
             "Impossible to have " + getTotalNumVertices() +
 103  
             " when should have " + vertexCount * 2 +
 104  
             " on superstep " + getSuperstep());
 105  
       }
 106  20
       long edgeCount = workerContext.getEdgeCount();
 107  20
       if (edgeCount + vertexCount != getTotalNumEdges()) {
 108  0
         throw new IllegalStateException(
 109  
             "Impossible to have " + getTotalNumEdges() +
 110  
             " edges when should have " + edgeCount + vertexCount +
 111  
             " on superstep " + getSuperstep());
 112  
       }
 113  
       // Remove the edges created in superstep 3
 114  20
       LongWritable vertexIndex =
 115  
           new LongWritable(rangeVertexIdStart(3) + getId().get());
 116  20
       workerContext.increaseEdgesRemoved();
 117  20
       removeEdgeRequest(vertexIndex, getId());
 118  20
     } else if (getSuperstep() == 6) {
 119  
       // Remove all the vertices created in superstep 3
 120  20
       if (getId().compareTo(
 121  
           new LongWritable(rangeVertexIdStart(3))) >= 0) {
 122  10
         removeVertexRequest(getId());
 123  
       }
 124  30
     } else if (getSuperstep() == 7) {
 125  10
       long origEdgeCount = workerContext.getOrigEdgeCount();
 126  10
       if (origEdgeCount != getTotalNumEdges()) {
 127  0
         throw new IllegalStateException(
 128  
             "Impossible to have " + getTotalNumEdges() +
 129  
             " edges when should have " + origEdgeCount +
 130  
             " on superstep " + getSuperstep());
 131  
       }
 132  10
     } else if (getSuperstep() == 8) {
 133  10
       long vertexCount = workerContext.getVertexCount();
 134  10
       if (vertexCount / 2 != getTotalNumVertices()) {
 135  0
         throw new IllegalStateException(
 136  
             "Impossible to have " + getTotalNumVertices() +
 137  
             " vertices when should have " + vertexCount / 2 +
 138  
             " on superstep " + getSuperstep());
 139  
       }
 140  10
     } else {
 141  10
       voteToHalt();
 142  
     }
 143  120
   }
 144  
 
 145  
   /**
 146  
    * Worker context used with {@link SimpleMutateGraphVertex}.
 147  
    */
 148  1
   public static class SimpleMutateGraphVertexWorkerContext
 149  
       extends WorkerContext {
 150  
     /** Cached vertex count */
 151  
     private long vertexCount;
 152  
     /** Cached edge count */
 153  
     private long edgeCount;
 154  
     /** Original number of edges */
 155  
     private long origEdgeCount;
 156  
     /** Number of edges removed during superstep */
 157  1
     private int edgesRemoved = 0;
 158  
 
 159  
     @Override
 160  
     public void preApplication()
 161  1
       throws InstantiationException, IllegalAccessException { }
 162  
 
 163  
     @Override
 164  1
     public void postApplication() { }
 165  
 
 166  
     @Override
 167  10
     public void preSuperstep() { }
 168  
 
 169  
     @Override
 170  
     public void postSuperstep() {
 171  10
       vertexCount = getTotalNumVertices();
 172  10
       edgeCount = getTotalNumEdges();
 173  10
       if (getSuperstep() == 1) {
 174  1
         origEdgeCount = edgeCount;
 175  
       }
 176  10
       LOG.info("Got " + vertexCount + " vertices, " +
 177  
           edgeCount + " edges on superstep " +
 178  
           getSuperstep());
 179  10
       LOG.info("Removed " + edgesRemoved);
 180  10
       edgesRemoved = 0;
 181  10
     }
 182  
 
 183  
     public long getVertexCount() {
 184  40
       return vertexCount;
 185  
     }
 186  
 
 187  
     public long getEdgeCount() {
 188  30
       return edgeCount;
 189  
     }
 190  
 
 191  
     public long getOrigEdgeCount() {
 192  10
       return origEdgeCount;
 193  
     }
 194  
 
 195  
     /**
 196  
      * Increase the number of edges removed by one.
 197  
      */
 198  
     public void increaseEdgesRemoved() {
 199  20
       this.edgesRemoved++;
 200  20
     }
 201  
   }
 202  
 }