1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.library;
19
20 import java.util.ArrayList;
21 import java.util.Iterator;
22 import java.util.List;
23
24 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
25 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
26 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
27 import org.apache.giraph.block_app.framework.api.CreateReducersApi;
28 import org.apache.giraph.block_app.framework.piece.Piece;
29 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerAndBroadcastWrapperHandle;
30 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
31 import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
32 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
33 import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
34 import org.apache.giraph.block_app.library.internal.SendMessagePiece;
35 import org.apache.giraph.block_app.library.internal.SendMessageWithCombinerPiece;
36 import org.apache.giraph.block_app.reducers.array.ArrayOfHandles;
37 import org.apache.giraph.combiner.MessageCombiner;
38 import org.apache.giraph.function.Consumer;
39 import org.apache.giraph.function.PairConsumer;
40 import org.apache.giraph.function.Supplier;
41 import org.apache.giraph.function.vertex.ConsumerWithVertex;
42 import org.apache.giraph.function.vertex.SupplierFromVertex;
43 import org.apache.giraph.graph.Vertex;
44 import org.apache.giraph.reducers.ReduceOperation;
45 import org.apache.giraph.reducers.impl.SumReduce;
46 import org.apache.giraph.types.NoMessage;
47 import org.apache.hadoop.io.LongWritable;
48 import org.apache.hadoop.io.Writable;
49 import org.apache.hadoop.io.WritableComparable;
50 import org.apache.log4j.Logger;
51
52
53
54
55
56 public class Pieces {
57 private static final Logger LOG = Logger.getLogger(Pieces.class);
58
59 private Pieces() { }
60
61
62
63
64
65 public static
66 <I extends WritableComparable, V extends Writable, E extends Writable>
67 Piece<I, V, E, NoMessage, Object> forAllVertices(
68 final String pieceName, final Consumer<Vertex<I, V, E>> process) {
69 return new Piece<I, V, E, NoMessage, Object>() {
70 @Override
71 public VertexSender<I, V, E> getVertexSender(
72 BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
73 Object executionStage) {
74 return new InnerVertexSender() {
75 @Override
76 public void vertexSend(Vertex<I, V, E> vertex) {
77 process.apply(vertex);
78 }
79 };
80 }
81
82 @Override
83 public String toString() {
84 return pieceName;
85 }
86 };
87 }
88
89
90
91
92 public static
93 Piece<WritableComparable, Writable, Writable, NoMessage,
94 Object> masterCompute(
95 final String pieceName, final Consumer<BlockMasterApi> process) {
96 return new Piece<WritableComparable, Writable, Writable, NoMessage,
97 Object>() {
98 @Override
99 public void masterCompute(
100 BlockMasterApi masterApi, Object executionStage) {
101 process.apply(masterApi);
102 }
103 };
104 }
105
106
107
108
109
110
111
112
113 public static
114 <I extends WritableComparable, V extends Writable, E extends Writable>
115 Piece<I, V, E, NoMessage, Object> forAllVerticesOnReceive(
116 final String pieceName, final Consumer<Vertex<I, V, E>> process) {
117 return new Piece<I, V, E, NoMessage, Object>() {
118 @Override
119 public VertexReceiver<I, V, E, NoMessage> getVertexReceiver(
120 BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
121 return new InnerVertexReceiver() {
122 @Override
123 public void vertexReceive(
124 Vertex<I, V, E> vertex, Iterable<NoMessage> messages) {
125 process.apply(vertex);
126 }
127 };
128 }
129
130 @Override
131 public String toString() {
132 return pieceName;
133 }
134 };
135 }
136
137
138
139
140 public static
141 <I extends WritableComparable, V extends Writable, E extends Writable>
142 Piece<I, V, E, NoMessage, Object> removeVertices(
143 final String pieceName,
144 final SupplierFromVertex<I, V, E, Boolean> shouldRemoveVertex) {
145 return new Piece<I, V, E, NoMessage, Object>() {
146 private ReducerHandle<LongWritable, LongWritable> countRemovedAgg;
147
148 @Override
149 public void registerReducers(
150 CreateReducersApi reduceApi, Object executionStage) {
151 countRemovedAgg = reduceApi.createLocalReducer(SumReduce.LONG);
152 }
153
154 @Override
155 public VertexSender<I, V, E> getVertexSender(
156 final BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
157 Object executionStage) {
158 return new InnerVertexSender() {
159 @Override
160 public void vertexSend(Vertex<I, V, E> vertex) {
161 if (shouldRemoveVertex.get(vertex)) {
162 workerApi.removeVertexRequest(vertex.getId());
163 reduceLong(countRemovedAgg, 1);
164 }
165 }
166 };
167 }
168
169 @Override
170 public void masterCompute(BlockMasterApi master, Object executionStage) {
171 LOG.info("Removed " + countRemovedAgg.getReducedValue(master) +
172 " vertices from the graph, during stage " + executionStage);
173 }
174
175 @Override
176 public String toString() {
177 return pieceName;
178 }
179 };
180 }
181
182
183
184
185
186
187
188
189
190
191
192 public static
193 <S, R extends Writable, I extends WritableComparable, V extends Writable,
194 E extends Writable>
195 Piece<I, V, E, NoMessage, Object> reduce(
196 String name,
197 ReduceOperation<S, R> reduceOp,
198 SupplierFromVertex<I, V, E, S> valueSupplier,
199 final Consumer<R> reducedValueConsumer) {
200 return reduceWithMaster(
201 name, reduceOp, valueSupplier,
202 new PairConsumer<R, BlockMasterApi>() {
203 @Override
204 public void apply(R input, BlockMasterApi master) {
205 reducedValueConsumer.apply(input);
206 }
207 });
208 }
209
210
211
212
213
214
215
216
217
218
219
220 public static
221 <S, R extends Writable, I extends WritableComparable, V extends Writable,
222 E extends Writable>
223 Piece<I, V, E, NoMessage, Object> reduceWithMaster(
224 final String name,
225 final ReduceOperation<S, R> reduceOp,
226 final SupplierFromVertex<I, V, E, S> valueSupplier,
227 final PairConsumer<R, BlockMasterApi> reducedValueConsumer) {
228 return new Piece<I, V, E, NoMessage, Object>() {
229 private ReducerHandle<S, R> handle;
230
231 @Override
232 public void registerReducers(
233 CreateReducersApi reduceApi, Object executionStage) {
234 handle = reduceApi.createLocalReducer(reduceOp);
235 }
236
237 @Override
238 public VertexSender<I, V, E> getVertexSender(
239 BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
240 Object executionStage) {
241 return new InnerVertexSender() {
242 @Override
243 public void vertexSend(Vertex<I, V, E> vertex) {
244 handle.reduce(valueSupplier.get(vertex));
245 }
246 };
247 }
248
249 @Override
250 public void masterCompute(BlockMasterApi master, Object executionStage) {
251 reducedValueConsumer.apply(handle.getReducedValue(master), master);
252 }
253
254 @Override
255 public String toString() {
256 return name;
257 }
258 };
259 }
260
261
262
263
264
265
266
267
268
269
270
271
272 public static
273 <S, R extends Writable, I extends WritableComparable, V extends Writable,
274 E extends Writable>
275 Piece<I, V, E, NoMessage, Object> reduceAndBroadcast(
276 final String name,
277 final ReduceOperation<S, R> reduceOp,
278 final SupplierFromVertex<I, V, E, S> valueSupplier,
279 final ConsumerWithVertex<I, V, E, R> reducedValueConsumer) {
280 return new Piece<I, V, E, NoMessage, Object>() {
281 private final ReducerAndBroadcastWrapperHandle<S, R> handle =
282 new ReducerAndBroadcastWrapperHandle<>();
283
284 @Override
285 public void registerReducers(
286 CreateReducersApi reduceApi, Object executionStage) {
287 handle.registeredReducer(reduceApi.createLocalReducer(reduceOp));
288 }
289
290 @Override
291 public VertexSender<I, V, E> getVertexSender(
292 BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
293 Object executionStage) {
294 return new InnerVertexSender() {
295 @Override
296 public void vertexSend(Vertex<I, V, E> vertex) {
297 handle.reduce(valueSupplier.get(vertex));
298 }
299 };
300 }
301
302 @Override
303 public void masterCompute(BlockMasterApi master, Object executionStage) {
304 handle.broadcastValue(master);
305 }
306
307 @Override
308 public VertexReceiver<I, V, E, NoMessage> getVertexReceiver(
309 BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
310 final R value = handle.getBroadcast(workerApi);
311 return new InnerVertexReceiver() {
312 @Override
313 public void vertexReceive(
314 Vertex<I, V, E> vertex, Iterable<NoMessage> messages) {
315 reducedValueConsumer.apply(vertex, value);
316 }
317 };
318 }
319
320 @Override
321 public String toString() {
322 return name;
323 }
324 };
325 }
326
327
328
329
330
331
332
333
334
335
336
337
338
339 public static
340 <S, R extends Writable, I extends WritableComparable, V extends Writable,
341 E extends Writable>
342 Piece<I, V, E, NoMessage, Object> reduceAndBroadcastWithArrayOfHandles(
343 final String name,
344 final int numHandles,
345 final Supplier<ReduceOperation<S, R>> reduceOp,
346 final SupplierFromVertex<I, V, E, Long> handleHashSupplier,
347 final SupplierFromVertex<I, V, E, S> valueSupplier,
348 final ConsumerWithVertex<I, V, E, R> reducedValueConsumer) {
349 return new Piece<I, V, E, NoMessage, Object>() {
350 protected ArrayOfHandles.ArrayOfReducers<S, R> reducers;
351 protected BroadcastArrayHandle<R> broadcasts;
352
353 private int getHandleIndex(Vertex<I, V, E> vertex) {
354 return (int) Math.abs(handleHashSupplier.get(vertex) % numHandles);
355 }
356
357 @Override
358 public void registerReducers(
359 final CreateReducersApi reduceApi, Object executionStage) {
360 reducers = new ArrayOfHandles.ArrayOfReducers<>(
361 numHandles,
362 new Supplier<ReducerHandle<S, R>>() {
363 @Override
364 public ReducerHandle<S, R> get() {
365 return reduceApi.createLocalReducer(reduceOp.get());
366 }
367 });
368 }
369
370 @Override
371 public VertexSender<I, V, E> getVertexSender(
372 BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
373 Object executionStage) {
374 return new InnerVertexSender() {
375 @Override
376 public void vertexSend(Vertex<I, V, E> vertex) {
377 reducers.get(getHandleIndex(vertex)).reduce(
378 valueSupplier.get(vertex));
379 }
380 };
381 }
382
383 @Override
384 public void masterCompute(BlockMasterApi master, Object executionStage) {
385 broadcasts = reducers.broadcastValue(master);
386 }
387
388 @Override
389 public VertexReceiver<I, V, E, NoMessage> getVertexReceiver(
390 BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
391 final List<R> values = new ArrayList<>();
392 for (int i = 0; i < numHandles; i++) {
393 values.add(broadcasts.get(i).getBroadcast(workerApi));
394 }
395 return new InnerVertexReceiver() {
396 @Override
397 public void vertexReceive(
398 Vertex<I, V, E> vertex, Iterable<NoMessage> messages) {
399 reducedValueConsumer.apply(
400 vertex, values.get(getHandleIndex(vertex)));
401 }
402 };
403 }
404
405 @Override
406 public String toString() {
407 return name;
408 }
409 };
410 }
411
412
413
414
415
416
417
418
419
420
421 public static
422 <I extends WritableComparable, V extends Writable, E extends Writable,
423 M extends Writable>
424 SendMessagePiece<I, V, E, M> sendMessage(
425 String name,
426 Class<M> messageClass,
427 SupplierFromVertex<I, V, E, M> messageSupplier,
428 SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier,
429 ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
430 return new SendMessagePiece<>(
431 name, messageClass, messageSupplier, targetsSupplier, messagesConsumer);
432 }
433
434
435
436
437
438
439
440
441
442
443 public static
444 <I extends WritableComparable, V extends Writable, E extends Writable,
445 M extends Writable>
446 SendMessagePiece<I, V, E, M> sendMessageToNeighbors(
447 String name,
448 Class<M> messageClass,
449 SupplierFromVertex<I, V, E, M> messageSupplier,
450 ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
451 return sendMessage(
452 name, messageClass, messageSupplier,
453 VertexSuppliers.<I, V, E>vertexNeighborsSupplier(),
454 messagesConsumer);
455 }
456
457
458
459
460
461
462
463
464
465
466
467 public static
468 <I extends WritableComparable, V extends Writable, E extends Writable,
469 M extends Writable>
470 SendMessageWithCombinerPiece<I, V, E, M> sendMessage(
471 String name,
472 MessageCombiner<? super I, M> messageCombiner,
473 SupplierFromVertex<I, V, E, M> messageSupplier,
474 SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier,
475 ConsumerWithVertex<I, V, E, M> messagesConsumer) {
476 return new SendMessageWithCombinerPiece<>(
477 name, messageCombiner,
478 messageSupplier, targetsSupplier, messagesConsumer);
479 }
480
481
482
483
484
485
486
487
488
489
490
491 public static
492 <I extends WritableComparable, V extends Writable, E extends Writable,
493 M extends Writable>
494 SendMessageWithCombinerPiece<I, V, E, M> sendMessageToNeighbors(
495 String name,
496 MessageCombiner<? super I, M> messageCombiner,
497 SupplierFromVertex<I, V, E, M> messageSupplier,
498 ConsumerWithVertex<I, V, E, M> messagesConsumer) {
499 return sendMessage(
500 name, messageCombiner, messageSupplier,
501 VertexSuppliers.<I, V, E>vertexNeighborsSupplier(),
502 messagesConsumer);
503 }
504 }