1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.examples; |
20 | |
|
21 | |
import org.apache.giraph.aggregators.DoubleMaxAggregator; |
22 | |
import org.apache.giraph.aggregators.DoubleMinAggregator; |
23 | |
import org.apache.giraph.aggregators.LongSumAggregator; |
24 | |
import org.apache.giraph.edge.Edge; |
25 | |
import org.apache.giraph.edge.EdgeFactory; |
26 | |
import org.apache.giraph.graph.BasicComputation; |
27 | |
import org.apache.giraph.graph.Vertex; |
28 | |
import org.apache.giraph.io.VertexReader; |
29 | |
import org.apache.giraph.io.formats.GeneratedVertexInputFormat; |
30 | |
import org.apache.giraph.io.formats.TextVertexOutputFormat; |
31 | |
import org.apache.giraph.master.DefaultMasterCompute; |
32 | |
import org.apache.giraph.worker.WorkerContext; |
33 | |
import org.apache.hadoop.io.DoubleWritable; |
34 | |
import org.apache.hadoop.io.FloatWritable; |
35 | |
import org.apache.hadoop.io.LongWritable; |
36 | |
import org.apache.hadoop.io.Text; |
37 | |
import org.apache.hadoop.mapreduce.InputSplit; |
38 | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; |
39 | |
import org.apache.log4j.Logger; |
40 | |
|
41 | |
import com.google.common.collect.Lists; |
42 | |
|
43 | |
import java.io.IOException; |
44 | |
import java.util.List; |
45 | |
|
46 | |
|
47 | |
|
48 | |
|
49 | |
@Algorithm( |
50 | |
name = "Page rank" |
51 | |
) |
52 | 0 | public class SimplePageRankComputation extends BasicComputation<LongWritable, |
53 | |
DoubleWritable, FloatWritable, DoubleWritable> { |
54 | |
|
55 | |
public static final int MAX_SUPERSTEPS = 30; |
56 | |
|
57 | 0 | private static final Logger LOG = |
58 | 0 | Logger.getLogger(SimplePageRankComputation.class); |
59 | |
|
60 | 0 | private static String SUM_AGG = "sum"; |
61 | |
|
62 | 0 | private static String MIN_AGG = "min"; |
63 | |
|
64 | 0 | private static String MAX_AGG = "max"; |
65 | |
|
66 | |
@Override |
67 | |
public void compute( |
68 | |
Vertex<LongWritable, DoubleWritable, FloatWritable> vertex, |
69 | |
Iterable<DoubleWritable> messages) throws IOException { |
70 | 0 | if (getSuperstep() >= 1) { |
71 | 0 | double sum = 0; |
72 | 0 | for (DoubleWritable message : messages) { |
73 | 0 | sum += message.get(); |
74 | 0 | } |
75 | 0 | DoubleWritable vertexValue = |
76 | 0 | new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum); |
77 | 0 | vertex.setValue(vertexValue); |
78 | 0 | aggregate(MAX_AGG, vertexValue); |
79 | 0 | aggregate(MIN_AGG, vertexValue); |
80 | 0 | aggregate(SUM_AGG, new LongWritable(1)); |
81 | 0 | LOG.info(vertex.getId() + ": PageRank=" + vertexValue + |
82 | 0 | " max=" + getAggregatedValue(MAX_AGG) + |
83 | 0 | " min=" + getAggregatedValue(MIN_AGG)); |
84 | |
} |
85 | |
|
86 | 0 | if (getSuperstep() < MAX_SUPERSTEPS) { |
87 | 0 | long edges = vertex.getNumEdges(); |
88 | 0 | sendMessageToAllEdges(vertex, |
89 | 0 | new DoubleWritable(vertex.getValue().get() / edges)); |
90 | 0 | } else { |
91 | 0 | vertex.voteToHalt(); |
92 | |
} |
93 | 0 | } |
94 | |
|
95 | |
|
96 | |
|
97 | |
|
98 | 0 | public static class SimplePageRankWorkerContext extends |
99 | |
WorkerContext { |
100 | |
|
101 | |
private static double FINAL_MAX; |
102 | |
|
103 | |
private static double FINAL_MIN; |
104 | |
|
105 | |
private static long FINAL_SUM; |
106 | |
|
107 | |
public static double getFinalMax() { |
108 | 0 | return FINAL_MAX; |
109 | |
} |
110 | |
|
111 | |
public static double getFinalMin() { |
112 | 0 | return FINAL_MIN; |
113 | |
} |
114 | |
|
115 | |
public static long getFinalSum() { |
116 | 0 | return FINAL_SUM; |
117 | |
} |
118 | |
|
119 | |
@Override |
120 | |
public void preApplication() |
121 | |
throws InstantiationException, IllegalAccessException { |
122 | 0 | } |
123 | |
|
124 | |
@Override |
125 | |
public void postApplication() { |
126 | 0 | FINAL_SUM = this.<LongWritable>getAggregatedValue(SUM_AGG).get(); |
127 | 0 | FINAL_MAX = this.<DoubleWritable>getAggregatedValue(MAX_AGG).get(); |
128 | 0 | FINAL_MIN = this.<DoubleWritable>getAggregatedValue(MIN_AGG).get(); |
129 | |
|
130 | 0 | LOG.info("aggregatedNumVertices=" + FINAL_SUM); |
131 | 0 | LOG.info("aggregatedMaxPageRank=" + FINAL_MAX); |
132 | 0 | LOG.info("aggregatedMinPageRank=" + FINAL_MIN); |
133 | 0 | } |
134 | |
|
135 | |
@Override |
136 | |
public void preSuperstep() { |
137 | 0 | if (getSuperstep() >= 3) { |
138 | 0 | LOG.info("aggregatedNumVertices=" + |
139 | 0 | getAggregatedValue(SUM_AGG) + |
140 | 0 | " NumVertices=" + getTotalNumVertices()); |
141 | 0 | if (this.<LongWritable>getAggregatedValue(SUM_AGG).get() != |
142 | 0 | getTotalNumVertices()) { |
143 | 0 | throw new RuntimeException("wrong value of SumAggreg: " + |
144 | 0 | getAggregatedValue(SUM_AGG) + ", should be: " + |
145 | 0 | getTotalNumVertices()); |
146 | |
} |
147 | 0 | DoubleWritable maxPagerank = getAggregatedValue(MAX_AGG); |
148 | 0 | LOG.info("aggregatedMaxPageRank=" + maxPagerank.get()); |
149 | 0 | DoubleWritable minPagerank = getAggregatedValue(MIN_AGG); |
150 | 0 | LOG.info("aggregatedMinPageRank=" + minPagerank.get()); |
151 | |
} |
152 | 0 | } |
153 | |
|
154 | |
@Override |
155 | 0 | public void postSuperstep() { } |
156 | |
} |
157 | |
|
158 | |
|
159 | |
|
160 | |
|
161 | |
|
162 | 0 | public static class SimplePageRankMasterCompute extends |
163 | |
DefaultMasterCompute { |
164 | |
@Override |
165 | |
public void initialize() throws InstantiationException, |
166 | |
IllegalAccessException { |
167 | 0 | registerAggregator(SUM_AGG, LongSumAggregator.class); |
168 | 0 | registerPersistentAggregator(MIN_AGG, DoubleMinAggregator.class); |
169 | 0 | registerPersistentAggregator(MAX_AGG, DoubleMaxAggregator.class); |
170 | 0 | } |
171 | |
} |
172 | |
|
173 | |
|
174 | |
|
175 | |
|
176 | 0 | public static class SimplePageRankVertexReader extends |
177 | |
GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable> { |
178 | |
|
179 | 0 | private static final Logger LOG = |
180 | 0 | Logger.getLogger(SimplePageRankVertexReader.class); |
181 | |
|
182 | |
@Override |
183 | |
public boolean nextVertex() { |
184 | 0 | return totalRecords > recordsRead; |
185 | |
} |
186 | |
|
187 | |
@Override |
188 | |
public Vertex<LongWritable, DoubleWritable, FloatWritable> |
189 | |
getCurrentVertex() throws IOException { |
190 | 0 | Vertex<LongWritable, DoubleWritable, FloatWritable> vertex = |
191 | 0 | getConf().createVertex(); |
192 | 0 | LongWritable vertexId = new LongWritable( |
193 | 0 | (inputSplit.getSplitIndex() * totalRecords) + recordsRead); |
194 | 0 | DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d); |
195 | 0 | long targetVertexId = |
196 | 0 | (vertexId.get() + 1) % |
197 | 0 | (inputSplit.getNumSplits() * totalRecords); |
198 | 0 | float edgeValue = vertexId.get() * 100f; |
199 | 0 | List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList(); |
200 | 0 | edges.add(EdgeFactory.create(new LongWritable(targetVertexId), |
201 | |
new FloatWritable(edgeValue))); |
202 | 0 | vertex.initialize(vertexId, vertexValue, edges); |
203 | 0 | ++recordsRead; |
204 | 0 | if (LOG.isInfoEnabled()) { |
205 | 0 | LOG.info("next: Return vertexId=" + vertex.getId().get() + |
206 | 0 | ", vertexValue=" + vertex.getValue() + |
207 | |
", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue); |
208 | |
} |
209 | 0 | return vertex; |
210 | |
} |
211 | |
} |
212 | |
|
213 | |
|
214 | |
|
215 | |
|
216 | 0 | public static class SimplePageRankVertexInputFormat extends |
217 | |
GeneratedVertexInputFormat<LongWritable, DoubleWritable, FloatWritable> { |
218 | |
@Override |
219 | |
public VertexReader<LongWritable, DoubleWritable, |
220 | |
FloatWritable> createVertexReader(InputSplit split, |
221 | |
TaskAttemptContext context) |
222 | |
throws IOException { |
223 | 0 | return new SimplePageRankVertexReader(); |
224 | |
} |
225 | |
} |
226 | |
|
227 | |
|
228 | |
|
229 | |
|
230 | 0 | public static class SimplePageRankVertexOutputFormat extends |
231 | |
TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> { |
232 | |
@Override |
233 | |
public TextVertexWriter createVertexWriter(TaskAttemptContext context) |
234 | |
throws IOException, InterruptedException { |
235 | 0 | return new SimplePageRankVertexWriter(); |
236 | |
} |
237 | |
|
238 | |
|
239 | |
|
240 | |
|
241 | 0 | public class SimplePageRankVertexWriter extends TextVertexWriter { |
242 | |
@Override |
243 | |
public void writeVertex( |
244 | |
Vertex<LongWritable, DoubleWritable, FloatWritable> vertex) |
245 | |
throws IOException, InterruptedException { |
246 | 0 | getRecordWriter().write( |
247 | 0 | new Text(vertex.getId().toString()), |
248 | 0 | new Text(vertex.getValue().toString())); |
249 | 0 | } |
250 | |
} |
251 | |
} |
252 | |
} |