1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph;
20
21 import org.apache.giraph.conf.GiraphConfiguration;
22 import org.apache.giraph.conf.GiraphConstants;
23 import org.apache.giraph.examples.GeneratedVertexReader;
24 import org.apache.giraph.examples.SimpleMutateGraphComputation;
25 import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexInputFormat;
26 import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexOutputFormat;
27 import org.apache.giraph.graph.DefaultVertexResolver;
28 import org.apache.giraph.graph.Vertex;
29 import org.apache.giraph.graph.VertexChanges;
30 import org.apache.giraph.job.GiraphJob;
31 import org.apache.hadoop.io.Writable;
32 import org.apache.hadoop.io.WritableComparable;
33 import org.junit.Test;
34
35 import java.io.IOException;
36
37 import static org.junit.Assert.assertTrue;
38
39
40
41
42 public class TestMutateGraph extends BspCase {
43 public TestMutateGraph() {
44 super(TestMutateGraph.class.getName());
45 }
46
47
48
49 public static class TestVertexResolver<I extends WritableComparable, V
50 extends Writable, E extends Writable>
51 extends DefaultVertexResolver {
52 @Override
53 public Vertex resolve(WritableComparable vertexId, Vertex vertex,
54 VertexChanges vertexChanges, boolean hasMessages) {
55 Vertex originalVertex = vertex;
56
57 removeEdges(vertex, vertexChanges);
58
59
60 vertex = removeVertexIfDesired(vertex, vertexChanges);
61
62
63 if (originalVertex != null && vertex == null) {
64 hasMessages = false;
65 }
66
67
68
69 vertex = addVertexIfDesired(vertexId, vertex, vertexChanges, hasMessages);
70
71
72 addEdges(vertex, vertexChanges);
73
74 return vertex;
75 }
76 }
77
78
79
80
81
82
83
84
85 @Test
86 public void testMutateGraph()
87 throws IOException, InterruptedException, ClassNotFoundException {
88 GiraphConfiguration conf = new GiraphConfiguration();
89 conf.setComputationClass(SimpleMutateGraphComputation.class);
90 conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
91 conf.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
92 conf.setWorkerContextClass(
93 SimpleMutateGraphComputation.SimpleMutateGraphVertexWorkerContext.class);
94 GiraphConstants.USER_PARTITION_COUNT.set(conf, 32);
95 conf.setNumComputeThreads(8);
96 GiraphConstants.VERTEX_RESOLVER_CLASS.set(conf, TestVertexResolver.class);
97 GiraphJob job = prepareJob(getCallingMethodName(), conf,
98 getTempPath(getCallingMethodName()));
99
100 GeneratedVertexReader.READER_VERTICES.set(conf, 400);
101 assertTrue(job.run(true));
102 }
103 }