1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.examples; |
20 | |
|
21 | |
import org.apache.giraph.graph.Edge; |
22 | |
import org.apache.giraph.graph.EdgeListVertex; |
23 | |
import org.apache.giraph.graph.Vertex; |
24 | |
import org.apache.giraph.graph.WorkerContext; |
25 | |
import org.apache.hadoop.io.DoubleWritable; |
26 | |
import org.apache.hadoop.io.FloatWritable; |
27 | |
import org.apache.hadoop.io.LongWritable; |
28 | |
import org.apache.log4j.Logger; |
29 | |
|
30 | |
import java.io.IOException; |
31 | |
|
32 | |
|
33 | |
|
34 | |
|
35 | 40 | public class SimpleMutateGraphVertex extends EdgeListVertex< |
36 | |
LongWritable, DoubleWritable, FloatWritable, DoubleWritable> { |
37 | |
|
38 | 1 | private static Logger LOG = |
39 | |
Logger.getLogger(SimpleMutateGraphVertex.class); |
40 | |
|
41 | 20 | private long maxRanges = 100; |
42 | |
|
43 | |
|
44 | |
|
45 | |
|
46 | |
|
47 | |
|
48 | |
|
49 | |
|
50 | |
|
51 | |
private long rangeVertexIdStart(int range) { |
52 | 55 | return (Long.MAX_VALUE / maxRanges) * range; |
53 | |
} |
54 | |
|
55 | |
@Override |
56 | |
public void compute(Iterable<DoubleWritable> messages) |
57 | |
throws IOException { |
58 | 120 | SimpleMutateGraphVertexWorkerContext workerContext = |
59 | |
(SimpleMutateGraphVertexWorkerContext) getWorkerContext(); |
60 | 120 | if (getSuperstep() == 0) { |
61 | 5 | LOG.debug("Reached superstep " + getSuperstep()); |
62 | 115 | } else if (getSuperstep() == 1) { |
63 | |
|
64 | |
|
65 | 5 | LongWritable destVertexId = |
66 | |
new LongWritable(rangeVertexIdStart(1) + getId().get()); |
67 | 5 | sendMessage(destVertexId, new DoubleWritable(0.0)); |
68 | 5 | } else if (getSuperstep() == 2) { |
69 | 10 | LOG.debug("Reached superstep " + getSuperstep()); |
70 | 100 | } else if (getSuperstep() == 3) { |
71 | 10 | long vertexCount = workerContext.getVertexCount(); |
72 | 10 | if (vertexCount * 2 != getTotalNumVertices()) { |
73 | 0 | throw new IllegalStateException( |
74 | |
"Impossible to have " + getTotalNumVertices() + |
75 | |
" vertices when should have " + vertexCount * 2 + |
76 | |
" on superstep " + getSuperstep()); |
77 | |
} |
78 | 10 | long edgeCount = workerContext.getEdgeCount(); |
79 | 10 | if (edgeCount != getTotalNumEdges()) { |
80 | 0 | throw new IllegalStateException( |
81 | |
"Impossible to have " + getTotalNumEdges() + |
82 | |
" edges when should have " + edgeCount + |
83 | |
" on superstep " + getSuperstep()); |
84 | |
} |
85 | |
|
86 | 10 | LongWritable vertexIndex = |
87 | |
new LongWritable(rangeVertexIdStart(3) + getId().get()); |
88 | |
Vertex<LongWritable, DoubleWritable, |
89 | 10 | FloatWritable, DoubleWritable> vertex = |
90 | |
instantiateVertex(vertexIndex, new DoubleWritable(0.0), null, null); |
91 | 10 | addVertexRequest(vertex); |
92 | |
|
93 | 10 | addEdgeRequest(vertexIndex, |
94 | |
new Edge<LongWritable, FloatWritable>( |
95 | |
getId(), new FloatWritable(0.0f))); |
96 | 10 | } else if (getSuperstep() == 4) { |
97 | 20 | LOG.debug("Reached superstep " + getSuperstep()); |
98 | 70 | } else if (getSuperstep() == 5) { |
99 | 20 | long vertexCount = workerContext.getVertexCount(); |
100 | 20 | if (vertexCount * 2 != getTotalNumVertices()) { |
101 | 0 | throw new IllegalStateException( |
102 | |
"Impossible to have " + getTotalNumVertices() + |
103 | |
" when should have " + vertexCount * 2 + |
104 | |
" on superstep " + getSuperstep()); |
105 | |
} |
106 | 20 | long edgeCount = workerContext.getEdgeCount(); |
107 | 20 | if (edgeCount + vertexCount != getTotalNumEdges()) { |
108 | 0 | throw new IllegalStateException( |
109 | |
"Impossible to have " + getTotalNumEdges() + |
110 | |
" edges when should have " + edgeCount + vertexCount + |
111 | |
" on superstep " + getSuperstep()); |
112 | |
} |
113 | |
|
114 | 20 | LongWritable vertexIndex = |
115 | |
new LongWritable(rangeVertexIdStart(3) + getId().get()); |
116 | 20 | workerContext.increaseEdgesRemoved(); |
117 | 20 | removeEdgeRequest(vertexIndex, getId()); |
118 | 20 | } else if (getSuperstep() == 6) { |
119 | |
|
120 | 20 | if (getId().compareTo( |
121 | |
new LongWritable(rangeVertexIdStart(3))) >= 0) { |
122 | 10 | removeVertexRequest(getId()); |
123 | |
} |
124 | 30 | } else if (getSuperstep() == 7) { |
125 | 10 | long origEdgeCount = workerContext.getOrigEdgeCount(); |
126 | 10 | if (origEdgeCount != getTotalNumEdges()) { |
127 | 0 | throw new IllegalStateException( |
128 | |
"Impossible to have " + getTotalNumEdges() + |
129 | |
" edges when should have " + origEdgeCount + |
130 | |
" on superstep " + getSuperstep()); |
131 | |
} |
132 | 10 | } else if (getSuperstep() == 8) { |
133 | 10 | long vertexCount = workerContext.getVertexCount(); |
134 | 10 | if (vertexCount / 2 != getTotalNumVertices()) { |
135 | 0 | throw new IllegalStateException( |
136 | |
"Impossible to have " + getTotalNumVertices() + |
137 | |
" vertices when should have " + vertexCount / 2 + |
138 | |
" on superstep " + getSuperstep()); |
139 | |
} |
140 | 10 | } else { |
141 | 10 | voteToHalt(); |
142 | |
} |
143 | 120 | } |
144 | |
|
145 | |
|
146 | |
|
147 | |
|
148 | 1 | public static class SimpleMutateGraphVertexWorkerContext |
149 | |
extends WorkerContext { |
150 | |
|
151 | |
private long vertexCount; |
152 | |
|
153 | |
private long edgeCount; |
154 | |
|
155 | |
private long origEdgeCount; |
156 | |
|
157 | 1 | private int edgesRemoved = 0; |
158 | |
|
159 | |
@Override |
160 | |
public void preApplication() |
161 | 1 | throws InstantiationException, IllegalAccessException { } |
162 | |
|
163 | |
@Override |
164 | 1 | public void postApplication() { } |
165 | |
|
166 | |
@Override |
167 | 10 | public void preSuperstep() { } |
168 | |
|
169 | |
@Override |
170 | |
public void postSuperstep() { |
171 | 10 | vertexCount = getTotalNumVertices(); |
172 | 10 | edgeCount = getTotalNumEdges(); |
173 | 10 | if (getSuperstep() == 1) { |
174 | 1 | origEdgeCount = edgeCount; |
175 | |
} |
176 | 10 | LOG.info("Got " + vertexCount + " vertices, " + |
177 | |
edgeCount + " edges on superstep " + |
178 | |
getSuperstep()); |
179 | 10 | LOG.info("Removed " + edgesRemoved); |
180 | 10 | edgesRemoved = 0; |
181 | 10 | } |
182 | |
|
183 | |
public long getVertexCount() { |
184 | 40 | return vertexCount; |
185 | |
} |
186 | |
|
187 | |
public long getEdgeCount() { |
188 | 30 | return edgeCount; |
189 | |
} |
190 | |
|
191 | |
public long getOrigEdgeCount() { |
192 | 10 | return origEdgeCount; |
193 | |
} |
194 | |
|
195 | |
|
196 | |
|
197 | |
|
198 | |
public void increaseEdgesRemoved() { |
199 | 20 | this.edgesRemoved++; |
200 | 20 | } |
201 | |
} |
202 | |
} |