Coverage Report - org.apache.giraph.comm.NettyWorkerClient
 
Classes in this File Line Coverage Branch Coverage Complexity
NettyWorkerClient
39%
51/128
32%
13/40
2.375
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm;
 20  
 
 21  
 import com.google.common.collect.Sets;
 22  
 import java.util.Set;
 23  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 24  
 import org.apache.giraph.comm.messages.MessageStoreByPartition;
 25  
 import org.apache.giraph.comm.messages.SendPartitionCurrentMessagesRequest;
 26  
 import org.apache.giraph.graph.Edge;
 27  
 import org.apache.giraph.graph.GiraphJob;
 28  
 import org.apache.giraph.graph.Vertex;
 29  
 import org.apache.giraph.graph.VertexMutations;
 30  
 import org.apache.giraph.graph.WorkerInfo;
 31  
 import org.apache.giraph.graph.partition.Partition;
 32  
 import org.apache.giraph.graph.partition.PartitionOwner;
 33  
 import org.apache.hadoop.conf.Configuration;
 34  
 import org.apache.hadoop.io.Writable;
 35  
 import org.apache.hadoop.io.WritableComparable;
 36  
 import org.apache.hadoop.mapreduce.Mapper;
 37  
 import org.apache.log4j.Logger;
 38  
 
 39  
 import com.google.common.collect.Maps;
 40  
 
 41  
 import java.io.IOException;
 42  
 import java.net.InetSocketAddress;
 43  
 import java.util.Collection;
 44  
 import java.util.Map;
 45  
 import java.util.Map.Entry;
 46  
 import java.util.concurrent.ConcurrentHashMap;
 47  
 
 48  
 /**
 49  
  * Takes users facing APIs in {@link WorkerClient} and implements them
 50  
  * using the available {@link WritableRequest} objects.
 51  
  *
 52  
  * @param <I> Vertex id
 53  
  * @param <V> Vertex data
 54  
  * @param <E> Edge data
 55  
  * @param <M> Message data
 56  
  */
 57  
 @SuppressWarnings("rawtypes")
 58  
 public class NettyWorkerClient<I extends WritableComparable,
 59  
     V extends Writable, E extends Writable, M extends Writable> implements
 60  
     WorkerClient<I, V, E, M> {
 61  
   /** Class logger */
 62  1
   private static final Logger LOG =
 63  
     Logger.getLogger(NettyWorkerClient.class);
 64  
   /** Hadoop configuration */
 65  
   private final Configuration conf;
 66  
   /** Netty client that does that actual I/O */
 67  
   private final NettyClient<I, V, E, M> nettyClient;
 68  
   /** Centralized service, needed to get vertex ranges */
 69  
   private final CentralizedServiceWorker<I, V, E, M> service;
 70  
   /**
 71  
    * Cached map of partition ids to remote socket address.
 72  
    */
 73  1
   private final Map<Integer, InetSocketAddress> partitionIndexAddressMap =
 74  
       new ConcurrentHashMap<Integer, InetSocketAddress>();
 75  
   /**
 76  
    * Cached map of partitions to vertex indices to messages
 77  
    */
 78  
   private final SendMessageCache<I, M> sendMessageCache;
 79  
   /**
 80  
    * Cached map of partitions to vertex indices to mutations
 81  
    */
 82  
   private final SendMutationsCache<I, V, E, M> sendMutationsCache;
 83  
   /** Maximum number of messages per partition before sending */
 84  
   private final int maxMessagesPerPartition;
 85  
   /** Maximum number of mutations per partition before sending */
 86  
   private final int maxMutationsPerPartition;
 87  
   /** Messages sent during the last superstep */
 88  1
   private long totalMsgsSentInSuperstep = 0;
 89  
   /** Server data from the server */
 90  
   private final ServerData<I, V, E, M> serverData;
 91  
 
 92  
   /**
 93  
    * Only constructor.
 94  
    *
 95  
    * @param context Context from mapper
 96  
    * @param service Used to get partition mapping
 97  
    * @param serverData Server data (used for local requests)
 98  
    */
 99  
   public NettyWorkerClient(Mapper<?, ?, ?, ?>.Context context,
 100  
                            CentralizedServiceWorker<I, V, E, M> service,
 101  1
                            ServerData<I, V, E, M> serverData) {
 102  1
     this.nettyClient = new NettyClient<I, V, E, M>(context);
 103  1
     this.conf = context.getConfiguration();
 104  1
     this.service = service;
 105  1
     maxMessagesPerPartition = conf.getInt(GiraphJob.MSG_SIZE,
 106  
         GiraphJob.MSG_SIZE_DEFAULT);
 107  1
     maxMutationsPerPartition = conf.getInt(GiraphJob.MAX_MUTATIONS_PER_REQUEST,
 108  
         GiraphJob.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
 109  1
     sendMessageCache = new SendMessageCache<I, M>(conf);
 110  1
     sendMutationsCache = new SendMutationsCache<I, V, E, M>();
 111  1
     this.serverData = serverData;
 112  1
   }
 113  
 
 114  
   @Override
 115  
   public void fixPartitionIdToSocketAddrMap() {
 116  
     // 1. Fix all the cached inet addresses (remove all changed entries)
 117  
     // 2. Connect to any new RPC servers
 118  12
     Set<InetSocketAddress> addresses =
 119  
         Sets.newHashSetWithExpectedSize(service.getPartitionOwners().size());
 120  12
     for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
 121  12
       InetSocketAddress address =
 122  
           partitionIndexAddressMap.get(
 123  
               partitionOwner.getPartitionId());
 124  12
       if (address != null &&
 125  
           (!address.getHostName().equals(
 126  
               partitionOwner.getWorkerInfo().getHostname()) ||
 127  
               address.getPort() !=
 128  
               partitionOwner.getWorkerInfo().getPort())) {
 129  0
         if (LOG.isInfoEnabled()) {
 130  0
           LOG.info("fixPartitionIdToSocketAddrMap: " +
 131  
               "Partition owner " +
 132  
               partitionOwner + " changed from " +
 133  
               address);
 134  
         }
 135  0
         partitionIndexAddressMap.remove(
 136  
             partitionOwner.getPartitionId());
 137  
       }
 138  12
       addresses.add(partitionOwner.getWorkerInfo().getHostnamePort());
 139  12
     }
 140  12
     nettyClient.connectAllAddresses(addresses);
 141  12
   }
 142  
 
 143  
   /**
 144  
    * Fill the socket address cache for the worker info and its partition.
 145  
    *
 146  
    * @param workerInfo Worker information to get the socket address
 147  
    * @param partitionId Partition id to look up.
 148  
    * @return address of the vertex range server containing this vertex
 149  
    */
 150  
   private InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo,
 151  
       int partitionId) {
 152  1
     InetSocketAddress address =
 153  
         partitionIndexAddressMap.get(partitionId);
 154  1
     if (address == null) {
 155  1
       address = workerInfo.getHostnamePort();
 156  1
       partitionIndexAddressMap.put(partitionId, address);
 157  
     }
 158  
 
 159  1
     return address;
 160  
   }
 161  
 
 162  
   /**
 163  
    * Fill the socket address cache for the partition owner.
 164  
    *
 165  
    * @param destVertex vertex to be sent
 166  
    * @return address of the vertex range server containing this vertex
 167  
    */
 168  
   private InetSocketAddress getInetSocketAddress(I destVertex) {
 169  0
     PartitionOwner partitionOwner =
 170  
         service.getVertexPartitionOwner(destVertex);
 171  0
     return getInetSocketAddress(partitionOwner.getWorkerInfo(),
 172  
         partitionOwner.getPartitionId());
 173  
   }
 174  
 
 175  
   /**
 176  
    * When doing the request, short circuit if it is local
 177  
    *
 178  
    * @param remoteServerAddress Remote server address (checked against local)
 179  
    * @param writableRequest Request to either submit or run locally
 180  
    */
 181  
   private void doRequest(InetSocketAddress remoteServerAddress,
 182  
                          WritableRequest<I, V, E, M> writableRequest) {
 183  
     // If this is local, execute locally
 184  1
     if (service.getWorkerInfo().getHostnamePort().equals(
 185  
         remoteServerAddress)) {
 186  1
       writableRequest.doRequest(serverData);
 187  
     } else {
 188  0
       nettyClient.sendWritableRequest(remoteServerAddress, writableRequest);
 189  
     }
 190  1
   }
 191  
 
 192  
   @Override
 193  
   public void sendMessageRequest(I destVertexId, M message) {
 194  0
     PartitionOwner partitionOwner =
 195  
         service.getVertexPartitionOwner(destVertexId);
 196  0
     int partitionId = partitionOwner.getPartitionId();
 197  0
     if (LOG.isDebugEnabled()) {
 198  0
       LOG.debug("sendMessageRequest: Send bytes (" + message.toString() +
 199  
           ") to " + destVertexId + " with partition " + partitionId);
 200  
     }
 201  0
     ++totalMsgsSentInSuperstep;
 202  
 
 203  
     // Add the message to the cache
 204  0
     int partitionMessageCount =
 205  
         sendMessageCache.addMessage(partitionId, destVertexId, message);
 206  
 
 207  
     // Send a request if enough messages are there for a partition
 208  0
     if (partitionMessageCount >= maxMessagesPerPartition) {
 209  0
       InetSocketAddress remoteServerAddress =
 210  
           getInetSocketAddress(partitionOwner.getWorkerInfo(), partitionId);
 211  0
       Map<I, Collection<M>> partitionMessages =
 212  
           sendMessageCache.removePartitionMessages(partitionId);
 213  0
       WritableRequest<I, V, E, M> writableRequest =
 214  
           new SendPartitionMessagesRequest<I, V, E, M>(
 215  
               partitionId, partitionMessages);
 216  0
       doRequest(remoteServerAddress, writableRequest);
 217  
     }
 218  0
   }
 219  
 
 220  
   @Override
 221  
   public void sendPartitionRequest(WorkerInfo workerInfo,
 222  
                                    Partition<I, V, E, M> partition) {
 223  1
     InetSocketAddress remoteServerAddress =
 224  
         getInetSocketAddress(workerInfo, partition.getId());
 225  1
     if (LOG.isDebugEnabled()) {
 226  0
       LOG.debug("sendPartitionRequest: Sending to " + remoteServerAddress +
 227  
           " from " + workerInfo + ", with partition " + partition);
 228  
     }
 229  
 
 230  1
     int partitionId = partition.getId();
 231  1
     WritableRequest<I, V, E, M> vertexRequest =
 232  
         new SendVertexRequest<I, V, E, M>(partitionId,
 233  
             partition.getVertices());
 234  1
     doRequest(remoteServerAddress, vertexRequest);
 235  
 
 236  
     // messages are stored separately
 237  1
     MessageStoreByPartition<I, M> messageStore =
 238  
         service.getServerData().getCurrentMessageStore();
 239  1
     Map<I, Collection<M>> map = Maps.newHashMap();
 240  1
     int messagesInMap = 0;
 241  
     for (I vertexId :
 242  1
         messageStore.getPartitionDestinationVertices(partitionId)) {
 243  
       try {
 244  0
         Collection<M> messages = messageStore.getVertexMessages(vertexId);
 245  0
         map.put(vertexId, messages);
 246  0
         messagesInMap += messages.size();
 247  0
       } catch (IOException e) {
 248  0
         throw new IllegalStateException(
 249  
             "sendPartitionReq: Got IOException ", e);
 250  0
       }
 251  0
       if (messagesInMap > maxMessagesPerPartition) {
 252  0
         WritableRequest<I, V, E, M> messagesRequest = new
 253  
             SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
 254  0
         doRequest(remoteServerAddress, messagesRequest);
 255  0
         map.clear();
 256  0
         messagesInMap = 0;
 257  0
       }
 258  
     }
 259  1
     if (!map.isEmpty()) {
 260  0
       WritableRequest<I, V, E, M> messagesRequest = new
 261  
           SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
 262  0
       doRequest(remoteServerAddress, messagesRequest);
 263  
     }
 264  1
   }
 265  
 
 266  
     /**
 267  
     * Send a mutations request if the maximum number of mutations per partition
 268  
     * was met.
 269  
     *
 270  
     * @param partitionId Partition id
 271  
     * @param partitionOwner Owner of the partition
 272  
     * @param partitionMutationCount Number of mutations for this partition
 273  
     */
 274  
   private void sendMutationsRequestIfFull(
 275  
       int partitionId, PartitionOwner partitionOwner,
 276  
       int partitionMutationCount) {
 277  
     // Send a request if enough mutations are there for a partition
 278  0
     if (partitionMutationCount >= maxMutationsPerPartition) {
 279  0
       InetSocketAddress remoteServerAddress =
 280  
           getInetSocketAddress(partitionOwner.getWorkerInfo(), partitionId);
 281  0
       Map<I, VertexMutations<I, V, E, M>> partitionMutations =
 282  
           sendMutationsCache.removePartitionMutations(partitionId);
 283  0
       WritableRequest<I, V, E, M> writableRequest =
 284  
           new SendPartitionMutationsRequest<I, V, E, M>(
 285  
               partitionId, partitionMutations);
 286  0
       doRequest(remoteServerAddress, writableRequest);
 287  
     }
 288  0
   }
 289  
 
 290  
   @Override
 291  
   public void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws
 292  
       IOException {
 293  0
     PartitionOwner partitionOwner =
 294  
         service.getVertexPartitionOwner(vertexIndex);
 295  0
     int partitionId = partitionOwner.getPartitionId();
 296  0
     if (LOG.isDebugEnabled()) {
 297  0
       LOG.debug("addEdgeRequest: Sending edge " + edge + " for index " +
 298  
           vertexIndex + " with partition " + partitionId);
 299  
     }
 300  
 
 301  
     // Add the message to the cache
 302  0
     int partitionMutationCount =
 303  
         sendMutationsCache.addEdgeMutation(partitionId, vertexIndex, edge);
 304  
 
 305  0
     sendMutationsRequestIfFull(
 306  
         partitionId, partitionOwner, partitionMutationCount);
 307  0
   }
 308  
 
 309  
   @Override
 310  
   public void removeEdgeRequest(I vertexIndex,
 311  
                                 I destinationVertexIndex) throws IOException {
 312  0
     PartitionOwner partitionOwner =
 313  
         service.getVertexPartitionOwner(vertexIndex);
 314  0
     int partitionId = partitionOwner.getPartitionId();
 315  0
     if (LOG.isDebugEnabled()) {
 316  0
       LOG.debug("removeEdgeRequest: Removing edge " + destinationVertexIndex +
 317  
           " for index " + vertexIndex + " with partition " + partitionId);
 318  
     }
 319  
 
 320  
     // Add the message to the cache
 321  0
     int partitionMutationCount =
 322  
         sendMutationsCache.removeEdgeMutation(
 323  
             partitionId, vertexIndex, destinationVertexIndex);
 324  
 
 325  0
     sendMutationsRequestIfFull(
 326  
         partitionId, partitionOwner, partitionMutationCount);
 327  0
   }
 328  
 
 329  
   @Override
 330  
   public void addVertexRequest(Vertex<I, V, E, M> vertex) throws IOException {
 331  0
     PartitionOwner partitionOwner =
 332  
         service.getVertexPartitionOwner(vertex.getId());
 333  0
     int partitionId = partitionOwner.getPartitionId();
 334  0
     if (LOG.isDebugEnabled()) {
 335  0
       LOG.debug("addVertexRequest: Sending vertex " + vertex +
 336  
           " to partition " + partitionId);
 337  
     }
 338  
 
 339  
     // Add the message to the cache
 340  0
     int partitionMutationCount =
 341  
         sendMutationsCache.addVertexMutation(partitionId, vertex);
 342  
 
 343  0
     sendMutationsRequestIfFull(
 344  
         partitionId, partitionOwner, partitionMutationCount);
 345  0
   }
 346  
 
 347  
   @Override
 348  
   public void removeVertexRequest(I vertexIndex) throws IOException {
 349  0
     PartitionOwner partitionOwner =
 350  
         service.getVertexPartitionOwner(vertexIndex);
 351  0
     int partitionId = partitionOwner.getPartitionId();
 352  0
     if (LOG.isDebugEnabled()) {
 353  0
       LOG.debug("removeVertexRequest: Removing vertex index " + vertexIndex +
 354  
           " from partition " + partitionId);
 355  
     }
 356  
 
 357  
     // Add the message to the cache
 358  0
     int partitionMutationCount =
 359  
         sendMutationsCache.removeVertexMutation(partitionId, vertexIndex);
 360  
 
 361  0
     sendMutationsRequestIfFull(
 362  
         partitionId, partitionOwner, partitionMutationCount);
 363  0
   }
 364  
 
 365  
   @Override
 366  
   public void flush() throws IOException {
 367  
     // Execute the remaining sends messages (if any)
 368  24
     Map<Integer, Map<I, Collection<M>>> remainingMessageCache =
 369  
         sendMessageCache.removeAllPartitionMessages();
 370  
     for (Entry<Integer, Map<I, Collection<M>>> entry :
 371  24
         remainingMessageCache.entrySet()) {
 372  0
       WritableRequest<I, V, E, M> writableRequest =
 373  
           new SendPartitionMessagesRequest<I, V, E, M>(
 374  
               entry.getKey(), entry.getValue());
 375  0
       InetSocketAddress remoteServerAddress =
 376  
           getInetSocketAddress(entry.getValue().keySet().iterator().next());
 377  0
       doRequest(remoteServerAddress, writableRequest);
 378  0
     }
 379  
 
 380  
     // Execute the remaining sends mutations (if any)
 381  24
     Map<Integer, Map<I, VertexMutations<I, V, E, M>>> remainingMutationsCache =
 382  
         sendMutationsCache.removeAllPartitionMutations();
 383  
     for (Entry<Integer, Map<I, VertexMutations<I, V, E, M>>> entry :
 384  24
         remainingMutationsCache.entrySet()) {
 385  0
       WritableRequest<I, V, E, M> writableRequest =
 386  
           new SendPartitionMutationsRequest<I, V, E, M>(
 387  
               entry.getKey(), entry.getValue());
 388  0
       InetSocketAddress remoteServerAddress =
 389  
           getInetSocketAddress(entry.getValue().keySet().iterator().next());
 390  0
       doRequest(remoteServerAddress, writableRequest);
 391  0
     }
 392  
 
 393  24
     nettyClient.waitAllRequests();
 394  24
   }
 395  
 
 396  
   @Override
 397  
   public long resetMessageCount() {
 398  12
     long messagesSentInSuperstep = totalMsgsSentInSuperstep;
 399  12
     totalMsgsSentInSuperstep = 0;
 400  12
     return messagesSentInSuperstep;
 401  
   }
 402  
 
 403  
   @Override
 404  
   public void closeConnections() throws IOException {
 405  1
     nettyClient.stop();
 406  1
   }
 407  
 
 408  
   @Override
 409  
   public void setup() {
 410  0
     fixPartitionIdToSocketAddrMap();
 411  0
   }
 412  
 }