Coverage Report - org.apache.giraph.comm.NettyServer
 
Classes in this File Line Coverage Branch Coverage Complexity
NettyServer
85%
57/67
37%
6/16
3.4
NettyServer$1
100%
2/2
N/A
3.4
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm;
 20  
 
 21  
 import java.net.InetAddress;
 22  
 import java.net.InetSocketAddress;
 23  
 import java.net.UnknownHostException;
 24  
 import java.util.concurrent.Executors;
 25  
 import java.util.concurrent.ThreadFactory;
 26  
 import java.util.concurrent.ThreadPoolExecutor;
 27  
 import java.util.concurrent.TimeUnit;
 28  
 
 29  
 import org.apache.giraph.comm.messages.SendPartitionCurrentMessagesRequest;
 30  
 import org.apache.giraph.graph.GiraphJob;
 31  
 import org.apache.hadoop.conf.Configuration;
 32  
 import org.apache.hadoop.io.Writable;
 33  
 import org.apache.hadoop.io.WritableComparable;
 34  
 import org.apache.log4j.Logger;
 35  
 import org.jboss.netty.bootstrap.ServerBootstrap;
 36  
 import org.jboss.netty.channel.Channel;
 37  
 import org.jboss.netty.channel.ChannelException;
 38  
 import org.jboss.netty.channel.ChannelFactory;
 39  
 import org.jboss.netty.channel.ChannelPipeline;
 40  
 import org.jboss.netty.channel.ChannelPipelineFactory;
 41  
 import org.jboss.netty.channel.Channels;
 42  
 import org.jboss.netty.channel.group.ChannelGroup;
 43  
 import org.jboss.netty.channel.group.DefaultChannelGroup;
 44  
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 45  
 import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
 46  
 
 47  
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 48  
 
 49  
 /**
 50  
  * This server uses Netty and will implement all Giraph communication
 51  
  *
 52  
  * @param <I> Vertex id
 53  
  * @param <V> Vertex data
 54  
  * @param <E> Edge data
 55  
  * @param <M> Message data
 56  
  */
 57  
 @SuppressWarnings("rawtypes")
 58  55
 public class NettyServer<I extends WritableComparable,
 59  
      V extends Writable, E extends Writable,
 60  
      M extends Writable> {
 61  
   /** Default maximum thread pool size */
 62  
   public static final int MAXIMUM_THREAD_POOL_SIZE_DEFAULT = 32;
 63  
   /** Default TCP backlog */
 64  
   public static final int TCP_BACKLOG_DEFAULT = 100;
 65  
   /** Class logger */
 66  1
   private static final Logger LOG = Logger.getLogger(NettyServer.class);
 67  
   /** Configuration */
 68  
   private final Configuration conf;
 69  
   /** Factory of channels */
 70  
   private ChannelFactory channelFactory;
 71  
   /** Accepted channels */
 72  9
   private final ChannelGroup accepted = new DefaultChannelGroup();
 73  
   /** Worker thread pool (if implemented as a ThreadPoolExecutor) */
 74  9
   private ThreadPoolExecutor workerThreadPool = null;
 75  
   /** Local hostname */
 76  
   private final String localHostname;
 77  
   /** Address of the server */
 78  
   private InetSocketAddress myAddress;
 79  
   /** Maximum number of threads */
 80  
   private final int maximumPoolSize;
 81  
   /** Request reqistry */
 82  9
   private final RequestRegistry requestRegistry = new RequestRegistry();
 83  
   /** Server data */
 84  
   private final ServerData<I, V, E, M> serverData;
 85  
   /** Server bootstrap */
 86  
   private ServerBootstrap bootstrap;
 87  
   /** Byte counter for this client */
 88  9
   private final ByteCounter byteCounter = new ByteCounter();
 89  
   /** Send buffer size */
 90  
   private final int sendBufferSize;
 91  
   /** Receive buffer size */
 92  
   private final int receiveBufferSize;
 93  
 
 94  
   /**
 95  
    * Constructor for creating the server
 96  
    *
 97  
    * @param conf Configuration to use
 98  
    * @param serverData Server data to operate on
 99  
    */
 100  9
   public NettyServer(Configuration conf, ServerData<I, V, E, M> serverData) {
 101  9
     this.conf = conf;
 102  9
     this.serverData = serverData;
 103  9
     requestRegistry.registerClass(
 104  
         new SendVertexRequest<I, V, E, M>());
 105  9
     requestRegistry.registerClass(
 106  
         new SendPartitionMessagesRequest<I, V, E, M>());
 107  9
     requestRegistry.registerClass(
 108  
         new SendPartitionMutationsRequest<I, V, E, M>());
 109  9
     requestRegistry.registerClass(
 110  
         new SendPartitionCurrentMessagesRequest<I, V, E, M>());
 111  9
     requestRegistry.shutdown();
 112  
 
 113  9
     sendBufferSize = conf.getInt(GiraphJob.SERVER_SEND_BUFFER_SIZE,
 114  
         GiraphJob.DEFAULT_SERVER_SEND_BUFFER_SIZE);
 115  9
     receiveBufferSize = conf.getInt(GiraphJob.SERVER_RECEIVE_BUFFER_SIZE,
 116  
         GiraphJob.DEFAULT_SERVER_RECEIVE_BUFFER_SIZE);
 117  
 
 118  9
     ThreadFactory bossFactory = new ThreadFactoryBuilder()
 119  
       .setNameFormat("Giraph Netty Boss #%d")
 120  
       .build();
 121  9
     ThreadFactory workerFactory = new ThreadFactoryBuilder()
 122  
       .setNameFormat("Giraph Netty Worker #%d")
 123  
       .build();
 124  
     try {
 125  9
       this.localHostname = InetAddress.getLocalHost().getHostName();
 126  0
     } catch (UnknownHostException e) {
 127  0
       throw new IllegalStateException("NettyServer: unable to get hostname");
 128  9
     }
 129  9
     maximumPoolSize = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
 130  
                                   MAXIMUM_THREAD_POOL_SIZE_DEFAULT);
 131  
 
 132  9
     channelFactory = new NioServerSocketChannelFactory(
 133  
         Executors.newCachedThreadPool(bossFactory),
 134  
         Executors.newCachedThreadPool(workerFactory),
 135  
         maximumPoolSize);
 136  9
   }
 137  
 
 138  
   /**
 139  
    * Start the server with the appropriate port
 140  
    */
 141  
   public void start() {
 142  9
     bootstrap = new ServerBootstrap(channelFactory);
 143  
     // Set up the pipeline factory.
 144  9
     bootstrap.setOption("child.keepAlive", true);
 145  9
     bootstrap.setOption("child.tcpNoDelay", true);
 146  9
     bootstrap.setOption("child.sendBufferSize", sendBufferSize);
 147  9
     bootstrap.setOption("child.receiveBufferSize", receiveBufferSize);
 148  9
     bootstrap.setOption("backlog", TCP_BACKLOG_DEFAULT);
 149  9
     bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
 150  
       @Override
 151  
       public ChannelPipeline getPipeline() throws Exception {
 152  11
         return Channels.pipeline(
 153  
             byteCounter,
 154  
             new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
 155  
             new RequestDecoder<I, V, E, M>(conf, requestRegistry, byteCounter),
 156  
             new RequestServerHandler<I, V, E, M>(serverData));
 157  
       }
 158  
     });
 159  
 
 160  9
     int taskId = conf.getInt("mapred.task.partition", -1);
 161  9
     int numTasks = conf.getInt("mapred.map.tasks", 1);
 162  9
     int numWorkers = conf.getInt(GiraphJob.MAX_WORKERS, numTasks);
 163  9
     int portIncrementConstant =
 164  
         (int) Math.pow(10, Math.ceil(Math.log10(numWorkers)));
 165  9
     int bindPort = conf.getInt(GiraphJob.RPC_INITIAL_PORT,
 166  
         GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
 167  
         taskId;
 168  9
     int bindAttempts = 0;
 169  9
     final int maxRpcPortBindAttempts =
 170  
         conf.getInt(GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS,
 171  
             GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT);
 172  9
     final boolean failFirstPortBindingAttempt =
 173  
         conf.getBoolean(GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT,
 174  
             GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT);
 175  
 
 176  
     // Simple handling of port collisions on the same machine while
 177  
     // preserving debugability from the port number alone.
 178  
     // Round up the max number of workers to the next power of 10 and use
 179  
     // it as a constant to increase the port number with.
 180  12
     while (bindAttempts < maxRpcPortBindAttempts) {
 181  12
       this.myAddress = new InetSocketAddress(localHostname, bindPort);
 182  12
       if (failFirstPortBindingAttempt && bindAttempts == 0) {
 183  0
         if (LOG.isInfoEnabled()) {
 184  0
           LOG.info("NettyServer: Intentionally fail first " +
 185  
               "binding attempt as giraph.failFirstRpcPortBindAttempt " +
 186  
               "is true, port " + bindPort);
 187  
         }
 188  0
         ++bindAttempts;
 189  0
         bindPort += portIncrementConstant;
 190  0
         continue;
 191  
       }
 192  
 
 193  
       try {
 194  12
         Channel ch = bootstrap.bind(myAddress);
 195  9
         accepted.add(ch);
 196  
 
 197  9
         break;
 198  3
       } catch (ChannelException e) {
 199  3
         LOG.warn("start: Likely failed to bind on attempt " +
 200  
             bindAttempts + " to port " + bindPort, e);
 201  3
         ++bindAttempts;
 202  3
         bindPort += portIncrementConstant;
 203  3
       }
 204  
     }
 205  9
     if (bindAttempts == maxRpcPortBindAttempts || myAddress == null) {
 206  0
       throw new IllegalStateException(
 207  
           "start: Failed to start NettyServer with " +
 208  
               bindAttempts + " attempts");
 209  
     }
 210  
 
 211  9
     if (LOG.isInfoEnabled()) {
 212  0
       LOG.info("start: Started server " +
 213  
           "communication server: " + myAddress + " with up to " +
 214  
           maximumPoolSize + " threads on bind attempt " + bindAttempts +
 215  
           " with sendBufferSize = " + sendBufferSize +
 216  
           " receiveBufferSize = " + receiveBufferSize + " backlog = " +
 217  
           bootstrap.getOption("backlog"));
 218  
     }
 219  9
   }
 220  
 
 221  
   /**
 222  
    * Stop the server.
 223  
    */
 224  
   public void stop() {
 225  9
     if (LOG.isInfoEnabled()) {
 226  0
       LOG.info("stop: Halting netty server");
 227  
     }
 228  9
     accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
 229  9
     bootstrap.releaseExternalResources();
 230  9
   }
 231  
 
 232  
   public InetSocketAddress getMyAddress() {
 233  14
     return myAddress;
 234  
   }
 235  
 }
 236