1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.io.formats;
19
20 import org.apache.giraph.edge.Edge;
21 import org.apache.giraph.edge.EdgeFactory;
22 import org.apache.giraph.graph.Vertex;
23 import org.apache.hadoop.io.DoubleWritable;
24 import org.apache.hadoop.io.FloatWritable;
25 import org.apache.hadoop.io.LongWritable;
26 import org.apache.hadoop.io.Text;
27 import org.apache.hadoop.mapreduce.InputSplit;
28 import org.apache.hadoop.mapreduce.TaskAttemptContext;
29 import org.json.JSONArray;
30 import org.json.JSONException;
31
32 import com.google.common.collect.Lists;
33
34 import java.io.IOException;
35 import java.util.List;
36
37
38
39
40
41
42
43 public class JsonLongDoubleFloatDoubleVertexInputFormat extends
44 TextVertexInputFormat<LongWritable, DoubleWritable, FloatWritable> {
45
46 @Override
47 public TextVertexReader createVertexReader(InputSplit split,
48 TaskAttemptContext context) {
49 return new JsonLongDoubleFloatDoubleVertexReader();
50 }
51
52
53
54
55
56
57
58
59
60
61
62
63 class JsonLongDoubleFloatDoubleVertexReader extends
64 TextVertexReaderFromEachLineProcessedHandlingExceptions<JSONArray,
65 JSONException> {
66
67 @Override
68 protected JSONArray preprocessLine(Text line) throws JSONException {
69 return new JSONArray(line.toString());
70 }
71
72 @Override
73 protected LongWritable getId(JSONArray jsonVertex) throws JSONException,
74 IOException {
75 return new LongWritable(jsonVertex.getLong(0));
76 }
77
78 @Override
79 protected DoubleWritable getValue(JSONArray jsonVertex) throws
80 JSONException, IOException {
81 return new DoubleWritable(jsonVertex.getDouble(1));
82 }
83
84 @Override
85 protected Iterable<Edge<LongWritable, FloatWritable>> getEdges(
86 JSONArray jsonVertex) throws JSONException, IOException {
87 JSONArray jsonEdgeArray = jsonVertex.getJSONArray(2);
88 List<Edge<LongWritable, FloatWritable>> edges =
89 Lists.newArrayListWithCapacity(jsonEdgeArray.length());
90 for (int i = 0; i < jsonEdgeArray.length(); ++i) {
91 JSONArray jsonEdge = jsonEdgeArray.getJSONArray(i);
92 edges.add(EdgeFactory.create(new LongWritable(jsonEdge.getLong(0)),
93 new FloatWritable((float) jsonEdge.getDouble(1))));
94 }
95 return edges;
96 }
97
98 @Override
99 protected Vertex<LongWritable, DoubleWritable, FloatWritable>
100 handleException(Text line, JSONArray jsonVertex, JSONException e) {
101 throw new IllegalArgumentException(
102 "Couldn't get vertex from line " + line, e);
103 }
104
105 }
106 }