1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.reducers.array;
19
20 import java.util.ArrayList;
21
22 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
23 import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
24 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
25 import org.apache.giraph.block_app.framework.piece.global_comm.array.ArrayHandle;
26 import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
27 import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
28 import org.apache.giraph.function.Supplier;
29 import org.apache.giraph.function.primitive.Int2ObjFunction;
30 import org.apache.giraph.worker.WorkerBroadcastUsage;
31
32
33
34
35
36
37 public class ArrayOfHandles<H> implements ArrayHandle<H> {
38 protected final ArrayList<H> handles;
39
40 public ArrayOfHandles(int count, Supplier<H> reduceHandleFactory) {
41 handles = new ArrayList<>();
42 for (int i = 0; i < count; i++) {
43 handles.add(reduceHandleFactory.get());
44 }
45 }
46
47 public ArrayOfHandles(int count, Int2ObjFunction<H> reduceHandleFactory) {
48 handles = new ArrayList<>();
49 for (int i = 0; i < count; i++) {
50 handles.add(reduceHandleFactory.apply(i));
51 }
52 }
53
54 @Override
55 public H get(int index) {
56 return handles.get(index);
57 }
58
59 @Override
60 public int getStaticSize() {
61 return handles.size();
62 }
63
64
65
66
67
68
69
70 public static class ArrayOfReducers<S, R>
71 extends ArrayOfHandles<ReducerHandle<S, R>>
72 implements ReducerArrayHandle<S, R> {
73
74 public ArrayOfReducers(
75 int count, Supplier<ReducerHandle<S, R>> reduceHandleFactory) {
76 super(count, reduceHandleFactory);
77 }
78
79 public ArrayOfReducers(
80 int count, Int2ObjFunction<ReducerHandle<S, R>> reduceHandleFactory) {
81 super(count, reduceHandleFactory);
82 }
83
84 @Override
85 public int getReducedSize(BlockMasterApi master) {
86 return getStaticSize();
87 }
88
89 @Override
90 public BroadcastArrayHandle<R> broadcastValue(final BlockMasterApi master) {
91 return new ArrayOfBroadcasts<>(
92 getStaticSize(),
93 new Int2ObjFunction<BroadcastHandle<R>>() {
94 @Override
95 public BroadcastHandle<R> apply(int index) {
96 return get(index).broadcastValue(master);
97 }
98 });
99 }
100 }
101
102
103
104
105
106
107 public static class ArrayOfBroadcasts<T>
108 extends ArrayOfHandles<BroadcastHandle<T>>
109 implements BroadcastArrayHandle<T> {
110
111 public ArrayOfBroadcasts(
112 int count,
113 Int2ObjFunction<BroadcastHandle<T>> broadcastHandleFactory) {
114 super(count, broadcastHandleFactory);
115 }
116
117 public ArrayOfBroadcasts(
118 int count,
119 Supplier<BroadcastHandle<T>> broadcastHandleFactory) {
120 super(count, broadcastHandleFactory);
121 }
122
123 @Override
124 public int getBroadcastedSize(WorkerBroadcastUsage worker) {
125 return getStaticSize();
126 }
127 }
128 }