Coverage Report - org.apache.giraph.comm.BasicRPCCommunications
 
Classes in this File Line Coverage Branch Coverage Complexity
BasicRPCCommunications
77%
307/397
66%
117/176
4.659
BasicRPCCommunications$LargeMessageFlushExecutor
0%
0/27
0%
0/10
4.659
BasicRPCCommunications$PeerConnection
81%
9/11
50%
1/2
4.659
BasicRPCCommunications$PeerFlushExecutor
72%
43/59
69%
25/36
4.659
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm;
 20  
 
 21  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 22  
 import org.apache.giraph.graph.BspUtils;
 23  
 import org.apache.giraph.graph.Edge;
 24  
 import org.apache.giraph.graph.GiraphJob;
 25  
 import org.apache.giraph.graph.Vertex;
 26  
 import org.apache.giraph.graph.VertexCombiner;
 27  
 import org.apache.giraph.graph.VertexMutations;
 28  
 import org.apache.giraph.graph.VertexResolver;
 29  
 import org.apache.giraph.graph.WorkerInfo;
 30  
 import org.apache.giraph.graph.partition.Partition;
 31  
 import org.apache.giraph.graph.partition.PartitionOwner;
 32  
 import org.apache.giraph.utils.MemoryUtils;
 33  
 import org.apache.hadoop.conf.Configuration;
 34  
 import org.apache.hadoop.io.Writable;
 35  
 import org.apache.hadoop.io.WritableComparable;
 36  
 import org.apache.hadoop.ipc.RPC;
 37  
 import org.apache.hadoop.ipc.RPC.Server;
 38  
 import org.apache.hadoop.mapreduce.Mapper;
 39  
 import org.apache.log4j.Logger;
 40  
 
 41  
 import com.google.common.collect.Iterables;
 42  
 import com.google.common.collect.Maps;
 43  
 
 44  
 import java.io.IOException;
 45  
 import java.net.BindException;
 46  
 import java.net.InetAddress;
 47  
 import java.net.InetSocketAddress;
 48  
 import java.util.ArrayList;
 49  
 import java.util.Collection;
 50  
 import java.util.Collections;
 51  
 import java.util.HashMap;
 52  
 import java.util.Iterator;
 53  
 import java.util.List;
 54  
 import java.util.Map;
 55  
 import java.util.Map.Entry;
 56  
 import java.util.Set;
 57  
 import java.util.TreeSet;
 58  
 import java.util.concurrent.ExecutionException;
 59  
 import java.util.concurrent.ExecutorService;
 60  
 import java.util.concurrent.Executors;
 61  
 import java.util.concurrent.Future;
 62  
 
 63  
 
 64  
 
 65  
 
 66  
 
 67  
 
 68  
 
 69  
 /**
 70  
  * Basic RPC communications object that implements the lower level operations
 71  
  * for RPC communication.
 72  
  *
 73  
  * @param <I> Vertex id
 74  
  * @param <V> Vertex data
 75  
  * @param <E> Edge data
 76  
  * @param <M> Message data
 77  
  * @param <J> Job token
 78  
  */
 79  
 @SuppressWarnings("rawtypes")
 80  3398
 public abstract class BasicRPCCommunications<I extends WritableComparable,
 81  
     V extends Writable, E extends Writable, M extends Writable, J>
 82  
     implements CommunicationsInterface<I, V, E, M>,
 83  
     WorkerClientServer<I, V, E, M> {
 84  
   /** Class logger */
 85  1
   private static final Logger LOG =
 86  
     Logger.getLogger(BasicRPCCommunications.class);
 87  
   /** Maximum number of vertices sent in a single RPC */
 88  
   private static final int MAX_VERTICES_PER_RPC = 1024;
 89  
   /** Hadoop configuration */
 90  
   protected final Configuration conf;
 91  
   /** Saved context for progress */
 92  
   private final Mapper<?, ?, ?, ?>.Context context;
 93  
   /** Indicates whether in superstep preparation */
 94  26
   private boolean inPrepareSuperstep = false;
 95  
   /** Local hostname */
 96  
   private final String localHostname;
 97  
   /** Name of RPC server, == myAddress.toString() */
 98  
   private final String myName;
 99  
   /** RPC server */
 100  
   private Server server;
 101  
   /** Centralized service, needed to get vertex ranges */
 102  
   private final CentralizedServiceWorker<I, V, E, M> service;
 103  
   /** Combiner instance, can be null */
 104  
   private final VertexCombiner<I, M> combiner;
 105  
   /** Address of RPC server */
 106  
   private InetSocketAddress myAddress;
 107  
   /** Messages sent during the last superstep */
 108  26
   private long totalMsgsSentInSuperstep = 0;
 109  
   /** Maximum messages sent per putVertexIdMessagesList RPC */
 110  
   private final int maxMessagesPerFlushPut;
 111  
   /**
 112  
    * Map of the peer connections, mapping from remote socket address to client
 113  
    * meta data
 114  
    */
 115  26
   private final Map<InetSocketAddress, PeerConnection> peerConnections =
 116  
       new HashMap<InetSocketAddress, PeerConnection>();
 117  
   /**
 118  
    * Cached map of partition ids to remote socket address.  Needs to be
 119  
    * synchronized.
 120  
    */
 121  26
   private final Map<Integer, InetSocketAddress> partitionIndexAddressMap =
 122  
       new HashMap<Integer, InetSocketAddress>();
 123  
   /**
 124  
    * Thread pool for message flush threads
 125  
    */
 126  
   private final ExecutorService executor;
 127  
   /**
 128  
    * Map of outbound messages, mapping from remote server to
 129  
    * destination vertex index to list of messages
 130  
    * (Synchronized between peer threads and main thread for each internal
 131  
    *  map)
 132  
    */
 133  26
   private final Map<InetSocketAddress, Map<I, MsgList<M>>> outMessages =
 134  
       new HashMap<InetSocketAddress, Map<I, MsgList<M>>>();
 135  
   /**
 136  
    * Map of incoming messages, mapping from vertex index to list of messages.
 137  
    * Only accessed by the main thread (no need to synchronize).
 138  
    */
 139  26
   private final Map<I, List<M>> inMessages = new HashMap<I, List<M>>();
 140  
   /**
 141  
    * Map of inbound messages, mapping from vertex index to list of messages.
 142  
    * Transferred to inMessages at beginning of a superstep.  This
 143  
    * intermediary step exists so that the combiner will run not only at the
 144  
    * client, but also at the server. Also, allows the sending of large
 145  
    * message lists during the superstep computation. (Synchronized)
 146  
    */
 147  26
   private final Map<I, List<M>> transientInMessages =
 148  
       new HashMap<I, List<M>>();
 149  
   /**
 150  
    * Map of partition ids to incoming vertices from other workers.
 151  
    * (Synchronized)
 152  
    */
 153  26
   private final Map<Integer, Collection<Vertex<I, V, E, M>>>
 154  
   inPartitionVertexMap = Maps.newHashMap();
 155  
 
 156  
   /**
 157  
    * Map from vertex index to all vertex mutations
 158  
    */
 159  26
   private final Map<I, VertexMutations<I, V, E, M>> inVertexMutationsMap =
 160  
     new HashMap<I, VertexMutations<I, V, E, M>>();
 161  
 
 162  
   /** Maximum size of cached message list, before sending it out */
 163  
   private final int maxSize;
 164  
   /** Cached job id */
 165  
   private final String jobId;
 166  
   /** Cached job token */
 167  
   private final J jobToken;
 168  
 
 169  
 
 170  
   /**
 171  
    * PeerConnection contains RPC client and accumulated messages
 172  
    * for a specific peer.
 173  
    */
 174  3456
   private class PeerConnection {
 175  
     /**
 176  
      * Map of outbound messages going to a particular remote server,
 177  
      * mapping from the destination vertex to a list of messages.
 178  
      * (Synchronized with itself).
 179  
      */
 180  
     private final Map<I, MsgList<M>> outMessagesPerPeer;
 181  
     /**
 182  
      * Client interface: RPC proxy for remote server, this class for local
 183  
      */
 184  
     private final CommunicationsInterface<I, V, E, M> peer;
 185  
     /** Boolean, set to false when local client (self), true otherwise */
 186  
     private final boolean isProxy;
 187  
 
 188  
     /**
 189  
      * Constructor
 190  
      * @param idMessageMap Map of vertex id to message list
 191  
      * @param peerConnection Peer connection
 192  
      * @param isProxy Is this a proxy (true) or local (false)?
 193  
      */
 194  
     public PeerConnection(Map<I, MsgList<M>> idMessageMap,
 195  
         CommunicationsInterface<I, V, E, M> peerConnection,
 196  23
         boolean isProxy) {
 197  
 
 198  23
       this.outMessagesPerPeer = idMessageMap;
 199  23
       this.peer = peerConnection;
 200  23
       this.isProxy = isProxy;
 201  23
     }
 202  
 
 203  
     /**
 204  
      * Nothing to do here to cleanup, just notify.
 205  
      */
 206  
     public void close() {
 207  23
       if (LOG.isDebugEnabled()) {
 208  0
         LOG.debug("close: Done");
 209  
       }
 210  23
     }
 211  
 
 212  
     /**
 213  
      * Get the RPC proxy of this connection.
 214  
      *
 215  
      * @return RPC proxy of this connection.
 216  
      */
 217  
     public CommunicationsInterface<I, V, E, M> getRPCProxy() {
 218  484
       return peer;
 219  
     }
 220  
 
 221  
     @Override
 222  
     public String toString() {
 223  0
       return peer.getName() + ", proxy=" + isProxy;
 224  
     }
 225  
   }
 226  
 
 227  
   /**
 228  
    * Runnable to flush messages to a given connection.
 229  
    */
 230  
   private class PeerFlushExecutor implements Runnable {
 231  
     /** Report on the status of this flusher if this interval was exceeded */
 232  
     private static final int REPORTING_INTERVAL_MIN_MILLIS = 60000;
 233  
     /** Connection to send the messages to. */
 234  
     private final PeerConnection peerConnection;
 235  
     /** Saved context. */
 236  
     private final Mapper<?, ?, ?, ?>.Context context;
 237  
 
 238  
     /**
 239  
      * Constructor.
 240  
      *
 241  
      * @param peerConnection Connection to send the messsages to.
 242  
      * @param context Context of the mapper.
 243  
      */
 244  
     PeerFlushExecutor(PeerConnection peerConnection,
 245  412
         Mapper<?, ?, ?, ?>.Context context) {
 246  412
       this.peerConnection = peerConnection;
 247  412
       this.context = context;
 248  412
     }
 249  
 
 250  
     @Override
 251  
     public void run() {
 252  412
       CommunicationsInterface<I, V, E, M> proxy = peerConnection.getRPCProxy();
 253  412
       long startMillis = System.currentTimeMillis();
 254  412
       long lastReportedMillis = startMillis;
 255  
       try {
 256  412
         int verticesDone = 0;
 257  412
         synchronized (peerConnection.outMessagesPerPeer) {
 258  412
           final int vertices =
 259  
               peerConnection.outMessagesPerPeer.size();
 260  
           // 1. Check for null messages and combine if possible
 261  
           // 2. Send vertex ids and messages in bulk to the
 262  
           //    destination servers.
 263  
           for (Entry<I, MsgList<M>> entry :
 264  412
             peerConnection.outMessagesPerPeer.entrySet()) {
 265  1536
             for (M msg : entry.getValue()) {
 266  2669
               if (msg == null) {
 267  0
                 throw new IllegalArgumentException(
 268  
                     "run: Cannot put null message on " +
 269  
                         "vertex id " + entry.getKey());
 270  
               }
 271  
             }
 272  1536
             if (combiner != null && entry.getValue().size() > 1) {
 273  27
               Iterable<M> messages = combiner.combine(
 274  
                   entry.getKey(), entry.getValue());
 275  27
               if (messages == null) {
 276  0
                 throw new IllegalStateException(
 277  
                     "run: Combiner cannot return null");
 278  
               }
 279  27
               if (Iterables.size(entry.getValue()) <
 280  
                   Iterables.size(messages)) {
 281  0
                 throw new IllegalStateException(
 282  
                     "run: The number of combined " +
 283  
                         "messages is required to be <= to " +
 284  
                     "number of messages to be combined");
 285  
               }
 286  27
               entry.getValue().clear();
 287  27
               for (M msg: messages) {
 288  27
                 entry.getValue().add(msg);
 289  
               }
 290  
             }
 291  1536
             if (entry.getValue().isEmpty()) {
 292  0
               throw new IllegalStateException(
 293  
                   "run: Impossible for no messages in " +
 294  
                       entry.getKey());
 295  
             }
 296  
           }
 297  548
           while (!peerConnection.outMessagesPerPeer.isEmpty()) {
 298  136
             int bulkedMessages = 0;
 299  136
             Iterator<Entry<I, MsgList<M>>> vertexIdMessagesListIt =
 300  
                 peerConnection.outMessagesPerPeer.entrySet().
 301  
                 iterator();
 302  136
             VertexIdMessagesList<I, M> vertexIdMessagesList =
 303  
                 new VertexIdMessagesList<I, M>();
 304  1672
             while (vertexIdMessagesListIt.hasNext()) {
 305  1536
               Entry<I, MsgList<M>> entry =
 306  
                   vertexIdMessagesListIt.next();
 307  
               // Add this entry if the list is empty or we
 308  
               // haven't reached the maximum number of messages
 309  1536
               if (vertexIdMessagesList.isEmpty() ||
 310  
                   ((bulkedMessages + entry.getValue().size()) <
 311  
                      maxMessagesPerFlushPut)) {
 312  1536
                 vertexIdMessagesList.add(
 313  
                     new VertexIdMessages<I, M>(
 314  
                         entry.getKey(), entry.getValue()));
 315  1536
                 bulkedMessages += entry.getValue().size();
 316  
               }
 317  1536
             }
 318  
 
 319  
             // Clean up references to the vertex id and messages
 320  
             for (VertexIdMessages<I, M> vertexIdMessages :
 321  136
                 vertexIdMessagesList) {
 322  1536
               peerConnection.outMessagesPerPeer.remove(
 323  
                   vertexIdMessages.getVertexId());
 324  
             }
 325  
 
 326  136
             proxy.putVertexIdMessagesList(vertexIdMessagesList);
 327  136
             context.progress();
 328  
 
 329  136
             verticesDone += vertexIdMessagesList.size();
 330  136
             long curMillis = System.currentTimeMillis();
 331  136
             if ((lastReportedMillis +
 332  
                 REPORTING_INTERVAL_MIN_MILLIS) < curMillis) {
 333  0
               lastReportedMillis = curMillis;
 334  0
               if (LOG.isInfoEnabled()) {
 335  0
                 float percentDone =
 336  
                     (100f * verticesDone) /
 337  
                     vertices;
 338  0
                 float minutesUsed =
 339  
                     (curMillis - startMillis) / 1000f / 60f;
 340  0
                 float minutesRemaining =
 341  
                     (minutesUsed * 100f / percentDone) -
 342  
                     minutesUsed;
 343  0
                 LOG.info("run: " + peerConnection + ", " +
 344  
                     verticesDone + " out of " +
 345  
                     vertices  +
 346  
                     " done in " + minutesUsed +
 347  
                     " minutes, " +
 348  
                     percentDone + "% done, ETA " +
 349  
                     minutesRemaining +
 350  
                     " minutes remaining, " +
 351  
                     MemoryUtils.getRuntimeMemoryStats());
 352  
               }
 353  
             }
 354  136
           }
 355  412
         }
 356  
 
 357  412
         if (LOG.isDebugEnabled()) {
 358  0
           LOG.debug("run: " + proxy.getName() +
 359  
               ": all messages flushed");
 360  
         }
 361  0
       } catch (IOException e) {
 362  0
         LOG.error(e);
 363  0
         if (peerConnection.isProxy) {
 364  0
           RPC.stopProxy(peerConnection.peer);
 365  
         }
 366  0
         throw new RuntimeException(e);
 367  412
       }
 368  412
     }
 369  
   }
 370  
 
 371  
   /**
 372  
    * LargeMessageFlushExecutor flushes all outgoing messages destined to
 373  
    * some vertices.  This is executed when the number of messages destined to
 374  
    * certain vertex exceeds <i>maxSize</i>.
 375  
    */
 376  
   private class LargeMessageFlushExecutor implements Runnable {
 377  
     /** Destination vertex of message. */
 378  
     private final I destVertex;
 379  
     /** List of messages to the destination vertex */
 380  
     private final MsgList<M> outMessageList;
 381  
     /** Connection to send the message to. */
 382  
     private PeerConnection peerConnection;
 383  
 
 384  
     /**
 385  
      * Constructor of the executor for flushing large messages.
 386  
      *
 387  
      * @param peerConnection Connection to send the message to.
 388  
      * @param destVertex Destination vertex of message.
 389  
      */
 390  0
     LargeMessageFlushExecutor(PeerConnection peerConnection, I destVertex) {
 391  0
       this.peerConnection = peerConnection;
 392  0
       synchronized (peerConnection.outMessagesPerPeer) {
 393  0
         this.destVertex = destVertex;
 394  0
         outMessageList =
 395  
             peerConnection.outMessagesPerPeer.get(destVertex);
 396  0
         peerConnection.outMessagesPerPeer.remove(destVertex);
 397  0
       }
 398  0
     }
 399  
 
 400  
     @Override
 401  
     public void run() {
 402  
       try {
 403  0
         CommunicationsInterface<I, V, E, M> proxy =
 404  
             peerConnection.getRPCProxy();
 405  
 
 406  0
         if (combiner != null) {
 407  0
           Iterable<M> messages = combiner.combine(destVertex,
 408  
               outMessageList);
 409  0
           if (messages == null) {
 410  0
             throw new IllegalStateException(
 411  
                 "run: Combiner cannot return null");
 412  
           }
 413  0
           if (Iterables.size(outMessageList) <
 414  
               Iterables.size(messages)) {
 415  0
             throw new IllegalStateException(
 416  
                 "run: The number of combined messages is " +
 417  
                     "required to be <= to the number of " +
 418  
                 "messages to be combined");
 419  
           }
 420  0
           for (M msg: messages) {
 421  0
             proxy.putMsg(destVertex, msg);
 422  
           }
 423  0
         } else {
 424  0
           proxy.putMsgList(destVertex, outMessageList);
 425  
         }
 426  0
       } catch (IOException e) {
 427  0
         LOG.error(e);
 428  0
         if (peerConnection.isProxy) {
 429  0
           RPC.stopProxy(peerConnection.peer);
 430  
         }
 431  0
         throw new RuntimeException("run: Got IOException", e);
 432  
       } finally {
 433  0
         outMessageList.clear();
 434  0
       }
 435  0
     }
 436  
   }
 437  
 
 438  
   /**
 439  
    * Only constructor.
 440  
    *
 441  
    * @param context Context for getting configuration
 442  
    * @param service Service worker to get the vertex ranges
 443  
    * @throws IOException
 444  
    * @throws UnknownHostException
 445  
    * @throws InterruptedException
 446  
    */
 447  
   public BasicRPCCommunications(Mapper<?, ?, ?, ?>.Context context,
 448  
     CentralizedServiceWorker<I, V, E, M> service)
 449  26
     throws IOException, InterruptedException {
 450  26
     this.service = service;
 451  26
     this.context = context;
 452  26
     this.conf = context.getConfiguration();
 453  26
     this.maxSize = conf.getInt(GiraphJob.MSG_SIZE,
 454  
         GiraphJob.MSG_SIZE_DEFAULT);
 455  26
     this.maxMessagesPerFlushPut =
 456  
         conf.getInt(GiraphJob.MAX_MESSAGES_PER_FLUSH_PUT,
 457  
             GiraphJob.DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT);
 458  26
     if (BspUtils.getVertexCombinerClass(conf) == null) {
 459  23
       this.combiner = null;
 460  
     } else {
 461  3
       this.combiner = BspUtils.createVertexCombiner(conf);
 462  
     }
 463  
 
 464  26
     this.localHostname = InetAddress.getLocalHost().getHostName();
 465  26
     int taskId = conf.getInt("mapred.task.partition", -1);
 466  26
     int numTasks = conf.getInt("mapred.map.tasks", 1);
 467  
 
 468  
 
 469  
 
 470  26
     int numHandlers = conf.getInt(GiraphJob.RPC_NUM_HANDLERS,
 471  
         GiraphJob.RPC_NUM_HANDLERS_DEFAULT);
 472  26
     if (numTasks < numHandlers) {
 473  26
       numHandlers = numTasks;
 474  
     }
 475  26
     this.jobToken = createJobToken();
 476  26
     this.jobId = context.getJobID().toString();
 477  
 
 478  26
     int numWorkers = conf.getInt(GiraphJob.MAX_WORKERS, numTasks);
 479  
     // If the number of flush threads is unset, it is set to
 480  
     // the number of max workers - 1 or a minimum of 1.
 481  26
     int numFlushThreads =
 482  
         Math.max(conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
 483  
             numWorkers - 1),
 484  
             1);
 485  26
     this.executor = Executors.newFixedThreadPool(numFlushThreads);
 486  
 
 487  
     // Simple handling of port collisions on the same machine while
 488  
     // preserving debugability from the port number alone.
 489  
     // Round up the max number of workers to the next power of 10 and use
 490  
     // it as a constant to increase the port number with.
 491  26
     int portIncrementConstant =
 492  
         (int) Math.pow(10, Math.ceil(Math.log10(numWorkers)));
 493  26
     String bindAddress = localHostname;
 494  26
     int bindPort = conf.getInt(GiraphJob.RPC_INITIAL_PORT,
 495  
         GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
 496  
         taskId;
 497  26
     int bindAttempts = 0;
 498  26
     final int maxRpcPortBindAttempts =
 499  
         conf.getInt(GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS,
 500  
             GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT);
 501  26
     final boolean failFirstPortBindingAttempt =
 502  
         conf.getBoolean(GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT,
 503  
             GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT);
 504  30
     while (bindAttempts < maxRpcPortBindAttempts) {
 505  30
       this.myAddress = new InetSocketAddress(bindAddress, bindPort);
 506  30
       if (failFirstPortBindingAttempt && bindAttempts == 0) {
 507  1
         LOG.info("BasicRPCCommunications: Intentionally fail first " +
 508  
             "binding attempt as giraph.failFirstRpcPortBindAttempt " +
 509  
             "is true, port " + bindPort);
 510  1
         ++bindAttempts;
 511  1
         bindPort += portIncrementConstant;
 512  1
         continue;
 513  
       }
 514  
 
 515  
       try {
 516  29
         this.server =
 517  
             getRPCServer(
 518  
                 myAddress, numHandlers, this.jobId, this.jobToken);
 519  26
         break;
 520  3
       } catch (BindException e) {
 521  3
         LOG.info("BasicRPCCommunications: Failed to bind with port " +
 522  
             bindPort + " on bind attempt " + bindAttempts);
 523  3
         ++bindAttempts;
 524  3
         bindPort += portIncrementConstant;
 525  3
       }
 526  
     }
 527  26
     if (bindAttempts == maxRpcPortBindAttempts || this.server == null) {
 528  0
       throw new IllegalStateException(
 529  
           "BasicRPCCommunications: Failed to start RPCServer with " +
 530  
               maxRpcPortBindAttempts + " attempts");
 531  
     }
 532  
 
 533  26
     this.server.start();
 534  26
     this.myName = myAddress.toString();
 535  
 
 536  26
     if (LOG.isInfoEnabled()) {
 537  26
       LOG.info("BasicRPCCommunications: Started RPC " +
 538  
           "communication server: " + myName + " with " +
 539  
           numHandlers + " handlers and " + numFlushThreads +
 540  
           " flush threads on bind attempt " + bindAttempts);
 541  
     }
 542  26
   }
 543  
 
 544  
   /**
 545  
    * Submit a large message to be sent.
 546  
    *
 547  
    * @param addr Message destination.
 548  
    * @param destVertex Index of the destination vertex.
 549  
    */
 550  
   private void submitLargeMessageSend(InetSocketAddress addr, I destVertex) {
 551  0
     PeerConnection pc = peerConnections.get(addr);
 552  0
     executor.execute(new LargeMessageFlushExecutor(pc, destVertex));
 553  0
   }
 554  
 
 555  
   /**
 556  
    * Create the job token.
 557  
    *
 558  
    * @return Job token.
 559  
    * @throws IOException
 560  
    */
 561  
   protected abstract J createJobToken() throws IOException;
 562  
 
 563  
   /**
 564  
    * Get the RPC server.
 565  
    * @param addr Server address.
 566  
    * @param numHandlers Number of handlers.
 567  
    * @param jobId Job id.
 568  
    * @param jobToken Job token.
 569  
    * @return RPC server.
 570  
    * @throws IOException
 571  
    */
 572  
   protected abstract Server getRPCServer(InetSocketAddress addr,
 573  
     int numHandlers, String jobId, J jobToken) throws IOException;
 574  
 
 575  
   /**
 576  
    * Get the final port of the RPC server that it bound to.
 577  
    *
 578  
    * @return Port that RPC server was bound to.
 579  
    */
 580  
   public int getPort() {
 581  26
     return myAddress.getPort();
 582  
   }
 583  
 
 584  
   @Override
 585  
   public void setup() {
 586  
     try {
 587  23
       connectAllRPCProxys(this.jobId, this.jobToken);
 588  0
     } catch (IOException e) {
 589  0
       throw new IllegalStateException("setup: Got IOException", e);
 590  0
     } catch (InterruptedException e) {
 591  0
       throw new IllegalStateException("setup: Got InterruptedException",
 592  
           e);
 593  23
     }
 594  23
   }
 595  
 
 596  
   /**
 597  
    * Get the RPC proxy (handled by subclasses)
 598  
    *
 599  
    * @param addr Socket address.
 600  
    * @param jobId Job id.
 601  
    * @param jobToken Jobtoken (if any)
 602  
    * @return The RPC proxy.
 603  
    * @throws IOException
 604  
    * @throws InterruptedException
 605  
    */
 606  
   protected abstract CommunicationsInterface<I, V, E, M> getRPCProxy(
 607  
     final InetSocketAddress addr, String jobId, J jobToken)
 608  
     throws IOException, InterruptedException;
 609  
 
 610  
   /**
 611  
    * Establish connections to every RPC proxy server that will be used in
 612  
    * the upcoming messaging.  This method is idempotent.
 613  
    *
 614  
    * @param jobId Stringified job id
 615  
    * @param jobToken used for
 616  
    * @throws InterruptedException
 617  
    * @throws IOException
 618  
    */
 619  
   private void connectAllRPCProxys(String jobId, J jobToken)
 620  
     throws IOException, InterruptedException {
 621  206
     final int maxTries = 5;
 622  206
     for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
 623  206
       int tries = 0;
 624  206
       while (tries < maxTries) {
 625  
         try {
 626  206
           startPeerConnectionThread(
 627  
               partitionOwner.getWorkerInfo(), jobId, jobToken);
 628  206
           break;
 629  0
         } catch (IOException e) {
 630  0
           LOG.warn("connectAllRPCProxys: Failed on attempt " +
 631  
               tries + " of " + maxTries +
 632  
               " to connect to " + partitionOwner.toString(), e);
 633  0
           ++tries;
 634  0
         }
 635  
       }
 636  206
     }
 637  206
   }
 638  
 
 639  
   /**
 640  
    * Creates the connections to remote RPCs if any only if the inet socket
 641  
    * address doesn't already exist.
 642  
    *
 643  
    * @param workerInfo My worker info
 644  
    * @param jobId Id of the job
 645  
    * @param jobToken Required for secure Hadoop
 646  
    * @throws IOException
 647  
    * @throws InterruptedException
 648  
    */
 649  
   private void startPeerConnectionThread(WorkerInfo workerInfo,
 650  
       String jobId,
 651  
       J jobToken) throws IOException, InterruptedException {
 652  206
     if (LOG.isDebugEnabled()) {
 653  0
       LOG.debug("startPeerConnectionThread: hostname " +
 654  
           workerInfo.getHostname() + ", port " +
 655  
           workerInfo.getPort());
 656  
     }
 657  206
     final InetSocketAddress addr =
 658  
         new InetSocketAddress(workerInfo.getHostname(),
 659  
             workerInfo.getPort());
 660  
     // Cheap way to hold both the hostname and port (rather than
 661  
     // make a class)
 662  206
     InetSocketAddress addrUnresolved =
 663  
         InetSocketAddress.createUnresolved(addr.getHostName(),
 664  
             addr.getPort());
 665  206
     Map<I, MsgList<M>> outMsgMap = null;
 666  206
     boolean isProxy = true;
 667  206
     CommunicationsInterface<I, V, E, M> peer = this;
 668  206
     synchronized (outMessages) {
 669  206
       outMsgMap = outMessages.get(addrUnresolved);
 670  206
       if (LOG.isDebugEnabled()) {
 671  0
         LOG.debug("startPeerConnectionThread: Connecting to " +
 672  
             workerInfo.toString() + ", addr = " + addr +
 673  
             " if outMsgMap (" + outMsgMap + ") == null ");
 674  
       }
 675  206
       if (outMsgMap != null) { // this host has already been added
 676  183
         return;
 677  
       }
 678  
 
 679  23
       if (myName.equals(addr.toString())) {
 680  23
         isProxy = false;
 681  
       } else {
 682  0
         peer = getRPCProxy(addr, jobId, jobToken);
 683  
       }
 684  
 
 685  23
       outMsgMap = new HashMap<I, MsgList<M>>();
 686  23
       outMessages.put(addrUnresolved, outMsgMap);
 687  23
     }
 688  
 
 689  23
     PeerConnection peerConnection =
 690  
         new PeerConnection(outMsgMap, peer, isProxy);
 691  23
     peerConnections.put(addrUnresolved, peerConnection);
 692  23
   }
 693  
 
 694  
   @Override
 695  
   public final long getProtocolVersion(String protocol, long clientVersion)
 696  
     throws IOException {
 697  0
     return VERSION_ID;
 698  
   }
 699  
 
 700  
   
 701  
   
 702  
 
 703  
 
 704  
 
 705  
 
 706  
 
 707  
 
 708  
 
 709  
 
 710  
 
 711  
 
 712  
 
 713  
 
 714  
 
 715  
 
 716  
 
 717  
 
 718  
 
 719  
 
 720  
   @Override
 721  
   public void closeConnections() throws IOException {
 722  23
     for (PeerConnection pc : peerConnections.values()) {
 723  23
       pc.close();
 724  
     }
 725  23
   }
 726  
 
 727  
 
 728  
   @Override
 729  
   public final void close() {
 730  23
     LOG.info("close: shutting down RPC server");
 731  23
     server.stop();
 732  23
   }
 733  
 
 734  
   @Override
 735  
   public final void putMsg(I vertex, M msg) throws IOException {
 736  45
     List<M> msgs = null;
 737  45
     if (LOG.isDebugEnabled()) {
 738  0
       LOG.debug("putMsg: Adding msg " + msg + " on vertex " + vertex);
 739  
     }
 740  45
     if (inPrepareSuperstep) {
 741  
       // Called by combiner (main thread) during superstep preparation
 742  45
       msgs = inMessages.get(vertex);
 743  45
       if (msgs == null) {
 744  45
         msgs = new ArrayList<M>();
 745  45
         inMessages.put(vertex, msgs);
 746  
       }
 747  45
       msgs.add(msg);
 748  
     } else {
 749  0
       synchronized (transientInMessages) {
 750  0
         msgs = transientInMessages.get(vertex);
 751  0
         if (msgs == null) {
 752  0
           msgs = new ArrayList<M>();
 753  0
           transientInMessages.put(vertex, msgs);
 754  
         }
 755  0
       }
 756  0
       synchronized (msgs) {
 757  0
         msgs.add(msg);
 758  0
       }
 759  
     }
 760  45
   }
 761  
 
 762  
   @Override
 763  
   public final void putMsgList(I vertex,
 764  
       MsgList<M> msgList) throws IOException {
 765  0
     List<M> msgs = null;
 766  0
     if (LOG.isDebugEnabled()) {
 767  0
       LOG.debug("putMsgList: Adding msgList " + msgList +
 768  
           " on vertex " + vertex);
 769  
     }
 770  0
     synchronized (transientInMessages) {
 771  0
       msgs = transientInMessages.get(vertex);
 772  0
       if (msgs == null) {
 773  0
         msgs = new ArrayList<M>(msgList.size());
 774  0
         transientInMessages.put(vertex, msgs);
 775  
       }
 776  0
     }
 777  0
     synchronized (msgs) {
 778  0
       msgs.addAll(msgList);
 779  0
     }
 780  0
   }
 781  
 
 782  
   @Override
 783  
   public final void putVertexIdMessagesList(
 784  
     VertexIdMessagesList<I, M> vertexIdMessagesList)
 785  
     throws IOException {
 786  136
     if (LOG.isDebugEnabled()) {
 787  0
       LOG.debug("putVertexIdMessagesList: Adding msgList " +
 788  
           vertexIdMessagesList);
 789  
     }
 790  
 
 791  136
     List<M> messageList = null;
 792  136
     for (VertexIdMessages<I, M> vertexIdMessages : vertexIdMessagesList) {
 793  1536
       synchronized (transientInMessages) {
 794  1536
         messageList =
 795  
             transientInMessages.get(vertexIdMessages.getVertexId());
 796  1536
         if (messageList == null) {
 797  1536
           messageList = new ArrayList<M>(
 798  
               vertexIdMessages.getMessageList().size());
 799  1536
           transientInMessages.put(
 800  
               vertexIdMessages.getVertexId(), messageList);
 801  
         }
 802  1536
       }
 803  1536
       synchronized (messageList) {
 804  1536
         messageList.addAll(vertexIdMessages.getMessageList());
 805  1536
       }
 806  
     }
 807  136
   }
 808  
 
 809  
   @Override
 810  
   public final void putVertexList(int partitionId,
 811  
       VertexList<I, V, E, M> vertexList) throws IOException {
 812  22
     if (LOG.isDebugEnabled()) {
 813  0
       LOG.debug("putVertexList: On partition id " + partitionId +
 814  
           " adding vertex list of size " + vertexList.size());
 815  
     }
 816  22
     service.getPartitionStore().addPartitionVertices(partitionId, vertexList);
 817  22
   }
 818  
 
 819  
   @Override
 820  
   public final void addEdge(I sourceVertexId, I targetVertexId, E edgeValue) {
 821  10
     Edge<I, E> edge = new Edge<I, E>(targetVertexId, edgeValue);
 822  10
     if (LOG.isDebugEnabled()) {
 823  0
       LOG.debug("addEdge: Adding edge " + edge);
 824  
     }
 825  10
     synchronized (inVertexMutationsMap) {
 826  10
       VertexMutations<I, V, E, M> vertexMutations = null;
 827  10
       if (!inVertexMutationsMap.containsKey(sourceVertexId)) {
 828  0
         vertexMutations = new VertexMutations<I, V, E, M>();
 829  0
         inVertexMutationsMap.put(sourceVertexId, vertexMutations);
 830  
       } else {
 831  10
         vertexMutations = inVertexMutationsMap.get(sourceVertexId);
 832  
       }
 833  10
       vertexMutations.addEdge(edge);
 834  10
     }
 835  10
   }
 836  
 
 837  
   @Override
 838  
   public void removeEdge(I vertexIndex, I destinationVertexIndex) {
 839  20
     if (LOG.isDebugEnabled()) {
 840  0
       LOG.debug("removeEdge: Removing edge on destination " +
 841  
           destinationVertexIndex);
 842  
     }
 843  20
     synchronized (inVertexMutationsMap) {
 844  20
       VertexMutations<I, V, E, M> vertexMutations = null;
 845  20
       if (!inVertexMutationsMap.containsKey(vertexIndex)) {
 846  20
         vertexMutations = new VertexMutations<I, V, E, M>();
 847  20
         inVertexMutationsMap.put(vertexIndex, vertexMutations);
 848  
       } else {
 849  0
         vertexMutations = inVertexMutationsMap.get(vertexIndex);
 850  
       }
 851  20
       vertexMutations.removeEdge(destinationVertexIndex);
 852  20
     }
 853  20
   }
 854  
 
 855  
   @Override
 856  
   public final void addVertex(Vertex<I, V, E, M> vertex) {
 857  10
     if (LOG.isDebugEnabled()) {
 858  0
       LOG.debug("addVertex: Adding vertex " + vertex);
 859  
     }
 860  10
     synchronized (inVertexMutationsMap) {
 861  10
       VertexMutations<I, V, E, M> vertexMutations = null;
 862  10
       if (!inVertexMutationsMap.containsKey(vertex.getId())) {
 863  10
         vertexMutations = new VertexMutations<I, V, E, M>();
 864  10
         inVertexMutationsMap.put(vertex.getId(), vertexMutations);
 865  
       } else {
 866  0
         vertexMutations = inVertexMutationsMap.get(vertex.getId());
 867  
       }
 868  10
       vertexMutations.addVertex(vertex);
 869  10
     }
 870  10
   }
 871  
 
 872  
   @Override
 873  
   public void removeVertex(I vertexIndex) {
 874  10
     if (LOG.isDebugEnabled()) {
 875  0
       LOG.debug("removeVertex: Removing vertex " + vertexIndex);
 876  
     }
 877  10
     synchronized (inVertexMutationsMap) {
 878  10
       VertexMutations<I, V, E, M> vertexMutations = null;
 879  10
       if (!inVertexMutationsMap.containsKey(vertexIndex)) {
 880  10
         vertexMutations = new VertexMutations<I, V, E, M>();
 881  10
         inVertexMutationsMap.put(vertexIndex, vertexMutations);
 882  
       } else {
 883  0
         vertexMutations = inVertexMutationsMap.get(vertexIndex);
 884  
       }
 885  10
       vertexMutations.removeVertex();
 886  10
     }
 887  10
   }
 888  
 
 889  
   @Override
 890  
   public final void sendPartitionRequest(WorkerInfo workerInfo,
 891  
                                          Partition<I, V, E, M> partition) {
 892  
     // Internally, break up the sending so that the list doesn't get too
 893  
     // big.
 894  22
     VertexList<I, V, E, M> hadoopVertexList =
 895  
         new VertexList<I, V, E, M>();
 896  22
     InetSocketAddress addr =
 897  
         getInetSocketAddress(workerInfo, partition.getId());
 898  22
     CommunicationsInterface<I, V, E, M> rpcProxy =
 899  
         peerConnections.get(addr).getRPCProxy();
 900  
 
 901  22
     if (LOG.isDebugEnabled()) {
 902  0
       LOG.debug("sendPartitionRequest: Sending to " + rpcProxy.getName() +
 903  
           " " + addr + " from " + workerInfo +
 904  
           ", with partition " + partition);
 905  
     }
 906  22
     for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
 907  418
       hadoopVertexList.add(vertex);
 908  418
       if (hadoopVertexList.size() >= MAX_VERTICES_PER_RPC) {
 909  
         try {
 910  0
           rpcProxy.putVertexList(partition.getId(),
 911  
               hadoopVertexList);
 912  0
         } catch (IOException e) {
 913  0
           throw new RuntimeException(e);
 914  0
         }
 915  0
         hadoopVertexList.clear();
 916  
       }
 917  
     }
 918  22
     if (hadoopVertexList.size() > 0) {
 919  
       try {
 920  22
         rpcProxy.putVertexList(partition.getId(),
 921  
             hadoopVertexList);
 922  0
       } catch (IOException e) {
 923  0
         throw new RuntimeException(e);
 924  22
       }
 925  
     }
 926  22
   }
 927  
 
 928  
   /**
 929  
    * Fill the socket address cache for the worker info and its partition.
 930  
    *
 931  
    * @param workerInfo Worker information to get the socket address
 932  
    * @param partitionId Partition id to look up.
 933  
    * @return address of the vertex range server containing this vertex
 934  
    */
 935  
   private InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo,
 936  
                                                  int partitionId) {
 937  2741
     synchronized (partitionIndexAddressMap) {
 938  2741
       InetSocketAddress address =
 939  
           partitionIndexAddressMap.get(partitionId);
 940  2741
       if (address == null) {
 941  22
         address = InetSocketAddress.createUnresolved(
 942  
             workerInfo.getHostname(),
 943  
             workerInfo.getPort());
 944  22
         partitionIndexAddressMap.put(partitionId, address);
 945  
       }
 946  
 
 947  2741
       if (address.getPort() != workerInfo.getPort() ||
 948  
           !address.getHostName().equals(workerInfo.getHostname())) {
 949  0
         throw new IllegalStateException(
 950  
             "getInetSocketAddress: Impossible that address " +
 951  
                 address + " does not match " + workerInfo);
 952  
       }
 953  
 
 954  2741
       return address;
 955  0
     }
 956  
   }
 957  
 
 958  
   /**
 959  
    * Fill the socket address cache for the partition owner.
 960  
    *
 961  
    * @param destVertex vertex to be sent
 962  
    * @return address of the vertex range server containing this vertex
 963  
    */
 964  
   private InetSocketAddress getInetSocketAddress(I destVertex) {
 965  2719
     PartitionOwner partitionOwner =
 966  
         service.getVertexPartitionOwner(destVertex);
 967  2719
     return getInetSocketAddress(partitionOwner.getWorkerInfo(),
 968  
         partitionOwner.getPartitionId());
 969  
   }
 970  
 
 971  
   @Override
 972  
   public final void sendMessageRequest(I destVertex, M msg) {
 973  2669
     InetSocketAddress addr = getInetSocketAddress(destVertex);
 974  2669
     if (LOG.isDebugEnabled()) {
 975  0
       LOG.debug("sendMessage: Send bytes (" + msg.toString() +
 976  
           ") to " + destVertex + " with address " + addr);
 977  
     }
 978  2669
     ++totalMsgsSentInSuperstep;
 979  2669
     Map<I, MsgList<M>> msgMap = null;
 980  2669
     synchronized (outMessages) {
 981  2669
       msgMap = outMessages.get(addr);
 982  2669
     }
 983  2669
     if (msgMap == null) { // should never happen after constructor
 984  0
       throw new RuntimeException(
 985  
           "sendMessage: msgMap did not exist for " + addr +
 986  
           " for vertex " + destVertex);
 987  
     }
 988  
 
 989  2669
     synchronized (msgMap) {
 990  2669
       MsgList<M> msgList = msgMap.get(destVertex);
 991  2669
       if (msgList == null) { // should only happen once
 992  1536
         msgList = new MsgList<M>();
 993  1536
         msgMap.put(destVertex, msgList);
 994  
       }
 995  2669
       msgList.add(msg);
 996  2669
       if (LOG.isDebugEnabled()) {
 997  0
         LOG.debug("sendMessage: added msg=" + msg + ", size=" +
 998  
             msgList.size());
 999  
       }
 1000  2669
       if (msgList.size() > maxSize) {
 1001  0
         submitLargeMessageSend(addr, destVertex);
 1002  
       }
 1003  2669
     }
 1004  2669
   }
 1005  
 
 1006  
   @Override
 1007  
   public final void addEdgeRequest(I destVertex, Edge<I, E> edge)
 1008  
     throws IOException {
 1009  10
     InetSocketAddress addr = getInetSocketAddress(destVertex);
 1010  10
     if (LOG.isDebugEnabled()) {
 1011  0
       LOG.debug("addEdgeRequest: Add edge (" + edge.toString() + ") to " +
 1012  
           destVertex + " with address " + addr);
 1013  
     }
 1014  10
     CommunicationsInterface<I, V, E, M> rpcProxy =
 1015  
         peerConnections.get(addr).getRPCProxy();
 1016  10
     rpcProxy.addEdge(destVertex, edge.getTargetVertexId(), edge.getValue());
 1017  10
   }
 1018  
 
 1019  
   @Override
 1020  
   public final void removeEdgeRequest(I vertexIndex, I destVertexIndex)
 1021  
     throws IOException {
 1022  20
     InetSocketAddress addr = getInetSocketAddress(vertexIndex);
 1023  20
     if (LOG.isDebugEnabled()) {
 1024  0
       LOG.debug("removeEdgeRequest: remove edge (" + destVertexIndex +
 1025  
                 ") from" + vertexIndex + " with address " + addr);
 1026  
     }
 1027  20
     CommunicationsInterface<I, V, E, M> rpcProxy =
 1028  
         peerConnections.get(addr).getRPCProxy();
 1029  20
     rpcProxy.removeEdge(vertexIndex, destVertexIndex);
 1030  20
   }
 1031  
 
 1032  
   @Override
 1033  
   public final void addVertexRequest(Vertex<I, V, E, M> vertex)
 1034  
     throws IOException {
 1035  10
     InetSocketAddress addr = getInetSocketAddress(vertex.getId());
 1036  10
     if (LOG.isDebugEnabled()) {
 1037  0
       LOG.debug("addVertexRequest: Add vertex (" + vertex + ") " +
 1038  
                 " with address " + addr);
 1039  
     }
 1040  10
     CommunicationsInterface<I, V, E, M> rpcProxy =
 1041  
         peerConnections.get(addr).getRPCProxy();
 1042  10
     rpcProxy.addVertex(vertex);
 1043  10
   }
 1044  
 
 1045  
   @Override
 1046  
   public void removeVertexRequest(I vertexIndex) throws IOException {
 1047  10
     InetSocketAddress addr =
 1048  
         getInetSocketAddress(vertexIndex);
 1049  10
     if (LOG.isDebugEnabled()) {
 1050  0
       LOG.debug("removeVertexRequest: Remove vertex index (" +
 1051  
                 vertexIndex + ")  with address " + addr);
 1052  
     }
 1053  10
     CommunicationsInterface<I, V, E, M> rpcProxy =
 1054  
         peerConnections.get(addr).getRPCProxy();
 1055  10
     rpcProxy.removeVertex(vertexIndex);
 1056  10
   }
 1057  
 
 1058  
   @Override
 1059  
   public void flush() throws IOException {
 1060  412
     if (LOG.isInfoEnabled()) {
 1061  412
       LOG.info("flush: starting for superstep " +
 1062  
           service.getSuperstep() + " " +
 1063  
           MemoryUtils.getRuntimeMemoryStats());
 1064  
     }
 1065  412
     for (List<M> msgList : inMessages.values()) {
 1066  5
       msgList.clear();
 1067  
     }
 1068  412
     inMessages.clear();
 1069  
 
 1070  412
     Collection<Future<?>> futures = new ArrayList<Future<?>>();
 1071  
 
 1072  
     // randomize peers in order to avoid hotspot on racks
 1073  412
     List<PeerConnection> peerList =
 1074  
         new ArrayList<PeerConnection>(peerConnections.values());
 1075  412
     Collections.shuffle(peerList);
 1076  
 
 1077  412
     for (PeerConnection pc : peerList) {
 1078  412
       futures.add(executor.submit(new PeerFlushExecutor(pc, context)));
 1079  
     }
 1080  
 
 1081  
     // wait for all flushes
 1082  412
     for (Future<?> future : futures) {
 1083  
       try {
 1084  412
         future.get();
 1085  412
         context.progress();
 1086  0
       } catch (InterruptedException e) {
 1087  0
         throw new IllegalStateException("flush: Got IOException", e);
 1088  0
       } catch (ExecutionException e) {
 1089  0
         throw new IllegalStateException(
 1090  
             "flush: Got ExecutionException", e);
 1091  412
       }
 1092  
     }
 1093  
 
 1094  412
     if (LOG.isInfoEnabled()) {
 1095  412
       LOG.info("flush: ended for superstep " +
 1096  
           service.getSuperstep() + " " +
 1097  
           MemoryUtils.getRuntimeMemoryStats());
 1098  
     }
 1099  412
   }
 1100  
 
 1101  
   @Override
 1102  
   public long resetMessageCount() {
 1103  206
     long msgs = totalMsgsSentInSuperstep;
 1104  206
     totalMsgsSentInSuperstep = 0;
 1105  206
     return msgs;
 1106  
   }
 1107  
 
 1108  
   @Override
 1109  
   public void prepareSuperstep() {
 1110  183
     if (LOG.isInfoEnabled()) {
 1111  183
       LOG.info("prepareSuperstep: Superstep " +
 1112  
           service.getSuperstep() + " " +
 1113  
           MemoryUtils.getRuntimeMemoryStats());
 1114  
     }
 1115  183
     inPrepareSuperstep = true;
 1116  
 
 1117  
     // Combine and put the transient messages into the inMessages.
 1118  183
     synchronized (transientInMessages) {
 1119  183
       for (Entry<I, List<M>> entry : transientInMessages.entrySet()) {
 1120  1536
         if (combiner != null) {
 1121  
           try {
 1122  45
             Iterable<M> messages =
 1123  
                 combiner.combine(entry.getKey(),
 1124  
                     entry.getValue());
 1125  45
             if (messages == null) {
 1126  0
               throw new IllegalStateException(
 1127  
                   "prepareSuperstep: Combiner cannot " +
 1128  
                       "return null");
 1129  
             }
 1130  45
             if (Iterables.size(entry.getValue()) <
 1131  
                 Iterables.size(messages)) {
 1132  0
               throw new IllegalStateException(
 1133  
                   "prepareSuperstep: The number of " +
 1134  
                       "combined messages is " +
 1135  
                       "required to be <= to the number of " +
 1136  
                   "messages to be combined");
 1137  
             }
 1138  45
             for (M msg: messages) {
 1139  45
               putMsg(entry.getKey(), msg);
 1140  
             }
 1141  0
           } catch (IOException e) {
 1142  
             // no actual IO -- should never happen
 1143  0
             throw new RuntimeException(e);
 1144  45
           }
 1145  
         } else {
 1146  1491
           List<M> msgs = inMessages.get(entry.getKey());
 1147  1491
           if (msgs == null) {
 1148  1491
             msgs = new ArrayList<M>();
 1149  1491
             inMessages.put(entry.getKey(), msgs);
 1150  
           }
 1151  1491
           msgs.addAll(entry.getValue());
 1152  
         }
 1153  1536
         entry.getValue().clear();
 1154  
       }
 1155  183
       transientInMessages.clear();
 1156  183
     }
 1157  
 
 1158  183
     if (inMessages.size() > 0) {
 1159  
       // Assign the messages to each destination vertex (getting rid of
 1160  
       // the old ones)
 1161  
       for (Partition<I, V, E, M> partition :
 1162  136
           service.getPartitionStore().getPartitions()) {
 1163  136
         for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
 1164  1685
           List<M> msgList = inMessages.get(vertex.getId());
 1165  1685
           if (msgList != null) {
 1166  1531
             if (LOG.isDebugEnabled()) {
 1167  0
               LOG.debug("prepareSuperstep: Assigning " +
 1168  
                   msgList.size() +
 1169  
                   " mgs to vertex index " + vertex);
 1170  
             }
 1171  1531
             for (M msg : msgList) {
 1172  2624
               if (msg == null) {
 1173  0
                 LOG.warn("prepareSuperstep: Null message " +
 1174  
                     "in inMessages");
 1175  
               }
 1176  
             }
 1177  1531
             service.assignMessagesToVertex(vertex, msgList);
 1178  1531
             msgList.clear();
 1179  1531
             if (inMessages.remove(vertex.getId()) == null) {
 1180  0
               throw new IllegalStateException(
 1181  
                   "prepareSuperstep: Impossible to not remove " +
 1182  
                       vertex);
 1183  
             }
 1184  
           }
 1185  1685
         }
 1186  
       }
 1187  
     }
 1188  
 
 1189  183
     inPrepareSuperstep = false;
 1190  
 
 1191  
     // Resolve what happens when messages are sent to non-existent vertices
 1192  
     // and vertices that have mutations.  Also make sure that the messages
 1193  
     // are being sent to the correct destination
 1194  183
     Set<I> resolveVertexIndexSet = new TreeSet<I>();
 1195  183
     if (inMessages.size() > 0) {
 1196  1
       for (Entry<I, List<M>> entry : inMessages.entrySet()) {
 1197  5
         if (service.getPartition(entry.getKey()) == null) {
 1198  0
           throw new IllegalStateException(
 1199  
               "prepareSuperstep: Impossible that this worker " +
 1200  
                   service.getWorkerInfo() + " was sent " +
 1201  
                   entry.getValue().size() + " message(s) with " +
 1202  
                   "vertex id " + entry.getKey() +
 1203  
                   " when it does not own this partition.  It should " +
 1204  
                   "have gone to partition owner " +
 1205  
                   service.getVertexPartitionOwner(entry.getKey()) +
 1206  
                   ".  The partition owners are " +
 1207  
                   service.getPartitionOwners());
 1208  
         }
 1209  5
         resolveVertexIndexSet.add(entry.getKey());
 1210  
       }
 1211  
     }
 1212  183
     synchronized (inVertexMutationsMap) {
 1213  183
       for (I vertexIndex : inVertexMutationsMap.keySet()) {
 1214  40
         resolveVertexIndexSet.add(vertexIndex);
 1215  
       }
 1216  183
     }
 1217  
 
 1218  
     // Resolve all graph mutations
 1219  183
     for (I vertexIndex : resolveVertexIndexSet) {
 1220  45
       VertexResolver<I, V, E, M> vertexResolver =
 1221  
           BspUtils.createVertexResolver(
 1222  
               conf, service.getGraphMapper().getGraphState());
 1223  45
       Vertex<I, V, E, M> originalVertex =
 1224  
           service.getVertex(vertexIndex);
 1225  45
       Iterable<M> messages = inMessages.get(vertexIndex);
 1226  45
       if (originalVertex != null) {
 1227  20
         messages = originalVertex.getMessages();
 1228  
       }
 1229  45
       VertexMutations<I, V, E, M> vertexMutations =
 1230  
           inVertexMutationsMap.get(vertexIndex);
 1231  45
       boolean receivedMessages =
 1232  
           messages != null && !Iterables.isEmpty(messages);
 1233  45
       Vertex<I, V, E, M> vertex =
 1234  
           vertexResolver.resolve(vertexIndex,
 1235  
               originalVertex,
 1236  
               vertexMutations,
 1237  
               receivedMessages);
 1238  45
       if (vertex != null && receivedMessages) {
 1239  5
         service.assignMessagesToVertex(vertex, messages);
 1240  
       }
 1241  45
       if (LOG.isDebugEnabled()) {
 1242  0
         LOG.debug("prepareSuperstep: Resolved vertex index " +
 1243  
             vertexIndex + " with original vertex " +
 1244  
             originalVertex + ", returned vertex " + vertex +
 1245  
             " on superstep " + service.getSuperstep() +
 1246  
             " with mutations " +
 1247  
             vertexMutations);
 1248  
       }
 1249  
 
 1250  45
       Partition<I, V, E, M> partition =
 1251  
           service.getPartition(vertexIndex);
 1252  45
       if (partition == null) {
 1253  0
         throw new IllegalStateException(
 1254  
             "prepareSuperstep: No partition for index " + vertexIndex +
 1255  
             " in " + service.getPartitionStore() + " should have been " +
 1256  
             service.getVertexPartitionOwner(vertexIndex));
 1257  
       }
 1258  45
       if (vertex != null) {
 1259  25
         partition.putVertex(vertex);
 1260  20
       } else if (originalVertex != null) {
 1261  10
         partition.removeVertex(originalVertex.getId());
 1262  
       }
 1263  45
     }
 1264  183
     synchronized (inVertexMutationsMap) {
 1265  183
       inVertexMutationsMap.clear();
 1266  183
     }
 1267  183
   }
 1268  
 
 1269  
   @Override
 1270  
   public void fixPartitionIdToSocketAddrMap() {
 1271  
     // 1. Fix all the cached inet addresses (remove all changed entries)
 1272  
     // 2. Connect to any new RPC servers
 1273  183
     synchronized (partitionIndexAddressMap) {
 1274  183
       for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
 1275  183
         InetSocketAddress address =
 1276  
             partitionIndexAddressMap.get(
 1277  
                 partitionOwner.getPartitionId());
 1278  183
         if (address != null &&
 1279  
             (!address.getHostName().equals(
 1280  
                 partitionOwner.getWorkerInfo().getHostname()) ||
 1281  
                 address.getPort() !=
 1282  
                 partitionOwner.getWorkerInfo().getPort())) {
 1283  0
           if (LOG.isInfoEnabled()) {
 1284  0
             LOG.info("fixPartitionIdToSocketAddrMap: " +
 1285  
                 "Partition owner " +
 1286  
                 partitionOwner + " changed from " +
 1287  
                 address);
 1288  
           }
 1289  0
           partitionIndexAddressMap.remove(
 1290  
               partitionOwner.getPartitionId());
 1291  
         }
 1292  183
       }
 1293  183
     }
 1294  
     try {
 1295  183
       connectAllRPCProxys(this.jobId, this.jobToken);
 1296  0
     } catch (InterruptedException e) {
 1297  0
       throw new RuntimeException(e);
 1298  0
     } catch (IOException e) {
 1299  0
       throw new RuntimeException(e);
 1300  183
     }
 1301  183
   }
 1302  
 
 1303  
   @Override
 1304  
   public String getName() {
 1305  0
     return myName;
 1306  
   }
 1307  
 }