View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.worker;
20  
21  import java.io.IOException;
22  
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.edge.Edge;
25  import org.apache.giraph.edge.OutEdges;
26  import org.apache.giraph.graph.Vertex;
27  import org.apache.giraph.graph.VertexEdgeCount;
28  import org.apache.giraph.io.GiraphInputFormat;
29  import org.apache.giraph.io.VertexInputFormat;
30  import org.apache.giraph.io.VertexReader;
31  import org.apache.giraph.io.filters.VertexInputFilter;
32  import org.apache.giraph.mapping.translate.TranslateEdge;
33  import org.apache.giraph.io.InputType;
34  import org.apache.giraph.ooc.OutOfCoreEngine;
35  import org.apache.giraph.partition.PartitionOwner;
36  import org.apache.giraph.utils.LoggerUtils;
37  import org.apache.giraph.utils.MemoryUtils;
38  import org.apache.hadoop.io.Writable;
39  import org.apache.hadoop.io.WritableComparable;
40  import org.apache.hadoop.mapreduce.InputSplit;
41  import org.apache.hadoop.mapreduce.Mapper;
42  import org.apache.log4j.Level;
43  import org.apache.log4j.Logger;
44  
45  import com.yammer.metrics.core.Counter;
46  import com.yammer.metrics.core.Meter;
47  
48  /**
49   * Load as many vertex input splits as possible.
50   * Every thread will has its own instance of WorkerClientRequestProcessor
51   * to send requests.
52   *
53   * @param <I> Vertex index value
54   * @param <V> Vertex value
55   * @param <E> Edge value
56   */
57  @SuppressWarnings("unchecked")
58  public class VertexInputSplitsCallable<I extends WritableComparable,
59      V extends Writable, E extends Writable>
60      extends InputSplitsCallable<I, V, E> {
61    /** How often to update metrics and print info */
62    public static final int VERTICES_UPDATE_PERIOD = 250000;
63    /** How often to update filtered out metrics */
64    public static final int VERTICES_FILTERED_UPDATE_PERIOD = 2500;
65  
66    /** Class logger */
67    private static final Logger LOG =
68        Logger.getLogger(VertexInputSplitsCallable.class);
69    /** Vertex input format */
70    private final VertexInputFormat<I, V, E> vertexInputFormat;
71    /** Input split max vertices (-1 denotes all) */
72    private final long inputSplitMaxVertices;
73    /** Bsp service worker (only use thread-safe methods) */
74    private final BspServiceWorker<I, V, E> bspServiceWorker;
75    /** Filter to select which vertices to keep */
76    private final VertexInputFilter<I, V, E> vertexInputFilter;
77    /** Can embedInfo in vertexIds */
78    private final boolean canEmbedInIds;
79    /**
80     * Whether the chosen {@link OutEdges} implementation allows for Edge
81     * reuse.
82     */
83    private final boolean reuseEdgeObjects;
84    /** Used to translate Edges during vertex input phase based on localData */
85    private final TranslateEdge<I, E> translateEdge;
86  
87    // Metrics
88    /** number of vertices loaded meter across all readers */
89    private final Meter totalVerticesMeter;
90    /** number of vertices filtered out */
91    private final Counter totalVerticesFilteredCounter;
92    /** number of edges loaded meter across all readers */
93    private final Meter totalEdgesMeter;
94  
95    /**
96     * Constructor.
97     *
98     * @param vertexInputFormat Vertex input format
99     * @param context Context
100    * @param configuration Configuration
101    * @param bspServiceWorker service worker
102    * @param splitsHandler Handler for input splits
103    */
104   public VertexInputSplitsCallable(
105       VertexInputFormat<I, V, E> vertexInputFormat,
106       Mapper<?, ?, ?, ?>.Context context,
107       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
108       BspServiceWorker<I, V, E> bspServiceWorker,
109       WorkerInputSplitsHandler splitsHandler)  {
110     super(context, configuration, bspServiceWorker, splitsHandler);
111     this.vertexInputFormat = vertexInputFormat;
112 
113     inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
114     this.bspServiceWorker = bspServiceWorker;
115     vertexInputFilter = configuration.getVertexInputFilter();
116     reuseEdgeObjects = configuration.reuseEdgeObjects();
117     canEmbedInIds = bspServiceWorker
118         .getLocalData()
119         .getMappingStoreOps() != null &&
120         bspServiceWorker
121             .getLocalData()
122             .getMappingStoreOps()
123             .hasEmbedding();
124     translateEdge = bspServiceWorker.getTranslateEdge();
125 
126     // Initialize Metrics
127     totalVerticesMeter = getTotalVerticesLoadedMeter();
128     totalVerticesFilteredCounter = getTotalVerticesFilteredCounter();
129     totalEdgesMeter = getTotalEdgesLoadedMeter();
130   }
131 
132   @Override
133   public GiraphInputFormat getInputFormat() {
134     return vertexInputFormat;
135   }
136 
137   @Override
138   public InputType getInputType() {
139     return InputType.VERTEX;
140   }
141 
142   /**
143    * Read vertices from input split.  If testing, the user may request a
144    * maximum number of vertices to be read from an input split.
145    *
146    * @param inputSplit Input split to process with vertex reader
147    * @return Vertices and edges loaded from this input split
148    * @throws IOException
149    * @throws InterruptedException
150    */
151   @Override
152   protected VertexEdgeCount readInputSplit(
153       InputSplit inputSplit) throws IOException, InterruptedException {
154     VertexReader<I, V, E> vertexReader =
155         vertexInputFormat.createVertexReader(inputSplit, context);
156     vertexReader.setConf(configuration);
157 
158     WorkerThreadGlobalCommUsage globalCommUsage =
159       this.bspServiceWorker
160         .getAggregatorHandler().newThreadAggregatorUsage();
161 
162     vertexReader.initialize(inputSplit, context);
163     // Set aggregator usage to vertex reader
164     vertexReader.setWorkerGlobalCommUsage(globalCommUsage);
165 
166     long inputSplitVerticesLoaded = 0;
167     long inputSplitVerticesFiltered = 0;
168 
169     long edgesSinceLastUpdate = 0;
170     long inputSplitEdgesLoaded = 0;
171 
172     int count = 0;
173     OutOfCoreEngine oocEngine = bspServiceWorker.getServerData().getOocEngine();
174     while (vertexReader.nextVertex()) {
175       // If out-of-core mechanism is used, check whether this thread
176       // can stay active or it should temporarily suspend and stop
177       // processing and generating more data for the moment.
178       if (oocEngine != null &&
179           (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
180         oocEngine.activeThreadCheckIn();
181       }
182       Vertex<I, V, E> readerVertex = vertexReader.getCurrentVertex();
183       if (readerVertex.getId() == null) {
184         throw new IllegalArgumentException(
185             "readInputSplit: Vertex reader returned a vertex " +
186                 "without an id!  - " + readerVertex);
187       }
188       if (canEmbedInIds) {
189         bspServiceWorker
190             .getLocalData()
191             .getMappingStoreOps()
192             .embedTargetInfo(readerVertex.getId());
193       }
194       if (readerVertex.getValue() == null) {
195         readerVertex.setValue(configuration.createVertexValue());
196       }
197       readerVertex.setConf(configuration);
198 
199       ++inputSplitVerticesLoaded;
200 
201       if (vertexInputFilter.dropVertex(readerVertex)) {
202         ++inputSplitVerticesFiltered;
203         if (inputSplitVerticesFiltered % VERTICES_FILTERED_UPDATE_PERIOD == 0) {
204           totalVerticesFilteredCounter.inc(inputSplitVerticesFiltered);
205           inputSplitVerticesFiltered = 0;
206         }
207         continue;
208       }
209 
210       // Before saving to partition-store translate all edges (if present)
211       if (translateEdge != null) {
212         // only iff vertexInput reads edges also
213         if (readerVertex.getEdges() != null && readerVertex.getNumEdges() > 0) {
214           OutEdges<I, E> vertexOutEdges = configuration
215               .createAndInitializeOutEdges(readerVertex.getNumEdges());
216           // TODO : this works for generic OutEdges, can create a better api
217           // to support more efficient translation for specific types
218 
219           // NOTE : for implementations where edge is reusable, space is
220           // consumed by the OutEdges data structure itself, but if not reusable
221           // space is consumed by the newly created edge -> and the new OutEdges
222           // data structure just holds a reference to the newly created edge
223           // so in any way we virtually hold edges twice - similar to
224           // OutEdges.trim() -> this has the same complexity as OutEdges.trim()
225           for (Edge<I, E> edge : readerVertex.getEdges()) {
226             if (reuseEdgeObjects) {
227               bspServiceWorker
228                   .getLocalData()
229                   .getMappingStoreOps()
230                   .embedTargetInfo(edge.getTargetVertexId());
231               vertexOutEdges.add(edge); // edge can be re-used
232             } else { // edge objects cannot be reused - so create new edges
233               vertexOutEdges.add(configuration.createEdge(translateEdge, edge));
234             }
235           }
236           // set out edges to translated instance -> old instance is released
237           readerVertex.setEdges(vertexOutEdges);
238         }
239       }
240 
241       PartitionOwner partitionOwner =
242           bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
243       workerClientRequestProcessor.sendVertexRequest(
244           partitionOwner, readerVertex);
245       edgesSinceLastUpdate += readerVertex.getNumEdges();
246 
247       // Update status every VERTICES_UPDATE_PERIOD vertices
248       if (inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD == 0) {
249         totalVerticesMeter.mark(VERTICES_UPDATE_PERIOD);
250         WorkerProgress.get().addVerticesLoaded(VERTICES_UPDATE_PERIOD);
251         totalEdgesMeter.mark(edgesSinceLastUpdate);
252         inputSplitEdgesLoaded += edgesSinceLastUpdate;
253         edgesSinceLastUpdate = 0;
254 
255         LoggerUtils.setStatusAndLog(
256             context, LOG, Level.INFO,
257             "readVertexInputSplit: Loaded " +
258                 totalVerticesMeter.count() + " vertices at " +
259                 totalVerticesMeter.meanRate() + " vertices/sec " +
260                 totalEdgesMeter.count() + " edges at " +
261                 totalEdgesMeter.meanRate() + " edges/sec " +
262                 MemoryUtils.getRuntimeMemoryStats());
263       }
264 
265       // For sampling, or to limit outlier input splits, the number of
266       // records per input split can be limited
267       if (inputSplitMaxVertices > 0 &&
268           inputSplitVerticesLoaded >= inputSplitMaxVertices) {
269         if (LOG.isInfoEnabled()) {
270           LOG.info("readInputSplit: Leaving the input " +
271               "split early, reached maximum vertices " +
272               inputSplitVerticesLoaded);
273         }
274         break;
275       }
276     }
277 
278     totalVerticesMeter.mark(inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD);
279     totalEdgesMeter.mark(edgesSinceLastUpdate);
280     totalVerticesFilteredCounter.inc(inputSplitVerticesFiltered);
281 
282     vertexReader.close();
283 
284     WorkerProgress.get().addVerticesLoaded(
285         inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD);
286     WorkerProgress.get().incrementVertexInputSplitsLoaded();
287 
288     return new VertexEdgeCount(inputSplitVerticesLoaded,
289         inputSplitEdgesLoaded + edgesSinceLastUpdate, 0);
290   }
291 }
292