Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
AbstractPiece |
|
| 1.0588235294117647;1.059 | ||||
AbstractPiece$InnerVertexReceiver |
|
| 1.0588235294117647;1.059 | ||||
AbstractPiece$InnerVertexSender |
|
| 1.0588235294117647;1.059 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | package org.apache.giraph.block_app.framework.piece; | |
19 | ||
20 | import java.util.Iterator; | |
21 | import java.util.List; | |
22 | ||
23 | import org.apache.giraph.block_app.framework.api.BlockMasterApi; | |
24 | import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi; | |
25 | import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi; | |
26 | import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; | |
27 | import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; | |
28 | import org.apache.giraph.block_app.framework.block.Block; | |
29 | import org.apache.giraph.block_app.framework.block.PieceCount; | |
30 | import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor; | |
31 | import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; | |
32 | import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; | |
33 | import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; | |
34 | import org.apache.giraph.conf.MessageClasses; | |
35 | import org.apache.giraph.function.Consumer; | |
36 | import org.apache.hadoop.io.Writable; | |
37 | import org.apache.hadoop.io.WritableComparable; | |
38 | ||
39 | import com.google.common.collect.Iterators; | |
40 | ||
41 | /** | |
42 | * Parent of all Pieces, contains comprehensive list of methods Piece | |
43 | * can support. Specific subclasses should be extended directly, | |
44 | * to simplify usage - most frequently for example Piece class. | |
45 | * | |
46 | * Single unit of execution, capturing: | |
47 | * - sending and then receiving messages from vertices | |
48 | * - sending data to be aggregated from workers to master | |
49 | * - sending values from master, via aggregators, to workers | |
50 | * - sending and receiving worker messages | |
51 | * | |
52 | * | |
53 | * Order of execution is: | |
54 | * - On master, once at the start of the application | |
55 | * -- registerAggregators (deprecated, use registerReducers instead) | |
56 | * | |
57 | * - After masterCompute of previous piece, on master: | |
58 | * -- registerReducers | |
59 | * | |
60 | * - Send logic on workers: | |
61 | * -- getVertexSender per each worker thread, and on object returned by it: | |
62 | * --- vertexSend on each vertex | |
63 | * --- postprocess on each worker thread | |
64 | * -- workerContextSend per worker | |
65 | * | |
66 | * - Logic on master: | |
67 | * -- masterCompute | |
68 | * | |
69 | * - Receive logic on workers: | |
70 | * -- workerContextReceive per worker | |
71 | * -- getVertexReceiver per each worker thread, and on object returned by it: | |
72 | * --- vertexReceive on each vertex | |
73 | * --- postprocess on each worker thread | |
74 | * | |
75 | * And before everything, during initialization, registerAggregators. | |
76 | * | |
77 | * Only masterCompute and registerReducers/registerAggregators should modify | |
78 | * the Piece, all of the worker methods should treat Piece as read-only. | |
79 | * | |
80 | * Each piece should be encapsulated unit of execution. Vertex value should be | |
81 | * used as a single implicit "communication" channel between different pieces, | |
82 | * all other dependencies should be explicitly defined and passed through | |
83 | * constructor, via interfaces (as explained below). | |
84 | * I.e. state of the vertex value is invariant that Pieces act upon. | |
85 | * Best is not to depend on explicit vertex value class, but on interface that | |
86 | * provides all needed functions, so that pieces can be freely combined, | |
87 | * as long as vertex value implements appropriate ones. | |
88 | * Similarly, use most abstract class you need - if Piece doesn't depend | |
89 | * on edge value, don't use NullWritable, but Writable. Or if it doesn't | |
90 | * depend on ExecutionStage, use Object for it. | |
91 | * | |
92 | * All other external dependencies should be explicitly passed through | |
93 | * constructor, through interfaces. | |
94 | * | |
95 | * All Pieces will be created within one context - on the master. | |
96 | * They are then going to be replicated across all workers, and across all | |
97 | * threads within each worker, and will see everything that happens in global | |
98 | * context (masterCompute) before them, including any state master has. | |
99 | * Through ObjectHolder/ObjectTransfer, you can pass data between Pieces in | |
100 | * global context, and from global context to worker functions of a Piece | |
101 | * that happens in the future. | |
102 | * | |
103 | * VertexReceiver of previous Piece and VertexSender of next Piece live in | |
104 | * the same context, and vertexReceive of the next Piece is executed | |
105 | * immediately after vertexSend of the previous piece, before vertexSend is | |
106 | * called on the next vertex. | |
107 | * This detail allows you to have external dependency on each other through | |
108 | * memory only mediator objects - like ObjectTransfer. | |
109 | * | |
110 | * All other logic going to live in different contexts, | |
111 | * specifically VertexSender and VertexReceiver of the same Piece, | |
112 | * or workerContextSend and VertexSender of the same Piece, and cannot interact | |
113 | * with each other outside of changing the state of the graph or using | |
114 | * global communication api. | |
115 | * | |
116 | * All methods on this class (or objects it returns) will be called serially, | |
117 | * so there is no need for any Thread synchronization. | |
118 | * Each Thread will have a complete deep copy of the Piece, to achieve that, | |
119 | * so all static fields must be written to be Thread safe! | |
120 | * (i.e. either immutable, or have synchronized/locked access to them) | |
121 | * | |
122 | * @param <I> Vertex id type | |
123 | * @param <V> Vertex value type | |
124 | * @param <E> Edge value type | |
125 | * @param <M> Message type | |
126 | * @param <WV> Worker value type | |
127 | * @param <WM> Worker message type | |
128 | * @param <S> Execution stage type | |
129 | */ | |
130 | @SuppressWarnings({ "rawtypes" }) | |
131 | 0 | public abstract class AbstractPiece<I extends WritableComparable, |
132 | V extends Writable, E extends Writable, M extends Writable, WV, | |
133 | WM extends Writable, S> implements Block { | |
134 | ||
135 | // Overridable functions | |
136 | ||
137 | // registerReducers(CreateReducersApi reduceApi, S executionStage) | |
138 | ||
139 | /** | |
140 | * Add automatic handling of reducers to registerReducers. | |
141 | * Only for internal use. | |
142 | */ | |
143 | public abstract void wrappedRegisterReducers( | |
144 | BlockMasterApi masterApi, S executionStage); | |
145 | ||
146 | // getVertexSender(BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) | |
147 | ||
148 | /** | |
149 | * Add automatic handling of reducers to getVertexSender. | |
150 | * | |
151 | * Only for Framework internal use. | |
152 | */ | |
153 | public abstract InnerVertexSender getWrappedVertexSender( | |
154 | final BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage); | |
155 | ||
156 | /** | |
157 | * Override to have worker context send computation. | |
158 | * | |
159 | * Called once per worker, after all vertices have been processed with | |
160 | * getVertexSender. | |
161 | */ | |
162 | public void workerContextSend( | |
163 | BlockWorkerContextSendApi<I, WM> workerContextApi, S executionStage, | |
164 | WV workerValue) { | |
165 | 0 | } |
166 | ||
167 | /** | |
168 | * Function that is called on master, after send phase, before receive phase. | |
169 | * | |
170 | * It can: | |
171 | * - read aggregators sent from worker | |
172 | * - do global processing | |
173 | * - send data to workers through aggregators | |
174 | */ | |
175 | public void masterCompute(BlockMasterApi masterApi, S executionStage) { | |
176 | 0 | } |
177 | ||
178 | /** | |
179 | * Override to have worker context receive computation. | |
180 | * | |
181 | * Called once per worker, before all vertices are going to be processed | |
182 | * with getVertexReceiver. | |
183 | */ | |
184 | public void workerContextReceive( | |
185 | BlockWorkerContextReceiveApi workerContextApi, S executionStage, | |
186 | WV workerValue, List<WM> workerMessages) { | |
187 | 0 | } |
188 | ||
189 | /** | |
190 | * Override to do vertex receive processing. | |
191 | * | |
192 | * Creates handler that defines what should be executed on worker | |
193 | * for each vertex during receive phase. | |
194 | * | |
195 | * This logic executed last. | |
196 | * This function is called once on each worker on each thread, in parallel, | |
197 | * on their copy of Piece object to create functions handler. | |
198 | * | |
199 | * If returned object implements Postprocessor interface, then corresponding | |
200 | * postprocess() function is going to be called once, after all vertices | |
201 | * corresponding thread needed to process are done. | |
202 | */ | |
203 | public VertexReceiver<I, V, E, M> getVertexReceiver( | |
204 | BlockWorkerReceiveApi<I> workerApi, S executionStage) { | |
205 | 0 | return null; |
206 | } | |
207 | ||
208 | /** | |
209 | * Returns MessageClasses definition for messages being sent by this Piece. | |
210 | */ | |
211 | public abstract MessageClasses<I, M> getMessageClasses( | |
212 | ImmutableClassesGiraphConfiguration conf); | |
213 | ||
214 | /** | |
215 | * Override to provide different next execution stage for | |
216 | * Pieces that come after it. | |
217 | * | |
218 | * Execution stage should be immutable, and this function should be | |
219 | * returning a new object, if it needs to return different value. | |
220 | * | |
221 | * It affects pieces that come after this piece, | |
222 | * and isn't applied to execution stage this piece sees. | |
223 | */ | |
224 | public S nextExecutionStage(S executionStage) { | |
225 | 0 | return executionStage; |
226 | } | |
227 | ||
228 | /** | |
229 | * Override to register any potential aggregators used by this piece. | |
230 | * | |
231 | * @deprecated Use registerReducers instead. | |
232 | */ | |
233 | @Deprecated | |
234 | public void registerAggregators(BlockMasterApi masterApi) | |
235 | throws InstantiationException, IllegalAccessException { | |
236 | 0 | } |
237 | ||
238 | // Inner classes | |
239 | ||
240 | /** Inner class to provide clean use without specifying types */ | |
241 | 0 | public abstract class InnerVertexSender |
242 | implements VertexSender<I, V, E>, VertexPostprocessor { | |
243 | @Override | |
244 | 0 | public void postprocess() { } |
245 | } | |
246 | ||
247 | /** Inner class to provide clean use without specifying types */ | |
248 | 0 | public abstract class InnerVertexReceiver |
249 | implements VertexReceiver<I, V, E, M>, VertexPostprocessor { | |
250 | @Override | |
251 | 0 | public void postprocess() { } |
252 | } | |
253 | ||
254 | // Internal implementation | |
255 | ||
256 | @Override | |
257 | public final Iterator<AbstractPiece> iterator() { | |
258 | 0 | return Iterators.<AbstractPiece>singletonIterator(this); |
259 | } | |
260 | ||
261 | @Override | |
262 | public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) { | |
263 | 0 | consumer.apply(this); |
264 | 0 | } |
265 | ||
266 | @Override | |
267 | public PieceCount getPieceCount() { | |
268 | 0 | return new PieceCount(1); |
269 | } | |
270 | ||
271 | @Override | |
272 | public String toString() { | |
273 | 0 | String name = getClass().getSimpleName(); |
274 | 0 | if (name.isEmpty()) { |
275 | 0 | name = getClass().getName(); |
276 | } | |
277 | 0 | return name; |
278 | } | |
279 | ||
280 | ||
281 | // make hashCode and equals final, forcing them to be based on | |
282 | // reference identity. | |
283 | @Override | |
284 | public final int hashCode() { | |
285 | 0 | return super.hashCode(); |
286 | } | |
287 | ||
288 | @Override | |
289 | public final boolean equals(Object obj) { | |
290 | 0 | return super.equals(obj); |
291 | } | |
292 | ||
293 | } |