1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.worker;
20
21 import java.io.IOException;
22
23 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24 import org.apache.giraph.edge.Edge;
25 import org.apache.giraph.edge.OutEdges;
26 import org.apache.giraph.graph.Vertex;
27 import org.apache.giraph.graph.VertexEdgeCount;
28 import org.apache.giraph.io.GiraphInputFormat;
29 import org.apache.giraph.io.VertexInputFormat;
30 import org.apache.giraph.io.VertexReader;
31 import org.apache.giraph.io.filters.VertexInputFilter;
32 import org.apache.giraph.mapping.translate.TranslateEdge;
33 import org.apache.giraph.io.InputType;
34 import org.apache.giraph.ooc.OutOfCoreEngine;
35 import org.apache.giraph.partition.PartitionOwner;
36 import org.apache.giraph.utils.LoggerUtils;
37 import org.apache.giraph.utils.MemoryUtils;
38 import org.apache.hadoop.io.Writable;
39 import org.apache.hadoop.io.WritableComparable;
40 import org.apache.hadoop.mapreduce.InputSplit;
41 import org.apache.hadoop.mapreduce.Mapper;
42 import org.apache.log4j.Level;
43 import org.apache.log4j.Logger;
44
45 import com.yammer.metrics.core.Counter;
46 import com.yammer.metrics.core.Meter;
47
48
49
50
51
52
53
54
55
56
57 @SuppressWarnings("unchecked")
58 public class VertexInputSplitsCallable<I extends WritableComparable,
59 V extends Writable, E extends Writable>
60 extends InputSplitsCallable<I, V, E> {
61
62 public static final int VERTICES_UPDATE_PERIOD = 250000;
63
64 public static final int VERTICES_FILTERED_UPDATE_PERIOD = 2500;
65
66
67 private static final Logger LOG =
68 Logger.getLogger(VertexInputSplitsCallable.class);
69
70 private final VertexInputFormat<I, V, E> vertexInputFormat;
71
72 private final long inputSplitMaxVertices;
73
74 private final BspServiceWorker<I, V, E> bspServiceWorker;
75
76 private final VertexInputFilter<I, V, E> vertexInputFilter;
77
78 private final boolean canEmbedInIds;
79
80
81
82
83 private final boolean reuseEdgeObjects;
84
85 private final TranslateEdge<I, E> translateEdge;
86
87
88
89 private final Meter totalVerticesMeter;
90
91 private final Counter totalVerticesFilteredCounter;
92
93 private final Meter totalEdgesMeter;
94
95
96
97
98
99
100
101
102
103
104 public VertexInputSplitsCallable(
105 VertexInputFormat<I, V, E> vertexInputFormat,
106 Mapper<?, ?, ?, ?>.Context context,
107 ImmutableClassesGiraphConfiguration<I, V, E> configuration,
108 BspServiceWorker<I, V, E> bspServiceWorker,
109 WorkerInputSplitsHandler splitsHandler) {
110 super(context, configuration, bspServiceWorker, splitsHandler);
111 this.vertexInputFormat = vertexInputFormat;
112
113 inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
114 this.bspServiceWorker = bspServiceWorker;
115 vertexInputFilter = configuration.getVertexInputFilter();
116 reuseEdgeObjects = configuration.reuseEdgeObjects();
117 canEmbedInIds = bspServiceWorker
118 .getLocalData()
119 .getMappingStoreOps() != null &&
120 bspServiceWorker
121 .getLocalData()
122 .getMappingStoreOps()
123 .hasEmbedding();
124 translateEdge = bspServiceWorker.getTranslateEdge();
125
126
127 totalVerticesMeter = getTotalVerticesLoadedMeter();
128 totalVerticesFilteredCounter = getTotalVerticesFilteredCounter();
129 totalEdgesMeter = getTotalEdgesLoadedMeter();
130 }
131
132 @Override
133 public GiraphInputFormat getInputFormat() {
134 return vertexInputFormat;
135 }
136
137 @Override
138 public InputType getInputType() {
139 return InputType.VERTEX;
140 }
141
142
143
144
145
146
147
148
149
150
151 @Override
152 protected VertexEdgeCount readInputSplit(
153 InputSplit inputSplit) throws IOException, InterruptedException {
154 VertexReader<I, V, E> vertexReader =
155 vertexInputFormat.createVertexReader(inputSplit, context);
156 vertexReader.setConf(configuration);
157
158 WorkerThreadGlobalCommUsage globalCommUsage =
159 this.bspServiceWorker
160 .getAggregatorHandler().newThreadAggregatorUsage();
161
162 vertexReader.initialize(inputSplit, context);
163
164 vertexReader.setWorkerGlobalCommUsage(globalCommUsage);
165
166 long inputSplitVerticesLoaded = 0;
167 long inputSplitVerticesFiltered = 0;
168
169 long edgesSinceLastUpdate = 0;
170 long inputSplitEdgesLoaded = 0;
171
172 int count = 0;
173 OutOfCoreEngine oocEngine = bspServiceWorker.getServerData().getOocEngine();
174 while (vertexReader.nextVertex()) {
175
176
177
178 if (oocEngine != null &&
179 (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
180 oocEngine.activeThreadCheckIn();
181 }
182 Vertex<I, V, E> readerVertex = vertexReader.getCurrentVertex();
183 if (readerVertex.getId() == null) {
184 throw new IllegalArgumentException(
185 "readInputSplit: Vertex reader returned a vertex " +
186 "without an id! - " + readerVertex);
187 }
188 if (canEmbedInIds) {
189 bspServiceWorker
190 .getLocalData()
191 .getMappingStoreOps()
192 .embedTargetInfo(readerVertex.getId());
193 }
194 if (readerVertex.getValue() == null) {
195 readerVertex.setValue(configuration.createVertexValue());
196 }
197 readerVertex.setConf(configuration);
198
199 ++inputSplitVerticesLoaded;
200
201 if (vertexInputFilter.dropVertex(readerVertex)) {
202 ++inputSplitVerticesFiltered;
203 if (inputSplitVerticesFiltered % VERTICES_FILTERED_UPDATE_PERIOD == 0) {
204 totalVerticesFilteredCounter.inc(inputSplitVerticesFiltered);
205 inputSplitVerticesFiltered = 0;
206 }
207 continue;
208 }
209
210
211 if (translateEdge != null) {
212
213 if (readerVertex.getEdges() != null && readerVertex.getNumEdges() > 0) {
214 OutEdges<I, E> vertexOutEdges = configuration
215 .createAndInitializeOutEdges(readerVertex.getNumEdges());
216
217
218
219
220
221
222
223
224
225 for (Edge<I, E> edge : readerVertex.getEdges()) {
226 if (reuseEdgeObjects) {
227 bspServiceWorker
228 .getLocalData()
229 .getMappingStoreOps()
230 .embedTargetInfo(edge.getTargetVertexId());
231 vertexOutEdges.add(edge);
232 } else {
233 vertexOutEdges.add(configuration.createEdge(translateEdge, edge));
234 }
235 }
236
237 readerVertex.setEdges(vertexOutEdges);
238 }
239 }
240
241 PartitionOwner partitionOwner =
242 bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
243 workerClientRequestProcessor.sendVertexRequest(
244 partitionOwner, readerVertex);
245 edgesSinceLastUpdate += readerVertex.getNumEdges();
246
247
248 if (inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD == 0) {
249 totalVerticesMeter.mark(VERTICES_UPDATE_PERIOD);
250 WorkerProgress.get().addVerticesLoaded(VERTICES_UPDATE_PERIOD);
251 totalEdgesMeter.mark(edgesSinceLastUpdate);
252 inputSplitEdgesLoaded += edgesSinceLastUpdate;
253 edgesSinceLastUpdate = 0;
254
255 LoggerUtils.setStatusAndLog(
256 context, LOG, Level.INFO,
257 "readVertexInputSplit: Loaded " +
258 totalVerticesMeter.count() + " vertices at " +
259 totalVerticesMeter.meanRate() + " vertices/sec " +
260 totalEdgesMeter.count() + " edges at " +
261 totalEdgesMeter.meanRate() + " edges/sec " +
262 MemoryUtils.getRuntimeMemoryStats());
263 }
264
265
266
267 if (inputSplitMaxVertices > 0 &&
268 inputSplitVerticesLoaded >= inputSplitMaxVertices) {
269 if (LOG.isInfoEnabled()) {
270 LOG.info("readInputSplit: Leaving the input " +
271 "split early, reached maximum vertices " +
272 inputSplitVerticesLoaded);
273 }
274 break;
275 }
276 }
277
278 totalVerticesMeter.mark(inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD);
279 totalEdgesMeter.mark(edgesSinceLastUpdate);
280 totalVerticesFilteredCounter.inc(inputSplitVerticesFiltered);
281
282 vertexReader.close();
283
284 WorkerProgress.get().addVerticesLoaded(
285 inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD);
286 WorkerProgress.get().incrementVertexInputSplitsLoaded();
287
288 return new VertexEdgeCount(inputSplitVerticesLoaded,
289 inputSplitEdgesLoaded + edgesSinceLastUpdate, 0);
290 }
291 }
292