1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.library;
19
20 import java.util.Iterator;
21
22 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
23 import org.apache.giraph.block_app.framework.block.Block;
24 import org.apache.giraph.block_app.framework.block.SequenceBlock;
25 import org.apache.giraph.combiner.MessageCombiner;
26 import org.apache.giraph.function.Consumer;
27 import org.apache.giraph.function.Function;
28 import org.apache.giraph.function.ObjectTransfer;
29 import org.apache.giraph.function.PairConsumer;
30 import org.apache.giraph.function.vertex.ConsumerWithVertex;
31 import org.apache.giraph.function.vertex.FunctionWithVertex;
32 import org.apache.giraph.function.vertex.SupplierFromVertex;
33 import org.apache.giraph.graph.Vertex;
34 import org.apache.giraph.reducers.ReduceOperation;
35 import org.apache.hadoop.io.Writable;
36 import org.apache.hadoop.io.WritableComparable;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 public class SendMessageChain<I extends WritableComparable, V extends Writable,
61 E extends Writable, P> {
62
63
64
65
66
67 private final Function<ConsumerWithVertex<I, V, E, P>, Block> blockCreator;
68
69 private SendMessageChain(
70 Function<ConsumerWithVertex<I, V, E, P>, Block> blockCreator) {
71 this.blockCreator = blockCreator;
72 }
73
74
75
76
77
78 public static <I extends WritableComparable, V extends Writable,
79 E extends Writable, M extends Writable>
80 SendMessageChain<I, V, E, Iterable<M>> startSend(
81 final String name,
82 final Class<M> messageClass,
83 final SupplierFromVertex<I, V, E, M> messageSupplier,
84 final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
85 return new SendMessageChain<>(
86 new Function<ConsumerWithVertex<I, V, E, Iterable<M>>, Block>() {
87 @Override
88 public Block apply(
89 ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
90 return Pieces.sendMessage(
91 name, messageClass, messageSupplier,
92 targetsSupplier, messagesConsumer);
93 }
94 });
95 }
96
97
98
99
100
101
102 public static <I extends WritableComparable, V extends Writable,
103 E extends Writable, M extends Writable>
104 SendMessageChain<I, V, E, M> startSend(
105 final String name,
106 final MessageCombiner<? super I, M> messageCombiner,
107 final SupplierFromVertex<I, V, E, M> messageSupplier,
108 final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
109 return new SendMessageChain<>(
110 new Function<ConsumerWithVertex<I, V, E, M>, Block>() {
111 @Override
112 public Block apply(ConsumerWithVertex<I, V, E, M> messagesConsumer) {
113 return Pieces.sendMessage(
114 name, messageCombiner, messageSupplier,
115 targetsSupplier, messagesConsumer);
116 }
117 });
118 }
119
120
121
122
123
124 public static <I extends WritableComparable, V extends Writable,
125 E extends Writable, M extends Writable>
126 SendMessageChain<I, V, E, Iterable<M>> startSendToNeighbors(
127 final String name,
128 final Class<M> messageClass,
129 final SupplierFromVertex<I, V, E, M> messageSupplier) {
130 return startSend(name, messageClass, messageSupplier,
131 VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
132 }
133
134
135
136
137
138
139 public static <I extends WritableComparable, V extends Writable,
140 E extends Writable, M extends Writable>
141 SendMessageChain<I, V, E, M> startSendToNeighbors(
142 final String name,
143 final MessageCombiner<? super I, M> messageCombiner,
144 final SupplierFromVertex<I, V, E, M> messageSupplier) {
145 return startSend(name, messageCombiner, messageSupplier,
146 VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
147 }
148
149
150
151
152
153
154 public static <I extends WritableComparable, V extends Writable,
155 E extends Writable, P extends Writable>
156 SendMessageChain<I, V, E, P> startCustom(
157 Function<ConsumerWithVertex<I, V, E, P>, Block> createStartingBlock) {
158 return new SendMessageChain<>(createStartingBlock);
159 }
160
161
162
163
164
165 public <M extends Writable>
166 SendMessageChain<I, V, E, Iterable<M>> thenSend(
167 final String name,
168 final Class<M> messageClass,
169 final FunctionWithVertex<I, V, E, P, M> messageSupplier,
170 final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
171 final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>();
172
173 return new SendMessageChain<>(
174 new Function<ConsumerWithVertex<I, V, E, Iterable<M>>, Block>() {
175 @Override
176 public Block apply(
177 ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
178 return new SequenceBlock(
179 blockCreator.apply(
180 prevMessagesTransfer.<I, V, E>castToConsumer()),
181 Pieces.sendMessage(
182 name, messageClass,
183 new SupplierFromVertex<I, V, E, M>() {
184 @Override
185 public M get(Vertex<I, V, E> vertex) {
186 return messageSupplier.apply(
187 vertex, prevMessagesTransfer.get());
188 }
189 },
190 targetsSupplier, messagesConsumer));
191 }
192 });
193 }
194
195
196
197
198
199 public <M extends Writable>
200 SendMessageChain<I, V, E, Iterable<M>> thenSendToNeighbors(
201 final String name,
202 final Class<M> messageClass,
203 final FunctionWithVertex<I, V, E, P, M> messageSupplier) {
204 return thenSend(name, messageClass, messageSupplier,
205 VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
206 }
207
208
209
210
211
212
213 public <M extends Writable>
214 SendMessageChain<I, V, E, M> thenSend(
215 final String name,
216 final MessageCombiner<? super I, M> messageCombiner,
217 final FunctionWithVertex<I, V, E, P, M> messageSupplier,
218 final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
219 final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>();
220
221 return new SendMessageChain<>(
222 new Function<ConsumerWithVertex<I, V, E, M>, Block>() {
223 @Override
224 public Block apply(ConsumerWithVertex<I, V, E, M> messagesConsumer) {
225 return new SequenceBlock(
226 blockCreator.apply(
227 prevMessagesTransfer.<I, V, E>castToConsumer()),
228 Pieces.sendMessage(
229 name, messageCombiner,
230 new SupplierFromVertex<I, V, E, M>() {
231 @Override
232 public M get(Vertex<I, V, E> vertex) {
233 return messageSupplier.apply(
234 vertex, prevMessagesTransfer.get());
235 }
236 },
237 targetsSupplier, messagesConsumer));
238 }
239 });
240 }
241
242
243
244
245
246
247 public <M extends Writable>
248 SendMessageChain<I, V, E, M> thenSendToNeighbors(
249 final String name,
250 final MessageCombiner<? super I, M> messageCombiner,
251 final FunctionWithVertex<I, V, E, P, M> messageSupplier) {
252 return thenSend(name, messageCombiner, messageSupplier,
253 VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
254 }
255
256
257
258
259
260
261 public <S, R extends Writable>
262 Block endReduce(final String name, final ReduceOperation<S, R> reduceOp,
263 final FunctionWithVertex<I, V, E, P, S> valueSupplier,
264 final Consumer<R> reducedValueConsumer) {
265 return endCustom(new Function<SupplierFromVertex<I, V, E, P>, Block>() {
266 @Override
267 public Block apply(final SupplierFromVertex<I, V, E, P> prevMessages) {
268 return Pieces.reduce(
269 name,
270 reduceOp,
271 new SupplierFromVertex<I, V, E, S>() {
272 @Override
273 public S get(Vertex<I, V, E> vertex) {
274 return valueSupplier.apply(vertex, prevMessages.get(vertex));
275 }
276 },
277 reducedValueConsumer);
278 }
279 });
280 }
281
282
283
284
285
286
287 public <S, R extends Writable>
288 Block endReduceWithMaster(
289 final String name, final ReduceOperation<S, R> reduceOp,
290 final FunctionWithVertex<I, V, E, P, S> valueSupplier,
291 final PairConsumer<R, BlockMasterApi> reducedValueConsumer) {
292 return endCustom(new Function<SupplierFromVertex<I, V, E, P>, Block>() {
293 @Override
294 public Block apply(final SupplierFromVertex<I, V, E, P> prevMessages) {
295 return Pieces.reduceWithMaster(
296 name,
297 reduceOp,
298 new SupplierFromVertex<I, V, E, S>() {
299 @Override
300 public S get(Vertex<I, V, E> vertex) {
301 return valueSupplier.apply(vertex, prevMessages.get(vertex));
302 }
303 },
304 reducedValueConsumer);
305 }
306 });
307 }
308
309
310
311
312
313 public Block endConsume(ConsumerWithVertex<I, V, E, P> messagesConsumer) {
314 return blockCreator.apply(messagesConsumer);
315 }
316
317
318
319
320
321
322 public Block endCustom(
323 Function<SupplierFromVertex<I, V, E, P>, Block> createBlockToAttach) {
324 final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>();
325 return new SequenceBlock(
326 blockCreator.apply(prevMessagesTransfer.<I, V, E>castToConsumer()),
327 createBlockToAttach.apply(
328 prevMessagesTransfer.<I, V, E>castToSupplier()));
329 }
330 }