View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.library;
19  
20  import java.util.ArrayList;
21  import java.util.Iterator;
22  import java.util.List;
23  
24  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
25  import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
26  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
27  import org.apache.giraph.block_app.framework.api.CreateReducersApi;
28  import org.apache.giraph.block_app.framework.piece.Piece;
29  import org.apache.giraph.block_app.framework.piece.global_comm.ReducerAndBroadcastWrapperHandle;
30  import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
31  import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
32  import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
33  import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
34  import org.apache.giraph.block_app.library.internal.SendMessagePiece;
35  import org.apache.giraph.block_app.library.internal.SendMessageWithCombinerPiece;
36  import org.apache.giraph.block_app.reducers.array.ArrayOfHandles;
37  import org.apache.giraph.combiner.MessageCombiner;
38  import org.apache.giraph.function.Consumer;
39  import org.apache.giraph.function.PairConsumer;
40  import org.apache.giraph.function.Supplier;
41  import org.apache.giraph.function.vertex.ConsumerWithVertex;
42  import org.apache.giraph.function.vertex.SupplierFromVertex;
43  import org.apache.giraph.graph.Vertex;
44  import org.apache.giraph.reducers.ReduceOperation;
45  import org.apache.giraph.reducers.impl.SumReduce;
46  import org.apache.giraph.types.NoMessage;
47  import org.apache.hadoop.io.LongWritable;
48  import org.apache.hadoop.io.Writable;
49  import org.apache.hadoop.io.WritableComparable;
50  import org.apache.log4j.Logger;
51  
52  /**
53   * Utility class for creating common Pieces and computations for processing
54   * graphs.
55   */
56  public class Pieces {
57    private static final Logger LOG = Logger.getLogger(Pieces.class);
58  
59    private Pieces() { }
60  
61    /**
62     * For each vertex execute given process function.
63     * Computation is happening in send phase of the returned Piece.
64     */
65    public static
66    <I extends WritableComparable, V extends Writable, E extends Writable>
67    Piece<I, V, E, NoMessage, Object> forAllVertices(
68        final String pieceName, final Consumer<Vertex<I, V, E>> process) {
69      return new Piece<I, V, E, NoMessage, Object>() {
70        @Override
71        public VertexSender<I, V, E> getVertexSender(
72            BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
73            Object executionStage) {
74          return new InnerVertexSender() {
75            @Override
76            public void vertexSend(Vertex<I, V, E> vertex) {
77              process.apply(vertex);
78            }
79          };
80        }
81  
82        @Override
83        public String toString() {
84          return pieceName;
85        }
86      };
87    }
88  
89    /**
90     * Execute given function on master.
91     */
92    public static
93    Piece<WritableComparable, Writable,  Writable, NoMessage,
94      Object> masterCompute(
95        final String pieceName, final Consumer<BlockMasterApi> process) {
96      return new Piece<WritableComparable, Writable,  Writable, NoMessage,
97          Object>() {
98        @Override
99        public void masterCompute(
100           BlockMasterApi masterApi, Object executionStage) {
101         process.apply(masterApi);
102       }
103     };
104   }
105 
106   /**
107    * For each vertex execute given process function.
108    * Computation is happening in the receive phase of the returned Piece.
109    * This function should be used if you need returned Piece to interact with
110    * subsequent Piece, as that requires passed function to be executed
111    * during receive phase,
112    */
113   public static
114   <I extends WritableComparable, V extends Writable, E extends Writable>
115   Piece<I, V, E, NoMessage, Object> forAllVerticesOnReceive(
116       final String pieceName, final Consumer<Vertex<I, V, E>> process) {
117     return new Piece<I, V, E, NoMessage, Object>() {
118       @Override
119       public VertexReceiver<I, V, E, NoMessage> getVertexReceiver(
120           BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
121         return new InnerVertexReceiver() {
122           @Override
123           public void vertexReceive(
124               Vertex<I, V, E> vertex, Iterable<NoMessage> messages) {
125             process.apply(vertex);
126           }
127         };
128       }
129 
130       @Override
131       public String toString() {
132         return pieceName;
133       }
134     };
135   }
136 
137   /**
138    * Creates Piece which removes vertices for which supplier returns true.
139    */
140   public static
141   <I extends WritableComparable, V extends Writable, E extends Writable>
142   Piece<I, V, E, NoMessage, Object> removeVertices(
143       final String pieceName,
144       final SupplierFromVertex<I, V, E, Boolean> shouldRemoveVertex) {
145     return new Piece<I, V, E, NoMessage, Object>() {
146       private ReducerHandle<LongWritable, LongWritable> countRemovedAgg;
147 
148       @Override
149       public void registerReducers(
150           CreateReducersApi reduceApi, Object executionStage) {
151         countRemovedAgg = reduceApi.createLocalReducer(SumReduce.LONG);
152       }
153 
154       @Override
155       public VertexSender<I, V, E> getVertexSender(
156           final BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
157           Object executionStage) {
158         return new InnerVertexSender() {
159           @Override
160           public void vertexSend(Vertex<I, V, E> vertex) {
161             if (shouldRemoveVertex.get(vertex)) {
162               workerApi.removeVertexRequest(vertex.getId());
163               reduceLong(countRemovedAgg, 1);
164             }
165           }
166         };
167       }
168 
169       @Override
170       public void masterCompute(BlockMasterApi master, Object executionStage) {
171         LOG.info("Removed " + countRemovedAgg.getReducedValue(master) +
172             " vertices from the graph, during stage " + executionStage);
173       }
174 
175       @Override
176       public String toString() {
177         return pieceName;
178       }
179     };
180   }
181 
182   /**
183    * Creates single reducer piece - given reduce class, supplier of values on
184    * worker, reduces and passes the result to given consumer on master.
185    *
186    * @param <S> Single value type, objects passed on workers
187    * @param <R> Reduced value type
188    * @param <I> Vertex id type
189    * @param <V> Vertex value type
190    * @param <E> Edge value type
191    */
192   public static
193   <S, R extends Writable, I extends WritableComparable, V extends Writable,
194   E extends Writable>
195   Piece<I, V, E, NoMessage, Object> reduce(
196       String name,
197       ReduceOperation<S, R> reduceOp,
198       SupplierFromVertex<I, V, E, S> valueSupplier,
199       final Consumer<R> reducedValueConsumer) {
200     return reduceWithMaster(
201         name, reduceOp, valueSupplier,
202         new PairConsumer<R, BlockMasterApi>() {
203           @Override
204           public void apply(R input, BlockMasterApi master) {
205             reducedValueConsumer.apply(input);
206           }
207         });
208   }
209 
210   /**
211    * Creates single reducer piece - given reduce class, supplier of values on
212    * worker, reduces and passes the result to given consumer on master.
213    *
214    * @param <S> Single value type, objects passed on workers
215    * @param <R> Reduced value type
216    * @param <I> Vertex id type
217    * @param <V> Vertex value type
218    * @param <E> Edge value type
219    */
220   public static
221   <S, R extends Writable, I extends WritableComparable, V extends Writable,
222   E extends Writable>
223   Piece<I, V, E, NoMessage, Object> reduceWithMaster(
224       final String name,
225       final ReduceOperation<S, R> reduceOp,
226       final SupplierFromVertex<I, V, E, S> valueSupplier,
227       final PairConsumer<R, BlockMasterApi> reducedValueConsumer) {
228     return new Piece<I, V, E, NoMessage, Object>() {
229       private ReducerHandle<S, R> handle;
230 
231       @Override
232       public void registerReducers(
233           CreateReducersApi reduceApi, Object executionStage) {
234         handle = reduceApi.createLocalReducer(reduceOp);
235       }
236 
237       @Override
238       public VertexSender<I, V, E> getVertexSender(
239           BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
240           Object executionStage) {
241         return new InnerVertexSender() {
242           @Override
243           public void vertexSend(Vertex<I, V, E> vertex) {
244             handle.reduce(valueSupplier.get(vertex));
245           }
246         };
247       }
248 
249       @Override
250       public void masterCompute(BlockMasterApi master, Object executionStage) {
251         reducedValueConsumer.apply(handle.getReducedValue(master), master);
252       }
253 
254       @Override
255       public String toString() {
256         return name;
257       }
258     };
259   }
260 
261   /**
262    * Creates single reducer and broadcast piece - given reduce class, supplier
263    * of values on worker, reduces and broadcasts the value, passing it to the
264    * consumer on worker for each vertex.
265    *
266    * @param <S> Single value type, objects passed on workers
267    * @param <R> Reduced value type
268    * @param <I> Vertex id type
269    * @param <V> Vertex value type
270    * @param <E> Edge value type
271    */
272   public static
273   <S, R extends Writable, I extends WritableComparable, V extends Writable,
274   E extends Writable>
275   Piece<I, V, E, NoMessage, Object> reduceAndBroadcast(
276       final String name,
277       final ReduceOperation<S, R> reduceOp,
278       final SupplierFromVertex<I, V, E, S> valueSupplier,
279       final ConsumerWithVertex<I, V, E, R> reducedValueConsumer) {
280     return new Piece<I, V, E, NoMessage, Object>() {
281       private final ReducerAndBroadcastWrapperHandle<S, R> handle =
282           new ReducerAndBroadcastWrapperHandle<>();
283 
284       @Override
285       public void registerReducers(
286           CreateReducersApi reduceApi, Object executionStage) {
287         handle.registeredReducer(reduceApi.createLocalReducer(reduceOp));
288       }
289 
290       @Override
291       public VertexSender<I, V, E> getVertexSender(
292           BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
293           Object executionStage) {
294         return new InnerVertexSender() {
295           @Override
296           public void vertexSend(Vertex<I, V, E> vertex) {
297             handle.reduce(valueSupplier.get(vertex));
298           }
299         };
300       }
301 
302       @Override
303       public void masterCompute(BlockMasterApi master, Object executionStage) {
304         handle.broadcastValue(master);
305       }
306 
307       @Override
308       public VertexReceiver<I, V, E, NoMessage> getVertexReceiver(
309           BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
310         final R value = handle.getBroadcast(workerApi);
311         return new InnerVertexReceiver() {
312           @Override
313           public void vertexReceive(
314               Vertex<I, V, E> vertex, Iterable<NoMessage> messages) {
315             reducedValueConsumer.apply(vertex, value);
316           }
317         };
318       }
319 
320       @Override
321       public String toString() {
322         return name;
323       }
324     };
325   }
326 
327   /**
328    * Like reduceAndBroadcast, but uses array of handles for reducers and
329    * broadcasts, to make it feasible and performant when values are large.
330    * Each supplied value to reduce will be reduced in the handle defined by
331    * handleHashSupplier%numHandles
332    *
333    * @param <S> Single value type, objects passed on workers
334    * @param <R> Reduced value type
335    * @param <I> Vertex id type
336    * @param <V> Vertex value type
337    * @param <E> Edge value type
338    */
339   public static
340   <S, R extends Writable, I extends WritableComparable, V extends Writable,
341       E extends Writable>
342   Piece<I, V, E, NoMessage, Object> reduceAndBroadcastWithArrayOfHandles(
343       final String name,
344       final int numHandles,
345       final Supplier<ReduceOperation<S, R>> reduceOp,
346       final SupplierFromVertex<I, V, E, Long> handleHashSupplier,
347       final SupplierFromVertex<I, V, E, S> valueSupplier,
348       final ConsumerWithVertex<I, V, E, R> reducedValueConsumer) {
349     return new Piece<I, V, E, NoMessage, Object>() {
350       protected ArrayOfHandles.ArrayOfReducers<S, R> reducers;
351       protected BroadcastArrayHandle<R> broadcasts;
352 
353       private int getHandleIndex(Vertex<I, V, E> vertex) {
354         return (int) Math.abs(handleHashSupplier.get(vertex) % numHandles);
355       }
356 
357       @Override
358       public void registerReducers(
359           final CreateReducersApi reduceApi, Object executionStage) {
360         reducers = new ArrayOfHandles.ArrayOfReducers<>(
361             numHandles,
362             new Supplier<ReducerHandle<S, R>>() {
363               @Override
364               public ReducerHandle<S, R> get() {
365                 return reduceApi.createLocalReducer(reduceOp.get());
366               }
367             });
368       }
369 
370       @Override
371       public VertexSender<I, V, E> getVertexSender(
372           BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
373           Object executionStage) {
374         return new InnerVertexSender() {
375           @Override
376           public void vertexSend(Vertex<I, V, E> vertex) {
377             reducers.get(getHandleIndex(vertex)).reduce(
378                 valueSupplier.get(vertex));
379           }
380         };
381       }
382 
383       @Override
384       public void masterCompute(BlockMasterApi master, Object executionStage) {
385         broadcasts = reducers.broadcastValue(master);
386       }
387 
388       @Override
389       public VertexReceiver<I, V, E, NoMessage> getVertexReceiver(
390           BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
391         final List<R> values = new ArrayList<>();
392         for (int i = 0; i < numHandles; i++) {
393           values.add(broadcasts.get(i).getBroadcast(workerApi));
394         }
395         return new InnerVertexReceiver() {
396           @Override
397           public void vertexReceive(
398               Vertex<I, V, E> vertex, Iterable<NoMessage> messages) {
399             reducedValueConsumer.apply(
400                 vertex, values.get(getHandleIndex(vertex)));
401           }
402         };
403       }
404 
405       @Override
406       public String toString() {
407         return name;
408       }
409     };
410   }
411 
412   /**
413    * Creates Piece that for each vertex, sends message provided by
414    * messageSupplier to all targets provided by targetsSupplier.
415    * Received messages are then passed to and processed by provided
416    * messagesConsumer.
417    *
418    * If messageSupplier or targetsSupplier returns null, current vertex
419    * is not going to send any messages.
420    */
421   public static
422   <I extends WritableComparable, V extends Writable, E extends Writable,
423   M extends Writable>
424   SendMessagePiece<I, V, E, M> sendMessage(
425       String name,
426       Class<M> messageClass,
427       SupplierFromVertex<I, V, E, M> messageSupplier,
428       SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier,
429       ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
430     return new SendMessagePiece<>(
431         name, messageClass, messageSupplier, targetsSupplier, messagesConsumer);
432   }
433 
434   /**
435    * Creates Piece that for each vertex, sends message provided by
436    * messageSupplier to all neighbors of current vertex.
437    * Received messages are then passed to and processed by provided
438    * messagesConsumer.
439    *
440    * If messageSupplier returns null, current vertex
441    * is not going to send any messages.
442    */
443   public static
444   <I extends WritableComparable, V extends Writable, E extends Writable,
445   M extends Writable>
446   SendMessagePiece<I, V, E, M> sendMessageToNeighbors(
447       String name,
448       Class<M> messageClass,
449       SupplierFromVertex<I, V, E, M> messageSupplier,
450       ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
451     return sendMessage(
452         name, messageClass, messageSupplier,
453         VertexSuppliers.<I, V, E>vertexNeighborsSupplier(),
454         messagesConsumer);
455   }
456 
457   /**
458    * Creates Piece that for each vertex, sends message provided by
459    * messageSupplier to all targets provided by targetsSupplier,
460    * and uses given messageCombiner to combine messages together.
461    * Received combined message is then passed to and processed by provided
462    * messageConsumer. (null is passed to it, if vertex received no messages)
463    *
464    * If messageSupplier or targetsSupplier returns null, current vertex
465    * is not going to send any messages.
466    */
467   public static
468   <I extends WritableComparable, V extends Writable, E extends Writable,
469   M extends Writable>
470   SendMessageWithCombinerPiece<I, V, E, M> sendMessage(
471       String name,
472       MessageCombiner<? super I, M> messageCombiner,
473       SupplierFromVertex<I, V, E, M> messageSupplier,
474       SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier,
475       ConsumerWithVertex<I, V, E, M> messagesConsumer) {
476     return new SendMessageWithCombinerPiece<>(
477         name, messageCombiner,
478         messageSupplier, targetsSupplier, messagesConsumer);
479   }
480 
481   /**
482    * Creates Piece that for each vertex, sends message provided by
483    * messageSupplier to all neighbors of current vertex,
484    * and uses given messageCombiner to combine messages together.
485    * Received combined message is then passed to and processed by provided
486    * messageConsumer. (null is passed to it, if vertex received no messages)
487    *
488    * If messageSupplier returns null, current vertex
489    * is not going to send any messages.
490    */
491   public static
492   <I extends WritableComparable, V extends Writable, E extends Writable,
493   M extends Writable>
494   SendMessageWithCombinerPiece<I, V, E, M> sendMessageToNeighbors(
495       String name,
496       MessageCombiner<? super I, M> messageCombiner,
497       SupplierFromVertex<I, V, E, M> messageSupplier,
498       ConsumerWithVertex<I, V, E, M> messagesConsumer) {
499     return sendMessage(
500         name, messageCombiner, messageSupplier,
501         VertexSuppliers.<I, V, E>vertexNeighborsSupplier(),
502         messagesConsumer);
503   }
504 }