1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.examples; |
20 | |
|
21 | |
import org.apache.giraph.graph.BspUtils; |
22 | |
import org.apache.giraph.graph.EdgeListVertex; |
23 | |
import org.apache.giraph.graph.Vertex; |
24 | |
import org.apache.giraph.graph.VertexReader; |
25 | |
import org.apache.giraph.graph.VertexWriter; |
26 | |
import org.apache.giraph.io.GeneratedVertexInputFormat; |
27 | |
import org.apache.giraph.io.TextVertexOutputFormat; |
28 | |
import org.apache.giraph.io.TextVertexOutputFormat.TextVertexWriter; |
29 | |
import org.apache.hadoop.io.FloatWritable; |
30 | |
import org.apache.hadoop.io.IntWritable; |
31 | |
import org.apache.hadoop.io.LongWritable; |
32 | |
import org.apache.hadoop.io.Text; |
33 | |
import org.apache.hadoop.mapreduce.InputSplit; |
34 | |
import org.apache.hadoop.mapreduce.RecordWriter; |
35 | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; |
36 | |
import org.apache.log4j.Logger; |
37 | |
|
38 | |
import com.google.common.collect.Maps; |
39 | |
|
40 | |
import java.io.IOException; |
41 | |
import java.util.Map; |
42 | |
|
43 | |
|
44 | |
|
45 | |
|
46 | |
|
47 | 16 | public class SimpleSuperstepVertex extends |
48 | |
EdgeListVertex<LongWritable, IntWritable, FloatWritable, IntWritable> { |
49 | |
@Override |
50 | |
public void compute(Iterable<IntWritable> messages) { |
51 | 75 | if (getSuperstep() > 3) { |
52 | 15 | voteToHalt(); |
53 | |
} |
54 | 75 | } |
55 | |
|
56 | |
|
57 | |
|
58 | |
|
59 | |
public static class SimpleSuperstepVertexReader extends |
60 | |
GeneratedVertexReader<LongWritable, IntWritable, |
61 | |
FloatWritable, IntWritable> { |
62 | |
|
63 | 1 | private static final Logger LOG = |
64 | |
Logger.getLogger(SimpleSuperstepVertexReader.class); |
65 | |
|
66 | |
|
67 | |
|
68 | |
public SimpleSuperstepVertexReader() { |
69 | 12 | super(); |
70 | 12 | } |
71 | |
|
72 | |
@Override |
73 | |
public boolean nextVertex() throws IOException, InterruptedException { |
74 | 72 | return totalRecords > recordsRead; |
75 | |
} |
76 | |
|
77 | |
@Override |
78 | |
public Vertex<LongWritable, IntWritable, FloatWritable, |
79 | |
IntWritable> getCurrentVertex() |
80 | |
throws IOException, InterruptedException { |
81 | 60 | Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex = |
82 | |
BspUtils.<LongWritable, IntWritable, FloatWritable, |
83 | |
IntWritable>createVertex(configuration); |
84 | 60 | long tmpId = reverseIdOrder ? |
85 | |
((inputSplit.getSplitIndex() + 1) * totalRecords) - |
86 | |
recordsRead - 1 : |
87 | |
(inputSplit.getSplitIndex() * totalRecords) + recordsRead; |
88 | 60 | LongWritable vertexId = new LongWritable(tmpId); |
89 | 60 | IntWritable vertexValue = |
90 | |
new IntWritable((int) (vertexId.get() * 10)); |
91 | 60 | Map<LongWritable, FloatWritable> edgeMap = Maps.newHashMap(); |
92 | 60 | long targetVertexId = |
93 | |
(vertexId.get() + 1) % |
94 | |
(inputSplit.getNumSplits() * totalRecords); |
95 | 60 | float edgeValue = vertexId.get() * 100f; |
96 | 60 | edgeMap.put(new LongWritable(targetVertexId), |
97 | |
new FloatWritable(edgeValue)); |
98 | 60 | vertex.initialize(vertexId, vertexValue, edgeMap, null); |
99 | 60 | ++recordsRead; |
100 | 60 | if (LOG.isInfoEnabled()) { |
101 | 60 | LOG.info("next: Return vertexId=" + vertex.getId().get() + |
102 | |
", vertexValue=" + vertex.getValue() + |
103 | |
", targetVertexId=" + targetVertexId + |
104 | |
", edgeValue=" + edgeValue); |
105 | |
} |
106 | 60 | return vertex; |
107 | |
} |
108 | |
} |
109 | |
|
110 | |
|
111 | |
|
112 | |
|
113 | 25 | public static class SimpleSuperstepVertexInputFormat extends |
114 | |
GeneratedVertexInputFormat<LongWritable, |
115 | |
IntWritable, FloatWritable, IntWritable> { |
116 | |
@Override |
117 | |
public VertexReader<LongWritable, IntWritable, FloatWritable, IntWritable> |
118 | |
createVertexReader(InputSplit split, TaskAttemptContext context) |
119 | |
throws IOException { |
120 | 12 | return new SimpleSuperstepVertexReader(); |
121 | |
} |
122 | |
} |
123 | |
|
124 | |
|
125 | |
|
126 | |
|
127 | |
public static class SimpleSuperstepVertexWriter extends |
128 | |
TextVertexWriter<LongWritable, IntWritable, FloatWritable> { |
129 | |
|
130 | |
|
131 | |
|
132 | |
|
133 | |
|
134 | |
public SimpleSuperstepVertexWriter( |
135 | |
RecordWriter<Text, Text> lineRecordWriter) { |
136 | 8 | super(lineRecordWriter); |
137 | 8 | } |
138 | |
|
139 | |
@Override |
140 | |
public void writeVertex(Vertex<LongWritable, IntWritable, |
141 | |
FloatWritable, ?> vertex) throws IOException, InterruptedException { |
142 | 45 | getRecordWriter().write( |
143 | |
new Text(vertex.getId().toString()), |
144 | |
new Text(vertex.getValue().toString())); |
145 | 45 | } |
146 | |
} |
147 | |
|
148 | |
|
149 | |
|
150 | |
|
151 | 16 | public static class SimpleSuperstepVertexOutputFormat extends |
152 | |
TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> { |
153 | |
@Override |
154 | |
public VertexWriter<LongWritable, IntWritable, FloatWritable> |
155 | |
createVertexWriter(TaskAttemptContext context) |
156 | |
throws IOException, InterruptedException { |
157 | 8 | RecordWriter<Text, Text> recordWriter = |
158 | |
textOutputFormat.getRecordWriter(context); |
159 | 8 | return new SimpleSuperstepVertexWriter(recordWriter); |
160 | |
} |
161 | |
} |
162 | |
} |