1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.comm; |
20 | |
|
21 | |
import org.apache.giraph.bsp.CentralizedServiceWorker; |
22 | |
import org.apache.giraph.graph.BspUtils; |
23 | |
import org.apache.giraph.graph.Edge; |
24 | |
import org.apache.giraph.graph.GiraphJob; |
25 | |
import org.apache.giraph.graph.Vertex; |
26 | |
import org.apache.giraph.graph.VertexCombiner; |
27 | |
import org.apache.giraph.graph.VertexMutations; |
28 | |
import org.apache.giraph.graph.VertexResolver; |
29 | |
import org.apache.giraph.graph.WorkerInfo; |
30 | |
import org.apache.giraph.graph.partition.Partition; |
31 | |
import org.apache.giraph.graph.partition.PartitionOwner; |
32 | |
import org.apache.giraph.utils.MemoryUtils; |
33 | |
import org.apache.hadoop.conf.Configuration; |
34 | |
import org.apache.hadoop.io.Writable; |
35 | |
import org.apache.hadoop.io.WritableComparable; |
36 | |
import org.apache.hadoop.ipc.RPC; |
37 | |
import org.apache.hadoop.ipc.RPC.Server; |
38 | |
import org.apache.hadoop.mapreduce.Mapper; |
39 | |
import org.apache.log4j.Logger; |
40 | |
|
41 | |
import com.google.common.collect.Iterables; |
42 | |
import com.google.common.collect.Maps; |
43 | |
|
44 | |
import java.io.IOException; |
45 | |
import java.net.BindException; |
46 | |
import java.net.InetAddress; |
47 | |
import java.net.InetSocketAddress; |
48 | |
import java.util.ArrayList; |
49 | |
import java.util.Collection; |
50 | |
import java.util.Collections; |
51 | |
import java.util.HashMap; |
52 | |
import java.util.Iterator; |
53 | |
import java.util.List; |
54 | |
import java.util.Map; |
55 | |
import java.util.Map.Entry; |
56 | |
import java.util.Set; |
57 | |
import java.util.TreeSet; |
58 | |
import java.util.concurrent.ExecutionException; |
59 | |
import java.util.concurrent.ExecutorService; |
60 | |
import java.util.concurrent.Executors; |
61 | |
import java.util.concurrent.Future; |
62 | |
|
63 | |
|
64 | |
|
65 | |
|
66 | |
|
67 | |
|
68 | |
|
69 | |
|
70 | |
|
71 | |
|
72 | |
|
73 | |
|
74 | |
|
75 | |
|
76 | |
|
77 | |
|
78 | |
|
79 | |
@SuppressWarnings("rawtypes") |
80 | 3398 | public abstract class BasicRPCCommunications<I extends WritableComparable, |
81 | |
V extends Writable, E extends Writable, M extends Writable, J> |
82 | |
implements CommunicationsInterface<I, V, E, M>, |
83 | |
WorkerClientServer<I, V, E, M> { |
84 | |
|
85 | 1 | private static final Logger LOG = |
86 | |
Logger.getLogger(BasicRPCCommunications.class); |
87 | |
|
88 | |
private static final int MAX_VERTICES_PER_RPC = 1024; |
89 | |
|
90 | |
protected final Configuration conf; |
91 | |
|
92 | |
private final Mapper<?, ?, ?, ?>.Context context; |
93 | |
|
94 | 26 | private boolean inPrepareSuperstep = false; |
95 | |
|
96 | |
private final String localHostname; |
97 | |
|
98 | |
private final String myName; |
99 | |
|
100 | |
private Server server; |
101 | |
|
102 | |
private final CentralizedServiceWorker<I, V, E, M> service; |
103 | |
|
104 | |
private final VertexCombiner<I, M> combiner; |
105 | |
|
106 | |
private InetSocketAddress myAddress; |
107 | |
|
108 | 26 | private long totalMsgsSentInSuperstep = 0; |
109 | |
|
110 | |
private final int maxMessagesPerFlushPut; |
111 | |
|
112 | |
|
113 | |
|
114 | |
|
115 | 26 | private final Map<InetSocketAddress, PeerConnection> peerConnections = |
116 | |
new HashMap<InetSocketAddress, PeerConnection>(); |
117 | |
|
118 | |
|
119 | |
|
120 | |
|
121 | 26 | private final Map<Integer, InetSocketAddress> partitionIndexAddressMap = |
122 | |
new HashMap<Integer, InetSocketAddress>(); |
123 | |
|
124 | |
|
125 | |
|
126 | |
private final ExecutorService executor; |
127 | |
|
128 | |
|
129 | |
|
130 | |
|
131 | |
|
132 | |
|
133 | 26 | private final Map<InetSocketAddress, Map<I, MsgList<M>>> outMessages = |
134 | |
new HashMap<InetSocketAddress, Map<I, MsgList<M>>>(); |
135 | |
|
136 | |
|
137 | |
|
138 | |
|
139 | 26 | private final Map<I, List<M>> inMessages = new HashMap<I, List<M>>(); |
140 | |
|
141 | |
|
142 | |
|
143 | |
|
144 | |
|
145 | |
|
146 | |
|
147 | 26 | private final Map<I, List<M>> transientInMessages = |
148 | |
new HashMap<I, List<M>>(); |
149 | |
|
150 | |
|
151 | |
|
152 | |
|
153 | 26 | private final Map<Integer, Collection<Vertex<I, V, E, M>>> |
154 | |
inPartitionVertexMap = Maps.newHashMap(); |
155 | |
|
156 | |
|
157 | |
|
158 | |
|
159 | 26 | private final Map<I, VertexMutations<I, V, E, M>> inVertexMutationsMap = |
160 | |
new HashMap<I, VertexMutations<I, V, E, M>>(); |
161 | |
|
162 | |
|
163 | |
private final int maxSize; |
164 | |
|
165 | |
private final String jobId; |
166 | |
|
167 | |
private final J jobToken; |
168 | |
|
169 | |
|
170 | |
|
171 | |
|
172 | |
|
173 | |
|
174 | 3456 | private class PeerConnection { |
175 | |
|
176 | |
|
177 | |
|
178 | |
|
179 | |
|
180 | |
private final Map<I, MsgList<M>> outMessagesPerPeer; |
181 | |
|
182 | |
|
183 | |
|
184 | |
private final CommunicationsInterface<I, V, E, M> peer; |
185 | |
|
186 | |
private final boolean isProxy; |
187 | |
|
188 | |
|
189 | |
|
190 | |
|
191 | |
|
192 | |
|
193 | |
|
194 | |
public PeerConnection(Map<I, MsgList<M>> idMessageMap, |
195 | |
CommunicationsInterface<I, V, E, M> peerConnection, |
196 | 23 | boolean isProxy) { |
197 | |
|
198 | 23 | this.outMessagesPerPeer = idMessageMap; |
199 | 23 | this.peer = peerConnection; |
200 | 23 | this.isProxy = isProxy; |
201 | 23 | } |
202 | |
|
203 | |
|
204 | |
|
205 | |
|
206 | |
public void close() { |
207 | 23 | if (LOG.isDebugEnabled()) { |
208 | 0 | LOG.debug("close: Done"); |
209 | |
} |
210 | 23 | } |
211 | |
|
212 | |
|
213 | |
|
214 | |
|
215 | |
|
216 | |
|
217 | |
public CommunicationsInterface<I, V, E, M> getRPCProxy() { |
218 | 484 | return peer; |
219 | |
} |
220 | |
|
221 | |
@Override |
222 | |
public String toString() { |
223 | 0 | return peer.getName() + ", proxy=" + isProxy; |
224 | |
} |
225 | |
} |
226 | |
|
227 | |
|
228 | |
|
229 | |
|
230 | |
private class PeerFlushExecutor implements Runnable { |
231 | |
|
232 | |
private static final int REPORTING_INTERVAL_MIN_MILLIS = 60000; |
233 | |
|
234 | |
private final PeerConnection peerConnection; |
235 | |
|
236 | |
private final Mapper<?, ?, ?, ?>.Context context; |
237 | |
|
238 | |
|
239 | |
|
240 | |
|
241 | |
|
242 | |
|
243 | |
|
244 | |
PeerFlushExecutor(PeerConnection peerConnection, |
245 | 412 | Mapper<?, ?, ?, ?>.Context context) { |
246 | 412 | this.peerConnection = peerConnection; |
247 | 412 | this.context = context; |
248 | 412 | } |
249 | |
|
250 | |
@Override |
251 | |
public void run() { |
252 | 412 | CommunicationsInterface<I, V, E, M> proxy = peerConnection.getRPCProxy(); |
253 | 412 | long startMillis = System.currentTimeMillis(); |
254 | 412 | long lastReportedMillis = startMillis; |
255 | |
try { |
256 | 412 | int verticesDone = 0; |
257 | 412 | synchronized (peerConnection.outMessagesPerPeer) { |
258 | 412 | final int vertices = |
259 | |
peerConnection.outMessagesPerPeer.size(); |
260 | |
|
261 | |
|
262 | |
|
263 | |
for (Entry<I, MsgList<M>> entry : |
264 | 412 | peerConnection.outMessagesPerPeer.entrySet()) { |
265 | 1536 | for (M msg : entry.getValue()) { |
266 | 2669 | if (msg == null) { |
267 | 0 | throw new IllegalArgumentException( |
268 | |
"run: Cannot put null message on " + |
269 | |
"vertex id " + entry.getKey()); |
270 | |
} |
271 | |
} |
272 | 1536 | if (combiner != null && entry.getValue().size() > 1) { |
273 | 27 | Iterable<M> messages = combiner.combine( |
274 | |
entry.getKey(), entry.getValue()); |
275 | 27 | if (messages == null) { |
276 | 0 | throw new IllegalStateException( |
277 | |
"run: Combiner cannot return null"); |
278 | |
} |
279 | 27 | if (Iterables.size(entry.getValue()) < |
280 | |
Iterables.size(messages)) { |
281 | 0 | throw new IllegalStateException( |
282 | |
"run: The number of combined " + |
283 | |
"messages is required to be <= to " + |
284 | |
"number of messages to be combined"); |
285 | |
} |
286 | 27 | entry.getValue().clear(); |
287 | 27 | for (M msg: messages) { |
288 | 27 | entry.getValue().add(msg); |
289 | |
} |
290 | |
} |
291 | 1536 | if (entry.getValue().isEmpty()) { |
292 | 0 | throw new IllegalStateException( |
293 | |
"run: Impossible for no messages in " + |
294 | |
entry.getKey()); |
295 | |
} |
296 | |
} |
297 | 548 | while (!peerConnection.outMessagesPerPeer.isEmpty()) { |
298 | 136 | int bulkedMessages = 0; |
299 | 136 | Iterator<Entry<I, MsgList<M>>> vertexIdMessagesListIt = |
300 | |
peerConnection.outMessagesPerPeer.entrySet(). |
301 | |
iterator(); |
302 | 136 | VertexIdMessagesList<I, M> vertexIdMessagesList = |
303 | |
new VertexIdMessagesList<I, M>(); |
304 | 1672 | while (vertexIdMessagesListIt.hasNext()) { |
305 | 1536 | Entry<I, MsgList<M>> entry = |
306 | |
vertexIdMessagesListIt.next(); |
307 | |
|
308 | |
|
309 | 1536 | if (vertexIdMessagesList.isEmpty() || |
310 | |
((bulkedMessages + entry.getValue().size()) < |
311 | |
maxMessagesPerFlushPut)) { |
312 | 1536 | vertexIdMessagesList.add( |
313 | |
new VertexIdMessages<I, M>( |
314 | |
entry.getKey(), entry.getValue())); |
315 | 1536 | bulkedMessages += entry.getValue().size(); |
316 | |
} |
317 | 1536 | } |
318 | |
|
319 | |
|
320 | |
for (VertexIdMessages<I, M> vertexIdMessages : |
321 | 136 | vertexIdMessagesList) { |
322 | 1536 | peerConnection.outMessagesPerPeer.remove( |
323 | |
vertexIdMessages.getVertexId()); |
324 | |
} |
325 | |
|
326 | 136 | proxy.putVertexIdMessagesList(vertexIdMessagesList); |
327 | 136 | context.progress(); |
328 | |
|
329 | 136 | verticesDone += vertexIdMessagesList.size(); |
330 | 136 | long curMillis = System.currentTimeMillis(); |
331 | 136 | if ((lastReportedMillis + |
332 | |
REPORTING_INTERVAL_MIN_MILLIS) < curMillis) { |
333 | 0 | lastReportedMillis = curMillis; |
334 | 0 | if (LOG.isInfoEnabled()) { |
335 | 0 | float percentDone = |
336 | |
(100f * verticesDone) / |
337 | |
vertices; |
338 | 0 | float minutesUsed = |
339 | |
(curMillis - startMillis) / 1000f / 60f; |
340 | 0 | float minutesRemaining = |
341 | |
(minutesUsed * 100f / percentDone) - |
342 | |
minutesUsed; |
343 | 0 | LOG.info("run: " + peerConnection + ", " + |
344 | |
verticesDone + " out of " + |
345 | |
vertices + |
346 | |
" done in " + minutesUsed + |
347 | |
" minutes, " + |
348 | |
percentDone + "% done, ETA " + |
349 | |
minutesRemaining + |
350 | |
" minutes remaining, " + |
351 | |
MemoryUtils.getRuntimeMemoryStats()); |
352 | |
} |
353 | |
} |
354 | 136 | } |
355 | 412 | } |
356 | |
|
357 | 412 | if (LOG.isDebugEnabled()) { |
358 | 0 | LOG.debug("run: " + proxy.getName() + |
359 | |
": all messages flushed"); |
360 | |
} |
361 | 0 | } catch (IOException e) { |
362 | 0 | LOG.error(e); |
363 | 0 | if (peerConnection.isProxy) { |
364 | 0 | RPC.stopProxy(peerConnection.peer); |
365 | |
} |
366 | 0 | throw new RuntimeException(e); |
367 | 412 | } |
368 | 412 | } |
369 | |
} |
370 | |
|
371 | |
|
372 | |
|
373 | |
|
374 | |
|
375 | |
|
376 | |
private class LargeMessageFlushExecutor implements Runnable { |
377 | |
|
378 | |
private final I destVertex; |
379 | |
|
380 | |
private final MsgList<M> outMessageList; |
381 | |
|
382 | |
private PeerConnection peerConnection; |
383 | |
|
384 | |
|
385 | |
|
386 | |
|
387 | |
|
388 | |
|
389 | |
|
390 | 0 | LargeMessageFlushExecutor(PeerConnection peerConnection, I destVertex) { |
391 | 0 | this.peerConnection = peerConnection; |
392 | 0 | synchronized (peerConnection.outMessagesPerPeer) { |
393 | 0 | this.destVertex = destVertex; |
394 | 0 | outMessageList = |
395 | |
peerConnection.outMessagesPerPeer.get(destVertex); |
396 | 0 | peerConnection.outMessagesPerPeer.remove(destVertex); |
397 | 0 | } |
398 | 0 | } |
399 | |
|
400 | |
@Override |
401 | |
public void run() { |
402 | |
try { |
403 | 0 | CommunicationsInterface<I, V, E, M> proxy = |
404 | |
peerConnection.getRPCProxy(); |
405 | |
|
406 | 0 | if (combiner != null) { |
407 | 0 | Iterable<M> messages = combiner.combine(destVertex, |
408 | |
outMessageList); |
409 | 0 | if (messages == null) { |
410 | 0 | throw new IllegalStateException( |
411 | |
"run: Combiner cannot return null"); |
412 | |
} |
413 | 0 | if (Iterables.size(outMessageList) < |
414 | |
Iterables.size(messages)) { |
415 | 0 | throw new IllegalStateException( |
416 | |
"run: The number of combined messages is " + |
417 | |
"required to be <= to the number of " + |
418 | |
"messages to be combined"); |
419 | |
} |
420 | 0 | for (M msg: messages) { |
421 | 0 | proxy.putMsg(destVertex, msg); |
422 | |
} |
423 | 0 | } else { |
424 | 0 | proxy.putMsgList(destVertex, outMessageList); |
425 | |
} |
426 | 0 | } catch (IOException e) { |
427 | 0 | LOG.error(e); |
428 | 0 | if (peerConnection.isProxy) { |
429 | 0 | RPC.stopProxy(peerConnection.peer); |
430 | |
} |
431 | 0 | throw new RuntimeException("run: Got IOException", e); |
432 | |
} finally { |
433 | 0 | outMessageList.clear(); |
434 | 0 | } |
435 | 0 | } |
436 | |
} |
437 | |
|
438 | |
|
439 | |
|
440 | |
|
441 | |
|
442 | |
|
443 | |
|
444 | |
|
445 | |
|
446 | |
|
447 | |
public BasicRPCCommunications(Mapper<?, ?, ?, ?>.Context context, |
448 | |
CentralizedServiceWorker<I, V, E, M> service) |
449 | 26 | throws IOException, InterruptedException { |
450 | 26 | this.service = service; |
451 | 26 | this.context = context; |
452 | 26 | this.conf = context.getConfiguration(); |
453 | 26 | this.maxSize = conf.getInt(GiraphJob.MSG_SIZE, |
454 | |
GiraphJob.MSG_SIZE_DEFAULT); |
455 | 26 | this.maxMessagesPerFlushPut = |
456 | |
conf.getInt(GiraphJob.MAX_MESSAGES_PER_FLUSH_PUT, |
457 | |
GiraphJob.DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT); |
458 | 26 | if (BspUtils.getVertexCombinerClass(conf) == null) { |
459 | 23 | this.combiner = null; |
460 | |
} else { |
461 | 3 | this.combiner = BspUtils.createVertexCombiner(conf); |
462 | |
} |
463 | |
|
464 | 26 | this.localHostname = InetAddress.getLocalHost().getHostName(); |
465 | 26 | int taskId = conf.getInt("mapred.task.partition", -1); |
466 | 26 | int numTasks = conf.getInt("mapred.map.tasks", 1); |
467 | |
|
468 | |
|
469 | |
|
470 | 26 | int numHandlers = conf.getInt(GiraphJob.RPC_NUM_HANDLERS, |
471 | |
GiraphJob.RPC_NUM_HANDLERS_DEFAULT); |
472 | 26 | if (numTasks < numHandlers) { |
473 | 26 | numHandlers = numTasks; |
474 | |
} |
475 | 26 | this.jobToken = createJobToken(); |
476 | 26 | this.jobId = context.getJobID().toString(); |
477 | |
|
478 | 26 | int numWorkers = conf.getInt(GiraphJob.MAX_WORKERS, numTasks); |
479 | |
|
480 | |
|
481 | 26 | int numFlushThreads = |
482 | |
Math.max(conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS, |
483 | |
numWorkers - 1), |
484 | |
1); |
485 | 26 | this.executor = Executors.newFixedThreadPool(numFlushThreads); |
486 | |
|
487 | |
|
488 | |
|
489 | |
|
490 | |
|
491 | 26 | int portIncrementConstant = |
492 | |
(int) Math.pow(10, Math.ceil(Math.log10(numWorkers))); |
493 | 26 | String bindAddress = localHostname; |
494 | 26 | int bindPort = conf.getInt(GiraphJob.RPC_INITIAL_PORT, |
495 | |
GiraphJob.RPC_INITIAL_PORT_DEFAULT) + |
496 | |
taskId; |
497 | 26 | int bindAttempts = 0; |
498 | 26 | final int maxRpcPortBindAttempts = |
499 | |
conf.getInt(GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS, |
500 | |
GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT); |
501 | 26 | final boolean failFirstPortBindingAttempt = |
502 | |
conf.getBoolean(GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT, |
503 | |
GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT); |
504 | 30 | while (bindAttempts < maxRpcPortBindAttempts) { |
505 | 30 | this.myAddress = new InetSocketAddress(bindAddress, bindPort); |
506 | 30 | if (failFirstPortBindingAttempt && bindAttempts == 0) { |
507 | 1 | LOG.info("BasicRPCCommunications: Intentionally fail first " + |
508 | |
"binding attempt as giraph.failFirstRpcPortBindAttempt " + |
509 | |
"is true, port " + bindPort); |
510 | 1 | ++bindAttempts; |
511 | 1 | bindPort += portIncrementConstant; |
512 | 1 | continue; |
513 | |
} |
514 | |
|
515 | |
try { |
516 | 29 | this.server = |
517 | |
getRPCServer( |
518 | |
myAddress, numHandlers, this.jobId, this.jobToken); |
519 | 26 | break; |
520 | 3 | } catch (BindException e) { |
521 | 3 | LOG.info("BasicRPCCommunications: Failed to bind with port " + |
522 | |
bindPort + " on bind attempt " + bindAttempts); |
523 | 3 | ++bindAttempts; |
524 | 3 | bindPort += portIncrementConstant; |
525 | 3 | } |
526 | |
} |
527 | 26 | if (bindAttempts == maxRpcPortBindAttempts || this.server == null) { |
528 | 0 | throw new IllegalStateException( |
529 | |
"BasicRPCCommunications: Failed to start RPCServer with " + |
530 | |
maxRpcPortBindAttempts + " attempts"); |
531 | |
} |
532 | |
|
533 | 26 | this.server.start(); |
534 | 26 | this.myName = myAddress.toString(); |
535 | |
|
536 | 26 | if (LOG.isInfoEnabled()) { |
537 | 26 | LOG.info("BasicRPCCommunications: Started RPC " + |
538 | |
"communication server: " + myName + " with " + |
539 | |
numHandlers + " handlers and " + numFlushThreads + |
540 | |
" flush threads on bind attempt " + bindAttempts); |
541 | |
} |
542 | 26 | } |
543 | |
|
544 | |
|
545 | |
|
546 | |
|
547 | |
|
548 | |
|
549 | |
|
550 | |
private void submitLargeMessageSend(InetSocketAddress addr, I destVertex) { |
551 | 0 | PeerConnection pc = peerConnections.get(addr); |
552 | 0 | executor.execute(new LargeMessageFlushExecutor(pc, destVertex)); |
553 | 0 | } |
554 | |
|
555 | |
|
556 | |
|
557 | |
|
558 | |
|
559 | |
|
560 | |
|
561 | |
protected abstract J createJobToken() throws IOException; |
562 | |
|
563 | |
|
564 | |
|
565 | |
|
566 | |
|
567 | |
|
568 | |
|
569 | |
|
570 | |
|
571 | |
|
572 | |
protected abstract Server getRPCServer(InetSocketAddress addr, |
573 | |
int numHandlers, String jobId, J jobToken) throws IOException; |
574 | |
|
575 | |
|
576 | |
|
577 | |
|
578 | |
|
579 | |
|
580 | |
public int getPort() { |
581 | 26 | return myAddress.getPort(); |
582 | |
} |
583 | |
|
584 | |
@Override |
585 | |
public void setup() { |
586 | |
try { |
587 | 23 | connectAllRPCProxys(this.jobId, this.jobToken); |
588 | 0 | } catch (IOException e) { |
589 | 0 | throw new IllegalStateException("setup: Got IOException", e); |
590 | 0 | } catch (InterruptedException e) { |
591 | 0 | throw new IllegalStateException("setup: Got InterruptedException", |
592 | |
e); |
593 | 23 | } |
594 | 23 | } |
595 | |
|
596 | |
|
597 | |
|
598 | |
|
599 | |
|
600 | |
|
601 | |
|
602 | |
|
603 | |
|
604 | |
|
605 | |
|
606 | |
protected abstract CommunicationsInterface<I, V, E, M> getRPCProxy( |
607 | |
final InetSocketAddress addr, String jobId, J jobToken) |
608 | |
throws IOException, InterruptedException; |
609 | |
|
610 | |
|
611 | |
|
612 | |
|
613 | |
|
614 | |
|
615 | |
|
616 | |
|
617 | |
|
618 | |
|
619 | |
private void connectAllRPCProxys(String jobId, J jobToken) |
620 | |
throws IOException, InterruptedException { |
621 | 206 | final int maxTries = 5; |
622 | 206 | for (PartitionOwner partitionOwner : service.getPartitionOwners()) { |
623 | 206 | int tries = 0; |
624 | 206 | while (tries < maxTries) { |
625 | |
try { |
626 | 206 | startPeerConnectionThread( |
627 | |
partitionOwner.getWorkerInfo(), jobId, jobToken); |
628 | 206 | break; |
629 | 0 | } catch (IOException e) { |
630 | 0 | LOG.warn("connectAllRPCProxys: Failed on attempt " + |
631 | |
tries + " of " + maxTries + |
632 | |
" to connect to " + partitionOwner.toString(), e); |
633 | 0 | ++tries; |
634 | 0 | } |
635 | |
} |
636 | 206 | } |
637 | 206 | } |
638 | |
|
639 | |
|
640 | |
|
641 | |
|
642 | |
|
643 | |
|
644 | |
|
645 | |
|
646 | |
|
647 | |
|
648 | |
|
649 | |
private void startPeerConnectionThread(WorkerInfo workerInfo, |
650 | |
String jobId, |
651 | |
J jobToken) throws IOException, InterruptedException { |
652 | 206 | if (LOG.isDebugEnabled()) { |
653 | 0 | LOG.debug("startPeerConnectionThread: hostname " + |
654 | |
workerInfo.getHostname() + ", port " + |
655 | |
workerInfo.getPort()); |
656 | |
} |
657 | 206 | final InetSocketAddress addr = |
658 | |
new InetSocketAddress(workerInfo.getHostname(), |
659 | |
workerInfo.getPort()); |
660 | |
|
661 | |
|
662 | 206 | InetSocketAddress addrUnresolved = |
663 | |
InetSocketAddress.createUnresolved(addr.getHostName(), |
664 | |
addr.getPort()); |
665 | 206 | Map<I, MsgList<M>> outMsgMap = null; |
666 | 206 | boolean isProxy = true; |
667 | 206 | CommunicationsInterface<I, V, E, M> peer = this; |
668 | 206 | synchronized (outMessages) { |
669 | 206 | outMsgMap = outMessages.get(addrUnresolved); |
670 | 206 | if (LOG.isDebugEnabled()) { |
671 | 0 | LOG.debug("startPeerConnectionThread: Connecting to " + |
672 | |
workerInfo.toString() + ", addr = " + addr + |
673 | |
" if outMsgMap (" + outMsgMap + ") == null "); |
674 | |
} |
675 | 206 | if (outMsgMap != null) { |
676 | 183 | return; |
677 | |
} |
678 | |
|
679 | 23 | if (myName.equals(addr.toString())) { |
680 | 23 | isProxy = false; |
681 | |
} else { |
682 | 0 | peer = getRPCProxy(addr, jobId, jobToken); |
683 | |
} |
684 | |
|
685 | 23 | outMsgMap = new HashMap<I, MsgList<M>>(); |
686 | 23 | outMessages.put(addrUnresolved, outMsgMap); |
687 | 23 | } |
688 | |
|
689 | 23 | PeerConnection peerConnection = |
690 | |
new PeerConnection(outMsgMap, peer, isProxy); |
691 | 23 | peerConnections.put(addrUnresolved, peerConnection); |
692 | 23 | } |
693 | |
|
694 | |
@Override |
695 | |
public final long getProtocolVersion(String protocol, long clientVersion) |
696 | |
throws IOException { |
697 | 0 | return VERSION_ID; |
698 | |
} |
699 | |
|
700 | |
|
701 | |
|
702 | |
|
703 | |
|
704 | |
|
705 | |
|
706 | |
|
707 | |
|
708 | |
|
709 | |
|
710 | |
|
711 | |
|
712 | |
|
713 | |
|
714 | |
|
715 | |
|
716 | |
|
717 | |
|
718 | |
|
719 | |
|
720 | |
@Override |
721 | |
public void closeConnections() throws IOException { |
722 | 23 | for (PeerConnection pc : peerConnections.values()) { |
723 | 23 | pc.close(); |
724 | |
} |
725 | 23 | } |
726 | |
|
727 | |
|
728 | |
@Override |
729 | |
public final void close() { |
730 | 23 | LOG.info("close: shutting down RPC server"); |
731 | 23 | server.stop(); |
732 | 23 | } |
733 | |
|
734 | |
@Override |
735 | |
public final void putMsg(I vertex, M msg) throws IOException { |
736 | 45 | List<M> msgs = null; |
737 | 45 | if (LOG.isDebugEnabled()) { |
738 | 0 | LOG.debug("putMsg: Adding msg " + msg + " on vertex " + vertex); |
739 | |
} |
740 | 45 | if (inPrepareSuperstep) { |
741 | |
|
742 | 45 | msgs = inMessages.get(vertex); |
743 | 45 | if (msgs == null) { |
744 | 45 | msgs = new ArrayList<M>(); |
745 | 45 | inMessages.put(vertex, msgs); |
746 | |
} |
747 | 45 | msgs.add(msg); |
748 | |
} else { |
749 | 0 | synchronized (transientInMessages) { |
750 | 0 | msgs = transientInMessages.get(vertex); |
751 | 0 | if (msgs == null) { |
752 | 0 | msgs = new ArrayList<M>(); |
753 | 0 | transientInMessages.put(vertex, msgs); |
754 | |
} |
755 | 0 | } |
756 | 0 | synchronized (msgs) { |
757 | 0 | msgs.add(msg); |
758 | 0 | } |
759 | |
} |
760 | 45 | } |
761 | |
|
762 | |
@Override |
763 | |
public final void putMsgList(I vertex, |
764 | |
MsgList<M> msgList) throws IOException { |
765 | 0 | List<M> msgs = null; |
766 | 0 | if (LOG.isDebugEnabled()) { |
767 | 0 | LOG.debug("putMsgList: Adding msgList " + msgList + |
768 | |
" on vertex " + vertex); |
769 | |
} |
770 | 0 | synchronized (transientInMessages) { |
771 | 0 | msgs = transientInMessages.get(vertex); |
772 | 0 | if (msgs == null) { |
773 | 0 | msgs = new ArrayList<M>(msgList.size()); |
774 | 0 | transientInMessages.put(vertex, msgs); |
775 | |
} |
776 | 0 | } |
777 | 0 | synchronized (msgs) { |
778 | 0 | msgs.addAll(msgList); |
779 | 0 | } |
780 | 0 | } |
781 | |
|
782 | |
@Override |
783 | |
public final void putVertexIdMessagesList( |
784 | |
VertexIdMessagesList<I, M> vertexIdMessagesList) |
785 | |
throws IOException { |
786 | 136 | if (LOG.isDebugEnabled()) { |
787 | 0 | LOG.debug("putVertexIdMessagesList: Adding msgList " + |
788 | |
vertexIdMessagesList); |
789 | |
} |
790 | |
|
791 | 136 | List<M> messageList = null; |
792 | 136 | for (VertexIdMessages<I, M> vertexIdMessages : vertexIdMessagesList) { |
793 | 1536 | synchronized (transientInMessages) { |
794 | 1536 | messageList = |
795 | |
transientInMessages.get(vertexIdMessages.getVertexId()); |
796 | 1536 | if (messageList == null) { |
797 | 1536 | messageList = new ArrayList<M>( |
798 | |
vertexIdMessages.getMessageList().size()); |
799 | 1536 | transientInMessages.put( |
800 | |
vertexIdMessages.getVertexId(), messageList); |
801 | |
} |
802 | 1536 | } |
803 | 1536 | synchronized (messageList) { |
804 | 1536 | messageList.addAll(vertexIdMessages.getMessageList()); |
805 | 1536 | } |
806 | |
} |
807 | 136 | } |
808 | |
|
809 | |
@Override |
810 | |
public final void putVertexList(int partitionId, |
811 | |
VertexList<I, V, E, M> vertexList) throws IOException { |
812 | 22 | if (LOG.isDebugEnabled()) { |
813 | 0 | LOG.debug("putVertexList: On partition id " + partitionId + |
814 | |
" adding vertex list of size " + vertexList.size()); |
815 | |
} |
816 | 22 | service.getPartitionStore().addPartitionVertices(partitionId, vertexList); |
817 | 22 | } |
818 | |
|
819 | |
@Override |
820 | |
public final void addEdge(I sourceVertexId, I targetVertexId, E edgeValue) { |
821 | 10 | Edge<I, E> edge = new Edge<I, E>(targetVertexId, edgeValue); |
822 | 10 | if (LOG.isDebugEnabled()) { |
823 | 0 | LOG.debug("addEdge: Adding edge " + edge); |
824 | |
} |
825 | 10 | synchronized (inVertexMutationsMap) { |
826 | 10 | VertexMutations<I, V, E, M> vertexMutations = null; |
827 | 10 | if (!inVertexMutationsMap.containsKey(sourceVertexId)) { |
828 | 0 | vertexMutations = new VertexMutations<I, V, E, M>(); |
829 | 0 | inVertexMutationsMap.put(sourceVertexId, vertexMutations); |
830 | |
} else { |
831 | 10 | vertexMutations = inVertexMutationsMap.get(sourceVertexId); |
832 | |
} |
833 | 10 | vertexMutations.addEdge(edge); |
834 | 10 | } |
835 | 10 | } |
836 | |
|
837 | |
@Override |
838 | |
public void removeEdge(I vertexIndex, I destinationVertexIndex) { |
839 | 20 | if (LOG.isDebugEnabled()) { |
840 | 0 | LOG.debug("removeEdge: Removing edge on destination " + |
841 | |
destinationVertexIndex); |
842 | |
} |
843 | 20 | synchronized (inVertexMutationsMap) { |
844 | 20 | VertexMutations<I, V, E, M> vertexMutations = null; |
845 | 20 | if (!inVertexMutationsMap.containsKey(vertexIndex)) { |
846 | 20 | vertexMutations = new VertexMutations<I, V, E, M>(); |
847 | 20 | inVertexMutationsMap.put(vertexIndex, vertexMutations); |
848 | |
} else { |
849 | 0 | vertexMutations = inVertexMutationsMap.get(vertexIndex); |
850 | |
} |
851 | 20 | vertexMutations.removeEdge(destinationVertexIndex); |
852 | 20 | } |
853 | 20 | } |
854 | |
|
855 | |
@Override |
856 | |
public final void addVertex(Vertex<I, V, E, M> vertex) { |
857 | 10 | if (LOG.isDebugEnabled()) { |
858 | 0 | LOG.debug("addVertex: Adding vertex " + vertex); |
859 | |
} |
860 | 10 | synchronized (inVertexMutationsMap) { |
861 | 10 | VertexMutations<I, V, E, M> vertexMutations = null; |
862 | 10 | if (!inVertexMutationsMap.containsKey(vertex.getId())) { |
863 | 10 | vertexMutations = new VertexMutations<I, V, E, M>(); |
864 | 10 | inVertexMutationsMap.put(vertex.getId(), vertexMutations); |
865 | |
} else { |
866 | 0 | vertexMutations = inVertexMutationsMap.get(vertex.getId()); |
867 | |
} |
868 | 10 | vertexMutations.addVertex(vertex); |
869 | 10 | } |
870 | 10 | } |
871 | |
|
872 | |
@Override |
873 | |
public void removeVertex(I vertexIndex) { |
874 | 10 | if (LOG.isDebugEnabled()) { |
875 | 0 | LOG.debug("removeVertex: Removing vertex " + vertexIndex); |
876 | |
} |
877 | 10 | synchronized (inVertexMutationsMap) { |
878 | 10 | VertexMutations<I, V, E, M> vertexMutations = null; |
879 | 10 | if (!inVertexMutationsMap.containsKey(vertexIndex)) { |
880 | 10 | vertexMutations = new VertexMutations<I, V, E, M>(); |
881 | 10 | inVertexMutationsMap.put(vertexIndex, vertexMutations); |
882 | |
} else { |
883 | 0 | vertexMutations = inVertexMutationsMap.get(vertexIndex); |
884 | |
} |
885 | 10 | vertexMutations.removeVertex(); |
886 | 10 | } |
887 | 10 | } |
888 | |
|
889 | |
@Override |
890 | |
public final void sendPartitionRequest(WorkerInfo workerInfo, |
891 | |
Partition<I, V, E, M> partition) { |
892 | |
|
893 | |
|
894 | 22 | VertexList<I, V, E, M> hadoopVertexList = |
895 | |
new VertexList<I, V, E, M>(); |
896 | 22 | InetSocketAddress addr = |
897 | |
getInetSocketAddress(workerInfo, partition.getId()); |
898 | 22 | CommunicationsInterface<I, V, E, M> rpcProxy = |
899 | |
peerConnections.get(addr).getRPCProxy(); |
900 | |
|
901 | 22 | if (LOG.isDebugEnabled()) { |
902 | 0 | LOG.debug("sendPartitionRequest: Sending to " + rpcProxy.getName() + |
903 | |
" " + addr + " from " + workerInfo + |
904 | |
", with partition " + partition); |
905 | |
} |
906 | 22 | for (Vertex<I, V, E, M> vertex : partition.getVertices()) { |
907 | 418 | hadoopVertexList.add(vertex); |
908 | 418 | if (hadoopVertexList.size() >= MAX_VERTICES_PER_RPC) { |
909 | |
try { |
910 | 0 | rpcProxy.putVertexList(partition.getId(), |
911 | |
hadoopVertexList); |
912 | 0 | } catch (IOException e) { |
913 | 0 | throw new RuntimeException(e); |
914 | 0 | } |
915 | 0 | hadoopVertexList.clear(); |
916 | |
} |
917 | |
} |
918 | 22 | if (hadoopVertexList.size() > 0) { |
919 | |
try { |
920 | 22 | rpcProxy.putVertexList(partition.getId(), |
921 | |
hadoopVertexList); |
922 | 0 | } catch (IOException e) { |
923 | 0 | throw new RuntimeException(e); |
924 | 22 | } |
925 | |
} |
926 | 22 | } |
927 | |
|
928 | |
|
929 | |
|
930 | |
|
931 | |
|
932 | |
|
933 | |
|
934 | |
|
935 | |
private InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo, |
936 | |
int partitionId) { |
937 | 2741 | synchronized (partitionIndexAddressMap) { |
938 | 2741 | InetSocketAddress address = |
939 | |
partitionIndexAddressMap.get(partitionId); |
940 | 2741 | if (address == null) { |
941 | 22 | address = InetSocketAddress.createUnresolved( |
942 | |
workerInfo.getHostname(), |
943 | |
workerInfo.getPort()); |
944 | 22 | partitionIndexAddressMap.put(partitionId, address); |
945 | |
} |
946 | |
|
947 | 2741 | if (address.getPort() != workerInfo.getPort() || |
948 | |
!address.getHostName().equals(workerInfo.getHostname())) { |
949 | 0 | throw new IllegalStateException( |
950 | |
"getInetSocketAddress: Impossible that address " + |
951 | |
address + " does not match " + workerInfo); |
952 | |
} |
953 | |
|
954 | 2741 | return address; |
955 | 0 | } |
956 | |
} |
957 | |
|
958 | |
|
959 | |
|
960 | |
|
961 | |
|
962 | |
|
963 | |
|
964 | |
private InetSocketAddress getInetSocketAddress(I destVertex) { |
965 | 2719 | PartitionOwner partitionOwner = |
966 | |
service.getVertexPartitionOwner(destVertex); |
967 | 2719 | return getInetSocketAddress(partitionOwner.getWorkerInfo(), |
968 | |
partitionOwner.getPartitionId()); |
969 | |
} |
970 | |
|
971 | |
@Override |
972 | |
public final void sendMessageRequest(I destVertex, M msg) { |
973 | 2669 | InetSocketAddress addr = getInetSocketAddress(destVertex); |
974 | 2669 | if (LOG.isDebugEnabled()) { |
975 | 0 | LOG.debug("sendMessage: Send bytes (" + msg.toString() + |
976 | |
") to " + destVertex + " with address " + addr); |
977 | |
} |
978 | 2669 | ++totalMsgsSentInSuperstep; |
979 | 2669 | Map<I, MsgList<M>> msgMap = null; |
980 | 2669 | synchronized (outMessages) { |
981 | 2669 | msgMap = outMessages.get(addr); |
982 | 2669 | } |
983 | 2669 | if (msgMap == null) { |
984 | 0 | throw new RuntimeException( |
985 | |
"sendMessage: msgMap did not exist for " + addr + |
986 | |
" for vertex " + destVertex); |
987 | |
} |
988 | |
|
989 | 2669 | synchronized (msgMap) { |
990 | 2669 | MsgList<M> msgList = msgMap.get(destVertex); |
991 | 2669 | if (msgList == null) { |
992 | 1536 | msgList = new MsgList<M>(); |
993 | 1536 | msgMap.put(destVertex, msgList); |
994 | |
} |
995 | 2669 | msgList.add(msg); |
996 | 2669 | if (LOG.isDebugEnabled()) { |
997 | 0 | LOG.debug("sendMessage: added msg=" + msg + ", size=" + |
998 | |
msgList.size()); |
999 | |
} |
1000 | 2669 | if (msgList.size() > maxSize) { |
1001 | 0 | submitLargeMessageSend(addr, destVertex); |
1002 | |
} |
1003 | 2669 | } |
1004 | 2669 | } |
1005 | |
|
1006 | |
@Override |
1007 | |
public final void addEdgeRequest(I destVertex, Edge<I, E> edge) |
1008 | |
throws IOException { |
1009 | 10 | InetSocketAddress addr = getInetSocketAddress(destVertex); |
1010 | 10 | if (LOG.isDebugEnabled()) { |
1011 | 0 | LOG.debug("addEdgeRequest: Add edge (" + edge.toString() + ") to " + |
1012 | |
destVertex + " with address " + addr); |
1013 | |
} |
1014 | 10 | CommunicationsInterface<I, V, E, M> rpcProxy = |
1015 | |
peerConnections.get(addr).getRPCProxy(); |
1016 | 10 | rpcProxy.addEdge(destVertex, edge.getTargetVertexId(), edge.getValue()); |
1017 | 10 | } |
1018 | |
|
1019 | |
@Override |
1020 | |
public final void removeEdgeRequest(I vertexIndex, I destVertexIndex) |
1021 | |
throws IOException { |
1022 | 20 | InetSocketAddress addr = getInetSocketAddress(vertexIndex); |
1023 | 20 | if (LOG.isDebugEnabled()) { |
1024 | 0 | LOG.debug("removeEdgeRequest: remove edge (" + destVertexIndex + |
1025 | |
") from" + vertexIndex + " with address " + addr); |
1026 | |
} |
1027 | 20 | CommunicationsInterface<I, V, E, M> rpcProxy = |
1028 | |
peerConnections.get(addr).getRPCProxy(); |
1029 | 20 | rpcProxy.removeEdge(vertexIndex, destVertexIndex); |
1030 | 20 | } |
1031 | |
|
1032 | |
@Override |
1033 | |
public final void addVertexRequest(Vertex<I, V, E, M> vertex) |
1034 | |
throws IOException { |
1035 | 10 | InetSocketAddress addr = getInetSocketAddress(vertex.getId()); |
1036 | 10 | if (LOG.isDebugEnabled()) { |
1037 | 0 | LOG.debug("addVertexRequest: Add vertex (" + vertex + ") " + |
1038 | |
" with address " + addr); |
1039 | |
} |
1040 | 10 | CommunicationsInterface<I, V, E, M> rpcProxy = |
1041 | |
peerConnections.get(addr).getRPCProxy(); |
1042 | 10 | rpcProxy.addVertex(vertex); |
1043 | 10 | } |
1044 | |
|
1045 | |
@Override |
1046 | |
public void removeVertexRequest(I vertexIndex) throws IOException { |
1047 | 10 | InetSocketAddress addr = |
1048 | |
getInetSocketAddress(vertexIndex); |
1049 | 10 | if (LOG.isDebugEnabled()) { |
1050 | 0 | LOG.debug("removeVertexRequest: Remove vertex index (" + |
1051 | |
vertexIndex + ") with address " + addr); |
1052 | |
} |
1053 | 10 | CommunicationsInterface<I, V, E, M> rpcProxy = |
1054 | |
peerConnections.get(addr).getRPCProxy(); |
1055 | 10 | rpcProxy.removeVertex(vertexIndex); |
1056 | 10 | } |
1057 | |
|
1058 | |
@Override |
1059 | |
public void flush() throws IOException { |
1060 | 412 | if (LOG.isInfoEnabled()) { |
1061 | 412 | LOG.info("flush: starting for superstep " + |
1062 | |
service.getSuperstep() + " " + |
1063 | |
MemoryUtils.getRuntimeMemoryStats()); |
1064 | |
} |
1065 | 412 | for (List<M> msgList : inMessages.values()) { |
1066 | 5 | msgList.clear(); |
1067 | |
} |
1068 | 412 | inMessages.clear(); |
1069 | |
|
1070 | 412 | Collection<Future<?>> futures = new ArrayList<Future<?>>(); |
1071 | |
|
1072 | |
|
1073 | 412 | List<PeerConnection> peerList = |
1074 | |
new ArrayList<PeerConnection>(peerConnections.values()); |
1075 | 412 | Collections.shuffle(peerList); |
1076 | |
|
1077 | 412 | for (PeerConnection pc : peerList) { |
1078 | 412 | futures.add(executor.submit(new PeerFlushExecutor(pc, context))); |
1079 | |
} |
1080 | |
|
1081 | |
|
1082 | 412 | for (Future<?> future : futures) { |
1083 | |
try { |
1084 | 412 | future.get(); |
1085 | 412 | context.progress(); |
1086 | 0 | } catch (InterruptedException e) { |
1087 | 0 | throw new IllegalStateException("flush: Got IOException", e); |
1088 | 0 | } catch (ExecutionException e) { |
1089 | 0 | throw new IllegalStateException( |
1090 | |
"flush: Got ExecutionException", e); |
1091 | 412 | } |
1092 | |
} |
1093 | |
|
1094 | 412 | if (LOG.isInfoEnabled()) { |
1095 | 412 | LOG.info("flush: ended for superstep " + |
1096 | |
service.getSuperstep() + " " + |
1097 | |
MemoryUtils.getRuntimeMemoryStats()); |
1098 | |
} |
1099 | 412 | } |
1100 | |
|
1101 | |
@Override |
1102 | |
public long resetMessageCount() { |
1103 | 206 | long msgs = totalMsgsSentInSuperstep; |
1104 | 206 | totalMsgsSentInSuperstep = 0; |
1105 | 206 | return msgs; |
1106 | |
} |
1107 | |
|
1108 | |
@Override |
1109 | |
public void prepareSuperstep() { |
1110 | 183 | if (LOG.isInfoEnabled()) { |
1111 | 183 | LOG.info("prepareSuperstep: Superstep " + |
1112 | |
service.getSuperstep() + " " + |
1113 | |
MemoryUtils.getRuntimeMemoryStats()); |
1114 | |
} |
1115 | 183 | inPrepareSuperstep = true; |
1116 | |
|
1117 | |
|
1118 | 183 | synchronized (transientInMessages) { |
1119 | 183 | for (Entry<I, List<M>> entry : transientInMessages.entrySet()) { |
1120 | 1536 | if (combiner != null) { |
1121 | |
try { |
1122 | 45 | Iterable<M> messages = |
1123 | |
combiner.combine(entry.getKey(), |
1124 | |
entry.getValue()); |
1125 | 45 | if (messages == null) { |
1126 | 0 | throw new IllegalStateException( |
1127 | |
"prepareSuperstep: Combiner cannot " + |
1128 | |
"return null"); |
1129 | |
} |
1130 | 45 | if (Iterables.size(entry.getValue()) < |
1131 | |
Iterables.size(messages)) { |
1132 | 0 | throw new IllegalStateException( |
1133 | |
"prepareSuperstep: The number of " + |
1134 | |
"combined messages is " + |
1135 | |
"required to be <= to the number of " + |
1136 | |
"messages to be combined"); |
1137 | |
} |
1138 | 45 | for (M msg: messages) { |
1139 | 45 | putMsg(entry.getKey(), msg); |
1140 | |
} |
1141 | 0 | } catch (IOException e) { |
1142 | |
|
1143 | 0 | throw new RuntimeException(e); |
1144 | 45 | } |
1145 | |
} else { |
1146 | 1491 | List<M> msgs = inMessages.get(entry.getKey()); |
1147 | 1491 | if (msgs == null) { |
1148 | 1491 | msgs = new ArrayList<M>(); |
1149 | 1491 | inMessages.put(entry.getKey(), msgs); |
1150 | |
} |
1151 | 1491 | msgs.addAll(entry.getValue()); |
1152 | |
} |
1153 | 1536 | entry.getValue().clear(); |
1154 | |
} |
1155 | 183 | transientInMessages.clear(); |
1156 | 183 | } |
1157 | |
|
1158 | 183 | if (inMessages.size() > 0) { |
1159 | |
|
1160 | |
|
1161 | |
for (Partition<I, V, E, M> partition : |
1162 | 136 | service.getPartitionStore().getPartitions()) { |
1163 | 136 | for (Vertex<I, V, E, M> vertex : partition.getVertices()) { |
1164 | 1685 | List<M> msgList = inMessages.get(vertex.getId()); |
1165 | 1685 | if (msgList != null) { |
1166 | 1531 | if (LOG.isDebugEnabled()) { |
1167 | 0 | LOG.debug("prepareSuperstep: Assigning " + |
1168 | |
msgList.size() + |
1169 | |
" mgs to vertex index " + vertex); |
1170 | |
} |
1171 | 1531 | for (M msg : msgList) { |
1172 | 2624 | if (msg == null) { |
1173 | 0 | LOG.warn("prepareSuperstep: Null message " + |
1174 | |
"in inMessages"); |
1175 | |
} |
1176 | |
} |
1177 | 1531 | service.assignMessagesToVertex(vertex, msgList); |
1178 | 1531 | msgList.clear(); |
1179 | 1531 | if (inMessages.remove(vertex.getId()) == null) { |
1180 | 0 | throw new IllegalStateException( |
1181 | |
"prepareSuperstep: Impossible to not remove " + |
1182 | |
vertex); |
1183 | |
} |
1184 | |
} |
1185 | 1685 | } |
1186 | |
} |
1187 | |
} |
1188 | |
|
1189 | 183 | inPrepareSuperstep = false; |
1190 | |
|
1191 | |
|
1192 | |
|
1193 | |
|
1194 | 183 | Set<I> resolveVertexIndexSet = new TreeSet<I>(); |
1195 | 183 | if (inMessages.size() > 0) { |
1196 | 1 | for (Entry<I, List<M>> entry : inMessages.entrySet()) { |
1197 | 5 | if (service.getPartition(entry.getKey()) == null) { |
1198 | 0 | throw new IllegalStateException( |
1199 | |
"prepareSuperstep: Impossible that this worker " + |
1200 | |
service.getWorkerInfo() + " was sent " + |
1201 | |
entry.getValue().size() + " message(s) with " + |
1202 | |
"vertex id " + entry.getKey() + |
1203 | |
" when it does not own this partition. It should " + |
1204 | |
"have gone to partition owner " + |
1205 | |
service.getVertexPartitionOwner(entry.getKey()) + |
1206 | |
". The partition owners are " + |
1207 | |
service.getPartitionOwners()); |
1208 | |
} |
1209 | 5 | resolveVertexIndexSet.add(entry.getKey()); |
1210 | |
} |
1211 | |
} |
1212 | 183 | synchronized (inVertexMutationsMap) { |
1213 | 183 | for (I vertexIndex : inVertexMutationsMap.keySet()) { |
1214 | 40 | resolveVertexIndexSet.add(vertexIndex); |
1215 | |
} |
1216 | 183 | } |
1217 | |
|
1218 | |
|
1219 | 183 | for (I vertexIndex : resolveVertexIndexSet) { |
1220 | 45 | VertexResolver<I, V, E, M> vertexResolver = |
1221 | |
BspUtils.createVertexResolver( |
1222 | |
conf, service.getGraphMapper().getGraphState()); |
1223 | 45 | Vertex<I, V, E, M> originalVertex = |
1224 | |
service.getVertex(vertexIndex); |
1225 | 45 | Iterable<M> messages = inMessages.get(vertexIndex); |
1226 | 45 | if (originalVertex != null) { |
1227 | 20 | messages = originalVertex.getMessages(); |
1228 | |
} |
1229 | 45 | VertexMutations<I, V, E, M> vertexMutations = |
1230 | |
inVertexMutationsMap.get(vertexIndex); |
1231 | 45 | boolean receivedMessages = |
1232 | |
messages != null && !Iterables.isEmpty(messages); |
1233 | 45 | Vertex<I, V, E, M> vertex = |
1234 | |
vertexResolver.resolve(vertexIndex, |
1235 | |
originalVertex, |
1236 | |
vertexMutations, |
1237 | |
receivedMessages); |
1238 | 45 | if (vertex != null && receivedMessages) { |
1239 | 5 | service.assignMessagesToVertex(vertex, messages); |
1240 | |
} |
1241 | 45 | if (LOG.isDebugEnabled()) { |
1242 | 0 | LOG.debug("prepareSuperstep: Resolved vertex index " + |
1243 | |
vertexIndex + " with original vertex " + |
1244 | |
originalVertex + ", returned vertex " + vertex + |
1245 | |
" on superstep " + service.getSuperstep() + |
1246 | |
" with mutations " + |
1247 | |
vertexMutations); |
1248 | |
} |
1249 | |
|
1250 | 45 | Partition<I, V, E, M> partition = |
1251 | |
service.getPartition(vertexIndex); |
1252 | 45 | if (partition == null) { |
1253 | 0 | throw new IllegalStateException( |
1254 | |
"prepareSuperstep: No partition for index " + vertexIndex + |
1255 | |
" in " + service.getPartitionStore() + " should have been " + |
1256 | |
service.getVertexPartitionOwner(vertexIndex)); |
1257 | |
} |
1258 | 45 | if (vertex != null) { |
1259 | 25 | partition.putVertex(vertex); |
1260 | 20 | } else if (originalVertex != null) { |
1261 | 10 | partition.removeVertex(originalVertex.getId()); |
1262 | |
} |
1263 | 45 | } |
1264 | 183 | synchronized (inVertexMutationsMap) { |
1265 | 183 | inVertexMutationsMap.clear(); |
1266 | 183 | } |
1267 | 183 | } |
1268 | |
|
1269 | |
@Override |
1270 | |
public void fixPartitionIdToSocketAddrMap() { |
1271 | |
|
1272 | |
|
1273 | 183 | synchronized (partitionIndexAddressMap) { |
1274 | 183 | for (PartitionOwner partitionOwner : service.getPartitionOwners()) { |
1275 | 183 | InetSocketAddress address = |
1276 | |
partitionIndexAddressMap.get( |
1277 | |
partitionOwner.getPartitionId()); |
1278 | 183 | if (address != null && |
1279 | |
(!address.getHostName().equals( |
1280 | |
partitionOwner.getWorkerInfo().getHostname()) || |
1281 | |
address.getPort() != |
1282 | |
partitionOwner.getWorkerInfo().getPort())) { |
1283 | 0 | if (LOG.isInfoEnabled()) { |
1284 | 0 | LOG.info("fixPartitionIdToSocketAddrMap: " + |
1285 | |
"Partition owner " + |
1286 | |
partitionOwner + " changed from " + |
1287 | |
address); |
1288 | |
} |
1289 | 0 | partitionIndexAddressMap.remove( |
1290 | |
partitionOwner.getPartitionId()); |
1291 | |
} |
1292 | 183 | } |
1293 | 183 | } |
1294 | |
try { |
1295 | 183 | connectAllRPCProxys(this.jobId, this.jobToken); |
1296 | 0 | } catch (InterruptedException e) { |
1297 | 0 | throw new RuntimeException(e); |
1298 | 0 | } catch (IOException e) { |
1299 | 0 | throw new RuntimeException(e); |
1300 | 183 | } |
1301 | 183 | } |
1302 | |
|
1303 | |
@Override |
1304 | |
public String getName() { |
1305 | 0 | return myName; |
1306 | |
} |
1307 | |
} |