1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.graph; |
20 | |
|
21 | |
import java.io.IOException; |
22 | |
import java.util.Map; |
23 | |
import java.util.Map.Entry; |
24 | |
import java.util.TreeMap; |
25 | |
|
26 | |
import org.apache.giraph.bsp.ApplicationState; |
27 | |
import org.apache.giraph.bsp.CentralizedServiceMaster; |
28 | |
import org.apache.giraph.bsp.SuperstepState; |
29 | |
import org.apache.hadoop.io.Writable; |
30 | |
import org.apache.hadoop.io.WritableComparable; |
31 | |
import org.apache.hadoop.mapreduce.Mapper.Context; |
32 | |
import org.apache.log4j.Logger; |
33 | |
import org.apache.zookeeper.KeeperException; |
34 | |
|
35 | |
|
36 | |
|
37 | |
|
38 | |
|
39 | |
|
40 | |
|
41 | |
|
42 | |
|
43 | |
|
44 | |
|
45 | |
@SuppressWarnings("rawtypes") |
46 | |
public class MasterThread<I extends WritableComparable, V extends Writable, |
47 | |
E extends Writable, M extends Writable> extends Thread { |
48 | |
|
49 | |
public static final String GIRAPH_TIMERS_COUNTER_GROUP_NAME = "Giraph Timers"; |
50 | |
|
51 | 1 | private static final Logger LOG = Logger.getLogger(MasterThread.class); |
52 | |
|
53 | 24 | private CentralizedServiceMaster<I, V, E, M> bspServiceMaster = null; |
54 | |
|
55 | |
private final Context context; |
56 | |
|
57 | |
private final boolean superstepCounterOn; |
58 | |
|
59 | 24 | private double setupSecs = 0d; |
60 | |
|
61 | 24 | private final Map<Long, Double> superstepSecsMap = |
62 | |
new TreeMap<Long, Double>(); |
63 | |
|
64 | |
|
65 | |
|
66 | |
|
67 | |
|
68 | |
|
69 | |
|
70 | |
|
71 | |
MasterThread(BspServiceMaster<I, V, E, M> bspServiceMaster, |
72 | |
Context context) { |
73 | 24 | super(MasterThread.class.getName()); |
74 | 24 | this.bspServiceMaster = bspServiceMaster; |
75 | 24 | this.context = context; |
76 | 24 | superstepCounterOn = context.getConfiguration().getBoolean( |
77 | |
GiraphJob.USE_SUPERSTEP_COUNTERS, |
78 | |
GiraphJob.USE_SUPERSTEP_COUNTERS_DEFAULT); |
79 | 24 | } |
80 | |
|
81 | |
|
82 | |
|
83 | |
|
84 | |
|
85 | |
|
86 | |
@Override |
87 | |
public void run() { |
88 | |
|
89 | |
|
90 | |
|
91 | |
|
92 | |
try { |
93 | 24 | long startMillis = System.currentTimeMillis(); |
94 | 24 | long endMillis = 0; |
95 | 24 | bspServiceMaster.setup(); |
96 | 24 | if (bspServiceMaster.becomeMaster()) { |
97 | |
|
98 | 24 | if (bspServiceMaster.getRestartedSuperstep() != |
99 | |
BspService.UNSET_SUPERSTEP || |
100 | |
bspServiceMaster.createInputSplits() != -1) { |
101 | 24 | long setupMillis = System.currentTimeMillis() - startMillis; |
102 | 24 | context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME, |
103 | |
"Setup (milliseconds)"). |
104 | |
increment(setupMillis); |
105 | 24 | setupSecs = setupMillis / 1000.0d; |
106 | 24 | SuperstepState superstepState = SuperstepState.INITIAL; |
107 | 24 | long cachedSuperstep = BspService.UNSET_SUPERSTEP; |
108 | 242 | while (superstepState != SuperstepState.ALL_SUPERSTEPS_DONE) { |
109 | 218 | long startSuperstepMillis = System.currentTimeMillis(); |
110 | 218 | cachedSuperstep = bspServiceMaster.getSuperstep(); |
111 | 218 | superstepState = bspServiceMaster.coordinateSuperstep(); |
112 | 218 | long superstepMillis = System.currentTimeMillis() - |
113 | |
startSuperstepMillis; |
114 | 218 | superstepSecsMap.put(Long.valueOf(cachedSuperstep), |
115 | |
superstepMillis / 1000.0d); |
116 | 218 | if (LOG.isInfoEnabled()) { |
117 | 218 | LOG.info("masterThread: Coordination of superstep " + |
118 | |
cachedSuperstep + " took " + |
119 | |
superstepMillis / 1000.0d + |
120 | |
" seconds ended with state " + superstepState + |
121 | |
" and is now on superstep " + |
122 | |
bspServiceMaster.getSuperstep()); |
123 | |
} |
124 | 218 | if (superstepCounterOn) { |
125 | |
String counterPrefix; |
126 | 218 | if (cachedSuperstep == -1) { |
127 | 24 | counterPrefix = "Vertex input superstep"; |
128 | |
} else { |
129 | 194 | counterPrefix = "Superstep " + cachedSuperstep; |
130 | |
} |
131 | 218 | context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME, |
132 | |
counterPrefix + |
133 | |
" (milliseconds)"). |
134 | |
increment(superstepMillis); |
135 | |
} |
136 | |
|
137 | |
|
138 | 218 | if (superstepState == SuperstepState.WORKER_FAILURE) { |
139 | 0 | bspServiceMaster.restartFromCheckpoint( |
140 | |
bspServiceMaster.getLastGoodCheckpoint()); |
141 | |
} |
142 | 218 | endMillis = System.currentTimeMillis(); |
143 | 218 | } |
144 | 24 | bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, -1); |
145 | |
} |
146 | |
} |
147 | 24 | bspServiceMaster.cleanup(); |
148 | 24 | if (!superstepSecsMap.isEmpty()) { |
149 | 24 | context.getCounter( |
150 | |
GIRAPH_TIMERS_COUNTER_GROUP_NAME, |
151 | |
"Shutdown (milliseconds)"). |
152 | |
increment(System.currentTimeMillis() - endMillis); |
153 | 24 | if (LOG.isInfoEnabled()) { |
154 | 24 | LOG.info("setup: Took " + setupSecs + " seconds."); |
155 | |
} |
156 | 24 | for (Entry<Long, Double> entry : superstepSecsMap.entrySet()) { |
157 | 218 | if (LOG.isInfoEnabled()) { |
158 | 218 | if (entry.getKey().longValue() == |
159 | |
BspService.INPUT_SUPERSTEP) { |
160 | 24 | LOG.info("vertex input superstep: Took " + |
161 | |
entry.getValue() + " seconds."); |
162 | |
} else { |
163 | 194 | LOG.info("superstep " + entry.getKey() + ": Took " + |
164 | |
entry.getValue() + " seconds."); |
165 | |
} |
166 | |
} |
167 | |
} |
168 | 24 | if (LOG.isInfoEnabled()) { |
169 | 24 | LOG.info("shutdown: Took " + |
170 | |
(System.currentTimeMillis() - endMillis) / |
171 | |
1000.0d + " seconds."); |
172 | 24 | LOG.info("total: Took " + |
173 | |
((System.currentTimeMillis() - startMillis) / |
174 | |
1000.0d) + " seconds."); |
175 | |
} |
176 | 24 | context.getCounter( |
177 | |
GIRAPH_TIMERS_COUNTER_GROUP_NAME, |
178 | |
"Total (milliseconds)"). |
179 | |
increment(System.currentTimeMillis() - startMillis); |
180 | |
} |
181 | 0 | } catch (IOException e) { |
182 | 0 | LOG.error("masterThread: Master algorithm failed with " + |
183 | |
"IOException ", e); |
184 | 0 | throw new IllegalStateException(e); |
185 | 0 | } catch (InterruptedException e) { |
186 | 0 | LOG.error("masterThread: Master algorithm failed with " + |
187 | |
"InterruptedException", e); |
188 | 0 | throw new IllegalStateException(e); |
189 | 0 | } catch (KeeperException e) { |
190 | 0 | LOG.error("masterThread: Master algorithm failed with " + |
191 | |
"KeeperException", e); |
192 | 0 | throw new IllegalStateException(e); |
193 | 24 | } |
194 | 24 | } |
195 | |
} |