1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.piece.global_comm.internal;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.concurrent.atomic.AtomicInteger;
25
26 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
27 import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
28 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
29 import org.apache.giraph.master.MasterGlobalCommUsage;
30 import org.apache.giraph.reducers.ReduceOperation;
31 import org.apache.giraph.reducers.Reducer;
32 import org.apache.giraph.utils.WritableUtils;
33 import org.apache.giraph.worker.WorkerBroadcastUsage;
34 import org.apache.giraph.worker.WorkerReduceUsage;
35 import org.apache.hadoop.io.Writable;
36
37
38
39
40
41 public class ReducersForPieceHandler implements VertexSenderObserver {
42 private static final AtomicInteger HANDLER_COUNTER = new AtomicInteger();
43 private static final AtomicInteger BROADCAST_COUNTER = new AtomicInteger();
44
45 private final int handleIndex = HANDLER_COUNTER.incrementAndGet();
46 private final AtomicInteger reduceCounter = new AtomicInteger();
47
48 private final ArrayList<VertexSenderObserver> observers = new ArrayList<>();
49
50 @Override
51 public void vertexSenderWorkerPreprocess(WorkerReduceUsage usage) {
52 for (VertexSenderObserver observer : observers) {
53 observer.vertexSenderWorkerPreprocess(usage);
54 }
55 }
56
57 @Override
58 public void vertexSenderWorkerPostprocess(WorkerReduceUsage usage) {
59 for (VertexSenderObserver observer : observers) {
60 observer.vertexSenderWorkerPostprocess(usage);
61 }
62 }
63
64 public <S, R extends Writable> ReducerHandle<S, R> createLocalReducer(
65 MasterGlobalCommUsage master, ReduceOperation<S, R> reduceOp,
66 R globalInitialValue) {
67 LocalReduceHandle<S, R> handle = new LocalReduceHandle<>(reduceOp);
68 master.registerReducer(handle.getName(), reduceOp, globalInitialValue);
69 observers.add(handle);
70 return handle;
71 }
72
73 public <S, R extends Writable> ReducerHandle<S, R> createGlobalReducer(
74 MasterGlobalCommUsage master, ReduceOperation<S, R> reduceOp,
75 R globalInitialValue) {
76 ReduceHandleImpl<S, R> handle = new GlobalReduceHandle<>(reduceOp);
77 master.registerReducer(handle.getName(), reduceOp, globalInitialValue);
78 observers.add(handle);
79 return handle;
80 }
81
82
83
84
85
86
87 public static class BroadcastHandleImpl<T> implements BroadcastHandle<T> {
88 private final String name;
89
90 public BroadcastHandleImpl() {
91 this.name = "_utils.broadcast." + BROADCAST_COUNTER.incrementAndGet();
92 }
93
94 public String getName() {
95 return name;
96 }
97
98 @Override
99 public T getBroadcast(WorkerBroadcastUsage worker) {
100 return worker.getBroadcast(name);
101 }
102 }
103
104
105
106
107
108
109
110 public abstract class ReduceHandleImpl<S, R extends Writable>
111 implements ReducerHandle<S, R>, VertexSenderObserver {
112 protected final ReduceOperation<S, R> reduceOp;
113 private final String name;
114
115 private ReduceHandleImpl(ReduceOperation<S, R> reduceOp) {
116 this.reduceOp = reduceOp;
117 name = "_utils." + handleIndex +
118 ".reduce." + reduceCounter.incrementAndGet();
119 }
120
121 public String getName() {
122 return name;
123 }
124
125 @Override
126 public R getReducedValue(MasterGlobalCommUsage master) {
127 return master.getReduced(name);
128 }
129
130 @Override
131 public BroadcastHandle<R> broadcastValue(BlockMasterApi master) {
132 return unwrapHandle(master.broadcast(
133 new WrappedReducedValue<>(reduceOp, getReducedValue(master))));
134 }
135 }
136
137 private static <R extends Writable> BroadcastHandle<R> unwrapHandle(
138 final BroadcastHandle<WrappedReducedValue<R>> handle) {
139 return new BroadcastHandle<R>() {
140 @Override
141 public R getBroadcast(WorkerBroadcastUsage worker) {
142 return handle.getBroadcast(worker).getValue();
143 }
144 };
145 }
146
147
148
149
150
151
152
153 public static class WrappedReducedValue<R extends Writable>
154 implements Writable {
155 private ReduceOperation<?, R> reduceOp;
156 private R value;
157
158 public WrappedReducedValue() {
159 }
160
161 public WrappedReducedValue(ReduceOperation<?, R> reduceOp, R value) {
162 this.reduceOp = reduceOp;
163 this.value = value;
164 }
165
166 @Override
167 public void write(DataOutput out) throws IOException {
168 WritableUtils.writeWritableObject(reduceOp, out);
169 value.write(out);
170 }
171
172 @Override
173 public void readFields(DataInput in) throws IOException {
174 reduceOp = WritableUtils.readWritableObject(in, null);
175 value = reduceOp.createInitialValue();
176 value.readFields(in);
177 }
178
179 public R getValue() {
180 return value;
181 }
182 }
183
184
185
186
187
188
189
190
191
192
193
194 public class GlobalReduceHandle<S, R extends Writable>
195 extends ReduceHandleImpl<S, R> {
196 private transient WorkerReduceUsage usage;
197
198 public GlobalReduceHandle(ReduceOperation<S, R> reduceOp) {
199 super(reduceOp);
200 }
201
202 @Override
203 public void vertexSenderWorkerPreprocess(WorkerReduceUsage usage) {
204 this.usage = usage;
205 }
206
207 @Override
208 public void reduce(S valueToReduce) {
209 usage.reduce(getName(), valueToReduce);
210 }
211
212 @Override
213 public void vertexSenderWorkerPostprocess(WorkerReduceUsage usage) {
214 }
215 }
216
217
218
219
220
221
222
223
224
225
226
227 public class LocalReduceHandle<S, R extends Writable>
228 extends ReduceHandleImpl<S, R> {
229 private transient Reducer<S, R> reducer;
230
231 public LocalReduceHandle(ReduceOperation<S, R> reduceOp) {
232 super(reduceOp);
233 }
234
235 @Override
236 public void vertexSenderWorkerPreprocess(WorkerReduceUsage usage) {
237 this.reducer = new Reducer<>(reduceOp);
238 }
239
240 @Override
241 public void reduce(S valueToReduce) {
242 reducer.reduce(valueToReduce);
243 }
244
245 @Override
246 public void vertexSenderWorkerPostprocess(WorkerReduceUsage usage) {
247 usage.reduceMerge(getName(), reducer.getCurrentValue());
248 }
249 }
250 }