1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import org.apache.giraph.graph.BasicComputation;
22 import org.apache.giraph.graph.Vertex;
23 import org.apache.hadoop.io.DoubleWritable;
24 import org.apache.hadoop.io.FloatWritable;
25 import org.apache.hadoop.io.LongWritable;
26 import org.apache.log4j.Logger;
27
28 import java.io.IOException;
29
30
31
32
33 public class SimpleFailComputation extends BasicComputation<
34 LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
35
36 private static Logger LOG = Logger.getLogger(SimpleFailComputation.class);
37
38 private static long SUPERSTEP = 0;
39
40 @Override
41 public void compute(
42 Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
43 Iterable<DoubleWritable> messages) throws IOException {
44 if (getSuperstep() >= 1) {
45 double sum = 0;
46 for (DoubleWritable message : messages) {
47 sum += message.get();
48 }
49 DoubleWritable vertexValue =
50 new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
51 vertex.setValue(vertexValue);
52 if (getSuperstep() < 30) {
53 if (getSuperstep() == 20) {
54 if (vertex.getId().get() == 10L) {
55 try {
56 Thread.sleep(2000);
57 } catch (InterruptedException e) {
58 LOG.info("Sleep interrupted ", e);
59 }
60 System.exit(1);
61 } else if (getSuperstep() - SUPERSTEP > 10) {
62 return;
63 }
64 }
65 long edges = vertex.getNumEdges();
66 sendMessageToAllEdges(vertex,
67 new DoubleWritable(vertex.getValue().get() / edges));
68 } else {
69 vertex.voteToHalt();
70 }
71 setSuperstep(getSuperstep());
72 }
73 }
74
75
76
77
78
79
80 private static void setSuperstep(long superstep) {
81 SUPERSTEP = superstep;
82 }
83 }