Coverage Report - org.apache.giraph.examples.BrachaTouegDeadlockComputation
 
Classes in this File Line Coverage Branch Coverage Complexity
BrachaTouegDeadlockComputation
0%
0/134
0%
0/76
4.9
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.examples;
 20  
 
 21  
 import java.io.IOException;
 22  
 import java.util.ArrayList;
 23  
 import java.util.HashMap;
 24  
 
 25  
 import org.apache.giraph.examples.utils.BrachaTouegDeadlockVertexValue;
 26  
 import org.apache.giraph.examples.utils.BrachaTouegDeadlockMessage;
 27  
 import org.apache.giraph.conf.LongConfOption;
 28  
 import org.apache.giraph.edge.Edge;
 29  
 import org.apache.giraph.graph.BasicComputation;
 30  
 import org.apache.giraph.graph.Vertex;
 31  
 import org.apache.hadoop.io.LongWritable;
 32  
 import org.apache.log4j.Logger;
 33  
 
 34  
 /**
 35  
  * This code demonstrates the Bracha Toueg deadlock detection algorithm.
 36  
  * The Bracha Toueg algorithm is a distributed, asynchronous, centralized
 37  
  * algorithm for deadlock detection. The algorithm is executed on a snapshot of
 38  
  * a undirected graph which depicts the corresponding wait-for-graph.
 39  
  * Consequently the algorithm works on <b>directed graphs</b> but assumes
 40  
  * the possibility to communicate in both ways on all the edges.
 41  
  * This is an adaptation of the standard algorithm for Giraph/Pregel system.
 42  
  * Since the value of the vertex is dumped during checkpointing, the algorithm
 43  
  * keeps all the state of the vertex in the value.
 44  
  */
 45  
 @Algorithm(
 46  
     name = "Bracha Toueg deadlock detection"
 47  
 )
 48  0
 public class BrachaTouegDeadlockComputation
 49  
   extends BasicComputation<LongWritable, BrachaTouegDeadlockVertexValue,
 50  
     LongWritable, BrachaTouegDeadlockMessage> {
 51  
 
 52  
   /** The deadlock detection initiator id */
 53  0
   public static final LongConfOption BRACHA_TOUEG_DL_INITIATOR_ID =
 54  
     new LongConfOption("BrachaTouegDeadlockVertex.initiatorId", 1,
 55  
         "The deadlock detection initiator id");
 56  
 
 57  
   /** Class logger */
 58  0
   private static final Logger LOG =
 59  0
     Logger.getLogger(BrachaTouegDeadlockComputation.class);
 60  
 
 61  
   @Override
 62  
   public void compute(
 63  
       Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex,
 64  
       Iterable<BrachaTouegDeadlockMessage> messages)
 65  
     throws IOException {
 66  
 
 67  
     BrachaTouegDeadlockVertexValue value;
 68  0
     long superstep = getSuperstep();
 69  
 
 70  0
     if (superstep == 0) {
 71  
       /* Phase to exchange the sender vertex IDs on the incoming edges.
 72  
          It also prepares the internal state of the vertex */
 73  0
       initAlgorithm(vertex);
 74  
 
 75  
     /* After each vertex collects the messages sent by the parents, the
 76  
        initiator node starts the algorithm by means of a NOTIFY message */
 77  0
     } else if (superstep == 1) {
 78  
       /* get the value/state of the vertex */
 79  0
       value = vertex.getValue();
 80  
 
 81  0
       if (LOG.isDebugEnabled()) {
 82  0
         LOG.debug("Vertex ID " + vertex.getId() + " status is:");
 83  0
         LOG.debug("\tpending requests? " + value.hasPendingRequests());
 84  0
         LOG.debug("\tis free? " + value.isFree());
 85  0
         LOG.debug("\tis notified? " + value.isNotified());
 86  
       }
 87  
 
 88  
       /* collect all the incoming senders IDs */
 89  0
       for (BrachaTouegDeadlockMessage message : messages) {
 90  0
         value.addParent(Long.valueOf(message.getSenderId()));
 91  0
       }
 92  
 
 93  
       /* debugging purpose: print all the parents of the vertex */
 94  0
       if (LOG.isDebugEnabled()) {
 95  0
         logParents(vertex);
 96  0
         if (isInitiator(vertex)) {
 97  0
           LOG.debug("Vertex ID " + vertex.getId() + " start the algorithm.");
 98  
         }
 99  
       }
 100  
 
 101  0
       if (isInitiator(vertex)) {
 102  
         /* the initiator starts the algorithm */
 103  0
         notifyVertices(vertex);
 104  
       } else {
 105  
         /* The Pregel model prescribes that each node starts in the "active"
 106  
            state. In some cases the Bracha-Toueg Algorithm leaves some nodes
 107  
            untouched causing the algorithm never to end. To avoid this
 108  
            situation at algorithm initialization all the nodes except the
 109  
            initiator (which by default is active) will vote to halt so that
 110  
            the unused vertices will not produce an infinite computation. Later,
 111  
            only when required the vote will be triggered. */
 112  0
         vertex.voteToHalt();
 113  0
         return;
 114  
       }
 115  
 
 116  
     /* At this point the actual deadlock detection algorithm is started. */
 117  
     } else {
 118  
       Long ackSenderId;
 119  
 
 120  0
       value = vertex.getValue();
 121  
 
 122  
       /* process all the incoming messages and act based on the type of
 123  
          message received */
 124  0
       for (BrachaTouegDeadlockMessage message : messages) {
 125  0
         long type = message.getType();
 126  
 
 127  0
         if (LOG.isDebugEnabled()) {
 128  0
           LOG.debug("Vertex ID " + vertex.getId() + " received: " + message);
 129  
         }
 130  
 
 131  0
         if (type == BrachaTouegDeadlockMessage.NOTIFY) {
 132  0
           handleNotifyMessage(vertex, message);
 133  0
         } else if (type == BrachaTouegDeadlockMessage.GRANT) {
 134  0
           handleGrantMessage(vertex, message);
 135  0
         } else if (type == BrachaTouegDeadlockMessage.DONE ||
 136  
                    type == BrachaTouegDeadlockMessage.ACK) {
 137  
           /* Both ACK and DONE Messages are handled in the same way. The
 138  
              action take afterwards is independent on these types of
 139  
              messages.  */
 140  0
           value.receivedMessage(message.getSenderId(), message.getType());
 141  
         }
 142  0
       }
 143  
 
 144  0
       ackSenderId = value.getIdWithInHoldAck();
 145  0
       if (value.isFree() &&
 146  0
           !value.isWaitingForMessage(BrachaTouegDeadlockMessage.ACK) &&
 147  0
           !ackSenderId.equals(BrachaTouegDeadlockVertexValue.INVALID_ID)) {
 148  
 
 149  0
         sendAckMessage(ackSenderId, vertex);
 150  0
         value.setIdWithInHoldAck(BrachaTouegDeadlockVertexValue.INVALID_ID);
 151  
       }
 152  
 
 153  
       /* if all the ACK and DONE messages have been received, the vertex can
 154  
          send the pending DONE message to the parent and vote to halt */
 155  0
       if (value.isNotified() &&
 156  0
           !value.isWaitingForMessage(BrachaTouegDeadlockMessage.ACK) &&
 157  0
           !value.isWaitingForMessage(BrachaTouegDeadlockMessage.DONE)) {
 158  
 
 159  0
         Long senderId = value.getIdWithInHoldDone();
 160  
 
 161  0
         if (LOG.isDebugEnabled()) {
 162  0
           LOG.debug("Vertex ID " + vertex.getId() +
 163  
                     " sent the last DONE message.");
 164  0
           LOG.debug("Vertex ID " + vertex.getId() + " voted to halt.");
 165  
         }
 166  
 
 167  
         /* the initiator vertex does not need to send the DONE message since
 168  
            it is the starting point of the algorithm */
 169  0
         if (!isInitiator(vertex) &&
 170  0
             !senderId.equals(BrachaTouegDeadlockVertexValue.INVALID_ID)) {
 171  0
           sendMessage(vertex.getId().get(), senderId,
 172  
                       BrachaTouegDeadlockMessage.DONE);
 173  0
           value.setIdWithInHoldDone(BrachaTouegDeadlockVertexValue.INVALID_ID);
 174  
         }
 175  
 
 176  0
         vertex.voteToHalt();
 177  
       }
 178  
     }
 179  0
   }
 180  
 
 181  
   /**
 182  
    * check whether the vertex is the initiator of the algorithm
 183  
    *
 184  
    * @param vertex Vertex
 185  
    * @return True if the vertex is the initiator
 186  
    */
 187  
   private boolean isInitiator(Vertex<LongWritable, ?, ?> vertex) {
 188  0
     return vertex.getId().get() == BRACHA_TOUEG_DL_INITIATOR_ID.get(getConf());
 189  
   }
 190  
 
 191  
   /**
 192  
    * Initializes the algorithm by sending the control message for ID exchange
 193  
    * and preparing the value of the vertex.
 194  
    *
 195  
    * @param  vertex  vertex from which the control message is sent
 196  
    */
 197  
   private void initAlgorithm(Vertex<LongWritable,
 198  
     BrachaTouegDeadlockVertexValue, LongWritable> vertex) {
 199  
 
 200  
     BrachaTouegDeadlockVertexValue value;
 201  0
     HashMap<Long, ArrayList<Long>> requests =
 202  
       new HashMap<Long, ArrayList<Long>>();
 203  0
     long vertexId = vertex.getId().get();
 204  
 
 205  
     /* prepare the pending requests tracking data structure */
 206  0
     for (Edge<LongWritable, LongWritable> edge : vertex.getEdges()) {
 207  
       ArrayList<Long> targets;
 208  0
       Long tag = Long.valueOf(edge.getValue().get());
 209  0
       Long target = Long.valueOf(edge.getTargetVertexId().get());
 210  
 
 211  0
       if (requests.containsKey(tag)) {
 212  0
         targets = requests.get(tag);
 213  
       } else {
 214  0
         targets = new ArrayList<Long>();
 215  
       }
 216  
 
 217  0
       targets.add(target);
 218  0
       requests.put(tag, targets);
 219  0
     }
 220  
 
 221  
     /* save in the value the number of requests that the node needs to get
 222  
        satisfied to consider itself free */
 223  0
     value = new BrachaTouegDeadlockVertexValue(requests);
 224  0
     vertex.setValue(value);
 225  
 
 226  
     /* send to all the outgoint edges the id of the current vertex */
 227  0
     for (Edge<LongWritable, LongWritable> edge : vertex.getEdges()) {
 228  0
       sendMessage(vertexId, edge.getTargetVertexId().get(),
 229  
                   BrachaTouegDeadlockMessage.CTRL_IN_EDGE);
 230  0
     }
 231  0
   }
 232  
 
 233  
   /**
 234  
    * Send message wrapper for the Bracha Toueg algorithm specific for ACK
 235  
    * messages.
 236  
    *
 237  
    * @param receiver      recipient of the message
 238  
    * @param vertex        vertex sending the message
 239  
    */
 240  
   private void sendAckMessage(long receiver, Vertex<LongWritable,
 241  
       BrachaTouegDeadlockVertexValue, LongWritable> vertex) {
 242  
 
 243  0
     this.sendMessage(Long.valueOf(vertex.getId().get()),
 244  
                      receiver, BrachaTouegDeadlockMessage.ACK);
 245  
 
 246  0
     if (!vertex.getValue().isNotified()) {
 247  0
       vertex.voteToHalt();
 248  
     }
 249  0
   }
 250  
 
 251  
   /**
 252  
    * Send message wrapper for the Bracha Toueg algorithm
 253  
    *
 254  
    * @param sender        sender of the message
 255  
    * @param receiver      recipient of the message
 256  
    * @param messageType   type of message to be sent
 257  
    */
 258  
   private void sendMessage(long sender, long receiver, long messageType) {
 259  
     BrachaTouegDeadlockMessage  message;
 260  
 
 261  0
     message = new BrachaTouegDeadlockMessage(sender, messageType);
 262  0
     sendMessage(new LongWritable(receiver), message);
 263  0
     if (LOG.isDebugEnabled()) {
 264  0
       LOG.debug("sent message " + message + " from " + sender +
 265  
                 " to " + receiver);
 266  
     }
 267  0
   }
 268  
 
 269  
   /**
 270  
    * this is a debugging function to verify that all parents have been
 271  
    * detected.
 272  
    *
 273  
    * @param vertex    vertex which collected its parents
 274  
    */
 275  
   private void logParents(Vertex<LongWritable,
 276  
                                  BrachaTouegDeadlockVertexValue,
 277  
                                  LongWritable> vertex) {
 278  0
     ArrayList<Long> parents = vertex.getValue().getParents();
 279  0
     int sz = parents.size();
 280  0
     StringBuffer buffer = new StringBuffer();
 281  
 
 282  0
     buffer.append("Vertex " + vertex.getId() + " parents:");
 283  0
     for (int i = 0; i < sz; ++i) {
 284  0
       buffer.append(" - " + parents.get(i));
 285  
     }
 286  0
     LOG.debug(buffer.toString());
 287  0
   }
 288  
 
 289  
   /**
 290  
    * This method resembles the notify_u procedure of the Bracha-Toueg algorithm.
 291  
    * It proceeds by sending a NOTIFY message via its outgoing edges and waits
 292  
    * for a DONE message from each destination node. If no pending requests need
 293  
    * to be awaited, the grant_u procedure is called. The latter case is
 294  
    * encounterd when the "wave" of NOTIFY messages reaches the edge of the
 295  
    * graph.
 296  
    *
 297  
    * @param vertex  the vertex on which the notify method i called
 298  
    */
 299  
   private void notifyVertices(
 300  
     Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex) {
 301  
 
 302  0
     BrachaTouegDeadlockVertexValue value = vertex.getValue();
 303  0
     long vertexId = vertex.getId().get();
 304  0
     boolean hasOutEdges = false;
 305  
 
 306  0
     value.setNotified();
 307  
 
 308  0
     for (Edge<LongWritable, LongWritable> edge : vertex.getEdges()) {
 309  0
       hasOutEdges = true;
 310  0
       sendMessage(vertexId,
 311  0
                   edge.getTargetVertexId().get(),
 312  
                   BrachaTouegDeadlockMessage.NOTIFY);
 313  
 
 314  
       /* the node will wait for a DONE message from each notified vertex */
 315  0
       value.waitForMessage(Long.valueOf(edge.getTargetVertexId().get()),
 316  0
                            Long.valueOf(BrachaTouegDeadlockMessage.DONE));
 317  0
     }
 318  
 
 319  
     /* if no requests are pending, the node has to start GRANTing to all
 320  
        incoming edges */
 321  0
     if (!hasOutEdges && isInitiator(vertex)) {
 322  0
       value.setFree();
 323  0
     } else if (!value.hasPendingRequests() && !value.isFree()) {
 324  0
       grantVertices(vertex);
 325  
     }
 326  0
   }
 327  
 
 328  
   /**
 329  
    * @param vertex      vertex on which the grant method is called
 330  
    */
 331  
   private void grantVertices(
 332  
     Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex) {
 333  
 
 334  0
     BrachaTouegDeadlockVertexValue value = vertex.getValue();
 335  0
     ArrayList<Long> parents = value.getParents();
 336  0
     long vertexId = vertex.getId().get();
 337  
 
 338  0
     value.setFree();
 339  
 
 340  
     /* grant all the parents with resource access */
 341  0
     for (Long parent : parents) {
 342  0
       sendMessage(vertexId, parent,
 343  
                   BrachaTouegDeadlockMessage.GRANT);
 344  
 
 345  
       /* the node will wait for a ACK message for each GRANTed vertex */
 346  0
       value.waitForMessage(parent,
 347  0
                            Long.valueOf(BrachaTouegDeadlockMessage.ACK));
 348  0
     }
 349  0
   }
 350  
 
 351  
   /**
 352  
    * Function to handle the cases when a NOTIFY message is received.
 353  
    * If the message received is of type NOTIFY we distinguish two cases:
 354  
    * 1. The node was not yet notified: in this case  it forwards the
 355  
    *    NOTIFY message to its outgoing messages. In this phase the
 356  
    *    {@link BrachaTougeDeadlockComputation#notifyVertices} function is
 357  
    *    called.
 358  
    *    NB: in this case there is the need to keep track of the sender
 359  
    *        of the message since later a DONE must be sent back.
 360  
    * 2. The node was notified: in this case the node will immediately
 361  
    *    reply with a DONE message.
 362  
    *
 363  
    * @param vertex    vertex that received the DONE message
 364  
    * @param message   message received by the vertex
 365  
    */
 366  
   private void handleNotifyMessage(
 367  
     Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex,
 368  
     BrachaTouegDeadlockMessage message) {
 369  
 
 370  0
     BrachaTouegDeadlockVertexValue value = vertex.getValue();
 371  
 
 372  0
     if (!value.isNotified()) {
 373  0
       notifyVertices(vertex);
 374  0
       value.setIdWithInHoldDone(message.getSenderId());
 375  
     } else {
 376  0
       sendMessage(vertex.getId().get(), message.getSenderId(),
 377  
                   BrachaTouegDeadlockMessage.DONE);
 378  
     }
 379  0
   }
 380  
 
 381  
   /**
 382  
    * Function to handle the cases when a GRANT message is received.
 383  
    * When a GRANT message is received the number of requests is
 384  
    * decremented. In this case we must distinguish three cases:
 385  
    * 1. The number of requests needed reaches zero: at this stage a
 386  
    *    round of {@link BrachaTougeDeadlockComputation#grantVertices} is
 387  
    *    started to forward the resource granting mechanism.
 388  
    *    NB: the sender id of the node must be kept to handle the delivery of
 389  
    *    the ACK to the sender at the end of the granting procedure.
 390  
    * 2. The node already started go grant since it is free: in this case an ACK
 391  
    *    message is immediately sent back to the sender.
 392  
    * 3. The number of requests is bigger than zero: in this case an ACK
 393  
    *    is sent back to the sender.
 394  
    *
 395  
    * @param vertex    vertex that received the ACK message
 396  
    * @param message   message received by the vertex
 397  
    */
 398  
   private void handleGrantMessage(
 399  
     Vertex<LongWritable, BrachaTouegDeadlockVertexValue, LongWritable> vertex,
 400  
     BrachaTouegDeadlockMessage message) {
 401  
 
 402  0
     BrachaTouegDeadlockVertexValue value = vertex.getValue();
 403  0
     Long senderId = Long.valueOf(message.getSenderId());
 404  0
     LongWritable wId = new LongWritable(senderId);
 405  0
     LongWritable tag = vertex.getEdgeValue(wId);
 406  
 
 407  0
     value.removeRequest(tag, wId);
 408  
 
 409  0
     if (value.isFree() || value.getNumOfRequests(tag) > 0) {
 410  0
       sendAckMessage(senderId, vertex);
 411  0
       return;
 412  
     } else {
 413  0
       grantVertices(vertex);
 414  0
       value.setIdWithInHoldAck(senderId);
 415  
     }
 416  0
   }
 417  
 }