Coverage Report - org.apache.giraph.graph.BspServiceWorker
 
Classes in this File Line Coverage Branch Coverage Complexity
BspServiceWorker
61%
378/610
52%
111/210
7.405
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.graph;
 20  
 
 21  
 import org.apache.giraph.bsp.ApplicationState;
 22  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 23  
 import org.apache.giraph.comm.RPCCommunications;
 24  
 import org.apache.giraph.comm.ServerData;
 25  
 import org.apache.giraph.comm.WorkerClientServer;
 26  
 import org.apache.giraph.comm.netty.NettyWorkerClientServer;
 27  
 import org.apache.giraph.graph.partition.Partition;
 28  
 import org.apache.giraph.graph.partition.PartitionExchange;
 29  
 import org.apache.giraph.graph.partition.PartitionOwner;
 30  
 import org.apache.giraph.graph.partition.PartitionStats;
 31  
 import org.apache.giraph.graph.partition.PartitionStore;
 32  
 import org.apache.giraph.graph.partition.SimplePartitionStore;
 33  
 import org.apache.giraph.graph.partition.WorkerGraphPartitioner;
 34  
 import org.apache.giraph.utils.MemoryUtils;
 35  
 import org.apache.giraph.utils.WritableUtils;
 36  
 import org.apache.giraph.zk.BspEvent;
 37  
 import org.apache.giraph.zk.PredicateLock;
 38  
 import org.apache.hadoop.fs.FSDataOutputStream;
 39  
 import org.apache.hadoop.fs.Path;
 40  
 import org.apache.hadoop.io.Text;
 41  
 import org.apache.hadoop.io.Writable;
 42  
 import org.apache.hadoop.io.WritableComparable;
 43  
 import org.apache.hadoop.mapreduce.InputSplit;
 44  
 import org.apache.hadoop.mapreduce.Mapper;
 45  
 import org.apache.hadoop.util.ReflectionUtils;
 46  
 import org.apache.log4j.Logger;
 47  
 import org.apache.zookeeper.CreateMode;
 48  
 import org.apache.zookeeper.KeeperException;
 49  
 import org.apache.zookeeper.WatchedEvent;
 50  
 import org.apache.zookeeper.Watcher.Event.EventType;
 51  
 import org.apache.zookeeper.ZooDefs.Ids;
 52  
 import org.apache.zookeeper.data.Stat;
 53  
 import org.json.JSONArray;
 54  
 import org.json.JSONException;
 55  
 import org.json.JSONObject;
 56  
 
 57  
 import net.iharder.Base64;
 58  
 
 59  
 import java.io.ByteArrayInputStream;
 60  
 import java.io.ByteArrayOutputStream;
 61  
 import java.io.DataInput;
 62  
 import java.io.DataInputStream;
 63  
 import java.io.DataOutput;
 64  
 import java.io.DataOutputStream;
 65  
 import java.io.IOException;
 66  
 import java.util.ArrayList;
 67  
 import java.util.Collection;
 68  
 import java.util.Collections;
 69  
 import java.util.HashMap;
 70  
 import java.util.HashSet;
 71  
 import java.util.List;
 72  
 import java.util.Map;
 73  
 import java.util.Map.Entry;
 74  
 import java.util.Set;
 75  
 
 76  
 /**
 77  
  * ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
 78  
  *
 79  
  * @param <I> Vertex id
 80  
  * @param <V> Vertex data
 81  
  * @param <E> Edge data
 82  
  * @param <M> Message data
 83  
  */
 84  
 @SuppressWarnings("rawtypes")
 85  
 public class BspServiceWorker<I extends WritableComparable,
 86  
     V extends Writable, E extends Writable, M extends Writable>
 87  
     extends BspService<I, V, E, M>
 88  
     implements CentralizedServiceWorker<I, V, E, M> {
 89  
   /** Class logger */
 90  1
   private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
 91  
   /** Number of input splits */
 92  24
   private int inputSplitCount = -1;
 93  
   /** My process health znode */
 94  
   private String myHealthZnode;
 95  
   /** Worker info */
 96  
   private final WorkerInfo workerInfo;
 97  
   /** Worker graph partitioner */
 98  
   private final WorkerGraphPartitioner<I, V, E, M> workerGraphPartitioner;
 99  
   /** Input split vertex cache (only used when loading from input split) */
 100  24
   private final Map<PartitionOwner, Partition<I, V, E, M>>
 101  
   inputSplitCache = new HashMap<PartitionOwner, Partition<I, V, E, M>>();
 102  
   /** Communication service */
 103  
   private final WorkerClientServer<I, V, E, M> commService;
 104  
   /** Master info */
 105  24
   private WorkerInfo masterInfo = new WorkerInfo();
 106  
   /** Have the partition exchange children (workers) changed? */
 107  
   private final BspEvent partitionExchangeChildrenChanged;
 108  
   /** Regulates the size of outgoing Collections of vertices read
 109  
    * by the local worker during INPUT_SUPERSTEP that are to be
 110  
    * transfered from <code>inputSplitCache</code> to the owner
 111  
    * of their initial, master-assigned Partition.*/
 112  
   private GiraphTransferRegulator transferRegulator;
 113  
   /** Worker Context */
 114  
   private final WorkerContext workerContext;
 115  
   /** Total vertices loaded */
 116  24
   private long totalVerticesLoaded = 0;
 117  
   /** Total edges loaded */
 118  24
   private long totalEdgesLoaded = 0;
 119  
   /** Input split max vertices (-1 denotes all) */
 120  
   private final long inputSplitMaxVertices;
 121  
   /**
 122  
    * Partition store for worker (only used by the Hadoop RPC implementation).
 123  
    */
 124  
   private final PartitionStore<I, V, E, M> workerPartitionStore;
 125  
 
 126  
   /**
 127  
    * Constructor for setting up the worker.
 128  
    *
 129  
    * @param serverPortList ZooKeeper server port list
 130  
    * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
 131  
    * @param context Mapper context
 132  
    * @param graphMapper Graph mapper
 133  
    * @param graphState Global graph state
 134  
    * @throws IOException
 135  
    * @throws InterruptedException
 136  
    */
 137  
   public BspServiceWorker(
 138  
     String serverPortList,
 139  
     int sessionMsecTimeout,
 140  
     Mapper<?, ?, ?, ?>.Context context,
 141  
     GraphMapper<I, V, E, M> graphMapper,
 142  
     GraphState<I, V, E, M> graphState)
 143  
     throws IOException, InterruptedException {
 144  24
     super(serverPortList, sessionMsecTimeout, context, graphMapper);
 145  24
     partitionExchangeChildrenChanged = new PredicateLock(context);
 146  24
     registerBspEvent(partitionExchangeChildrenChanged);
 147  24
     transferRegulator =
 148  
         new GiraphTransferRegulator(getConfiguration());
 149  24
     inputSplitMaxVertices =
 150  
         getConfiguration().getLong(
 151  
             GiraphJob.INPUT_SPLIT_MAX_VERTICES,
 152  
             GiraphJob.INPUT_SPLIT_MAX_VERTICES_DEFAULT);
 153  24
     workerGraphPartitioner =
 154  
         getGraphPartitionerFactory().createWorkerGraphPartitioner();
 155  24
     boolean useNetty = getConfiguration().getBoolean(GiraphJob.USE_NETTY,
 156  
         GiraphJob.USE_NETTY_DEFAULT);
 157  24
     if (useNetty) {
 158  1
       commService =  new NettyWorkerClientServer<I, V, E, M>(context, this);
 159  
     } else {
 160  23
       commService =
 161  
           new RPCCommunications<I, V, E, M>(context, this, graphState);
 162  
     }
 163  24
     if (LOG.isInfoEnabled()) {
 164  24
       LOG.info("BspServiceWorker: maxVerticesPerTransfer = " +
 165  
           transferRegulator.getMaxVerticesPerTransfer());
 166  24
       LOG.info("BspServiceWorker: maxEdgesPerTransfer = " +
 167  
           transferRegulator.getMaxEdgesPerTransfer() +
 168  
           " useNetty = " + useNetty);
 169  
     }
 170  
 
 171  24
     workerInfo = new WorkerInfo(
 172  
         getHostname(), getTaskPartition(), commService.getPort());
 173  
 
 174  24
     graphState.setWorkerCommunications(commService);
 175  24
     this.workerContext =
 176  
         BspUtils.createWorkerContext(getConfiguration(),
 177  
             graphMapper.getGraphState());
 178  
 
 179  24
     if (useNetty) {
 180  1
       workerPartitionStore = null;
 181  
     } else {
 182  23
       workerPartitionStore =
 183  
           new SimplePartitionStore<I, V, E, M>(getConfiguration());
 184  
     }
 185  24
   }
 186  
 
 187  
   public WorkerContext getWorkerContext() {
 188  1078
     return workerContext;
 189  
   }
 190  
 
 191  
   /**
 192  
    * Intended to check the health of the node.  For instance, can it ssh,
 193  
    * dmesg, etc. For now, does nothing.
 194  
    * TODO: Make this check configurable by the user (i.e. search dmesg for
 195  
    * problems).
 196  
    *
 197  
    * @return True if healthy (always in this case).
 198  
    */
 199  
   public boolean isHealthy() {
 200  218
     return true;
 201  
   }
 202  
 
 203  
   /**
 204  
    * Try to reserve an InputSplit for loading.  While InputSplits exists that
 205  
    * are not finished, wait until they are.
 206  
    *
 207  
    * NOTE: iterations on the InputSplit list only halt for each worker when it
 208  
    * has scanned the entire list once and found every split marked RESERVED.
 209  
    * When a worker fails, its Ephemeral RESERVED znodes will disappear,
 210  
    * allowing other iterating workers to claim it's previously read splits.
 211  
    * Only when the last worker left iterating on the list fails can a danger
 212  
    * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
 213  
    * causes job failure, this is OK. As the failure model evolves, this
 214  
    * behavior might need to change.
 215  
    *
 216  
    * @return reserved InputSplit or null if no unfinished InputSplits exist
 217  
    * @throws KeeperException
 218  
    * @throws InterruptedException
 219  
    */
 220  
   private String reserveInputSplit()
 221  
     throws KeeperException, InterruptedException {
 222  48
     List<String> inputSplitPathList = null;
 223  48
     inputSplitPathList =
 224  
         getZkExt().getChildrenExt(inputSplitsPath, false, false, true);
 225  48
     if (inputSplitCount == -1) {
 226  24
       inputSplitCount = inputSplitPathList.size();
 227  
     }
 228  48
     LocalityInfoSorter localitySorter = new LocalityInfoSorter(
 229  
       getZkExt(), inputSplitPathList, getHostname(), getWorkerInfo().getPort());
 230  48
     String reservedInputSplitPath = null;
 231  48
     Stat reservedStat = null;
 232  48
     final Mapper<?, ?, ?, ?>.Context context = getContext();
 233  
     while (true) {
 234  48
       int reservedInputSplits = 0;
 235  48
       for (String nextSplitToClaim : localitySorter) {
 236  48
         context.progress();
 237  48
         String tmpInputSplitReservedPath =
 238  
             nextSplitToClaim + INPUT_SPLIT_RESERVED_NODE;
 239  48
         reservedStat =
 240  
             getZkExt().exists(tmpInputSplitReservedPath, true);
 241  48
         if (reservedStat == null) {
 242  
           try {
 243  
             // Attempt to reserve this InputSplit
 244  24
             getZkExt().createExt(tmpInputSplitReservedPath,
 245  
                 null,
 246  
                 Ids.OPEN_ACL_UNSAFE,
 247  
                 CreateMode.EPHEMERAL,
 248  
                 false);
 249  24
             reservedInputSplitPath = nextSplitToClaim;
 250  24
             if (LOG.isInfoEnabled()) {
 251  24
               float percentFinished =
 252  
                   reservedInputSplits * 100.0f /
 253  
                   inputSplitPathList.size();
 254  24
               LOG.info("reserveInputSplit: Reserved input " +
 255  
                   "split path " + reservedInputSplitPath +
 256  
                   ", overall roughly " +
 257  
                   + percentFinished +
 258  
                   "% input splits reserved");
 259  
             }
 260  24
             return reservedInputSplitPath;
 261  0
           } catch (KeeperException.NodeExistsException e) {
 262  0
             LOG.info("reserveInputSplit: Couldn't reserve " +
 263  
                 "(already reserved) inputSplit" +
 264  
                 " at " + tmpInputSplitReservedPath);
 265  0
           } catch (KeeperException e) {
 266  0
             throw new IllegalStateException(
 267  
                 "reserveInputSplit: KeeperException on reserve", e);
 268  0
           } catch (InterruptedException e) {
 269  0
             throw new IllegalStateException(
 270  
                 "reserveInputSplit: InterruptedException " +
 271  
                     "on reserve", e);
 272  0
           }
 273  
         } else {
 274  24
           ++reservedInputSplits;
 275  
         }
 276  24
       }
 277  24
       if (LOG.isInfoEnabled()) {
 278  24
         LOG.info("reserveInputSplit: reservedPath = " +
 279  
             reservedInputSplitPath + ", " + reservedInputSplits +
 280  
             " of " + inputSplitPathList.size() +
 281  
             " InputSplits are finished.");
 282  
       }
 283  24
       if (reservedInputSplits == inputSplitPathList.size()) {
 284  24
         transferRegulator = null; // don't need this anymore
 285  24
         return null;
 286  
       }
 287  
       // Wait for either a reservation to go away or a notification that
 288  
       // an InputSplit has finished.
 289  0
       context.progress();
 290  0
       getInputSplitsStateChangedEvent().waitMsecs(60 * 1000);
 291  0
       getInputSplitsStateChangedEvent().reset();
 292  0
     }
 293  
   }
 294  
 
 295  
   /**
 296  
    * Load the vertices from the user-defined VertexReader into our partitions
 297  
    * of vertex ranges.  Do this until all the InputSplits have been processed.
 298  
    * All workers will try to do as many InputSplits as they can.  The master
 299  
    * will monitor progress and stop this once all the InputSplits have been
 300  
    * loaded and check-pointed.  Keep track of the last input split path to
 301  
    * ensure the input split cache is flushed prior to marking the last input
 302  
    * split complete.
 303  
    *
 304  
    * @return Statistics of the vertices loaded
 305  
    * @throws IOException
 306  
    * @throws IllegalAccessException
 307  
    * @throws InstantiationException
 308  
    * @throws ClassNotFoundException
 309  
    * @throws InterruptedException
 310  
    * @throws KeeperException
 311  
    */
 312  
   private VertexEdgeCount loadVertices() throws IOException,
 313  
     ClassNotFoundException, InterruptedException, InstantiationException,
 314  
     IllegalAccessException, KeeperException {
 315  24
     String inputSplitPath = null;
 316  24
     VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
 317  48
     while ((inputSplitPath = reserveInputSplit()) != null) {
 318  24
       vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
 319  
           loadVerticesFromInputSplit(inputSplitPath));
 320  
     }
 321  
 
 322  
     // Flush the remaining cached vertices
 323  
     for (Entry<PartitionOwner, Partition<I, V, E, M>> entry :
 324  24
       inputSplitCache.entrySet()) {
 325  23
       if (!entry.getValue().getVertices().isEmpty()) {
 326  23
         getContext().progress();
 327  23
         commService.sendPartitionRequest(entry.getKey().getWorkerInfo(),
 328  
             entry.getValue());
 329  
       }
 330  
     }
 331  24
     inputSplitCache.clear();
 332  24
     commService.flush();
 333  
 
 334  24
     return vertexEdgeCount;
 335  
   }
 336  
 
 337  
   /**
 338  
    * Mark an input split path as completed by this worker.  This notifies
 339  
    * the master and the other workers that this input split has not only
 340  
    * been reserved, but also marked processed.
 341  
    *
 342  
    * @param inputSplitPath Path to the input split.
 343  
    */
 344  
   private void markInputSplitPathFinished(String inputSplitPath) {
 345  24
     String inputSplitFinishedPath =
 346  
         inputSplitPath + INPUT_SPLIT_FINISHED_NODE;
 347  
     try {
 348  24
       getZkExt().createExt(inputSplitFinishedPath,
 349  
           null,
 350  
           Ids.OPEN_ACL_UNSAFE,
 351  
           CreateMode.PERSISTENT,
 352  
           true);
 353  0
     } catch (KeeperException.NodeExistsException e) {
 354  0
       LOG.warn("loadVertices: " + inputSplitFinishedPath +
 355  
           " already exists!");
 356  0
     } catch (KeeperException e) {
 357  0
       throw new IllegalStateException(
 358  
           "loadVertices: KeeperException on " +
 359  
               inputSplitFinishedPath, e);
 360  0
     } catch (InterruptedException e) {
 361  0
       throw new IllegalStateException(
 362  
           "loadVertices: InterruptedException on " +
 363  
               inputSplitFinishedPath, e);
 364  24
     }
 365  24
   }
 366  
 
 367  
   /**
 368  
    * Extract vertices from input split, saving them into a mini cache of
 369  
    * partitions.  Periodically flush the cache of vertices when a limit is
 370  
    * reached in readVerticeFromInputSplit.
 371  
    * Mark the input split finished when done.
 372  
    *
 373  
    * @param inputSplitPath ZK location of input split
 374  
    * @return Mapping of vertex indices and statistics, or null if no data read
 375  
    * @throws IOException
 376  
    * @throws ClassNotFoundException
 377  
    * @throws InterruptedException
 378  
    * @throws InstantiationException
 379  
    * @throws IllegalAccessException
 380  
    */
 381  
   private VertexEdgeCount loadVerticesFromInputSplit(String inputSplitPath)
 382  
     throws IOException, ClassNotFoundException, InterruptedException,
 383  
     InstantiationException, IllegalAccessException {
 384  24
     InputSplit inputSplit = getInputSplitForVertices(inputSplitPath);
 385  24
     VertexEdgeCount vertexEdgeCount =
 386  
         readVerticesFromInputSplit(inputSplit);
 387  24
     if (LOG.isInfoEnabled()) {
 388  24
       LOG.info("loadVerticesFromInputSplit: Finished loading " +
 389  
           inputSplitPath + " " + vertexEdgeCount);
 390  
     }
 391  24
     markInputSplitPathFinished(inputSplitPath);
 392  24
     return vertexEdgeCount;
 393  
   }
 394  
 
 395  
   /**
 396  
    * Talk to ZooKeeper to convert the input split path to the actual
 397  
    * InputSplit containing the vertices to read.
 398  
    *
 399  
    * @param inputSplitPath Location in ZK of input split
 400  
    * @return instance of InputSplit containing vertices to read
 401  
    * @throws IOException
 402  
    * @throws ClassNotFoundException
 403  
    */
 404  
   private InputSplit getInputSplitForVertices(String inputSplitPath)
 405  
     throws IOException, ClassNotFoundException {
 406  
     byte[] splitList;
 407  
     try {
 408  24
       splitList = getZkExt().getData(inputSplitPath, false, null);
 409  0
     } catch (KeeperException e) {
 410  0
       throw new IllegalStateException(
 411  
           "loadVertices: KeeperException on " + inputSplitPath, e);
 412  0
     } catch (InterruptedException e) {
 413  0
       throw new IllegalStateException(
 414  
           "loadVertices: IllegalStateException on " + inputSplitPath, e);
 415  24
     }
 416  24
     getContext().progress();
 417  
 
 418  24
     DataInputStream inputStream =
 419  
         new DataInputStream(new ByteArrayInputStream(splitList));
 420  24
     Text.readString(inputStream); // location data unused here, skip
 421  24
     String inputSplitClass = Text.readString(inputStream);
 422  24
     InputSplit inputSplit = (InputSplit)
 423  
         ReflectionUtils.newInstance(
 424  
             getConfiguration().getClassByName(inputSplitClass),
 425  
             getConfiguration());
 426  24
     ((Writable) inputSplit).readFields(inputStream);
 427  
 
 428  24
     if (LOG.isInfoEnabled()) {
 429  24
       LOG.info("getInputSplitForVertices: Reserved " + inputSplitPath +
 430  
           " from ZooKeeper and got input split '" +
 431  
           inputSplit.toString() + "'");
 432  
     }
 433  24
     return inputSplit;
 434  
   }
 435  
 
 436  
   /**
 437  
    * Read vertices from input split.  If testing, the user may request a
 438  
    * maximum number of vertices to be read from an input split.
 439  
    *
 440  
    * @param inputSplit Input split to process with vertex reader
 441  
    * @return List of vertices.
 442  
    * @throws IOException
 443  
    * @throws InterruptedException
 444  
    */
 445  
   private VertexEdgeCount readVerticesFromInputSplit(
 446  
       InputSplit inputSplit) throws IOException, InterruptedException {
 447  24
     VertexInputFormat<I, V, E, M> vertexInputFormat =
 448  
         BspUtils.<I, V, E, M>createVertexInputFormat(getConfiguration());
 449  24
     VertexReader<I, V, E, M> vertexReader =
 450  
         vertexInputFormat.createVertexReader(inputSplit, getContext());
 451  24
     vertexReader.initialize(inputSplit, getContext());
 452  24
     transferRegulator.clearCounters();
 453  447
     while (vertexReader.nextVertex()) {
 454  423
       Vertex<I, V, E, M> readerVertex =
 455  
           vertexReader.getCurrentVertex();
 456  423
       if (readerVertex.getId() == null) {
 457  0
         throw new IllegalArgumentException(
 458  
             "readVerticesFromInputSplit: Vertex reader returned a vertex " +
 459  
                 "without an id!  - " + readerVertex);
 460  
       }
 461  423
       if (readerVertex.getValue() == null) {
 462  0
         readerVertex.setValue(
 463  
             BspUtils.<V>createVertexValue(getConfiguration()));
 464  
       }
 465  423
       PartitionOwner partitionOwner =
 466  
           workerGraphPartitioner.getPartitionOwner(
 467  
               readerVertex.getId());
 468  423
       Partition<I, V, E, M> partition =
 469  
           inputSplitCache.get(partitionOwner);
 470  423
       if (partition == null) {
 471  23
         partition = new Partition<I, V, E, M>(
 472  
             getConfiguration(),
 473  
             partitionOwner.getPartitionId());
 474  23
         inputSplitCache.put(partitionOwner, partition);
 475  
       }
 476  423
       Vertex<I, V, E, M> oldVertex =
 477  
           partition.putVertex(readerVertex);
 478  423
       if (oldVertex != null) {
 479  0
         LOG.warn("readVertices: Replacing vertex " + oldVertex +
 480  
             " with " + readerVertex);
 481  
       }
 482  423
       getContext().progress(); // do this before potential data transfer
 483  423
       transferRegulator.incrementCounters(partitionOwner, readerVertex);
 484  423
       if (transferRegulator.transferThisPartition(partitionOwner)) {
 485  0
         commService.sendPartitionRequest(partitionOwner.getWorkerInfo(),
 486  
             partition);
 487  0
         inputSplitCache.remove(partitionOwner);
 488  
       }
 489  423
       ++totalVerticesLoaded;
 490  423
       totalEdgesLoaded += readerVertex.getNumEdges();
 491  
 
 492  
       // Update status every 250k vertices
 493  423
       if ((totalVerticesLoaded % 250000) == 0) {
 494  0
         String status = "readVerticesFromInputSplit: Loaded " +
 495  
             totalVerticesLoaded + " vertices and " +
 496  
             totalEdgesLoaded + " edges " +
 497  
             MemoryUtils.getRuntimeMemoryStats() + " " +
 498  
             getGraphMapper().getMapFunctions().toString() +
 499  
             " - Attempt=" + getApplicationAttempt() +
 500  
             ", Superstep=" + getSuperstep();
 501  0
         if (LOG.isInfoEnabled()) {
 502  0
           LOG.info(status);
 503  
         }
 504  0
         getContext().setStatus(status);
 505  
       }
 506  
 
 507  
       // For sampling, or to limit outlier input splits, the number of
 508  
       // records per input split can be limited
 509  423
       if (inputSplitMaxVertices > 0 &&
 510  
         transferRegulator.getTotalVertices() >=
 511  
         inputSplitMaxVertices) {
 512  0
         if (LOG.isInfoEnabled()) {
 513  0
           LOG.info("readVerticesFromInputSplit: Leaving the input " +
 514  
               "split early, reached maximum vertices " +
 515  
               transferRegulator.getTotalVertices());
 516  
         }
 517  
         break;
 518  
       }
 519  423
     }
 520  24
     vertexReader.close();
 521  
 
 522  24
     return new VertexEdgeCount(transferRegulator.getTotalVertices(),
 523  
       transferRegulator.getTotalEdges());
 524  
   }
 525  
 
 526  
   @Override
 527  
   public void assignMessagesToVertex(Vertex<I, V, E, M> vertex,
 528  
       Iterable<M> messages) {
 529  1536
     vertex.putMessages(messages);
 530  1536
   }
 531  
 
 532  
   @Override
 533  
   public WorkerInfo getMasterInfo() {
 534  12
     return masterInfo;
 535  
   }
 536  
 
 537  
   @Override
 538  
   public void setup() {
 539  
     // Unless doing a restart, prepare for computation:
 540  
     // 1. Start superstep INPUT_SUPERSTEP (no computation)
 541  
     // 2. Wait until the INPUT_SPLIT_ALL_READY_PATH node has been created
 542  
     // 3. Process input splits until there are no more.
 543  
     // 4. Wait until the INPUT_SPLIT_ALL_DONE_PATH node has been created
 544  
     // 5. Wait for superstep INPUT_SUPERSTEP to complete.
 545  24
     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
 546  0
       setCachedSuperstep(getRestartedSuperstep());
 547  0
       return;
 548  
     }
 549  
 
 550  24
     JSONObject jobState = getJobState();
 551  24
     if (jobState != null) {
 552  
       try {
 553  0
         if ((ApplicationState.valueOf(jobState.getString(JSONOBJ_STATE_KEY)) ==
 554  
             ApplicationState.START_SUPERSTEP) &&
 555  
             jobState.getLong(JSONOBJ_SUPERSTEP_KEY) ==
 556  
             getSuperstep()) {
 557  0
           if (LOG.isInfoEnabled()) {
 558  0
             LOG.info("setup: Restarting from an automated " +
 559  
                 "checkpointed superstep " +
 560  
                 getSuperstep() + ", attempt " +
 561  
                 getApplicationAttempt());
 562  
           }
 563  0
           setRestartedSuperstep(getSuperstep());
 564  0
           return;
 565  
         }
 566  0
       } catch (JSONException e) {
 567  0
         throw new RuntimeException(
 568  
             "setup: Failed to get key-values from " +
 569  
                 jobState.toString(), e);
 570  0
       }
 571  
     }
 572  
 
 573  
     // Add the partitions for that this worker owns
 574  24
     Collection<? extends PartitionOwner> masterSetPartitionOwners =
 575  
         startSuperstep();
 576  24
     workerGraphPartitioner.updatePartitionOwners(
 577  
         getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
 578  
 
 579  24
     commService.setup();
 580  
 
 581  
     // Ensure the InputSplits are ready for processing before processing
 582  
     while (true) {
 583  
       Stat inputSplitsReadyStat;
 584  
       try {
 585  24
         inputSplitsReadyStat =
 586  
             getZkExt().exists(inputSplitsAllReadyPath, true);
 587  0
       } catch (KeeperException e) {
 588  0
         throw new IllegalStateException(
 589  
             "setup: KeeperException waiting on input splits", e);
 590  0
       } catch (InterruptedException e) {
 591  0
         throw new IllegalStateException(
 592  
             "setup: InterruptedException waiting on input splits", e);
 593  24
       }
 594  24
       if (inputSplitsReadyStat != null) {
 595  24
         break;
 596  
       }
 597  0
       getInputSplitsAllReadyEvent().waitForever();
 598  0
       getInputSplitsAllReadyEvent().reset();
 599  0
     }
 600  
 
 601  24
     getContext().progress();
 602  
 
 603  
     try {
 604  24
       VertexEdgeCount vertexEdgeCount = loadVertices();
 605  24
       if (LOG.isInfoEnabled()) {
 606  24
         LOG.info("setup: Finally loaded a total of " +
 607  
             vertexEdgeCount);
 608  
       }
 609  0
     } catch (IOException e) {
 610  0
       throw new IllegalStateException("setup: loadVertices failed due to " +
 611  
           "IOException", e);
 612  0
     } catch (ClassNotFoundException e) {
 613  0
       throw new IllegalStateException("setup: loadVertices failed due to " +
 614  
           "ClassNotFoundException", e);
 615  0
     } catch (InterruptedException e) {
 616  0
       throw new IllegalStateException("setup: loadVertices failed due to " +
 617  
           "InterruptedException", e);
 618  0
     } catch (InstantiationException e) {
 619  0
       throw new IllegalStateException("setup: loadVertices failed due to " +
 620  
           "InstantiationException", e);
 621  0
     } catch (IllegalAccessException e) {
 622  0
       throw new IllegalStateException("setup: loadVertices failed due to " +
 623  
           "IllegalAccessException", e);
 624  0
     } catch (KeeperException e) {
 625  0
       throw new IllegalStateException("setup: loadVertices failed due to " +
 626  
           "KeeperException", e);
 627  24
     }
 628  24
     getContext().progress();
 629  
 
 630  
     // Workers wait for each other to finish, coordinated by master
 631  24
     String workerDonePath =
 632  
         inputSplitsDonePath + "/" + getWorkerInfo().getHostnameId();
 633  
     try {
 634  24
       getZkExt().createExt(workerDonePath,
 635  
           null,
 636  
           Ids.OPEN_ACL_UNSAFE,
 637  
           CreateMode.PERSISTENT,
 638  
           true);
 639  0
     } catch (KeeperException e) {
 640  0
       throw new IllegalStateException(
 641  
           "setup: KeeperException creating worker done splits", e);
 642  0
     } catch (InterruptedException e) {
 643  0
       throw new IllegalStateException(
 644  
           "setup: InterruptedException creating worker done splits", e);
 645  24
     }
 646  
     while (true) {
 647  
       Stat inputSplitsDoneStat;
 648  
       try {
 649  48
         inputSplitsDoneStat =
 650  
             getZkExt().exists(inputSplitsAllDonePath, true);
 651  0
       } catch (KeeperException e) {
 652  0
         throw new IllegalStateException(
 653  
             "setup: KeeperException waiting on worker done splits", e);
 654  0
       } catch (InterruptedException e) {
 655  0
         throw new IllegalStateException(
 656  
             "setup: InterruptedException waiting on worker " +
 657  
                 "done splits", e);
 658  48
       }
 659  48
       if (inputSplitsDoneStat != null) {
 660  24
         break;
 661  
       }
 662  24
       getInputSplitsAllDoneEvent().waitForever();
 663  24
       getInputSplitsAllDoneEvent().reset();
 664  24
     }
 665  
 
 666  
     // Create remaining partitions owned by this worker.
 667  24
     for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
 668  24
       if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
 669  
           !getPartitionStore().hasPartition(
 670  
               partitionOwner.getPartitionId())) {
 671  1
         Partition<I, V, E, M> partition =
 672  
             new Partition<I, V, E, M>(getConfiguration(),
 673  
                 partitionOwner.getPartitionId());
 674  1
         getPartitionStore().addPartition(partition);
 675  24
       }
 676  
     }
 677  
 
 678  
     // Generate the partition stats for the input superstep and process
 679  
     // if necessary
 680  24
     List<PartitionStats> partitionStatsList =
 681  
         new ArrayList<PartitionStats>();
 682  
     for (Partition<I, V, E, M> partition :
 683  24
         getPartitionStore().getPartitions()) {
 684  24
       PartitionStats partitionStats =
 685  
           new PartitionStats(partition.getId(),
 686  
               partition.getVertices().size(),
 687  
               0,
 688  
               partition.getEdgeCount());
 689  24
       partitionStatsList.add(partitionStats);
 690  24
     }
 691  24
     workerGraphPartitioner.finalizePartitionStats(
 692  
         partitionStatsList, getPartitionStore());
 693  
 
 694  24
     finishSuperstep(partitionStatsList);
 695  24
   }
 696  
 
 697  
   @Override
 698  
   public <A extends Writable> void aggregate(String name, A value) {
 699  1255
     AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
 700  1255
     if (aggregator != null) {
 701  1255
       ((AggregatorWrapper<A>) aggregator).aggregateCurrent(value);
 702  
     } else {
 703  0
       throw new IllegalStateException("aggregate: Tried to aggregate value " +
 704  
           "to unregistered aggregator " + name);
 705  
     }
 706  1255
   }
 707  
 
 708  
   /**
 709  
    *  Marshal the aggregator values of the worker to a byte array that will
 710  
    *  later be aggregated by master.
 711  
    *
 712  
    * @param superstep Superstep to marshall on
 713  
    * @return Byte array of the aggreagtor values
 714  
    */
 715  
   private byte[] marshalAggregatorValues(long superstep) {
 716  218
     if (superstep == INPUT_SUPERSTEP) {
 717  24
       return new byte[0];
 718  
     }
 719  
 
 720  194
     ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
 721  194
     DataOutputStream output = new DataOutputStream(outputStream);
 722  
     for (Entry<String, AggregatorWrapper<Writable>> entry :
 723  194
         getAggregatorMap().entrySet()) {
 724  285
       if (entry.getValue().isChanged()) {
 725  
         try {
 726  251
           output.writeUTF(entry.getKey());
 727  251
           entry.getValue().getCurrentAggregatedValue().write(output);
 728  0
         } catch (IOException e) {
 729  0
           throw new IllegalStateException("Failed to marshall aggregator " +
 730  
               "with IOException " + entry.getKey(), e);
 731  251
         }
 732  
       }
 733  
     }
 734  
 
 735  194
     if (LOG.isInfoEnabled()) {
 736  194
       LOG.info(
 737  
           "marshalAggregatorValues: Finished assembling aggregator values");
 738  
     }
 739  194
     return outputStream.toByteArray();
 740  
   }
 741  
 
 742  
   /**
 743  
    * Get values of aggregators aggregated by master in previous superstep.
 744  
    *
 745  
    * @param superstep Superstep to get the aggregated values from
 746  
    */
 747  
   private void getAggregatorValues(long superstep) {
 748  
     // prepare aggregators for reading and next superstep
 749  
     for (AggregatorWrapper<Writable> aggregator :
 750  194
         getAggregatorMap().values()) {
 751  268
       aggregator.setPreviousAggregatedValue(aggregator.createInitialValue());
 752  268
       aggregator.resetCurrentAggregator();
 753  
     }
 754  194
     String mergedAggregatorPath =
 755  
         getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
 756  
 
 757  194
     byte[] aggregatorArray = null;
 758  
     try {
 759  194
       aggregatorArray = getZkExt().getData(mergedAggregatorPath, false, null);
 760  55
     } catch (KeeperException.NoNodeException e) {
 761  55
       LOG.info("getAggregatorValues: no aggregators in " +
 762  
           mergedAggregatorPath + " on superstep " + superstep);
 763  55
       return;
 764  0
     } catch (KeeperException e) {
 765  0
       throw new IllegalStateException("Failed to get data for " +
 766  
           mergedAggregatorPath + " with KeeperException", e);
 767  0
     } catch (InterruptedException e) {
 768  0
       throw new IllegalStateException("Failed to get data for " +
 769  
           mergedAggregatorPath + " with InterruptedException", e);
 770  139
     }
 771  
 
 772  139
     DataInput input =
 773  
         new DataInputStream(new ByteArrayInputStream(aggregatorArray));
 774  139
     int numAggregators = 0;
 775  
 
 776  
     try {
 777  139
       numAggregators = input.readInt();
 778  0
     } catch (IOException e) {
 779  0
       throw new IllegalStateException("getAggregatorValues: " +
 780  
           "Failed to decode data", e);
 781  139
     }
 782  
 
 783  424
     for (int i = 0; i < numAggregators; i++) {
 784  
       try {
 785  285
         String aggregatorName = input.readUTF();
 786  285
         String aggregatorClassName = input.readUTF();
 787  285
         AggregatorWrapper<Writable> aggregatorWrapper =
 788  
             getAggregatorMap().get(aggregatorName);
 789  285
         if (aggregatorWrapper == null) {
 790  
           try {
 791  17
             Class<? extends Aggregator<Writable>> aggregatorClass =
 792  
                 (Class<? extends Aggregator<Writable>>)
 793  
                     Class.forName(aggregatorClassName);
 794  17
             aggregatorWrapper =
 795  
                 registerAggregator(aggregatorName, aggregatorClass, false);
 796  0
           } catch (ClassNotFoundException e) {
 797  0
             throw new IllegalStateException("Failed to create aggregator " +
 798  
                 aggregatorName + " of class " + aggregatorClassName +
 799  
                 " with ClassNotFoundException", e);
 800  0
           } catch (InstantiationException e) {
 801  0
             throw new IllegalStateException("Failed to create aggregator " +
 802  
                 aggregatorName + " of class " + aggregatorClassName +
 803  
                 " with InstantiationException", e);
 804  0
           } catch (IllegalAccessException e) {
 805  0
             throw new IllegalStateException("Failed to create aggregator " +
 806  
                 aggregatorName + " of class " + aggregatorClassName +
 807  
                 " with IllegalAccessException", e);
 808  17
           }
 809  
         }
 810  285
         Writable aggregatorValue = aggregatorWrapper.createInitialValue();
 811  285
         aggregatorValue.readFields(input);
 812  285
         aggregatorWrapper.setPreviousAggregatedValue(aggregatorValue);
 813  0
       } catch (IOException e) {
 814  0
         throw new IllegalStateException(
 815  
             "Failed to decode data for index " + i, e);
 816  285
       }
 817  
     }
 818  
 
 819  139
     if (LOG.isInfoEnabled()) {
 820  139
       LOG.info("getAggregatorValues: Finished loading " +
 821  
           mergedAggregatorPath);
 822  
     }
 823  139
   }
 824  
 
 825  
   /**
 826  
    * Register the health of this worker for a given superstep
 827  
    *
 828  
    * @param superstep Superstep to register health on
 829  
    */
 830  
   private void registerHealth(long superstep) {
 831  218
     JSONArray hostnamePort = new JSONArray();
 832  218
     hostnamePort.put(getHostname());
 833  
 
 834  218
     hostnamePort.put(workerInfo.getPort());
 835  
 
 836  218
     String myHealthPath = null;
 837  218
     if (isHealthy()) {
 838  218
       myHealthPath = getWorkerInfoHealthyPath(getApplicationAttempt(),
 839  
           getSuperstep());
 840  
     } else {
 841  0
       myHealthPath = getWorkerInfoUnhealthyPath(getApplicationAttempt(),
 842  
           getSuperstep());
 843  
     }
 844  218
     myHealthPath = myHealthPath + "/" + workerInfo.getHostnameId();
 845  
     try {
 846  218
       myHealthZnode = getZkExt().createExt(
 847  
           myHealthPath,
 848  
           WritableUtils.writeToByteArray(workerInfo),
 849  
           Ids.OPEN_ACL_UNSAFE,
 850  
           CreateMode.EPHEMERAL,
 851  
           true);
 852  0
     } catch (KeeperException.NodeExistsException e) {
 853  0
       LOG.warn("registerHealth: myHealthPath already exists (likely " +
 854  
           "from previous failure): " + myHealthPath +
 855  
           ".  Waiting for change in attempts " +
 856  
           "to re-join the application");
 857  0
       getApplicationAttemptChangedEvent().waitForever();
 858  0
       if (LOG.isInfoEnabled()) {
 859  0
         LOG.info("registerHealth: Got application " +
 860  
             "attempt changed event, killing self");
 861  
       }
 862  0
       throw new IllegalStateException(
 863  
           "registerHealth: Trying " +
 864  
               "to get the new application attempt by killing self", e);
 865  0
     } catch (KeeperException e) {
 866  0
       throw new IllegalStateException("Creating " + myHealthPath +
 867  
           " failed with KeeperException", e);
 868  0
     } catch (InterruptedException e) {
 869  0
       throw new IllegalStateException("Creating " + myHealthPath +
 870  
           " failed with InterruptedException", e);
 871  218
     }
 872  218
     if (LOG.isInfoEnabled()) {
 873  218
       LOG.info("registerHealth: Created my health node for attempt=" +
 874  
           getApplicationAttempt() + ", superstep=" +
 875  
           getSuperstep() + " with " + myHealthZnode +
 876  
           " and workerInfo= " + workerInfo);
 877  
     }
 878  218
   }
 879  
 
 880  
   /**
 881  
    * Do this to help notify the master quicker that this worker has failed.
 882  
    */
 883  
   private void unregisterHealth() {
 884  0
     LOG.error("unregisterHealth: Got failure, unregistering health on " +
 885  
         myHealthZnode + " on superstep " + getSuperstep());
 886  
     try {
 887  0
       getZkExt().delete(myHealthZnode, -1);
 888  0
     } catch (InterruptedException e) {
 889  0
       throw new IllegalStateException(
 890  
           "unregisterHealth: InterruptedException - Couldn't delete " +
 891  
               myHealthZnode, e);
 892  0
     } catch (KeeperException e) {
 893  0
       throw new IllegalStateException(
 894  
           "unregisterHealth: KeeperException - Couldn't delete " +
 895  
               myHealthZnode, e);
 896  0
     }
 897  0
   }
 898  
 
 899  
   @Override
 900  
   public void failureCleanup() {
 901  0
     unregisterHealth();
 902  0
   }
 903  
 
 904  
   @Override
 905  
   public Collection<? extends PartitionOwner> startSuperstep() {
 906  
     // Algorithm:
 907  
     // 1. Communication service will combine message from previous
 908  
     //    superstep
 909  
     // 2. Register my health for the next superstep.
 910  
     // 3. Wait until the partition assignment is complete and get it
 911  
     // 4. Get the aggregator values from the previous superstep
 912  218
     if (getSuperstep() != INPUT_SUPERSTEP) {
 913  194
       commService.prepareSuperstep();
 914  
     }
 915  
 
 916  218
     registerHealth(getSuperstep());
 917  
 
 918  218
     String partitionAssignmentsNode =
 919  
         getPartitionAssignmentsPath(getApplicationAttempt(),
 920  
             getSuperstep());
 921  
     Collection<? extends PartitionOwner> masterSetPartitionOwners;
 922  
     try {
 923  436
       while (getZkExt().exists(partitionAssignmentsNode, true) ==
 924  
           null) {
 925  218
         getPartitionAssignmentsReadyChangedEvent().waitForever();
 926  218
         getPartitionAssignmentsReadyChangedEvent().reset();
 927  
       }
 928  218
       List<? extends Writable> writableList =
 929  
           WritableUtils.readListFieldsFromZnode(
 930  
               getZkExt(),
 931  
               partitionAssignmentsNode,
 932  
               false,
 933  
               null,
 934  
               workerGraphPartitioner.createPartitionOwner().getClass(),
 935  
               getConfiguration());
 936  
 
 937  
       @SuppressWarnings("unchecked")
 938  218
       Collection<? extends PartitionOwner> castedWritableList =
 939  
         (Collection<? extends PartitionOwner>) writableList;
 940  218
       masterSetPartitionOwners = castedWritableList;
 941  0
     } catch (KeeperException e) {
 942  0
       throw new IllegalStateException(
 943  
           "startSuperstep: KeeperException getting assignments", e);
 944  0
     } catch (InterruptedException e) {
 945  0
       throw new IllegalStateException(
 946  
           "startSuperstep: InterruptedException getting assignments", e);
 947  218
     }
 948  
 
 949  
 
 950  218
     boolean useNetty = getConfiguration().getBoolean(GiraphJob.USE_NETTY,
 951  
         GiraphJob.USE_NETTY_DEFAULT);
 952  218
     if (useNetty) {
 953  
       // get address of master
 954  12
       WritableUtils.readFieldsFromZnode(getZkExt(), currentMasterPath, false,
 955  
           null, masterInfo);
 956  
     }
 957  
 
 958  218
     if (LOG.isInfoEnabled()) {
 959  218
       LOG.info("startSuperstep: Ready for computation on superstep " +
 960  
           getSuperstep() + " since worker " +
 961  
           "selection and vertex range assignments are done in " +
 962  
           partitionAssignmentsNode);
 963  
     }
 964  
 
 965  218
     if (getSuperstep() != INPUT_SUPERSTEP) {
 966  194
       getAggregatorValues(getSuperstep());
 967  
     }
 968  218
     getContext().setStatus("startSuperstep: " +
 969  
         getGraphMapper().getMapFunctions().toString() +
 970  
         " - Attempt=" + getApplicationAttempt() +
 971  
         ", Superstep=" + getSuperstep());
 972  218
     return masterSetPartitionOwners;
 973  
   }
 974  
 
 975  
   @Override
 976  
   public boolean finishSuperstep(List<PartitionStats> partitionStatsList) {
 977  
     // This barrier blocks until success (or the master signals it to
 978  
     // restart).
 979  
     //
 980  
     // Master will coordinate the barriers and aggregate "doneness" of all
 981  
     // the vertices.  Each worker will:
 982  
     // 1. Flush the unsent messages
 983  
     // 2. Execute user postSuperstep() if necessary.
 984  
     // 3. Save aggregator values that are in use.
 985  
     // 4. Report the statistics (vertices, edges, messages, etc.)
 986  
     //    of this worker
 987  
     // 5. Let the master know it is finished.
 988  
     // 6. Wait for the master's global stats, and check if done
 989  
 
 990  218
     getContext().setStatus("Flushing started: " +
 991  
         getGraphMapper().getMapFunctions().toString() +
 992  
         " - Attempt=" + getApplicationAttempt() +
 993  
         ", Superstep=" + getSuperstep());
 994  
 
 995  218
     long workerSentMessages = 0;
 996  
     try {
 997  218
       commService.flush();
 998  218
       workerSentMessages = commService.resetMessageCount();
 999  0
     } catch (IOException e) {
 1000  0
       throw new IllegalStateException(
 1001  
           "finishSuperstep: flush failed", e);
 1002  218
     }
 1003  
 
 1004  218
     if (getSuperstep() != INPUT_SUPERSTEP) {
 1005  194
       getWorkerContext().postSuperstep();
 1006  194
       getContext().progress();
 1007  
     }
 1008  
 
 1009  218
     if (LOG.isInfoEnabled()) {
 1010  218
       LOG.info("finishSuperstep: Superstep " + getSuperstep() +
 1011  
           ", messages = " + workerSentMessages + " " +
 1012  
           MemoryUtils.getRuntimeMemoryStats());
 1013  
     }
 1014  
 
 1015  218
     byte[] aggregatorArray =
 1016  
         marshalAggregatorValues(getSuperstep());
 1017  218
     Collection<PartitionStats> finalizedPartitionStats =
 1018  
         workerGraphPartitioner.finalizePartitionStats(
 1019  
             partitionStatsList, getPartitionStore());
 1020  218
     List<PartitionStats> finalizedPartitionStatsList =
 1021  
         new ArrayList<PartitionStats>(finalizedPartitionStats);
 1022  218
     byte [] partitionStatsBytes =
 1023  
         WritableUtils.writeListToByteArray(finalizedPartitionStatsList);
 1024  218
     JSONObject workerFinishedInfoObj = new JSONObject();
 1025  
     try {
 1026  218
       workerFinishedInfoObj.put(JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY,
 1027  
           Base64.encodeBytes(aggregatorArray));
 1028  218
       workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
 1029  
           Base64.encodeBytes(partitionStatsBytes));
 1030  218
       workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY,
 1031  
           workerSentMessages);
 1032  0
     } catch (JSONException e) {
 1033  0
       throw new RuntimeException(e);
 1034  218
     }
 1035  218
     String finishedWorkerPath =
 1036  
         getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()) +
 1037  
         "/" + getHostnamePartitionId();
 1038  
     try {
 1039  218
       getZkExt().createExt(finishedWorkerPath,
 1040  
           workerFinishedInfoObj.toString().getBytes(),
 1041  
           Ids.OPEN_ACL_UNSAFE,
 1042  
           CreateMode.PERSISTENT,
 1043  
           true);
 1044  0
     } catch (KeeperException.NodeExistsException e) {
 1045  0
       LOG.warn("finishSuperstep: finished worker path " +
 1046  
           finishedWorkerPath + " already exists!");
 1047  0
     } catch (KeeperException e) {
 1048  0
       throw new IllegalStateException("Creating " + finishedWorkerPath +
 1049  
           " failed with KeeperException", e);
 1050  0
     } catch (InterruptedException e) {
 1051  0
       throw new IllegalStateException("Creating " + finishedWorkerPath +
 1052  
           " failed with InterruptedException", e);
 1053  218
     }
 1054  218
     getContext().setStatus("finishSuperstep: (waiting for rest " +
 1055  
         "of workers) " +
 1056  
         getGraphMapper().getMapFunctions().toString() +
 1057  
         " - Attempt=" + getApplicationAttempt() +
 1058  
         ", Superstep=" + getSuperstep());
 1059  
 
 1060  218
     String superstepFinishedNode =
 1061  
         getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
 1062  
     try {
 1063  436
       while (getZkExt().exists(superstepFinishedNode, true) == null) {
 1064  218
         getSuperstepFinishedEvent().waitForever();
 1065  218
         getSuperstepFinishedEvent().reset();
 1066  
       }
 1067  0
     } catch (KeeperException e) {
 1068  0
       throw new IllegalStateException(
 1069  
           "finishSuperstep: Failed while waiting for master to " +
 1070  
               "signal completion of superstep " + getSuperstep(), e);
 1071  0
     } catch (InterruptedException e) {
 1072  0
       throw new IllegalStateException(
 1073  
           "finishSuperstep: Failed while waiting for master to " +
 1074  
               "signal completion of superstep " + getSuperstep(), e);
 1075  218
     }
 1076  218
     GlobalStats globalStats = new GlobalStats();
 1077  218
     WritableUtils.readFieldsFromZnode(
 1078  
         getZkExt(), superstepFinishedNode, false, null, globalStats);
 1079  218
     if (LOG.isInfoEnabled()) {
 1080  218
       LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
 1081  
           " with global stats " + globalStats);
 1082  
     }
 1083  218
     incrCachedSuperstep();
 1084  218
     getContext().setStatus("finishSuperstep: (all workers done) " +
 1085  
         getGraphMapper().getMapFunctions().toString() +
 1086  
         " - Attempt=" + getApplicationAttempt() +
 1087  
         ", Superstep=" + getSuperstep());
 1088  218
     getGraphMapper().getGraphState().
 1089  
         setTotalNumEdges(globalStats.getEdgeCount()).
 1090  
         setTotalNumVertices(globalStats.getVertexCount());
 1091  218
     return globalStats.getHaltComputation();
 1092  
   }
 1093  
 
 1094  
   /**
 1095  
    * Save the vertices using the user-defined VertexOutputFormat from our
 1096  
    * vertexArray based on the split.
 1097  
    * @throws InterruptedException
 1098  
    */
 1099  
   private void saveVertices() throws IOException, InterruptedException {
 1100  24
     if (getConfiguration().get(GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS) ==
 1101  
         null) {
 1102  7
       LOG.warn("saveVertices: " + GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS +
 1103  
           " not specified -- there will be no saved output");
 1104  7
       return;
 1105  
     }
 1106  
 
 1107  17
     VertexOutputFormat<I, V, E> vertexOutputFormat =
 1108  
         BspUtils.<I, V, E>createVertexOutputFormat(getConfiguration());
 1109  17
     VertexWriter<I, V, E> vertexWriter =
 1110  
         vertexOutputFormat.createVertexWriter(getContext());
 1111  17
     vertexWriter.initialize(getContext());
 1112  
     for (Partition<I, V, E, M> partition :
 1113  17
         getPartitionStore().getPartitions()) {
 1114  17
       for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
 1115  398
         vertexWriter.writeVertex(vertex);
 1116  
       }
 1117  
     }
 1118  17
     vertexWriter.close(getContext());
 1119  17
   }
 1120  
 
 1121  
   @Override
 1122  
   public void cleanup() throws IOException, InterruptedException {
 1123  24
     commService.closeConnections();
 1124  24
     setCachedSuperstep(getSuperstep() - 1);
 1125  24
     saveVertices();
 1126  
     // All worker processes should denote they are done by adding special
 1127  
     // znode.  Once the number of znodes equals the number of partitions
 1128  
     // for workers and masters, the master will clean up the ZooKeeper
 1129  
     // znodes associated with this job.
 1130  24
     String workerCleanedUpPath = cleanedUpPath  + "/" +
 1131  
         getTaskPartition() + WORKER_SUFFIX;
 1132  
     try {
 1133  24
       String finalFinishedPath =
 1134  
           getZkExt().createExt(workerCleanedUpPath,
 1135  
               null,
 1136  
               Ids.OPEN_ACL_UNSAFE,
 1137  
               CreateMode.PERSISTENT,
 1138  
               true);
 1139  24
       if (LOG.isInfoEnabled()) {
 1140  24
         LOG.info("cleanup: Notifying master its okay to cleanup with " +
 1141  
             finalFinishedPath);
 1142  
       }
 1143  0
     } catch (KeeperException.NodeExistsException e) {
 1144  0
       if (LOG.isInfoEnabled()) {
 1145  0
         LOG.info("cleanup: Couldn't create finished node '" +
 1146  
             workerCleanedUpPath);
 1147  
       }
 1148  0
     } catch (KeeperException e) {
 1149  
       // Cleaning up, it's okay to fail after cleanup is successful
 1150  0
       LOG.error("cleanup: Got KeeperException on notifcation " +
 1151  
           "to master about cleanup", e);
 1152  0
     } catch (InterruptedException e) {
 1153  
       // Cleaning up, it's okay to fail after cleanup is successful
 1154  0
       LOG.error("cleanup: Got InterruptedException on notifcation " +
 1155  
           "to master about cleanup", e);
 1156  24
     }
 1157  
     try {
 1158  24
       getZkExt().close();
 1159  0
     } catch (InterruptedException e) {
 1160  
       // cleanup phase -- just log the error
 1161  0
       LOG.error("cleanup: Zookeeper failed to close with " + e);
 1162  24
     }
 1163  
 
 1164  
     // Preferably would shut down the service only after
 1165  
     // all clients have disconnected (or the exceptions on the
 1166  
     // client side ignored).
 1167  24
     commService.close();
 1168  24
   }
 1169  
 
 1170  
   @Override
 1171  
   public void storeCheckpoint() throws IOException {
 1172  4
     getContext().setStatus("storeCheckpoint: Starting checkpoint " +
 1173  
         getGraphMapper().getMapFunctions().toString() +
 1174  
         " - Attempt=" + getApplicationAttempt() +
 1175  
         ", Superstep=" + getSuperstep());
 1176  
 
 1177  
     // Algorithm:
 1178  
     // For each partition, dump vertices and messages
 1179  4
     Path metadataFilePath =
 1180  
         new Path(getCheckpointBasePath(getSuperstep()) + "." +
 1181  
             getHostnamePartitionId() +
 1182  
             CHECKPOINT_METADATA_POSTFIX);
 1183  4
     Path verticesFilePath =
 1184  
         new Path(getCheckpointBasePath(getSuperstep()) + "." +
 1185  
             getHostnamePartitionId() +
 1186  
             CHECKPOINT_VERTICES_POSTFIX);
 1187  4
     Path validFilePath =
 1188  
         new Path(getCheckpointBasePath(getSuperstep()) + "." +
 1189  
             getHostnamePartitionId() +
 1190  
             CHECKPOINT_VALID_POSTFIX);
 1191  
 
 1192  
     // Remove these files if they already exist (shouldn't though, unless
 1193  
     // of previous failure of this worker)
 1194  4
     if (getFs().delete(validFilePath, false)) {
 1195  0
       LOG.warn("storeCheckpoint: Removed valid file " +
 1196  
           validFilePath);
 1197  
     }
 1198  4
     if (getFs().delete(metadataFilePath, false)) {
 1199  0
       LOG.warn("storeCheckpoint: Removed metadata file " +
 1200  
           metadataFilePath);
 1201  
     }
 1202  4
     if (getFs().delete(verticesFilePath, false)) {
 1203  0
       LOG.warn("storeCheckpoint: Removed file " + verticesFilePath);
 1204  
     }
 1205  
 
 1206  4
     boolean useNetty = getConfiguration().getBoolean(GiraphJob.USE_NETTY,
 1207  
         GiraphJob.USE_NETTY_DEFAULT);
 1208  4
     FSDataOutputStream verticesOutputStream =
 1209  
         getFs().create(verticesFilePath);
 1210  4
     ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
 1211  4
     DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
 1212  
     for (Partition<I, V, E, M> partition :
 1213  4
         getPartitionStore().getPartitions()) {
 1214  4
       long startPos = verticesOutputStream.getPos();
 1215  4
       partition.write(verticesOutputStream);
 1216  
       // write messages
 1217  4
       verticesOutputStream.writeBoolean(useNetty);
 1218  4
       if (useNetty) {
 1219  0
         getServerData().getCurrentMessageStore().writePartition(
 1220  
             verticesOutputStream, partition.getId());
 1221  
       }
 1222  
       // Write the metadata for this partition
 1223  
       // Format:
 1224  
       // <index count>
 1225  
       //   <index 0 start pos><partition id>
 1226  
       //   <index 1 start pos><partition id>
 1227  4
       metadataOutput.writeLong(startPos);
 1228  4
       metadataOutput.writeInt(partition.getId());
 1229  4
       if (LOG.isDebugEnabled()) {
 1230  0
         LOG.debug("storeCheckpoint: Vertex file starting " +
 1231  
             "offset = " + startPos + ", length = " +
 1232  
             (verticesOutputStream.getPos() - startPos) +
 1233  
             ", partition = " + partition.toString());
 1234  
       }
 1235  4
     }
 1236  
     // Metadata is buffered and written at the end since it's small and
 1237  
     // needs to know how many partitions this worker owns
 1238  4
     FSDataOutputStream metadataOutputStream =
 1239  
         getFs().create(metadataFilePath);
 1240  4
     metadataOutputStream.writeInt(getPartitionStore().getNumPartitions());
 1241  4
     metadataOutputStream.write(metadataByteStream.toByteArray());
 1242  4
     metadataOutputStream.close();
 1243  4
     verticesOutputStream.close();
 1244  4
     if (LOG.isInfoEnabled()) {
 1245  4
       LOG.info("storeCheckpoint: Finished metadata (" +
 1246  
           metadataFilePath + ") and vertices (" + verticesFilePath + ").");
 1247  
     }
 1248  
 
 1249  4
     getFs().createNewFile(validFilePath);
 1250  
 
 1251  
     // Notify master that checkpoint is stored
 1252  4
     String workerWroteCheckpoint =
 1253  
         getWorkerWroteCheckpointPath(getApplicationAttempt(),
 1254  
             getSuperstep()) + "/" + getHostnamePartitionId();
 1255  
     try {
 1256  4
       getZkExt().createExt(workerWroteCheckpoint,
 1257  
           new byte[0],
 1258  
           Ids.OPEN_ACL_UNSAFE,
 1259  
           CreateMode.PERSISTENT,
 1260  
           true);
 1261  0
     } catch (KeeperException.NodeExistsException e) {
 1262  0
       LOG.warn("finishSuperstep: wrote checkpoint worker path " +
 1263  
           workerWroteCheckpoint + " already exists!");
 1264  0
     } catch (KeeperException e) {
 1265  0
       throw new IllegalStateException("Creating " + workerWroteCheckpoint +
 1266  
           " failed with KeeperException", e);
 1267  0
     } catch (InterruptedException e) {
 1268  0
       throw new IllegalStateException("Creating " + workerWroteCheckpoint +
 1269  
           " failed with InterruptedException", e);
 1270  4
     }
 1271  4
   }
 1272  
 
 1273  
   @Override
 1274  
   public void loadCheckpoint(long superstep) {
 1275  0
     if (getConfiguration().getBoolean(GiraphJob.USE_NETTY,
 1276  
         GiraphJob.USE_NETTY_DEFAULT)) {
 1277  
       try {
 1278  
         // clear old message stores
 1279  0
         getServerData().getIncomingMessageStore().clearAll();
 1280  0
         getServerData().getCurrentMessageStore().clearAll();
 1281  0
       } catch (IOException e) {
 1282  0
         throw new RuntimeException(
 1283  
             "loadCheckpoint: Failed to clear message stores ", e);
 1284  0
       }
 1285  
     }
 1286  
 
 1287  
     // Algorithm:
 1288  
     // Examine all the partition owners and load the ones
 1289  
     // that match my hostname and id from the master designated checkpoint
 1290  
     // prefixes.
 1291  0
     long startPos = 0;
 1292  0
     int loadedPartitions = 0;
 1293  
     for (PartitionOwner partitionOwner :
 1294  0
       workerGraphPartitioner.getPartitionOwners()) {
 1295  0
       if (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
 1296  0
         String metadataFile =
 1297  
             partitionOwner.getCheckpointFilesPrefix() +
 1298  
             CHECKPOINT_METADATA_POSTFIX;
 1299  0
         String partitionsFile =
 1300  
             partitionOwner.getCheckpointFilesPrefix() +
 1301  
             CHECKPOINT_VERTICES_POSTFIX;
 1302  
         try {
 1303  0
           int partitionId = -1;
 1304  0
           DataInputStream metadataStream =
 1305  
               getFs().open(new Path(metadataFile));
 1306  0
           int partitions = metadataStream.readInt();
 1307  0
           for (int i = 0; i < partitions; ++i) {
 1308  0
             startPos = metadataStream.readLong();
 1309  0
             partitionId = metadataStream.readInt();
 1310  0
             if (partitionId == partitionOwner.getPartitionId()) {
 1311  0
               break;
 1312  
             }
 1313  
           }
 1314  0
           if (partitionId != partitionOwner.getPartitionId()) {
 1315  0
             throw new IllegalStateException(
 1316  
                 "loadCheckpoint: " + partitionOwner +
 1317  
                 " not found!");
 1318  
           }
 1319  0
           metadataStream.close();
 1320  0
           Partition<I, V, E, M> partition =
 1321  
               new Partition<I, V, E, M>(
 1322  
                   getConfiguration(),
 1323  
                   partitionId);
 1324  0
           DataInputStream partitionsStream =
 1325  
               getFs().open(new Path(partitionsFile));
 1326  0
           if (partitionsStream.skip(startPos) != startPos) {
 1327  0
             throw new IllegalStateException(
 1328  
                 "loadCheckpoint: Failed to skip " + startPos +
 1329  
                 " on " + partitionsFile);
 1330  
           }
 1331  0
           partition.readFields(partitionsStream);
 1332  0
           if (partitionsStream.readBoolean()) {
 1333  0
             getServerData().getCurrentMessageStore().readFieldsForPartition(
 1334  
                 partitionsStream, partitionId);
 1335  
           }
 1336  0
           partitionsStream.close();
 1337  0
           if (LOG.isInfoEnabled()) {
 1338  0
             LOG.info("loadCheckpoint: Loaded partition " +
 1339  
                 partition);
 1340  
           }
 1341  0
           if (getPartitionStore().hasPartition(partitionId)) {
 1342  0
             throw new IllegalStateException(
 1343  
                 "loadCheckpoint: Already has partition owner " +
 1344  
                     partitionOwner);
 1345  
           }
 1346  0
           getPartitionStore().addPartition(partition);
 1347  0
           ++loadedPartitions;
 1348  0
         } catch (IOException e) {
 1349  0
           throw new RuntimeException(
 1350  
               "loadCheckpoing: Failed to get partition owner " +
 1351  
                   partitionOwner, e);
 1352  0
         }
 1353  0
       }
 1354  
     }
 1355  0
     if (LOG.isInfoEnabled()) {
 1356  0
       LOG.info("loadCheckpoint: Loaded " + loadedPartitions +
 1357  
           " partitions of out " +
 1358  
           workerGraphPartitioner.getPartitionOwners().size() +
 1359  
           " total.");
 1360  
     }
 1361  
 
 1362  
     // Load global statistics
 1363  0
     String finalizedCheckpointPath =
 1364  
         getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
 1365  
     try {
 1366  0
       DataInputStream finalizedStream =
 1367  
           getFs().open(new Path(finalizedCheckpointPath));
 1368  0
       GlobalStats globalStats = new GlobalStats();
 1369  0
       globalStats.readFields(finalizedStream);
 1370  0
       getGraphMapper().getGraphState().
 1371  
           setTotalNumEdges(globalStats.getEdgeCount()).
 1372  
           setTotalNumVertices(globalStats.getVertexCount());
 1373  0
     } catch (IOException e) {
 1374  0
       throw new IllegalStateException(
 1375  
           "loadCheckpoint: Failed to load global statistics", e);
 1376  0
     }
 1377  
 
 1378  
     // Communication service needs to setup the connections prior to
 1379  
     // processing vertices
 1380  0
     commService.setup();
 1381  0
   }
 1382  
 
 1383  
   /**
 1384  
    * Send the worker partitions to their destination workers
 1385  
    *
 1386  
    * @param workerPartitionMap Map of worker info to the partitions stored
 1387  
    *        on this worker to be sent
 1388  
    */
 1389  
   private void sendWorkerPartitions(
 1390  
       Map<WorkerInfo, List<Integer>> workerPartitionMap) {
 1391  194
     List<Entry<WorkerInfo, List<Integer>>> randomEntryList =
 1392  
         new ArrayList<Entry<WorkerInfo, List<Integer>>>(
 1393  
             workerPartitionMap.entrySet());
 1394  194
     Collections.shuffle(randomEntryList);
 1395  
     for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
 1396  194
       randomEntryList) {
 1397  0
       for (Integer partitionId : workerPartitionList.getValue()) {
 1398  0
         Partition<I, V, E, M> partition =
 1399  
             getPartitionStore().removePartition(partitionId);
 1400  0
         if (partition == null) {
 1401  0
           throw new IllegalStateException(
 1402  
               "sendWorkerPartitions: Couldn't find partition " +
 1403  
                   partitionId + " to send to " +
 1404  
                   workerPartitionList.getKey());
 1405  
         }
 1406  0
         if (LOG.isInfoEnabled()) {
 1407  0
           LOG.info("sendWorkerPartitions: Sending worker " +
 1408  
               workerPartitionList.getKey() + " partition " +
 1409  
               partitionId);
 1410  
         }
 1411  0
         getGraphMapper().getGraphState().getWorkerCommunications().
 1412  
             sendPartitionRequest(workerPartitionList.getKey(),
 1413  
                 partition);
 1414  0
       }
 1415  
     }
 1416  
 
 1417  
     try {
 1418  194
       getGraphMapper().getGraphState().getWorkerCommunications().flush();
 1419  0
     } catch (IOException e) {
 1420  0
       throw new IllegalStateException("sendWorkerPartitions: Flush failed", e);
 1421  194
     }
 1422  194
     String myPartitionExchangeDonePath =
 1423  
         getPartitionExchangeWorkerPath(
 1424  
             getApplicationAttempt(), getSuperstep(), getWorkerInfo());
 1425  
     try {
 1426  194
       getZkExt().createExt(myPartitionExchangeDonePath,
 1427  
           null,
 1428  
           Ids.OPEN_ACL_UNSAFE,
 1429  
           CreateMode.PERSISTENT,
 1430  
           true);
 1431  0
     } catch (KeeperException e) {
 1432  0
       throw new IllegalStateException(
 1433  
           "sendWorkerPartitions: KeeperException to create " +
 1434  
               myPartitionExchangeDonePath, e);
 1435  0
     } catch (InterruptedException e) {
 1436  0
       throw new IllegalStateException(
 1437  
           "sendWorkerPartitions: InterruptedException to create " +
 1438  
               myPartitionExchangeDonePath, e);
 1439  194
     }
 1440  194
     if (LOG.isInfoEnabled()) {
 1441  194
       LOG.info("sendWorkerPartitions: Done sending all my partitions.");
 1442  
     }
 1443  194
   }
 1444  
 
 1445  
   @Override
 1446  
   public final void exchangeVertexPartitions(
 1447  
       Collection<? extends PartitionOwner> masterSetPartitionOwners) {
 1448  
     // 1. Fix the addresses of the partition ids if they have changed.
 1449  
     // 2. Send all the partitions to their destination workers in a random
 1450  
     //    fashion.
 1451  
     // 3. Notify completion with a ZooKeeper stamp
 1452  
     // 4. Wait for all my dependencies to be done (if any)
 1453  
     // 5. Add the partitions to myself.
 1454  194
     PartitionExchange partitionExchange =
 1455  
         workerGraphPartitioner.updatePartitionOwners(
 1456  
             getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
 1457  194
     commService.fixPartitionIdToSocketAddrMap();
 1458  
 
 1459  194
     Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
 1460  
         partitionExchange.getSendWorkerPartitionMap();
 1461  194
     if (!getPartitionStore().isEmpty()) {
 1462  194
       sendWorkerPartitions(sendWorkerPartitionMap);
 1463  
     }
 1464  
 
 1465  194
     Set<WorkerInfo> myDependencyWorkerSet =
 1466  
         partitionExchange.getMyDependencyWorkerSet();
 1467  194
     Set<String> workerIdSet = new HashSet<String>();
 1468  194
     for (WorkerInfo tmpWorkerInfo : myDependencyWorkerSet) {
 1469  0
       if (!workerIdSet.add(tmpWorkerInfo.getHostnameId())) {
 1470  0
         throw new IllegalStateException(
 1471  
             "exchangeVertexPartitions: Duplicate entry " + tmpWorkerInfo);
 1472  
       }
 1473  
     }
 1474  194
     if (myDependencyWorkerSet.isEmpty() && getPartitionStore().isEmpty()) {
 1475  0
       if (LOG.isInfoEnabled()) {
 1476  0
         LOG.info("exchangeVertexPartitions: Nothing to exchange, " +
 1477  
             "exiting early");
 1478  
       }
 1479  0
       return;
 1480  
     }
 1481  
 
 1482  194
     String vertexExchangePath =
 1483  
         getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
 1484  
     List<String> workerDoneList;
 1485  
     try {
 1486  
       while (true) {
 1487  194
         workerDoneList = getZkExt().getChildrenExt(
 1488  
             vertexExchangePath, true, false, false);
 1489  194
         workerIdSet.removeAll(workerDoneList);
 1490  194
         if (workerIdSet.isEmpty()) {
 1491  194
           break;
 1492  
         }
 1493  0
         if (LOG.isInfoEnabled()) {
 1494  0
           LOG.info("exchangeVertexPartitions: Waiting for workers " +
 1495  
               workerIdSet);
 1496  
         }
 1497  0
         getPartitionExchangeChildrenChangedEvent().waitForever();
 1498  0
         getPartitionExchangeChildrenChangedEvent().reset();
 1499  
       }
 1500  0
     } catch (KeeperException e) {
 1501  0
       throw new RuntimeException(e);
 1502  0
     } catch (InterruptedException e) {
 1503  0
       throw new RuntimeException(e);
 1504  194
     }
 1505  
 
 1506  194
     if (LOG.isInfoEnabled()) {
 1507  194
       LOG.info("exchangeVertexPartitions: Done with exchange.");
 1508  
     }
 1509  194
   }
 1510  
 
 1511  
   /**
 1512  
    * Get event when the state of a partition exchange has changed.
 1513  
    *
 1514  
    * @return Event to check.
 1515  
    */
 1516  
   public final BspEvent getPartitionExchangeChildrenChangedEvent() {
 1517  0
     return partitionExchangeChildrenChanged;
 1518  
   }
 1519  
 
 1520  
   @Override
 1521  
   protected boolean processEvent(WatchedEvent event) {
 1522  1038
     boolean foundEvent = false;
 1523  1038
     if (event.getPath().startsWith(masterJobStatePath) &&
 1524  
         (event.getType() == EventType.NodeChildrenChanged)) {
 1525  4
       if (LOG.isInfoEnabled()) {
 1526  4
         LOG.info("processEvent: Job state changed, checking " +
 1527  
             "to see if it needs to restart");
 1528  
       }
 1529  4
       JSONObject jsonObj = getJobState();
 1530  
       try {
 1531  3
         if ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
 1532  
             ApplicationState.START_SUPERSTEP) &&
 1533  
             jsonObj.getLong(JSONOBJ_APPLICATION_ATTEMPT_KEY) !=
 1534  
             getApplicationAttempt()) {
 1535  0
           LOG.fatal("processEvent: Worker will restart " +
 1536  
               "from command - " + jsonObj.toString());
 1537  0
           System.exit(-1);
 1538  
         }
 1539  0
       } catch (JSONException e) {
 1540  0
         throw new RuntimeException(
 1541  
             "processEvent: Couldn't properly get job state from " +
 1542  
                 jsonObj.toString());
 1543  3
       }
 1544  3
       foundEvent = true;
 1545  3
     } else if (event.getPath().contains(PARTITION_EXCHANGE_DIR) &&
 1546  
         event.getType() == EventType.NodeChildrenChanged) {
 1547  167
       if (LOG.isInfoEnabled()) {
 1548  167
         LOG.info("processEvent : partitionExchangeChildrenChanged " +
 1549  
             "(at least one worker is done sending partitions)");
 1550  
       }
 1551  167
       partitionExchangeChildrenChanged.signal();
 1552  167
       foundEvent = true;
 1553  
     }
 1554  
 
 1555  1037
     return foundEvent;
 1556  
   }
 1557  
 
 1558  
   @Override
 1559  
   public WorkerInfo getWorkerInfo() {
 1560  521
     return workerInfo;
 1561  
   }
 1562  
 
 1563  
   @Override
 1564  
   public PartitionStore<I, V, E, M> getPartitionStore() {
 1565  1619
     if (workerPartitionStore != null) {
 1566  1537
       return workerPartitionStore;
 1567  
     } else {
 1568  82
       return getServerData().getPartitionStore();
 1569  
     }
 1570  
   }
 1571  
 
 1572  
   @Override
 1573  
   public Collection<? extends PartitionOwner> getPartitionOwners() {
 1574  413
     return workerGraphPartitioner.getPartitionOwners();
 1575  
   }
 1576  
 
 1577  
   @Override
 1578  
   public PartitionOwner getVertexPartitionOwner(I vertexId) {
 1579  2924
     return workerGraphPartitioner.getPartitionOwner(vertexId);
 1580  
   }
 1581  
 
 1582  
   @Override
 1583  
   public Partition<I, V, E, M> getPartition(I vertexId) {
 1584  50
     return getPartitionStore().getPartition(getPartitionId(vertexId));
 1585  
   }
 1586  
 
 1587  
   @Override
 1588  
   public Integer getPartitionId(I vertexId) {
 1589  50
     PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
 1590  50
     return partitionOwner.getPartitionId();
 1591  
   }
 1592  
 
 1593  
   @Override
 1594  
   public boolean hasPartition(Integer partitionId) {
 1595  0
     return getPartitionStore().hasPartition(partitionId);
 1596  
   }
 1597  
 
 1598  
   @Override
 1599  
   public Vertex<I, V, E, M> getVertex(I vertexId) {
 1600  45
     PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
 1601  45
     if (getPartitionStore().hasPartition(partitionOwner.getPartitionId())) {
 1602  45
       return getPartitionStore().getPartition(
 1603  
           partitionOwner.getPartitionId()).getVertex(vertexId);
 1604  
     } else {
 1605  0
       return null;
 1606  
     }
 1607  
   }
 1608  
 
 1609  
   @Override
 1610  
   public ServerData<I, V, E, M> getServerData() {
 1611  94
     return commService.getServerData();
 1612  
   }
 1613  
 }