Coverage Report - org.apache.giraph.examples.scc.SccComputation
 
Classes in this File Line Coverage Branch Coverage Complexity
SccComputation
0%
0/67
0%
0/28
3.125
SccComputation$1
0%
0/1
N/A
3.125
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.examples.scc;
 20  
 
 21  
 import static org.apache.giraph.examples.scc.SccPhaseMasterCompute.PHASE;
 22  
 import static org.apache.giraph.examples.scc.SccPhaseMasterCompute.NEW_MAXIMUM;
 23  
 import static org.apache.giraph.examples.scc.SccPhaseMasterCompute.CONVERGED;
 24  
 
 25  
 import java.io.IOException;
 26  
 
 27  
 import org.apache.giraph.Algorithm;
 28  
 import org.apache.giraph.examples.scc.SccPhaseMasterCompute.Phases;
 29  
 import org.apache.giraph.graph.BasicComputation;
 30  
 import org.apache.giraph.graph.Vertex;
 31  
 import org.apache.hadoop.io.BooleanWritable;
 32  
 import org.apache.hadoop.io.IntWritable;
 33  
 import org.apache.hadoop.io.LongWritable;
 34  
 import org.apache.hadoop.io.NullWritable;
 35  
 
 36  
 /**
 37  
  * Finds strongly connected components of the graph.
 38  
  */
 39  
 @Algorithm(name = "Strongly Connected Components",
 40  
            description = "Finds strongly connected components of the graph")
 41  0
 public class SccComputation extends
 42  
     BasicComputation<LongWritable, SccVertexValue, NullWritable, LongWritable> {
 43  
 
 44  
   /**
 45  
    * Current phase of the computation as defined in SccPhaseMasterCompute
 46  
    */
 47  
   private Phases currPhase;
 48  
 
 49  
   /**
 50  
    * Reusable object to encapsulate message value, in order to avoid
 51  
    * creating a new instance every time a message is sent.
 52  
    */
 53  0
   private LongWritable messageValue = new LongWritable();
 54  
 
 55  
   /**
 56  
    * Reusable object to encapsulate a parent vertex id.
 57  
    */
 58  0
   private LongWritable parentId = new LongWritable();
 59  
 
 60  
   @Override
 61  
   public void preSuperstep() {
 62  0
     IntWritable phaseInt = getAggregatedValue(PHASE);
 63  0
     currPhase = SccPhaseMasterCompute.getPhase(phaseInt);
 64  0
   }
 65  
 
 66  
   @Override
 67  
   public void compute(
 68  
       Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
 69  
       Iterable<LongWritable> messages) throws IOException {
 70  
 
 71  0
     SccVertexValue vertexValue = vertex.getValue();
 72  
 
 73  0
     if (!vertexValue.isActive()) {
 74  0
       vertex.voteToHalt();
 75  0
       return;
 76  
     }
 77  
 
 78  0
     switch (currPhase) {
 79  
     case TRANSPOSE :
 80  0
       vertexValue.clearParents();
 81  0
       sendMessageToAllEdges(vertex, vertex.getId());
 82  0
       break;
 83  
     case TRIMMING :
 84  0
       trim(vertex, messages);
 85  0
       break;
 86  
     case FORWARD_TRAVERSAL :
 87  0
       forwardTraversal(vertex, messages);
 88  0
       break;
 89  
     case BACKWARD_TRAVERSAL_START :
 90  0
       backwardTraversalStart(vertex);
 91  0
       break;
 92  
     case BACKWARD_TRAVERSAL_REST :
 93  0
       backwardTraversalRest(vertex, messages);
 94  0
       break;
 95  
     default :
 96  
       break;
 97  
     }
 98  
 
 99  0
   }
 100  
 
 101  
   /**
 102  
    * Creates list of parents based on the received ids and halts the vertices
 103  
    * that don't have any parent or outgoing edge, hence, they can't be
 104  
    * part of an SCC.
 105  
    * @param vertex Current vertex.
 106  
    * @param messages Received ids from the Transpose phase.
 107  
    */
 108  
   private void trim(Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
 109  
                     Iterable<LongWritable> messages) {
 110  0
     SccVertexValue vertexValue = vertex.getValue();
 111  
     // Keep the ids of the parent nodes to allow for backwards traversal
 112  0
     for (LongWritable parent : messages) {
 113  0
       vertexValue.addParent(parent.get());
 114  0
     }
 115  
     // If this node doesn't have any parents or outgoing edges,
 116  
     // it can't be part of an SCC
 117  0
     vertexValue.set(vertex.getId().get());
 118  0
     if (vertex.getNumEdges() == 0 || vertexValue.getParents() == null) {
 119  0
       vertexValue.deactivate();
 120  
     } else {
 121  0
       messageValue.set(vertexValue.get());
 122  0
       sendMessageToAllEdges(vertex, messageValue);
 123  
     }
 124  0
   }
 125  
 
 126  
   /**
 127  
    * Traverse the graph through outgoing edges and keep the maximum vertex
 128  
    * value.
 129  
    * If a new maximum value is found, propagate it until convergence.
 130  
    * @param vertex Current vertex.
 131  
    * @param messages Received values from neighbor vertices.
 132  
    */
 133  
   private void forwardTraversal(
 134  
       Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
 135  
       Iterable<LongWritable> messages) {
 136  0
     SccVertexValue vertexValue = vertex.getValue();
 137  0
     boolean changed = setMaxValue(vertexValue, messages);
 138  0
     if (changed) {
 139  0
       messageValue.set(vertexValue.get());
 140  0
       sendMessageToAllEdges(vertex, messageValue);
 141  0
       aggregate(NEW_MAXIMUM, new BooleanWritable(true));
 142  
     }
 143  0
   }
 144  
 
 145  
   /**
 146  
    * Traverse the transposed graph and keep the maximum vertex value.
 147  
    * @param vertex Current vertex.
 148  
    */
 149  
   private void backwardTraversalStart(
 150  
       Vertex<LongWritable, SccVertexValue, NullWritable> vertex) {
 151  0
     SccVertexValue vertexValue = vertex.getValue();
 152  0
     if (vertexValue.get() == vertex.getId().get()) {
 153  0
       messageValue.set(vertexValue.get());
 154  0
       sendMessageToAllParents(vertex, messageValue);
 155  
     }
 156  0
   }
 157  
 
 158  
   /**
 159  
    * Traverse the transposed graph and keep the maximum vertex value.
 160  
    * @param vertex Current vertex.
 161  
    * @param messages Received values from children vertices.
 162  
    */
 163  
   private void backwardTraversalRest(
 164  
       Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
 165  
       Iterable<LongWritable> messages) {
 166  0
     SccVertexValue vertexValue = vertex.getValue();
 167  0
     for (LongWritable m : messages) {
 168  0
       if (vertexValue.get() == m.get()) {
 169  0
         sendMessageToAllParents(vertex, m);
 170  0
         aggregate(CONVERGED, new BooleanWritable(true));
 171  0
         vertexValue.deactivate();
 172  0
         vertex.voteToHalt();
 173  0
         break;
 174  
       }
 175  0
     }
 176  0
   }
 177  
 
 178  
   /**
 179  
    * Compares the messages values with the current vertex value and finds
 180  
    * the maximum.
 181  
    * If the maximum value is different from the vertex value, makes it the
 182  
    * new vertex value and returns true, otherwise, returns false.
 183  
    * @param vertexValue Current vertex value.
 184  
    * @param messages Messages containing neighbors' vertex values.
 185  
    * @return True if a new maximum was found, otherwise, returns false.
 186  
    */
 187  
   private boolean setMaxValue(SccVertexValue vertexValue,
 188  
                               Iterable<LongWritable> messages) {
 189  0
     boolean changed = false;
 190  0
     for (LongWritable m : messages) {
 191  0
       if (vertexValue.get() < m.get()) {
 192  0
         vertexValue.set(m.get());
 193  0
         changed = true;
 194  
       }
 195  0
     }
 196  0
     return changed;
 197  
   }
 198  
 
 199  
 
 200  
   /**
 201  
    * Send message to all parents.
 202  
    * @param vertex Current vertex.
 203  
    * @param message Message to be sent.
 204  
    */
 205  
   private void sendMessageToAllParents(
 206  
       Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
 207  
       LongWritable message) {
 208  0
     for (Long id : vertex.getValue().getParents()) {
 209  0
       parentId.set(id);
 210  0
       sendMessage(parentId, message);
 211  0
     }
 212  0
   }
 213  
 }