1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.examples; |
20 | |
|
21 | |
import org.apache.giraph.aggregators.DoubleMaxAggregator; |
22 | |
import org.apache.giraph.aggregators.DoubleMinAggregator; |
23 | |
import org.apache.giraph.aggregators.LongSumAggregator; |
24 | |
import org.apache.giraph.graph.BspUtils; |
25 | |
import org.apache.giraph.graph.DefaultMasterCompute; |
26 | |
import org.apache.giraph.graph.LongDoubleFloatDoubleVertex; |
27 | |
import org.apache.giraph.graph.Vertex; |
28 | |
import org.apache.giraph.graph.VertexReader; |
29 | |
import org.apache.giraph.graph.VertexWriter; |
30 | |
import org.apache.giraph.graph.WorkerContext; |
31 | |
import org.apache.giraph.io.GeneratedVertexInputFormat; |
32 | |
import org.apache.giraph.io.TextVertexOutputFormat; |
33 | |
import org.apache.giraph.io.TextVertexOutputFormat.TextVertexWriter; |
34 | |
import org.apache.hadoop.io.DoubleWritable; |
35 | |
import org.apache.hadoop.io.FloatWritable; |
36 | |
import org.apache.hadoop.io.LongWritable; |
37 | |
import org.apache.hadoop.io.Text; |
38 | |
import org.apache.hadoop.mapreduce.InputSplit; |
39 | |
import org.apache.hadoop.mapreduce.RecordWriter; |
40 | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; |
41 | |
import org.apache.log4j.Logger; |
42 | |
|
43 | |
import com.google.common.collect.Maps; |
44 | |
|
45 | |
import java.io.IOException; |
46 | |
import java.util.Map; |
47 | |
|
48 | |
|
49 | |
|
50 | |
|
51 | |
@Algorithm( |
52 | |
name = "Page rank" |
53 | |
) |
54 | 420 | public class SimplePageRankVertex extends LongDoubleFloatDoubleVertex { |
55 | |
|
56 | |
public static final int MAX_SUPERSTEPS = 30; |
57 | |
|
58 | 1 | private static final Logger LOG = |
59 | |
Logger.getLogger(SimplePageRankVertex.class); |
60 | |
|
61 | 1 | private static String SUM_AGG = "sum"; |
62 | |
|
63 | 1 | private static String MIN_AGG = "min"; |
64 | |
|
65 | 1 | private static String MAX_AGG = "max"; |
66 | |
|
67 | |
@Override |
68 | |
public void compute(Iterable<DoubleWritable> messages) { |
69 | 310 | if (getSuperstep() >= 1) { |
70 | 300 | double sum = 0; |
71 | 300 | for (DoubleWritable message : messages) { |
72 | 300 | sum += message.get(); |
73 | |
} |
74 | 300 | DoubleWritable vertexValue = |
75 | |
new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum); |
76 | 300 | setValue(vertexValue); |
77 | 300 | aggregate(MAX_AGG, vertexValue); |
78 | 300 | aggregate(MIN_AGG, vertexValue); |
79 | 300 | aggregate(SUM_AGG, new LongWritable(1)); |
80 | 300 | LOG.info(getId() + ": PageRank=" + vertexValue + |
81 | |
" max=" + getAggregatedValue(MAX_AGG) + |
82 | |
" min=" + getAggregatedValue(MIN_AGG)); |
83 | |
} |
84 | |
|
85 | 310 | if (getSuperstep() < MAX_SUPERSTEPS) { |
86 | 300 | long edges = getNumEdges(); |
87 | 300 | sendMessageToAllEdges( |
88 | |
new DoubleWritable(getValue().get() / edges)); |
89 | 300 | } else { |
90 | 10 | voteToHalt(); |
91 | |
} |
92 | 310 | } |
93 | |
|
94 | |
|
95 | |
|
96 | |
|
97 | 2 | public static class SimplePageRankVertexWorkerContext extends |
98 | |
WorkerContext { |
99 | |
|
100 | |
private static double FINAL_MAX; |
101 | |
|
102 | |
private static double FINAL_MIN; |
103 | |
|
104 | |
private static long FINAL_SUM; |
105 | |
|
106 | |
public static double getFinalMax() { |
107 | 2 | return FINAL_MAX; |
108 | |
} |
109 | |
|
110 | |
public static double getFinalMin() { |
111 | 2 | return FINAL_MIN; |
112 | |
} |
113 | |
|
114 | |
public static long getFinalSum() { |
115 | 2 | return FINAL_SUM; |
116 | |
} |
117 | |
|
118 | |
@Override |
119 | |
public void preApplication() |
120 | |
throws InstantiationException, IllegalAccessException { |
121 | 2 | } |
122 | |
|
123 | |
@Override |
124 | |
public void postApplication() { |
125 | 2 | FINAL_SUM = this.<LongWritable>getAggregatedValue(SUM_AGG).get(); |
126 | 2 | FINAL_MAX = this.<DoubleWritable>getAggregatedValue(MAX_AGG).get(); |
127 | 2 | FINAL_MIN = this.<DoubleWritable>getAggregatedValue(MIN_AGG).get(); |
128 | |
|
129 | 2 | LOG.info("aggregatedNumVertices=" + FINAL_SUM); |
130 | 2 | LOG.info("aggregatedMaxPageRank=" + FINAL_MAX); |
131 | 2 | LOG.info("aggregatedMinPageRank=" + FINAL_MIN); |
132 | 2 | } |
133 | |
|
134 | |
@Override |
135 | |
public void preSuperstep() { |
136 | 62 | if (getSuperstep() >= 3) { |
137 | 56 | LOG.info("aggregatedNumVertices=" + |
138 | |
getAggregatedValue(SUM_AGG) + |
139 | |
" NumVertices=" + getTotalNumVertices()); |
140 | 56 | if (this.<LongWritable>getAggregatedValue(SUM_AGG).get() != |
141 | |
getTotalNumVertices()) { |
142 | 0 | throw new RuntimeException("wrong value of SumAggreg: " + |
143 | |
getAggregatedValue(SUM_AGG) + ", should be: " + |
144 | |
getTotalNumVertices()); |
145 | |
} |
146 | 56 | DoubleWritable maxPagerank = getAggregatedValue(MAX_AGG); |
147 | 56 | LOG.info("aggregatedMaxPageRank=" + maxPagerank.get()); |
148 | 56 | DoubleWritable minPagerank = getAggregatedValue(MIN_AGG); |
149 | 56 | LOG.info("aggregatedMinPageRank=" + minPagerank.get()); |
150 | |
} |
151 | 62 | } |
152 | |
|
153 | |
@Override |
154 | 62 | public void postSuperstep() { } |
155 | |
} |
156 | |
|
157 | |
|
158 | |
|
159 | |
|
160 | |
|
161 | 2 | public static class SimplePageRankVertexMasterCompute extends |
162 | |
DefaultMasterCompute { |
163 | |
@Override |
164 | |
public void initialize() throws InstantiationException, |
165 | |
IllegalAccessException { |
166 | 2 | registerAggregator(SUM_AGG, LongSumAggregator.class); |
167 | 2 | registerPersistentAggregator(MIN_AGG, DoubleMinAggregator.class); |
168 | 2 | registerPersistentAggregator(MAX_AGG, DoubleMaxAggregator.class); |
169 | 2 | } |
170 | |
} |
171 | |
|
172 | |
|
173 | |
|
174 | |
|
175 | |
public static class SimplePageRankVertexReader extends |
176 | |
GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable, |
177 | |
DoubleWritable> { |
178 | |
|
179 | 1 | private static final Logger LOG = |
180 | |
Logger.getLogger(SimplePageRankVertexReader.class); |
181 | |
|
182 | |
|
183 | |
|
184 | |
|
185 | |
public SimplePageRankVertexReader() { |
186 | 6 | super(); |
187 | 6 | } |
188 | |
|
189 | |
@Override |
190 | |
public boolean nextVertex() { |
191 | 36 | return totalRecords > recordsRead; |
192 | |
} |
193 | |
|
194 | |
@Override |
195 | |
public Vertex<LongWritable, DoubleWritable, |
196 | |
FloatWritable, DoubleWritable> |
197 | |
getCurrentVertex() throws IOException { |
198 | |
Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> |
199 | 30 | vertex = BspUtils.createVertex(configuration); |
200 | |
|
201 | 30 | LongWritable vertexId = new LongWritable( |
202 | |
(inputSplit.getSplitIndex() * totalRecords) + recordsRead); |
203 | 30 | DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d); |
204 | 30 | long targetVertexId = |
205 | |
(vertexId.get() + 1) % |
206 | |
(inputSplit.getNumSplits() * totalRecords); |
207 | 30 | float edgeValue = vertexId.get() * 100f; |
208 | 30 | Map<LongWritable, FloatWritable> edges = Maps.newHashMap(); |
209 | 30 | edges.put(new LongWritable(targetVertexId), new FloatWritable(edgeValue)); |
210 | 30 | vertex.initialize(vertexId, vertexValue, edges, null); |
211 | 30 | ++recordsRead; |
212 | 30 | if (LOG.isInfoEnabled()) { |
213 | 30 | LOG.info("next: Return vertexId=" + vertex.getId().get() + |
214 | |
", vertexValue=" + vertex.getValue() + |
215 | |
", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue); |
216 | |
} |
217 | 30 | return vertex; |
218 | |
} |
219 | |
} |
220 | |
|
221 | |
|
222 | |
|
223 | |
|
224 | 12 | public static class SimplePageRankVertexInputFormat extends |
225 | |
GeneratedVertexInputFormat<LongWritable, |
226 | |
DoubleWritable, FloatWritable, DoubleWritable> { |
227 | |
@Override |
228 | |
public VertexReader<LongWritable, DoubleWritable, |
229 | |
FloatWritable, DoubleWritable> createVertexReader(InputSplit split, |
230 | |
TaskAttemptContext context) |
231 | |
throws IOException { |
232 | 6 | return new SimplePageRankVertexReader(); |
233 | |
} |
234 | |
} |
235 | |
|
236 | |
|
237 | |
|
238 | |
|
239 | |
public static class SimplePageRankVertexWriter extends |
240 | |
TextVertexWriter<LongWritable, DoubleWritable, FloatWritable> { |
241 | |
|
242 | |
|
243 | |
|
244 | |
|
245 | |
|
246 | |
public SimplePageRankVertexWriter( |
247 | |
RecordWriter<Text, Text> lineRecordWriter) { |
248 | 2 | super(lineRecordWriter); |
249 | 2 | } |
250 | |
|
251 | |
@Override |
252 | |
public void writeVertex( |
253 | |
Vertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex) |
254 | |
throws IOException, InterruptedException { |
255 | 15 | getRecordWriter().write( |
256 | |
new Text(vertex.getId().toString()), |
257 | |
new Text(vertex.getValue().toString())); |
258 | 15 | } |
259 | |
} |
260 | |
|
261 | |
|
262 | |
|
263 | |
|
264 | 6 | public static class SimplePageRankVertexOutputFormat extends |
265 | |
TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> { |
266 | |
@Override |
267 | |
public VertexWriter<LongWritable, DoubleWritable, FloatWritable> |
268 | |
createVertexWriter(TaskAttemptContext context) |
269 | |
throws IOException, InterruptedException { |
270 | 2 | RecordWriter<Text, Text> recordWriter = |
271 | |
textOutputFormat.getRecordWriter(context); |
272 | 2 | return new SimplePageRankVertexWriter(recordWriter); |
273 | |
} |
274 | |
} |
275 | |
} |