1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.jython;
19
20 import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
21 import org.apache.giraph.comm.messages.MessageStoreFactory;
22 import org.apache.giraph.conf.GiraphConfiguration;
23 import org.apache.giraph.conf.GiraphConstants;
24 import org.apache.giraph.conf.GiraphTypes;
25 import org.apache.giraph.edge.ByteArrayEdges;
26 import org.apache.giraph.graph.Language;
27 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
28 import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
29 import org.apache.giraph.scripting.DeployType;
30 import org.apache.giraph.scripting.ScriptLoader;
31 import org.apache.giraph.utils.InternalVertexRunner;
32 import org.apache.hadoop.io.IntWritable;
33 import org.apache.hadoop.io.NullWritable;
34 import org.junit.Test;
35
36 import com.google.common.collect.Maps;
37
38 import java.util.Map;
39
40 import static org.junit.Assert.assertEquals;
41
42 public class TestJythonComputation {
43
44 @Test
45 public void testCountEdgesInMemoryMessageStoreFactory() throws Exception {
46 testCountEdges(InMemoryMessageStoreFactory.class);
47 }
48
49 public void testCountEdges(Class<? extends MessageStoreFactory>
50 messageStoreFactoryClass) throws Exception {
51 String[] edges = new String[] {
52 "1 2",
53 "2 3",
54 "2 4",
55 "4 1"
56 };
57
58 GiraphConfiguration conf = new GiraphConfiguration();
59 GiraphTypes types = new GiraphTypes(IntWritable.class, IntWritable.class,
60 NullWritable.class, NullWritable.class, NullWritable.class);
61 types.writeIfUnset(conf);
62 ScriptLoader.setScriptsToLoad(conf,
63 "org/apache/giraph/jython/count-edges.py",
64 DeployType.RESOURCE, Language.JYTHON);
65 JythonUtils.init(conf, "CountEdges");
66 conf.setOutEdgesClass(ByteArrayEdges.class);
67 conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
68 conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
69 GiraphConstants.MESSAGE_STORE_FACTORY_CLASS.set(conf,
70 messageStoreFactoryClass);
71 Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
72
73 Map<Integer, Integer> values = parseResults(results);
74
75
76 assertEquals(3, values.size());
77
78 assertEquals(1, (int) values.get(1));
79 assertEquals(2, (int) values.get(2));
80 assertEquals(1, (int) values.get(4));
81 }
82
83 private static Map<Integer, Integer> parseResults(Iterable<String> results) {
84 Map<Integer, Integer> values = Maps.newHashMap();
85 for (String line : results) {
86 String[] tokens = line.split("\\s+");
87 int id = Integer.valueOf(tokens[0]);
88 int value = Integer.valueOf(tokens[1]);
89 values.put(id, value);
90 }
91 return values;
92 }
93 }