Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
CentralizedServiceWorker |
|
| 1.0;1 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | ||
19 | package org.apache.giraph.bsp; | |
20 | ||
21 | import java.io.IOException; | |
22 | import java.util.Collection; | |
23 | import java.util.List; | |
24 | ||
25 | import org.apache.giraph.comm.ServerData; | |
26 | import org.apache.giraph.comm.WorkerClient; | |
27 | import org.apache.giraph.comm.messages.PartitionSplitInfo; | |
28 | import org.apache.giraph.graph.AddressesAndPartitionsWritable; | |
29 | import org.apache.giraph.graph.FinishedSuperstepStats; | |
30 | import org.apache.giraph.graph.GlobalStats; | |
31 | import org.apache.giraph.graph.GraphTaskManager; | |
32 | import org.apache.giraph.graph.VertexEdgeCount; | |
33 | import org.apache.giraph.io.superstep_output.SuperstepOutput; | |
34 | import org.apache.giraph.metrics.GiraphTimerContext; | |
35 | import org.apache.giraph.partition.PartitionOwner; | |
36 | import org.apache.giraph.partition.PartitionStats; | |
37 | import org.apache.giraph.partition.PartitionStore; | |
38 | import org.apache.giraph.worker.WorkerAggregatorHandler; | |
39 | import org.apache.giraph.worker.WorkerContext; | |
40 | import org.apache.giraph.worker.WorkerInfo; | |
41 | import org.apache.giraph.worker.WorkerInputSplitsHandler; | |
42 | import org.apache.giraph.worker.WorkerObserver; | |
43 | import org.apache.hadoop.io.Writable; | |
44 | import org.apache.hadoop.io.WritableComparable; | |
45 | ||
46 | /** | |
47 | * All workers should have access to this centralized service to | |
48 | * execute the following methods. | |
49 | * | |
50 | * @param <I> Vertex id | |
51 | * @param <V> Vertex value | |
52 | * @param <E> Edge value | |
53 | */ | |
54 | @SuppressWarnings("rawtypes") | |
55 | public interface CentralizedServiceWorker<I extends WritableComparable, | |
56 | V extends Writable, E extends Writable> | |
57 | extends CentralizedService<I, V, E>, PartitionSplitInfo<I> { | |
58 | /** | |
59 | * Setup (must be called prior to any other function) | |
60 | * | |
61 | * @return Finished superstep stats for the input superstep | |
62 | */ | |
63 | FinishedSuperstepStats setup(); | |
64 | ||
65 | /** | |
66 | * Get the worker information | |
67 | * | |
68 | * @return Worker information | |
69 | */ | |
70 | WorkerInfo getWorkerInfo(); | |
71 | ||
72 | /** | |
73 | * Get the worker client (for instantiating WorkerClientRequestProcessor | |
74 | * instances. | |
75 | * | |
76 | * @return Worker client | |
77 | */ | |
78 | WorkerClient<I, V, E> getWorkerClient(); | |
79 | ||
80 | /** | |
81 | * Get the worker context. | |
82 | * | |
83 | * @return worker's WorkerContext | |
84 | */ | |
85 | WorkerContext getWorkerContext(); | |
86 | ||
87 | /** | |
88 | * Get the observers for this Worker. | |
89 | * | |
90 | * @return array of WorkerObservers. | |
91 | */ | |
92 | WorkerObserver[] getWorkerObservers(); | |
93 | ||
94 | /** | |
95 | * Get the partition store for this worker. | |
96 | * The partitions contain the vertices for | |
97 | * this worker and can be used to run compute() for the vertices or do | |
98 | * checkpointing. | |
99 | * | |
100 | * @return The partition store for this worker. | |
101 | */ | |
102 | PartitionStore<I, V, E> getPartitionStore(); | |
103 | ||
104 | /** | |
105 | * Both the vertices and the messages need to be checkpointed in order | |
106 | * for them to be used. This is done after all messages have been | |
107 | * delivered, but prior to a superstep starting. | |
108 | */ | |
109 | void storeCheckpoint() throws IOException; | |
110 | ||
111 | /** | |
112 | * Load the vertices, edges, messages from the beginning of a superstep. | |
113 | * Will load the vertex partitions as designated by the master and set the | |
114 | * appropriate superstep. | |
115 | * | |
116 | * @param superstep which checkpoint to use | |
117 | * @return Graph-wide vertex and edge counts | |
118 | * @throws IOException | |
119 | */ | |
120 | VertexEdgeCount loadCheckpoint(long superstep) throws IOException; | |
121 | ||
122 | /** | |
123 | * Take all steps prior to actually beginning the computation of a | |
124 | * superstep. | |
125 | * | |
126 | * @return Collection of all the partition owners from the master for this | |
127 | * superstep. | |
128 | */ | |
129 | Collection<? extends PartitionOwner> startSuperstep(); | |
130 | ||
131 | /** | |
132 | * Worker is done with its portion of the superstep. Report the | |
133 | * worker level statistics after the computation. | |
134 | * | |
135 | * @param partitionStatsList All the partition stats for this worker | |
136 | * @param superstepTimerContext superstep timer context only given when the | |
137 | * function needs to stop the timer, otherwise null. | |
138 | * @return Stats of the superstep completion | |
139 | */ | |
140 | FinishedSuperstepStats finishSuperstep( | |
141 | List<PartitionStats> partitionStatsList, | |
142 | GiraphTimerContext superstepTimerContext); | |
143 | ||
144 | /** | |
145 | * Get the partition id that a vertex id would belong to. | |
146 | * | |
147 | * @param vertexId Vertex id | |
148 | * @return Partition id | |
149 | */ | |
150 | @Override | |
151 | int getPartitionId(I vertexId); | |
152 | ||
153 | /** | |
154 | * Whether a partition with given id exists on this worker. | |
155 | * | |
156 | * @param partitionId Partition id | |
157 | * @return True iff this worker has the specified partition | |
158 | */ | |
159 | boolean hasPartition(Integer partitionId); | |
160 | ||
161 | /** | |
162 | * Every client will need to get a partition owner from a vertex id so that | |
163 | * they know which worker to sent the request to. | |
164 | * | |
165 | * @param vertexId Vertex index to look for | |
166 | * @return PartitionOnwer that should contain this vertex if it exists | |
167 | */ | |
168 | PartitionOwner getVertexPartitionOwner(I vertexId); | |
169 | ||
170 | /** | |
171 | * Get all partition owners. | |
172 | * | |
173 | * @return Iterable through partition owners | |
174 | */ | |
175 | Iterable<? extends PartitionOwner> getPartitionOwners(); | |
176 | ||
177 | /** | |
178 | * If desired by the user, vertex partitions are redistributed among | |
179 | * workers according to the chosen WorkerGraphPartitioner. | |
180 | * | |
181 | * @param masterSetPartitionOwners Partition owner info passed from the | |
182 | * master. | |
183 | */ | |
184 | void exchangeVertexPartitions( | |
185 | Collection<? extends PartitionOwner> masterSetPartitionOwners); | |
186 | ||
187 | /** | |
188 | * Get the GraphTaskManager that this service is using. Vertices need to know | |
189 | * this. | |
190 | * | |
191 | * @return the GraphTaskManager instance for this compute node | |
192 | */ | |
193 | GraphTaskManager<I, V, E> getGraphTaskManager(); | |
194 | ||
195 | /** | |
196 | * Operations that will be called if there is a failure by a worker. | |
197 | */ | |
198 | void failureCleanup(); | |
199 | ||
200 | /** | |
201 | * Get server data | |
202 | * | |
203 | * @return Server data | |
204 | */ | |
205 | ServerData<I, V, E> getServerData(); | |
206 | ||
207 | /** | |
208 | * Get worker aggregator handler | |
209 | * | |
210 | * @return Worker aggregator handler | |
211 | */ | |
212 | WorkerAggregatorHandler getAggregatorHandler(); | |
213 | ||
214 | /** | |
215 | * Final preparation for superstep, called after startSuperstep and | |
216 | * potential loading from checkpoint, right before the computation started | |
217 | * TODO how to avoid this additional function | |
218 | */ | |
219 | void prepareSuperstep(); | |
220 | ||
221 | /** | |
222 | * Get the superstep output class | |
223 | * | |
224 | * @return SuperstepOutput | |
225 | */ | |
226 | SuperstepOutput<I, V, E> getSuperstepOutput(); | |
227 | ||
228 | /** | |
229 | * Clean up the service (no calls may be issued after this) | |
230 | * | |
231 | * @param finishedSuperstepStats Finished supestep stats | |
232 | * @throws IOException | |
233 | * @throws InterruptedException | |
234 | */ | |
235 | void cleanup(FinishedSuperstepStats finishedSuperstepStats) | |
236 | throws IOException, InterruptedException; | |
237 | ||
238 | /** | |
239 | * Loads Global stats from zookeeper. | |
240 | * @return global stats stored in zookeeper for | |
241 | * previous superstep. | |
242 | */ | |
243 | GlobalStats getGlobalStats(); | |
244 | ||
245 | /** | |
246 | * Get input splits handler used during input | |
247 | * | |
248 | * @return Input splits handler | |
249 | */ | |
250 | WorkerInputSplitsHandler getInputSplitsHandler(); | |
251 | ||
252 | /** | |
253 | * Received addresses and partitions assignments from master. | |
254 | * | |
255 | * @param addressesAndPartitions Addresses and partitions assignment | |
256 | */ | |
257 | void addressesAndPartitionsReceived( | |
258 | AddressesAndPartitionsWritable addressesAndPartitions); | |
259 | ||
260 | /** | |
261 | * Store the counter values in the zookeeper after every superstep | |
262 | * and also after all supersteps are done. This is called before closing | |
263 | * the zookeeper. We need to call this method after calling cleanup on the | |
264 | * worker, since some counters are updated during cleanup | |
265 | * @param allSuperstepsDone boolean value whether all the supersteps | |
266 | * are completed | |
267 | */ | |
268 | void storeCountersInZooKeeper(boolean allSuperstepsDone); | |
269 | ||
270 | /** | |
271 | * Close zookeeper | |
272 | */ | |
273 | void closeZooKeeper(); | |
274 | } |