1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.comm; |
20 | |
|
21 | |
import java.net.InetAddress; |
22 | |
import java.net.InetSocketAddress; |
23 | |
import java.net.UnknownHostException; |
24 | |
import java.util.concurrent.Executors; |
25 | |
import java.util.concurrent.ThreadFactory; |
26 | |
import java.util.concurrent.ThreadPoolExecutor; |
27 | |
import java.util.concurrent.TimeUnit; |
28 | |
|
29 | |
import org.apache.giraph.comm.messages.SendPartitionCurrentMessagesRequest; |
30 | |
import org.apache.giraph.graph.GiraphJob; |
31 | |
import org.apache.hadoop.conf.Configuration; |
32 | |
import org.apache.hadoop.io.Writable; |
33 | |
import org.apache.hadoop.io.WritableComparable; |
34 | |
import org.apache.log4j.Logger; |
35 | |
import org.jboss.netty.bootstrap.ServerBootstrap; |
36 | |
import org.jboss.netty.channel.Channel; |
37 | |
import org.jboss.netty.channel.ChannelException; |
38 | |
import org.jboss.netty.channel.ChannelFactory; |
39 | |
import org.jboss.netty.channel.ChannelPipeline; |
40 | |
import org.jboss.netty.channel.ChannelPipelineFactory; |
41 | |
import org.jboss.netty.channel.Channels; |
42 | |
import org.jboss.netty.channel.group.ChannelGroup; |
43 | |
import org.jboss.netty.channel.group.DefaultChannelGroup; |
44 | |
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; |
45 | |
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder; |
46 | |
|
47 | |
import com.google.common.util.concurrent.ThreadFactoryBuilder; |
48 | |
|
49 | |
|
50 | |
|
51 | |
|
52 | |
|
53 | |
|
54 | |
|
55 | |
|
56 | |
|
57 | |
@SuppressWarnings("rawtypes") |
58 | 55 | public class NettyServer<I extends WritableComparable, |
59 | |
V extends Writable, E extends Writable, |
60 | |
M extends Writable> { |
61 | |
|
62 | |
public static final int MAXIMUM_THREAD_POOL_SIZE_DEFAULT = 32; |
63 | |
|
64 | |
public static final int TCP_BACKLOG_DEFAULT = 100; |
65 | |
|
66 | 1 | private static final Logger LOG = Logger.getLogger(NettyServer.class); |
67 | |
|
68 | |
private final Configuration conf; |
69 | |
|
70 | |
private ChannelFactory channelFactory; |
71 | |
|
72 | 9 | private final ChannelGroup accepted = new DefaultChannelGroup(); |
73 | |
|
74 | 9 | private ThreadPoolExecutor workerThreadPool = null; |
75 | |
|
76 | |
private final String localHostname; |
77 | |
|
78 | |
private InetSocketAddress myAddress; |
79 | |
|
80 | |
private final int maximumPoolSize; |
81 | |
|
82 | 9 | private final RequestRegistry requestRegistry = new RequestRegistry(); |
83 | |
|
84 | |
private final ServerData<I, V, E, M> serverData; |
85 | |
|
86 | |
private ServerBootstrap bootstrap; |
87 | |
|
88 | 9 | private final ByteCounter byteCounter = new ByteCounter(); |
89 | |
|
90 | |
private final int sendBufferSize; |
91 | |
|
92 | |
private final int receiveBufferSize; |
93 | |
|
94 | |
|
95 | |
|
96 | |
|
97 | |
|
98 | |
|
99 | |
|
100 | 9 | public NettyServer(Configuration conf, ServerData<I, V, E, M> serverData) { |
101 | 9 | this.conf = conf; |
102 | 9 | this.serverData = serverData; |
103 | 9 | requestRegistry.registerClass( |
104 | |
new SendVertexRequest<I, V, E, M>()); |
105 | 9 | requestRegistry.registerClass( |
106 | |
new SendPartitionMessagesRequest<I, V, E, M>()); |
107 | 9 | requestRegistry.registerClass( |
108 | |
new SendPartitionMutationsRequest<I, V, E, M>()); |
109 | 9 | requestRegistry.registerClass( |
110 | |
new SendPartitionCurrentMessagesRequest<I, V, E, M>()); |
111 | 9 | requestRegistry.shutdown(); |
112 | |
|
113 | 9 | sendBufferSize = conf.getInt(GiraphJob.SERVER_SEND_BUFFER_SIZE, |
114 | |
GiraphJob.DEFAULT_SERVER_SEND_BUFFER_SIZE); |
115 | 9 | receiveBufferSize = conf.getInt(GiraphJob.SERVER_RECEIVE_BUFFER_SIZE, |
116 | |
GiraphJob.DEFAULT_SERVER_RECEIVE_BUFFER_SIZE); |
117 | |
|
118 | 9 | ThreadFactory bossFactory = new ThreadFactoryBuilder() |
119 | |
.setNameFormat("Giraph Netty Boss #%d") |
120 | |
.build(); |
121 | 9 | ThreadFactory workerFactory = new ThreadFactoryBuilder() |
122 | |
.setNameFormat("Giraph Netty Worker #%d") |
123 | |
.build(); |
124 | |
try { |
125 | 9 | this.localHostname = InetAddress.getLocalHost().getHostName(); |
126 | 0 | } catch (UnknownHostException e) { |
127 | 0 | throw new IllegalStateException("NettyServer: unable to get hostname"); |
128 | 9 | } |
129 | 9 | maximumPoolSize = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS, |
130 | |
MAXIMUM_THREAD_POOL_SIZE_DEFAULT); |
131 | |
|
132 | 9 | channelFactory = new NioServerSocketChannelFactory( |
133 | |
Executors.newCachedThreadPool(bossFactory), |
134 | |
Executors.newCachedThreadPool(workerFactory), |
135 | |
maximumPoolSize); |
136 | 9 | } |
137 | |
|
138 | |
|
139 | |
|
140 | |
|
141 | |
public void start() { |
142 | 9 | bootstrap = new ServerBootstrap(channelFactory); |
143 | |
|
144 | 9 | bootstrap.setOption("child.keepAlive", true); |
145 | 9 | bootstrap.setOption("child.tcpNoDelay", true); |
146 | 9 | bootstrap.setOption("child.sendBufferSize", sendBufferSize); |
147 | 9 | bootstrap.setOption("child.receiveBufferSize", receiveBufferSize); |
148 | 9 | bootstrap.setOption("backlog", TCP_BACKLOG_DEFAULT); |
149 | 9 | bootstrap.setPipelineFactory(new ChannelPipelineFactory() { |
150 | |
@Override |
151 | |
public ChannelPipeline getPipeline() throws Exception { |
152 | 11 | return Channels.pipeline( |
153 | |
byteCounter, |
154 | |
new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4), |
155 | |
new RequestDecoder<I, V, E, M>(conf, requestRegistry, byteCounter), |
156 | |
new RequestServerHandler<I, V, E, M>(serverData)); |
157 | |
} |
158 | |
}); |
159 | |
|
160 | 9 | int taskId = conf.getInt("mapred.task.partition", -1); |
161 | 9 | int numTasks = conf.getInt("mapred.map.tasks", 1); |
162 | 9 | int numWorkers = conf.getInt(GiraphJob.MAX_WORKERS, numTasks); |
163 | 9 | int portIncrementConstant = |
164 | |
(int) Math.pow(10, Math.ceil(Math.log10(numWorkers))); |
165 | 9 | int bindPort = conf.getInt(GiraphJob.RPC_INITIAL_PORT, |
166 | |
GiraphJob.RPC_INITIAL_PORT_DEFAULT) + |
167 | |
taskId; |
168 | 9 | int bindAttempts = 0; |
169 | 9 | final int maxRpcPortBindAttempts = |
170 | |
conf.getInt(GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS, |
171 | |
GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT); |
172 | 9 | final boolean failFirstPortBindingAttempt = |
173 | |
conf.getBoolean(GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT, |
174 | |
GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT); |
175 | |
|
176 | |
|
177 | |
|
178 | |
|
179 | |
|
180 | 12 | while (bindAttempts < maxRpcPortBindAttempts) { |
181 | 12 | this.myAddress = new InetSocketAddress(localHostname, bindPort); |
182 | 12 | if (failFirstPortBindingAttempt && bindAttempts == 0) { |
183 | 0 | if (LOG.isInfoEnabled()) { |
184 | 0 | LOG.info("NettyServer: Intentionally fail first " + |
185 | |
"binding attempt as giraph.failFirstRpcPortBindAttempt " + |
186 | |
"is true, port " + bindPort); |
187 | |
} |
188 | 0 | ++bindAttempts; |
189 | 0 | bindPort += portIncrementConstant; |
190 | 0 | continue; |
191 | |
} |
192 | |
|
193 | |
try { |
194 | 12 | Channel ch = bootstrap.bind(myAddress); |
195 | 9 | accepted.add(ch); |
196 | |
|
197 | 9 | break; |
198 | 3 | } catch (ChannelException e) { |
199 | 3 | LOG.warn("start: Likely failed to bind on attempt " + |
200 | |
bindAttempts + " to port " + bindPort, e); |
201 | 3 | ++bindAttempts; |
202 | 3 | bindPort += portIncrementConstant; |
203 | 3 | } |
204 | |
} |
205 | 9 | if (bindAttempts == maxRpcPortBindAttempts || myAddress == null) { |
206 | 0 | throw new IllegalStateException( |
207 | |
"start: Failed to start NettyServer with " + |
208 | |
bindAttempts + " attempts"); |
209 | |
} |
210 | |
|
211 | 9 | if (LOG.isInfoEnabled()) { |
212 | 0 | LOG.info("start: Started server " + |
213 | |
"communication server: " + myAddress + " with up to " + |
214 | |
maximumPoolSize + " threads on bind attempt " + bindAttempts + |
215 | |
" with sendBufferSize = " + sendBufferSize + |
216 | |
" receiveBufferSize = " + receiveBufferSize + " backlog = " + |
217 | |
bootstrap.getOption("backlog")); |
218 | |
} |
219 | 9 | } |
220 | |
|
221 | |
|
222 | |
|
223 | |
|
224 | |
public void stop() { |
225 | 9 | if (LOG.isInfoEnabled()) { |
226 | 0 | LOG.info("stop: Halting netty server"); |
227 | |
} |
228 | 9 | accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS); |
229 | 9 | bootstrap.releaseExternalResources(); |
230 | 9 | } |
231 | |
|
232 | |
public InetSocketAddress getMyAddress() { |
233 | 14 | return myAddress; |
234 | |
} |
235 | |
} |
236 | |
|