1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.yarn;
19
20 import com.google.common.collect.ImmutableList;
21
22 import com.google.common.collect.Maps;
23 import org.apache.giraph.conf.GiraphConfiguration;
24 import org.apache.giraph.conf.GiraphConstants;
25 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.mapreduce.MRJobConfig;
28 import org.apache.hadoop.io.DataOutputBuffer;
29 import org.apache.hadoop.security.Credentials;
30 import org.apache.hadoop.security.UserGroupInformation;
31 import org.apache.hadoop.security.token.Token;
32 import org.apache.hadoop.yarn.api.ApplicationConstants;
33 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
34 import org.apache.hadoop.yarn.api.protocolrecords
35 .RegisterApplicationMasterResponse;
36 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
37 import org.apache.hadoop.yarn.api.records.Container;
38 import org.apache.hadoop.yarn.api.records.ContainerId;
39 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
40 import org.apache.hadoop.yarn.api.records.ContainerStatus;
41 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
42 import org.apache.hadoop.yarn.api.records.LocalResource;
43 import org.apache.hadoop.yarn.api.records.NodeReport;
44 import org.apache.hadoop.yarn.api.records.Priority;
45 import org.apache.hadoop.yarn.api.records.Resource;
46 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
47 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
48 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
49 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
50 import org.apache.hadoop.yarn.conf.YarnConfiguration;
51 import org.apache.hadoop.yarn.exceptions.YarnException;
52 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
53 import org.apache.hadoop.yarn.util.ConverterUtils;
54 import org.apache.hadoop.yarn.util.Records;
55
56 import org.apache.log4j.Logger;
57
58 import java.io.IOException;
59 import java.nio.ByteBuffer;
60 import java.util.Iterator;
61 import java.util.List;
62 import java.util.Map;
63 import java.util.concurrent.ConcurrentHashMap;
64 import java.util.concurrent.ConcurrentMap;
65 import java.util.concurrent.ExecutorService;
66 import java.util.concurrent.Executors;
67 import java.util.concurrent.atomic.AtomicInteger;
68
69
70
71
72
73
74
75
76
77
78 public class GiraphApplicationMaster {
79
80 private static final Logger LOG =
81 Logger.getLogger(GiraphApplicationMaster.class);
82
83 private static final int YARN_ABORT_EXIT_STATUS = -100;
84
85 private static final int YARN_SUCCESS_EXIT_STATUS = 0;
86
87 private static final int SLEEP_BETWEEN_HEARTBEATS_MSECS = 900;
88
89
90 private static Map<String, LocalResource> LOCAL_RESOURCES;
91
92
93
94
95
96
97
98
99 private String appMasterHostname = "";
100
101 private int appMasterRpcPort = 0;
102
103 private String appMasterTrackingUrl = "";
104
105 static {
106
107 Configuration.addDefaultResource(GiraphConstants.GIRAPH_YARN_CONF_FILE);
108 }
109
110
111 private final ApplicationAttemptId appAttemptId;
112
113 private final ContainerId containerId;
114
115 private final int containersToLaunch;
116
117 private final int heapPerContainer;
118
119 private final ImmutableClassesGiraphConfiguration giraphConf;
120
121 private final YarnConfiguration yarnConf;
122
123 private final AtomicInteger completedCount;
124
125 private final AtomicInteger failedCount;
126
127 private final AtomicInteger allocatedCount;
128
129 private final AtomicInteger successfulCount;
130
131 private AtomicInteger lastResponseId;
132
133 private ByteBuffer allTokens;
134
135 private ExecutorService executor;
136
137
138 @SuppressWarnings("rawtypes")
139 private AMRMClientAsync amRMClient;
140
141 private NMClientAsync nmClientAsync;
142
143 private NMCallbackHandler containerListener;
144
145 private volatile boolean done;
146
147
148
149
150
151
152
153 protected GiraphApplicationMaster(ContainerId cId, ApplicationAttemptId aId)
154 throws IOException {
155 containerId = cId;
156 appAttemptId = aId;
157 lastResponseId = new AtomicInteger(0);
158 giraphConf =
159 new ImmutableClassesGiraphConfiguration(new GiraphConfiguration());
160 yarnConf = new YarnConfiguration(giraphConf);
161 completedCount = new AtomicInteger(0);
162 failedCount = new AtomicInteger(0);
163 allocatedCount = new AtomicInteger(0);
164 successfulCount = new AtomicInteger(0);
165 containersToLaunch = giraphConf.getMaxWorkers() + 1;
166 executor = Executors.newFixedThreadPool(containersToLaunch);
167 heapPerContainer = giraphConf.getYarnTaskHeapMb();
168 LOG.info("GiraphAM for ContainerId " + cId + " ApplicationAttemptId " +
169 aId);
170 }
171
172
173
174
175
176
177 private boolean run() throws YarnException, IOException {
178 boolean success = false;
179 try {
180 getAllTokens();
181 registerRMCallBackHandler();
182 registerNMCallbackHandler();
183 registerAMToRM();
184 madeAllContainerRequestToRM();
185 LOG.info("Wait to finish ..");
186 while (!done) {
187 try {
188 Thread.sleep(200);
189 } catch (InterruptedException ex) {
190 LOG.error(ex);
191
192 }
193 }
194 LOG.info("Done " + done);
195 } finally {
196
197 if (null != executor && !executor.isTerminated()) {
198 LOG.info("Forcefully terminating executors with done =:" + done);
199 executor.shutdownNow();
200 }
201 success = finish();
202 }
203 return success;
204 }
205
206
207
208
209
210 private boolean finish() {
211
212 LOG.info("Application completed. Stopping running containers");
213 nmClientAsync.stop();
214
215
216
217 LOG.info("Application completed. Signalling finish to RM");
218 FinalApplicationStatus appStatus;
219 String appMessage = null;
220 boolean success = true;
221 if (failedCount.get() == 0 &&
222 completedCount.get() == containersToLaunch) {
223 appStatus = FinalApplicationStatus.SUCCEEDED;
224 } else {
225 appStatus = FinalApplicationStatus.FAILED;
226 appMessage = "Diagnostics." + ", total=" + containersToLaunch +
227 ", completed=" + completedCount.get() + ", failed=" +
228 failedCount.get();
229 success = false;
230 }
231 try {
232 amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
233 } catch (YarnException ex) {
234 LOG.error("Failed to unregister application", ex);
235 } catch (IOException e) {
236 LOG.error("Failed to unregister application", e);
237 }
238
239 amRMClient.stop();
240 return success;
241 }
242
243
244
245
246 private void madeAllContainerRequestToRM() {
247
248
249
250
251
252
253 for (int i = 0; i < containersToLaunch; ++i) {
254 ContainerRequest containerAsk = setupContainerAskForRM();
255 amRMClient.addContainerRequest(containerAsk);
256 }
257 }
258
259
260
261
262
263
264 private ContainerRequest setupContainerAskForRM() {
265
266
267
268 Priority pri = Records.newRecord(Priority.class);
269
270 pri.setPriority(GiraphConstants.GIRAPH_YARN_PRIORITY);
271
272
273
274 Resource capability = Records.newRecord(Resource.class);
275 capability.setMemory(heapPerContainer);
276
277 ContainerRequest request = new ContainerRequest(capability, null, null,
278 pri);
279 LOG.info("Requested container ask: " + request.toString());
280 return request;
281 }
282
283
284
285
286
287 private void getAllTokens() throws IOException {
288 Credentials credentials = UserGroupInformation.getCurrentUser()
289 .getCredentials();
290 DataOutputBuffer dob = new DataOutputBuffer();
291 credentials.writeTokenStorageToStream(dob);
292
293 Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
294 while (iter.hasNext()) {
295 Token<?> token = iter.next();
296 if (LOG.isDebugEnabled()) {
297 LOG.debug("Token type :" + token.getKind());
298 }
299 if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
300 iter.remove();
301 }
302 }
303 allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
304 }
305
306
307
308
309
310 private void registerRMCallBackHandler() {
311 AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
312 amRMClient = AMRMClientAsync.createAMRMClientAsync(1000,
313 allocListener);
314 amRMClient.init(yarnConf);
315 amRMClient.start();
316 }
317
318
319
320
321
322 private void registerNMCallbackHandler() {
323 containerListener = new NMCallbackHandler();
324 nmClientAsync = new NMClientAsyncImpl(containerListener);
325 nmClientAsync.init(yarnConf);
326 nmClientAsync.start();
327 }
328
329
330
331
332 private RegisterApplicationMasterResponse registerAMToRM()
333 throws YarnException {
334
335
336 try {
337 if (UserGroupInformation.isSecurityEnabled()) {
338 LOG.info("SECURITY ENABLED ");
339 }
340
341 RegisterApplicationMasterResponse response = amRMClient
342 .registerApplicationMaster(appMasterHostname
343 , appMasterRpcPort, appMasterTrackingUrl);
344 return response;
345 } catch (IOException ioe) {
346 throw new IllegalStateException(
347 "GiraphApplicationMaster failed to register with RM.", ioe);
348 }
349 }
350
351
352
353
354
355
356 private void startContainerLaunchingThreads(final List<Container>
357 allocatedContainers) {
358 for (Container allocatedContainer : allocatedContainers) {
359 LOG.info("Launching command on a new container." +
360 ", containerId=" + allocatedContainer.getId() +
361 ", containerNode=" + allocatedContainer.getNodeId().getHost() +
362 ":" + allocatedContainer.getNodeId().getPort() +
363 ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() +
364 ", containerResourceMemory=" +
365 allocatedContainer.getResource().getMemory());
366
367
368 LaunchContainerRunnable runnableLaunchContainer =
369 new LaunchContainerRunnable(allocatedContainer, containerListener);
370 executor.execute(runnableLaunchContainer);
371 }
372 }
373
374
375
376
377
378
379
380 private synchronized Map<String, LocalResource> getTaskResourceMap() {
381
382 if (null == LOCAL_RESOURCES) {
383 LOCAL_RESOURCES = Maps.newHashMap();
384 try {
385
386 updateGiraphConfForExport();
387 YarnUtils.addFsResourcesToMap(LOCAL_RESOURCES, giraphConf,
388 appAttemptId.getApplicationId());
389 } catch (IOException ioe) {
390
391 throw new IllegalStateException("Could not configure the container" +
392 "launch context for GiraphYarnTasks.", ioe);
393 }
394 }
395
396 return LOCAL_RESOURCES;
397 }
398
399
400
401
402
403
404
405 private void updateGiraphConfForExport()
406 throws IOException {
407
408 giraphConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
409 appAttemptId.getAttemptId());
410
411 YarnUtils.exportGiraphConfiguration(giraphConf,
412 appAttemptId.getApplicationId());
413 }
414
415
416
417
418
419 public static void main(final String[] args) {
420 boolean result = false;
421 LOG.info("Starting GiraphAM ");
422 String containerIdString = System.getenv().get(
423 Environment.CONTAINER_ID.name());
424 if (containerIdString == null) {
425
426 throw new IllegalArgumentException("ContainerId not found in env vars.");
427 }
428 ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
429 ApplicationAttemptId appAttemptId = containerId.getApplicationAttemptId();
430 try {
431 GiraphApplicationMaster giraphAppMaster =
432 new GiraphApplicationMaster(containerId, appAttemptId);
433 result = giraphAppMaster.run();
434
435 } catch (Throwable t) {
436
437 LOG.error("GiraphApplicationMaster caught a " +
438 "top-level exception in main.", t);
439 System.exit(1);
440 }
441 if (result) {
442 LOG.info("Giraph Application Master completed successfully. exiting");
443 System.exit(0);
444 } else {
445 LOG.info("Giraph Application Master failed. exiting");
446 System.exit(2);
447 }
448 }
449
450
451
452
453
454 private class LaunchContainerRunnable implements Runnable {
455
456 private Container container;
457
458 private NMCallbackHandler containerListener;
459
460
461
462
463
464
465 public LaunchContainerRunnable(final Container newGiraphTaskContainer,
466 NMCallbackHandler containerListener) {
467 this.container = newGiraphTaskContainer;
468 this.containerListener = containerListener;
469 }
470
471
472
473
474
475
476 public void run() {
477
478
479 ContainerLaunchContext ctx = buildContainerLaunchContext();
480
481 containerListener.addContainer(container.getId(), container);
482 nmClientAsync.startContainerAsync(container, ctx);
483 }
484
485
486
487
488
489
490
491 private ContainerLaunchContext buildContainerLaunchContext() {
492 LOG.info("Setting up container launch container for containerid=" +
493 container.getId());
494 ContainerLaunchContext launchContext = Records
495 .newRecord(ContainerLaunchContext.class);
496
497 final List<String> commands = generateShellExecCommand();
498 LOG.info("Conatain launch Commands :" + commands.get(0));
499 launchContext.setCommands(commands);
500
501
502
503
504
505 launchContext.setTokens(allTokens.slice());
506
507
508 String jobUserName = "ERROR_UNKNOWN_USER";
509 UserGroupInformation ugi = null;
510 try {
511 ugi = UserGroupInformation.getCurrentUser();
512 jobUserName = ugi.getUserName();
513 } catch (IOException ioe) {
514 jobUserName =
515 System.getenv(ApplicationConstants.Environment.USER.name());
516 }
517
518 LOG.info("Setting username in ContainerLaunchContext to: " + jobUserName);
519
520 buildEnvironment(launchContext);
521
522 launchContext.setLocalResources(getTaskResourceMap());
523 return launchContext;
524 }
525
526
527
528
529
530 private List<String> generateShellExecCommand() {
531 return ImmutableList.of("java " +
532 "-Xmx" + heapPerContainer + "M " +
533 "-Xms" + heapPerContainer + "M " +
534 "-cp .:${CLASSPATH} " +
535 "org.apache.giraph.yarn.GiraphYarnTask " +
536 appAttemptId.getApplicationId().getClusterTimestamp() + " " +
537 appAttemptId.getApplicationId().getId() + " " +
538 container.getId().getId() + " " +
539 appAttemptId.getAttemptId() + " " +
540 "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
541 "/task-" + container.getId().getId() + "-stdout.log " +
542 "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
543 "/task-" + container.getId().getId() + "-stderr.log "
544 );
545 }
546
547
548
549
550
551
552
553 private void buildEnvironment(final ContainerLaunchContext launchContext) {
554 Map<String, String> classPathForEnv = Maps.<String, String>newHashMap();
555
556
557 YarnUtils.addLocalClasspathToEnv(classPathForEnv, giraphConf);
558
559 launchContext.setEnvironment(classPathForEnv);
560 }
561 }
562
563
564
565
566 private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
567 @SuppressWarnings("unchecked")
568 @Override
569 public void onContainersCompleted(List<ContainerStatus>
570 completedContainers) {
571 LOG.info("Got response from RM for container ask, completedCnt=" +
572 completedContainers.size());
573 for (ContainerStatus containerStatus : completedContainers) {
574 LOG.info("Got container status for containerID=" +
575 containerStatus.getContainerId() + ", state=" +
576 containerStatus.getState() + ", exitStatus=" +
577 containerStatus.getExitStatus() + ", diagnostics=" +
578 containerStatus.getDiagnostics());
579 switch (containerStatus.getExitStatus()) {
580 case YARN_SUCCESS_EXIT_STATUS:
581 successfulCount.incrementAndGet();
582 break;
583 case YARN_ABORT_EXIT_STATUS:
584 break;
585 default:
586 failedCount.incrementAndGet();
587 break;
588 }
589 completedCount.incrementAndGet();
590 }
591
592 if (completedCount.get() == containersToLaunch) {
593 done = true;
594 LOG.info("All container compeleted. done = " + done);
595 } else {
596 LOG.info("After completion of one conatiner. current status is:" +
597 " completedCount :" + completedCount.get() +
598 " containersToLaunch :" + containersToLaunch +
599 " successfulCount :" + successfulCount.get() +
600 " failedCount :" + failedCount.get());
601 }
602 }
603 @Override
604 public void onContainersAllocated(List<Container> allocatedContainers) {
605 LOG.info("Got response from RM for container ask, allocatedCnt=" +
606 allocatedContainers.size());
607 allocatedCount.addAndGet(allocatedContainers.size());
608 LOG.info("Total allocated # of container so far : " +
609 allocatedCount.get() +
610 " allocated out of " + containersToLaunch + " required.");
611 startContainerLaunchingThreads(allocatedContainers);
612 }
613
614 @Override
615 public void onShutdownRequest() {
616 done = true;
617 }
618
619 @Override
620 public void onNodesUpdated(List<NodeReport> updatedNodes) {
621 }
622
623 @Override
624 public float getProgress() {
625
626 float progress = (float) completedCount.get() /
627 containersToLaunch;
628 return progress;
629 }
630
631 @Override
632 public void onError(Throwable e) {
633 done = true;
634 amRMClient.stop();
635 }
636 }
637
638
639
640
641 private class NMCallbackHandler implements NMClientAsync.CallbackHandler {
642
643 private ConcurrentMap<ContainerId, Container> containers =
644 new ConcurrentHashMap<ContainerId, Container>();
645
646
647
648
649
650
651
652 public void addContainer(ContainerId containerId, Container container) {
653 containers.putIfAbsent(containerId, container);
654 }
655
656 @Override
657 public void onContainerStopped(ContainerId containerId) {
658 if (LOG.isDebugEnabled()) {
659 LOG.debug("Succeeded to stop Container " + containerId);
660 }
661 containers.remove(containerId);
662 }
663
664 @Override
665 public void onContainerStatusReceived(ContainerId containerId,
666 ContainerStatus containerStatus) {
667 if (LOG.isDebugEnabled()) {
668 LOG.debug("Container Status: id=" + containerId + ", status=" +
669 containerStatus);
670 }
671 }
672
673 @Override
674 public void onContainerStarted(ContainerId containerId,
675 Map<String, ByteBuffer> allServiceResponse) {
676 if (LOG.isDebugEnabled()) {
677 LOG.debug("Succeeded to start Container " + containerId);
678 }
679 Container container = containers.get(containerId);
680 if (container != null) {
681 nmClientAsync.getContainerStatusAsync(containerId,
682 container.getNodeId());
683 }
684 }
685
686 @Override
687 public void onStartContainerError(ContainerId containerId, Throwable t) {
688 LOG.error("Failed to start Container " + containerId, t);
689 containers.remove(containerId);
690 }
691
692 @Override
693 public void onGetContainerStatusError(
694 ContainerId containerId, Throwable t) {
695 LOG.error("Failed to query the status of Container " + containerId, t);
696 }
697
698 @Override
699 public void onStopContainerError(ContainerId containerId, Throwable t) {
700 LOG.error("Failed to stop Container " + containerId);
701 containers.remove(containerId);
702 }
703 }
704 }