1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.examples; |
20 | |
|
21 | |
import org.apache.commons.cli.CommandLine; |
22 | |
import org.apache.commons.cli.CommandLineParser; |
23 | |
import org.apache.commons.cli.HelpFormatter; |
24 | |
import org.apache.commons.cli.Options; |
25 | |
import org.apache.commons.cli.PosixParser; |
26 | |
import org.apache.giraph.aggregators.LongSumAggregator; |
27 | |
import org.apache.giraph.graph.DefaultMasterCompute; |
28 | |
import org.apache.giraph.graph.Edge; |
29 | |
import org.apache.giraph.graph.EdgeListVertex; |
30 | |
import org.apache.giraph.graph.GiraphJob; |
31 | |
import org.apache.giraph.graph.WorkerContext; |
32 | |
import org.apache.giraph.io.GeneratedVertexInputFormat; |
33 | |
import org.apache.giraph.io.IdWithValueTextOutputFormat; |
34 | |
import org.apache.hadoop.conf.Configuration; |
35 | |
import org.apache.hadoop.fs.Path; |
36 | |
import org.apache.hadoop.io.FloatWritable; |
37 | |
import org.apache.hadoop.io.IntWritable; |
38 | |
import org.apache.hadoop.io.LongWritable; |
39 | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
40 | |
import org.apache.hadoop.util.Tool; |
41 | |
import org.apache.hadoop.util.ToolRunner; |
42 | |
import org.apache.log4j.Logger; |
43 | |
|
44 | |
|
45 | |
|
46 | |
|
47 | |
|
48 | |
|
49 | 42 | public class SimpleCheckpointVertex extends |
50 | |
EdgeListVertex<LongWritable, IntWritable, FloatWritable, FloatWritable> |
51 | |
implements Tool { |
52 | |
|
53 | |
public static final int FAULTING_SUPERSTEP = 4; |
54 | |
|
55 | |
public static final long FAULTING_VERTEX_ID = 1; |
56 | |
|
57 | |
public static final String SUPERSTEP_COUNT = |
58 | |
"simpleCheckpointVertex.superstepCount"; |
59 | |
|
60 | |
public static final String ENABLE_FAULT = |
61 | |
"simpleCheckpointVertex.enableFault"; |
62 | |
|
63 | 1 | private static final Logger LOG = |
64 | |
Logger.getLogger(SimpleCheckpointVertex.class); |
65 | |
|
66 | |
private Configuration conf; |
67 | |
|
68 | |
@Override |
69 | |
public void compute(Iterable<FloatWritable> messages) { |
70 | 280 | SimpleCheckpointVertexWorkerContext workerContext = |
71 | |
(SimpleCheckpointVertexWorkerContext) getWorkerContext(); |
72 | |
|
73 | 280 | boolean enableFault = workerContext.getEnableFault(); |
74 | 280 | int supersteps = workerContext.getSupersteps(); |
75 | |
|
76 | 280 | if (enableFault && (getSuperstep() == FAULTING_SUPERSTEP) && |
77 | |
(getContext().getTaskAttemptID().getId() == 0) && |
78 | |
(getId().get() == FAULTING_VERTEX_ID)) { |
79 | 0 | LOG.info("compute: Forced a fault on the first " + |
80 | |
"attempt of superstep " + |
81 | |
FAULTING_SUPERSTEP + " and vertex id " + |
82 | |
FAULTING_VERTEX_ID); |
83 | 0 | System.exit(-1); |
84 | |
} |
85 | 280 | if (getSuperstep() > supersteps) { |
86 | 35 | voteToHalt(); |
87 | 35 | return; |
88 | |
} |
89 | 245 | long sumAgg = this.<LongWritable>getAggregatedValue( |
90 | |
LongSumAggregator.class.getName()).get(); |
91 | 245 | LOG.info("compute: " + sumAgg); |
92 | 245 | aggregate(LongSumAggregator.class.getName(), |
93 | |
new LongWritable(getId().get())); |
94 | 245 | LOG.info("compute: sum = " + sumAgg + |
95 | |
" for vertex " + getId()); |
96 | 245 | float msgValue = 0.0f; |
97 | 245 | for (FloatWritable message : messages) { |
98 | 210 | float curMsgValue = message.get(); |
99 | 210 | msgValue += curMsgValue; |
100 | 210 | LOG.info("compute: got msgValue = " + curMsgValue + |
101 | |
" for vertex " + getId() + |
102 | |
" on superstep " + getSuperstep()); |
103 | 210 | } |
104 | 245 | int vertexValue = getValue().get(); |
105 | 245 | setValue(new IntWritable(vertexValue + (int) msgValue)); |
106 | 245 | LOG.info("compute: vertex " + getId() + |
107 | |
" has value " + getValue() + |
108 | |
" on superstep " + getSuperstep()); |
109 | 245 | for (Edge<LongWritable, FloatWritable> edge : getEdges()) { |
110 | 245 | FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() + |
111 | |
(float) vertexValue); |
112 | 245 | LOG.info("compute: vertex " + getId() + |
113 | |
" sending edgeValue " + edge.getValue() + |
114 | |
" vertexValue " + vertexValue + |
115 | |
" total " + newEdgeValue + |
116 | |
" to vertex " + edge.getTargetVertexId() + |
117 | |
" on superstep " + getSuperstep()); |
118 | 245 | addEdge(edge.getTargetVertexId(), newEdgeValue); |
119 | 245 | sendMessage(edge.getTargetVertexId(), newEdgeValue); |
120 | 245 | } |
121 | 245 | } |
122 | |
|
123 | |
|
124 | |
|
125 | |
|
126 | 7 | public static class SimpleCheckpointVertexWorkerContext |
127 | |
extends WorkerContext { |
128 | |
|
129 | |
public static final String FAULT_FILE = "/tmp/faultFile"; |
130 | |
|
131 | |
private static long FINAL_SUM; |
132 | |
|
133 | 7 | private int supersteps = 6; |
134 | |
|
135 | 7 | private boolean enableFault = false; |
136 | |
|
137 | |
public static long getFinalSum() { |
138 | 2 | return FINAL_SUM; |
139 | |
} |
140 | |
|
141 | |
@Override |
142 | |
public void preApplication() |
143 | |
throws InstantiationException, IllegalAccessException { |
144 | 7 | supersteps = getContext().getConfiguration() |
145 | |
.getInt(SUPERSTEP_COUNT, supersteps); |
146 | 7 | enableFault = getContext().getConfiguration() |
147 | |
.getBoolean(ENABLE_FAULT, false); |
148 | 7 | } |
149 | |
|
150 | |
@Override |
151 | |
public void postApplication() { |
152 | 7 | FINAL_SUM = this.<LongWritable>getAggregatedValue( |
153 | |
LongSumAggregator.class.getName()).get(); |
154 | 7 | LOG.info("FINAL_SUM=" + FINAL_SUM); |
155 | 7 | } |
156 | |
|
157 | |
@Override |
158 | |
public void preSuperstep() { |
159 | 56 | } |
160 | |
|
161 | |
@Override |
162 | 56 | public void postSuperstep() { } |
163 | |
|
164 | |
public int getSupersteps() { |
165 | 280 | return this.supersteps; |
166 | |
} |
167 | |
|
168 | |
public boolean getEnableFault() { |
169 | 280 | return this.enableFault; |
170 | |
} |
171 | |
} |
172 | |
|
173 | |
@Override |
174 | |
public int run(String[] args) throws Exception { |
175 | 0 | Options options = new Options(); |
176 | 0 | options.addOption("h", "help", false, "Help"); |
177 | 0 | options.addOption("v", "verbose", false, "Verbose"); |
178 | 0 | options.addOption("w", |
179 | |
"workers", |
180 | |
true, |
181 | |
"Number of workers"); |
182 | 0 | options.addOption("s", |
183 | |
"supersteps", |
184 | |
true, |
185 | |
"Supersteps to execute before finishing"); |
186 | 0 | options.addOption("w", |
187 | |
"workers", |
188 | |
true, |
189 | |
"Minimum number of workers"); |
190 | 0 | options.addOption("o", |
191 | |
"outputDirectory", |
192 | |
true, |
193 | |
"Output directory"); |
194 | 0 | HelpFormatter formatter = new HelpFormatter(); |
195 | 0 | if (args.length == 0) { |
196 | 0 | formatter.printHelp(getClass().getName(), options, true); |
197 | 0 | return 0; |
198 | |
} |
199 | 0 | CommandLineParser parser = new PosixParser(); |
200 | 0 | CommandLine cmd = parser.parse(options, args); |
201 | 0 | if (cmd.hasOption('h')) { |
202 | 0 | formatter.printHelp(getClass().getName(), options, true); |
203 | 0 | return 0; |
204 | |
} |
205 | 0 | if (!cmd.hasOption('w')) { |
206 | 0 | LOG.info("Need to choose the number of workers (-w)"); |
207 | 0 | return -1; |
208 | |
} |
209 | 0 | if (!cmd.hasOption('o')) { |
210 | 0 | LOG.info("Need to set the output directory (-o)"); |
211 | 0 | return -1; |
212 | |
} |
213 | |
|
214 | 0 | GiraphJob bspJob = new GiraphJob(getConf(), getClass().getName()); |
215 | 0 | bspJob.setVertexClass(getClass()); |
216 | 0 | bspJob.setVertexInputFormatClass(GeneratedVertexInputFormat.class); |
217 | 0 | bspJob.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); |
218 | 0 | bspJob.setWorkerContextClass(SimpleCheckpointVertexWorkerContext.class); |
219 | 0 | bspJob.setMasterComputeClass(SimpleCheckpointVertexMasterCompute.class); |
220 | 0 | int minWorkers = Integer.parseInt(cmd.getOptionValue('w')); |
221 | 0 | int maxWorkers = Integer.parseInt(cmd.getOptionValue('w')); |
222 | 0 | bspJob.setWorkerConfiguration(minWorkers, maxWorkers, 100.0f); |
223 | |
|
224 | 0 | FileOutputFormat.setOutputPath(bspJob.getInternalJob(), |
225 | |
new Path(cmd.getOptionValue('o'))); |
226 | 0 | boolean verbose = false; |
227 | 0 | if (cmd.hasOption('v')) { |
228 | 0 | verbose = true; |
229 | |
} |
230 | 0 | if (cmd.hasOption('s')) { |
231 | 0 | getConf().setInt(SUPERSTEP_COUNT, |
232 | |
Integer.parseInt(cmd.getOptionValue('s'))); |
233 | |
} |
234 | 0 | if (bspJob.run(verbose)) { |
235 | 0 | return 0; |
236 | |
} else { |
237 | 0 | return -1; |
238 | |
} |
239 | |
} |
240 | |
|
241 | |
|
242 | |
|
243 | |
|
244 | |
|
245 | 35 | public static class SimpleCheckpointVertexMasterCompute extends |
246 | |
DefaultMasterCompute { |
247 | |
@Override |
248 | |
public void initialize() throws InstantiationException, |
249 | |
IllegalAccessException { |
250 | 7 | registerAggregator(LongSumAggregator.class.getName(), |
251 | |
LongSumAggregator.class); |
252 | 7 | } |
253 | |
} |
254 | |
|
255 | |
|
256 | |
|
257 | |
|
258 | |
|
259 | |
|
260 | |
|
261 | |
public static void main(String[] args) throws Exception { |
262 | 0 | System.exit(ToolRunner.run(new SimpleCheckpointVertex(), args)); |
263 | 0 | } |
264 | |
|
265 | |
@Override |
266 | |
public Configuration getConf() { |
267 | 0 | return conf; |
268 | |
} |
269 | |
|
270 | |
@Override |
271 | |
public void setConf(Configuration conf) { |
272 | 35 | this.conf = conf; |
273 | 35 | } |
274 | |
} |