1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24
25 import java.io.IOException;
26 import java.util.Map;
27
28 import org.apache.giraph.BspCase;
29 import org.apache.giraph.conf.GiraphConfiguration;
30 import org.apache.giraph.edge.ByteArrayEdges;
31 import org.apache.giraph.edge.Edge;
32 import org.apache.giraph.factories.VertexValueFactory;
33 import org.apache.giraph.graph.Vertex;
34 import org.apache.giraph.graph.VertexValueCombiner;
35 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
36 import org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat;
37 import org.apache.giraph.io.formats.IntIntTextVertexValueInputFormat;
38 import org.apache.giraph.io.formats.IntNullReverseTextEdgeInputFormat;
39 import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
40 import org.apache.giraph.utils.ComputationCountEdges;
41 import org.apache.giraph.utils.IntIntNullNoOpComputation;
42 import org.apache.giraph.utils.InternalVertexRunner;
43 import org.apache.hadoop.io.IntWritable;
44 import org.apache.hadoop.io.NullWritable;
45 import org.junit.Test;
46
47 import com.google.common.collect.Maps;
48
49
50
51
52
53 public class TestVertexEdgeInput extends BspCase {
54 public TestVertexEdgeInput() {
55 super(TestVertexEdgeInput.class.getName());
56 }
57
58
59
60 @Test
61 public void testEdgesOnly() throws Exception {
62 String[] edges = new String[] {
63 "1 2",
64 "2 3",
65 "2 4",
66 "4 1"
67 };
68
69 GiraphConfiguration conf = new GiraphConfiguration();
70 conf.setComputationClass(ComputationCountEdges.class);
71 conf.setOutEdgesClass(ByteArrayEdges.class);
72 conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
73 conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
74 Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
75
76 Map<Integer, Integer> values = parseResults(results);
77
78
79 assertEquals(3, values.size());
80
81 assertEquals(1, (int) values.get(1));
82 assertEquals(2, (int) values.get(2));
83 assertEquals(1, (int) values.get(4));
84 }
85
86
87
88
89 @Test
90 public void testEdgesOnlyWithReverse() throws Exception {
91 String[] edges = new String[] {
92 "1 2",
93 "2 3",
94 "2 4",
95 "4 1"
96 };
97
98 GiraphConfiguration conf = new GiraphConfiguration();
99 conf.setComputationClass(ComputationCountEdges.class);
100 conf.setOutEdgesClass(ByteArrayEdges.class);
101 conf.setEdgeInputFormatClass(IntNullReverseTextEdgeInputFormat.class);
102 conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
103 Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
104
105 Map<Integer, Integer> values = parseResults(results);
106
107
108 assertEquals(4, values.size());
109
110 assertEquals(2, (int) values.get(1));
111 assertEquals(3, (int) values.get(2));
112 assertEquals(1, (int) values.get(3));
113 assertEquals(2, (int) values.get(4));
114 }
115
116
117
118
119 public static class IntSumVertexValueCombiner implements VertexValueCombiner<IntWritable> {
120 @Override
121 public void combine(IntWritable originalVertexValue, IntWritable vertexValue) {
122 originalVertexValue.set(originalVertexValue.get() + vertexValue.get());
123 }
124 }
125
126
127
128 @Test
129 public void testVertexValueCombiner() throws Exception {
130 String[] vertices = new String[] {
131 "1 75 2",
132 "2 34 3",
133 "3 13",
134 "4 32",
135 "1 11",
136 "2 23 1",
137 "2 3"
138 };
139
140 GiraphConfiguration conf = new GiraphConfiguration();
141 conf.setComputationClass(IntIntNullNoOpComputation.class);
142 conf.setOutEdgesClass(ByteArrayEdges.class);
143 conf.setVertexInputFormatClass(IntIntNullTextVertexInputFormat.class);
144 conf.setVertexValueCombinerClass(IntSumVertexValueCombiner.class);
145 conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
146
147
148 Iterable<String> results = InternalVertexRunner.run(conf, vertices);
149
150 Map<Integer, Integer> values = parseResults(results);
151
152
153 assertEquals(4, values.size());
154
155 assertEquals(86, (int) values.get(1));
156 assertEquals(60, (int) values.get(2));
157 assertEquals(13, (int) values.get(3));
158 assertEquals(32, (int) values.get(4));
159
160
161 conf.setComputationClass(ComputationCountEdges.class);
162 results = InternalVertexRunner.run(conf, vertices);
163
164
165 values = parseResults(results);
166 assertEquals(1, (int) values.get(1));
167 assertEquals(2, (int) values.get(2));
168 assertEquals(0, (int) values.get(3));
169 assertEquals(0, (int) values.get(4));
170 }
171
172
173
174 @Test
175 public void testMixedVertexValueEdgeFormat() throws Exception {
176 String[] vertices = new String[] {
177 "1 75",
178 "2 34",
179 "3 13",
180 "4 32"
181 };
182 String[] edges = new String[] {
183 "1 2",
184 "2 3",
185 "2 4",
186 "4 1",
187 "5 3"
188 };
189
190 GiraphConfiguration conf = new GiraphConfiguration();
191 conf.setComputationClass(IntIntNullNoOpComputation.class);
192 conf.setOutEdgesClass(ByteArrayEdges.class);
193 conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
194 conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
195 conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
196
197
198 Iterable<String> results = InternalVertexRunner.run(conf, vertices, edges);
199
200 Map<Integer, Integer> values = parseResults(results);
201
202
203
204 assertEquals(5, values.size());
205
206 assertEquals(75, (int) values.get(1));
207 assertEquals(34, (int) values.get(2));
208 assertEquals(13, (int) values.get(3));
209 assertEquals(32, (int) values.get(4));
210
211 assertEquals(0, (int) values.get(5));
212
213
214 conf.setVertexValueFactoryClass(TestVertexValueFactory.class);
215 results = InternalVertexRunner.run(conf, vertices, edges);
216 values = parseResults(results);
217
218
219 assertEquals(3, (int) values.get(5));
220
221 conf = new GiraphConfiguration();
222 conf.setComputationClass(ComputationCountEdges.class);
223 conf.setOutEdgesClass(ByteArrayEdges.class);
224 conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
225 conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
226 conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
227
228
229 results = InternalVertexRunner.run(conf, vertices, edges);
230
231 values = parseResults(results);
232
233
234 assertEquals(1, (int) values.get(1));
235 assertEquals(2, (int) values.get(2));
236 assertEquals(0, (int) values.get(3));
237 assertEquals(1, (int) values.get(4));
238 assertEquals(1, (int) values.get(5));
239 }
240
241
242
243 @Test
244 public void testMixedVertexEdgeFormat() throws Exception {
245 String[] vertices = new String[] {
246 "1 75 2 3",
247 "2 34 1 5",
248 "3 13",
249 "4 32"
250 };
251 String[] edges = new String[] {
252 "1 2",
253 "2 3",
254 "2 4",
255 "4 1",
256 "5 3"
257 };
258
259 GiraphConfiguration conf = new GiraphConfiguration();
260 conf.setComputationClass(IntIntNullNoOpComputation.class);
261 conf.setOutEdgesClass(ByteArrayEdges.class);
262 conf.setVertexInputFormatClass(IntIntNullTextVertexInputFormat.class);
263 conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
264 conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
265
266
267 Iterable<String> results = InternalVertexRunner.run(conf, vertices, edges);
268
269 Map<Integer, Integer> values = parseResults(results);
270
271
272
273 assertEquals(5, values.size());
274
275 assertEquals(75, (int) values.get(1));
276 assertEquals(34, (int) values.get(2));
277 assertEquals(13, (int) values.get(3));
278 assertEquals(32, (int) values.get(4));
279
280 assertEquals(0, (int) values.get(5));
281
282
283 conf.setVertexValueFactoryClass(TestVertexValueFactory.class);
284 results = InternalVertexRunner.run(conf, vertices, edges);
285 values = parseResults(results);
286
287
288 assertEquals(3, (int) values.get(5));
289
290 conf = new GiraphConfiguration();
291 conf.setComputationClass(ComputationCountEdges.class);
292 conf.setOutEdgesClass(ByteArrayEdges.class);
293 conf.setVertexInputFormatClass(IntIntNullTextVertexInputFormat.class);
294 conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
295 conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
296
297
298 results = InternalVertexRunner.run(conf, vertices, edges);
299
300 values = parseResults(results);
301
302
303 assertEquals(3, (int) values.get(1));
304 assertEquals(4, (int) values.get(2));
305 assertEquals(0, (int) values.get(3));
306 assertEquals(1, (int) values.get(4));
307 assertEquals(1, (int) values.get(5));
308 }
309
310
311 @Test
312 public void testDifferentInputEdgesClass() throws Exception {
313 String[] edges = new String[] {
314 "1 2",
315 "2 3",
316 "2 4",
317 "4 1"
318 };
319
320 GiraphConfiguration conf = new GiraphConfiguration();
321 conf.setComputationClass(TestComputationCheckEdgesType.class);
322 conf.setOutEdgesClass(ByteArrayEdges.class);
323 conf.setInputOutEdgesClass(TestOutEdgesFilterEven.class);
324 conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
325 conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
326 Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
327
328 Map<Integer, Integer> values = parseResults(results);
329
330
331
332 assertEquals(3, values.size());
333
334
335 assertEquals(1, (int) values.get(1));
336 assertEquals(1, (int) values.get(2));
337 assertEquals(0, (int) values.get(4));
338 }
339
340 public static class TestComputationCheckEdgesType extends
341 ComputationCountEdges {
342 @Override
343 public void compute(Vertex<IntWritable, IntWritable, NullWritable> vertex,
344 Iterable<NullWritable> messages) throws IOException {
345 assertFalse(vertex.getEdges() instanceof TestOutEdgesFilterEven);
346 assertTrue(vertex.getEdges() instanceof ByteArrayEdges);
347 super.compute(vertex, messages);
348 }
349 }
350
351 public static class TestVertexValueFactory
352 implements VertexValueFactory<IntWritable> {
353 @Override
354 public IntWritable newInstance() {
355 return new IntWritable(3);
356 }
357 }
358
359 public static class TestOutEdgesFilterEven
360 extends ByteArrayEdges<IntWritable, NullWritable> {
361 @Override
362 public void add(Edge<IntWritable, NullWritable> edge) {
363 if (edge.getTargetVertexId().get() % 2 == 0) {
364 super.add(edge);
365 }
366 }
367 }
368
369 private static Map<Integer, Integer> parseResults(Iterable<String> results) {
370 Map<Integer, Integer> values = Maps.newHashMap();
371 for (String line : results) {
372 String[] tokens = line.split("\\s+");
373 int id = Integer.valueOf(tokens[0]);
374 int value = Integer.valueOf(tokens[1]);
375 values.put(id, value);
376 }
377 return values;
378 }
379 }