Coverage Report - org.apache.giraph.block_app.framework.piece.AbstractPiece
 
Classes in this File Line Coverage Branch Coverage Complexity
AbstractPiece
0%
0/17
0%
0/2
1.059
AbstractPiece$InnerVertexReceiver
0%
0/2
N/A
1.059
AbstractPiece$InnerVertexSender
0%
0/2
N/A
1.059
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.framework.piece;
 19  
 
 20  
 import java.util.Iterator;
 21  
 import java.util.List;
 22  
 
 23  
 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
 24  
 import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
 25  
 import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
 26  
 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
 27  
 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
 28  
 import org.apache.giraph.block_app.framework.block.Block;
 29  
 import org.apache.giraph.block_app.framework.block.PieceCount;
 30  
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
 31  
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
 32  
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
 33  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 34  
 import org.apache.giraph.conf.MessageClasses;
 35  
 import org.apache.giraph.function.Consumer;
 36  
 import org.apache.hadoop.io.Writable;
 37  
 import org.apache.hadoop.io.WritableComparable;
 38  
 
 39  
 import com.google.common.collect.Iterators;
 40  
 
 41  
 /**
 42  
  * Parent of all Pieces, contains comprehensive list of methods Piece
 43  
  * can support. Specific subclasses should be extended directly,
 44  
  * to simplify usage - most frequently for example Piece class.
 45  
  *
 46  
  * Single unit of execution, capturing:
 47  
  * - sending and then receiving messages from vertices
 48  
  * - sending data to be aggregated from workers to master
 49  
  * - sending values from master, via aggregators, to workers
 50  
  * - sending and receiving worker messages
 51  
  *
 52  
  *
 53  
  * Order of execution is:
 54  
  * - On master, once at the start of the application
 55  
  * -- registerAggregators (deprecated, use registerReducers instead)
 56  
  *
 57  
  * - After masterCompute of previous piece, on master:
 58  
  * -- registerReducers
 59  
  *
 60  
  * - Send logic on workers:
 61  
  * -- getVertexSender per each worker thread, and on object returned by it:
 62  
  * --- vertexSend on each vertex
 63  
  * --- postprocess on each worker thread
 64  
  * -- workerContextSend per worker
 65  
  *
 66  
  * - Logic on master:
 67  
  * -- masterCompute
 68  
  *
 69  
  * - Receive logic on workers:
 70  
  * -- workerContextReceive per worker
 71  
  * -- getVertexReceiver per each worker thread, and on object returned by it:
 72  
  * --- vertexReceive on each vertex
 73  
  * --- postprocess on each worker thread
 74  
  *
 75  
  * And before everything, during initialization, registerAggregators.
 76  
  *
 77  
  * Only masterCompute and registerReducers/registerAggregators should modify
 78  
  * the Piece, all of the worker methods should treat Piece as read-only.
 79  
  *
 80  
  * Each piece should be encapsulated unit of execution. Vertex value should be
 81  
  * used as a single implicit "communication" channel between different pieces,
 82  
  * all other dependencies should be explicitly defined and passed through
 83  
  * constructor, via interfaces (as explained below).
 84  
  * I.e. state of the vertex value is invariant that Pieces act upon.
 85  
  * Best is not to depend on explicit vertex value class, but on interface that
 86  
  * provides all needed functions, so that pieces can be freely combined,
 87  
  * as long as vertex value implements appropriate ones.
 88  
  * Similarly, use most abstract class you need - if Piece doesn't depend
 89  
  * on edge value, don't use NullWritable, but Writable. Or if it doesn't
 90  
  * depend on ExecutionStage, use Object for it.
 91  
  *
 92  
  * All other external dependencies should be explicitly passed through
 93  
  * constructor, through interfaces.
 94  
  *
 95  
  * All Pieces will be created within one context - on the master.
 96  
  * They are then going to be replicated across all workers, and across all
 97  
  * threads within each worker, and will see everything that happens in global
 98  
  * context (masterCompute) before them, including any state master has.
 99  
  * Through ObjectHolder/ObjectTransfer, you can pass data between Pieces in
 100  
  * global context, and from global context to worker functions of a Piece
 101  
  * that happens in the future.
 102  
  *
 103  
  * VertexReceiver of previous Piece and VertexSender of next Piece live in
 104  
  * the same context, and vertexReceive of the next Piece is executed
 105  
  * immediately after vertexSend of the previous piece, before vertexSend is
 106  
  * called on the next vertex.
 107  
  * This detail allows you to have external dependency on each other through
 108  
  * memory only mediator objects - like ObjectTransfer.
 109  
  *
 110  
  * All other logic going to live in different contexts,
 111  
  * specifically VertexSender and VertexReceiver of the same Piece,
 112  
  * or workerContextSend and VertexSender of the same Piece, and cannot interact
 113  
  * with each other outside of changing the state of the graph or using
 114  
  * global communication api.
 115  
  *
 116  
  * All methods on this class (or objects it returns) will be called serially,
 117  
  * so there is no need for any Thread synchronization.
 118  
  * Each Thread will have a complete deep copy of the Piece, to achieve that,
 119  
  * so all static fields must be written to be Thread safe!
 120  
  * (i.e. either immutable, or have synchronized/locked access to them)
 121  
  *
 122  
  * @param <I> Vertex id type
 123  
  * @param <V> Vertex value type
 124  
  * @param <E> Edge value type
 125  
  * @param <M> Message type
 126  
  * @param <WV> Worker value type
 127  
  * @param <WM> Worker message type
 128  
  * @param <S> Execution stage type
 129  
  */
 130  
 @SuppressWarnings({ "rawtypes" })
 131  0
 public abstract class AbstractPiece<I extends WritableComparable,
 132  
     V extends Writable, E extends Writable, M extends Writable, WV,
 133  
     WM extends Writable, S> implements Block {
 134  
 
 135  
   // Overridable functions
 136  
 
 137  
   // registerReducers(CreateReducersApi reduceApi, S executionStage)
 138  
 
 139  
   /**
 140  
    * Add automatic handling of reducers to registerReducers.
 141  
    * Only for internal use.
 142  
    */
 143  
   public abstract void wrappedRegisterReducers(
 144  
       BlockMasterApi masterApi, S executionStage);
 145  
 
 146  
   // getVertexSender(BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage)
 147  
 
 148  
   /**
 149  
    * Add automatic handling of reducers to getVertexSender.
 150  
    *
 151  
    * Only for Framework internal use.
 152  
    */
 153  
   public abstract InnerVertexSender getWrappedVertexSender(
 154  
       final BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage);
 155  
 
 156  
   /**
 157  
    * Override to have worker context send computation.
 158  
    *
 159  
    * Called once per worker, after all vertices have been processed with
 160  
    * getVertexSender.
 161  
    */
 162  
   public void workerContextSend(
 163  
       BlockWorkerContextSendApi<I, WM> workerContextApi, S executionStage,
 164  
       WV workerValue) {
 165  0
   }
 166  
 
 167  
   /**
 168  
    * Function that is called on master, after send phase, before receive phase.
 169  
    *
 170  
    * It can:
 171  
    * - read aggregators sent from worker
 172  
    * - do global processing
 173  
    * - send data to workers through aggregators
 174  
    */
 175  
   public void masterCompute(BlockMasterApi masterApi, S executionStage) {
 176  0
   }
 177  
 
 178  
   /**
 179  
    * Override to have worker context receive computation.
 180  
    *
 181  
    * Called once per worker, before all vertices are going to be processed
 182  
    * with getVertexReceiver.
 183  
    */
 184  
   public void workerContextReceive(
 185  
       BlockWorkerContextReceiveApi workerContextApi, S executionStage,
 186  
       WV workerValue, List<WM> workerMessages) {
 187  0
   }
 188  
 
 189  
   /**
 190  
    * Override to do vertex receive processing.
 191  
    *
 192  
    * Creates handler that defines what should be executed on worker
 193  
    * for each vertex during receive phase.
 194  
    *
 195  
    * This logic executed last.
 196  
    * This function is called once on each worker on each thread, in parallel,
 197  
    * on their copy of Piece object to create functions handler.
 198  
    *
 199  
    * If returned object implements Postprocessor interface, then corresponding
 200  
    * postprocess() function is going to be called once, after all vertices
 201  
    * corresponding thread needed to process are done.
 202  
    */
 203  
   public VertexReceiver<I, V, E, M> getVertexReceiver(
 204  
       BlockWorkerReceiveApi<I> workerApi, S executionStage) {
 205  0
     return null;
 206  
   }
 207  
 
 208  
   /**
 209  
    * Returns MessageClasses definition for messages being sent by this Piece.
 210  
    */
 211  
   public abstract MessageClasses<I, M> getMessageClasses(
 212  
       ImmutableClassesGiraphConfiguration conf);
 213  
 
 214  
   /**
 215  
    * Override to provide different next execution stage for
 216  
    * Pieces that come after it.
 217  
    *
 218  
    * Execution stage should be immutable, and this function should be
 219  
    * returning a new object, if it needs to return different value.
 220  
    *
 221  
    * It affects pieces that come after this piece,
 222  
    * and isn't applied to execution stage this piece sees.
 223  
    */
 224  
   public S nextExecutionStage(S executionStage) {
 225  0
     return executionStage;
 226  
   }
 227  
 
 228  
   /**
 229  
    * Override to register any potential aggregators used by this piece.
 230  
    *
 231  
    * @deprecated Use registerReducers instead.
 232  
    */
 233  
   @Deprecated
 234  
   public void registerAggregators(BlockMasterApi masterApi)
 235  
       throws InstantiationException, IllegalAccessException {
 236  0
   }
 237  
 
 238  
   // Inner classes
 239  
 
 240  
   /** Inner class to provide clean use without specifying types */
 241  0
   public abstract class InnerVertexSender
 242  
       implements VertexSender<I, V, E>, VertexPostprocessor {
 243  
     @Override
 244  0
     public void postprocess() { }
 245  
   }
 246  
 
 247  
   /** Inner class to provide clean use without specifying types */
 248  0
   public abstract class InnerVertexReceiver
 249  
       implements VertexReceiver<I, V, E, M>, VertexPostprocessor {
 250  
     @Override
 251  0
     public void postprocess() { }
 252  
   }
 253  
 
 254  
   // Internal implementation
 255  
 
 256  
   @Override
 257  
   public final Iterator<AbstractPiece> iterator() {
 258  0
     return Iterators.<AbstractPiece>singletonIterator(this);
 259  
   }
 260  
 
 261  
   @Override
 262  
   public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
 263  0
     consumer.apply(this);
 264  0
   }
 265  
 
 266  
   @Override
 267  
   public PieceCount getPieceCount() {
 268  0
     return new PieceCount(1);
 269  
   }
 270  
 
 271  
   @Override
 272  
   public String toString() {
 273  0
     String name = getClass().getSimpleName();
 274  0
     if (name.isEmpty()) {
 275  0
       name = getClass().getName();
 276  
     }
 277  0
     return name;
 278  
   }
 279  
 
 280  
 
 281  
   // make hashCode and equals final, forcing them to be based on
 282  
   // reference identity.
 283  
   @Override
 284  
   public final int hashCode() {
 285  0
     return super.hashCode();
 286  
   }
 287  
 
 288  
   @Override
 289  
   public final boolean equals(Object obj) {
 290  0
     return super.equals(obj);
 291  
   }
 292  
 
 293  
 }