1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.master;
20
21 import org.apache.giraph.master.input.MasterInputSplitsHandler;
22 import org.apache.giraph.partition.PartitionStats;
23 import org.apache.giraph.reducers.ReduceOperation;
24 import org.apache.giraph.utils.BlockingElementsSet;
25 import org.apache.hadoop.io.Writable;
26 import org.apache.hadoop.util.Progressable;
27
28 import com.google.common.collect.Iterables;
29
30 import java.util.List;
31
32
33
34
35 public class MasterGlobalCommHandler implements MasterGlobalCommUsage {
36
37 private final MasterAggregatorHandler aggregatorHandler;
38
39 private final MasterInputSplitsHandler inputSplitsHandler;
40
41 private final BlockingElementsSet<List<PartitionStats>> partitionStats =
42 new BlockingElementsSet<>();
43
44
45
46
47
48
49
50 public MasterGlobalCommHandler(
51 MasterAggregatorHandler aggregatorHandler,
52 MasterInputSplitsHandler inputSplitsHandler) {
53 this.aggregatorHandler = aggregatorHandler;
54 this.inputSplitsHandler = inputSplitsHandler;
55 }
56
57 public MasterAggregatorHandler getAggregatorHandler() {
58 return aggregatorHandler;
59 }
60
61 public MasterInputSplitsHandler getInputSplitsHandler() {
62 return inputSplitsHandler;
63 }
64
65 @Override
66 public <S, R extends Writable> void registerReducer(String name,
67 ReduceOperation<S, R> reduceOp) {
68 aggregatorHandler.registerReducer(name, reduceOp);
69 }
70
71 @Override
72 public <S, R extends Writable> void registerReducer(String name,
73 ReduceOperation<S, R> reduceOp, R globalInitialValue) {
74 aggregatorHandler.registerReducer(name, reduceOp, globalInitialValue);
75 }
76
77 @Override
78 public <R extends Writable> R getReduced(String name) {
79 return aggregatorHandler.getReduced(name);
80 }
81
82 @Override
83 public void broadcast(String name, Writable value) {
84 aggregatorHandler.broadcast(name, value);
85 }
86
87
88
89
90
91
92 public void receivedPartitionStats(List<PartitionStats> partitionStats) {
93 this.partitionStats.offer(partitionStats);
94 }
95
96
97
98
99
100
101
102
103 public Iterable<PartitionStats> getAllPartitionStats(int numWorkers,
104 Progressable progressable) {
105 return Iterables.concat(
106 partitionStats.getElements(numWorkers, progressable));
107 }
108 }