001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.javabridge.generic; 020 021import org.apache.reef.driver.client.JobMessageObserver; 022import org.apache.reef.driver.context.ActiveContext; 023import org.apache.reef.driver.context.ClosedContext; 024import org.apache.reef.driver.context.ContextMessage; 025import org.apache.reef.driver.context.FailedContext; 026import org.apache.reef.driver.evaluator.*; 027import org.apache.reef.driver.restart.DriverRestarted; 028import org.apache.reef.driver.task.*; 029import org.apache.reef.io.network.naming.NameServer; 030import org.apache.reef.javabridge.*; 031import org.apache.reef.driver.restart.DriverRestartCompleted; 032import org.apache.reef.runtime.common.driver.DriverStatusManager; 033import org.apache.reef.driver.evaluator.EvaluatorProcess; 034import org.apache.reef.runtime.common.files.REEFFileNames; 035import org.apache.reef.tang.annotations.Unit; 036import org.apache.reef.util.Optional; 037import org.apache.reef.util.logging.CLRBufferedLogHandler; 038import org.apache.reef.util.logging.LoggingScope; 039import org.apache.reef.util.logging.LoggingScopeFactory; 040import org.apache.reef.wake.EventHandler; 041import org.apache.reef.wake.remote.address.LocalAddressProvider; 042import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 043import org.apache.reef.wake.time.Clock; 044import org.apache.reef.wake.time.event.Alarm; 045import org.apache.reef.wake.time.event.StartTime; 046import org.apache.reef.wake.time.event.StopTime; 047import org.apache.reef.webserver.*; 048 049import javax.inject.Inject; 050import javax.servlet.ServletException; 051import javax.servlet.http.HttpServletResponse; 052import java.io.*; 053import java.nio.charset.Charset; 054import java.nio.charset.StandardCharsets; 055import java.util.ArrayList; 056import java.util.HashMap; 057import java.util.List; 058import java.util.Map; 059import java.util.logging.Handler; 060import java.util.logging.Level; 061import java.util.logging.Logger; 062 063/** 064 * Generic job driver for CLRBridge. 065 */ 066@Unit 067public final class JobDriver { 068 069 private static final Logger LOG = Logger.getLogger(JobDriver.class.getName()); 070 /** 071 * String codec is used to encode the results 072 * before passing them back to the client. 073 */ 074 private static final ObjectSerializableCodec<String> JVM_CODEC = new ObjectSerializableCodec<>(); 075 private final InteropLogger interopLogger = new InteropLogger(); 076 private final NameServer nameServer; 077 private final String nameServerInfo; 078 private final HttpServer httpServer; 079 private final ActiveContextBridgeFactory activeContextBridgeFactory; 080 private final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory; 081 082 /** 083 * Wake clock is used to schedule periodical job check-ups. 084 */ 085 private final Clock clock; 086 /** 087 * Job observer on the client. 088 * We use it to send results from the driver back to the client. 089 */ 090 private final JobMessageObserver jobMessageObserver; 091 /** 092 * Job driver uses EvaluatorRequestor 093 * to request Evaluators that will run the Tasks. 094 */ 095 private final EvaluatorRequestor evaluatorRequestor; 096 097 /** 098 * Driver status manager to monitor driver status. 099 */ 100 private final DriverStatusManager driverStatusManager; 101 102 /** 103 * Factory to setup new CLR process configurations. 104 */ 105 private final CLRProcessFactory clrProcessFactory; 106 107 /** 108 * Shell execution results from each Evaluator. 109 */ 110 private final List<String> results = new ArrayList<>(); 111 /** 112 * Map from context ID to running evaluator context. 113 */ 114 private final Map<String, ActiveContext> contexts = new HashMap<>(); 115 116 private final REEFFileNames reefFileNames; 117 private final LocalAddressProvider localAddressProvider; 118 /** 119 * Logging scope factory that provides LoggingScope. 120 */ 121 private final LoggingScopeFactory loggingScopeFactory; 122 123 private long allocatedEvaluatorHandler = 0; 124 private long activeContextHandler = 0; 125 private long taskMessageHandler = 0; 126 private long failedTaskHandler = 0; 127 private long failedEvaluatorHandler = 0; 128 private long httpServerEventHandler = 0; 129 private long completedTaskHandler = 0; 130 private long runningTaskHandler = 0; 131 private long suspendedTaskHandler = 0; 132 private long completedEvaluatorHandler = 0; 133 private long closedContextHandler = 0; 134 private long failedContextHandler = 0; 135 private long contextMessageHandler = 0; 136 private long driverRestartActiveContextHandler = 0; 137 private long driverRestartRunningTaskHandler = 0; 138 private long driverRestartCompletedHandler = 0; 139 private long driverRestartFailedEvaluatorHandler = 0; 140 private boolean clrBridgeSetup = false; 141 private boolean isRestarted = false; 142 // We are holding on to following on bridge side. 143 // Need to add references here so that GC does not collect them. 144 private final HashMap<String, AllocatedEvaluatorBridge> allocatedEvaluatorBridges = 145 new HashMap<String, AllocatedEvaluatorBridge>(); 146 private EvaluatorRequestorBridge evaluatorRequestorBridge; 147 148 149 /** 150 * Job driver constructor. 151 * All parameters are injected from TANG automatically. 152 * 153 * @param clock Wake clock to schedule and check up running jobs. 154 * @param jobMessageObserver is used to send messages back to the client. 155 * @param evaluatorRequestor is used to request Evaluators. 156 * @param activeContextBridgeFactory 157 */ 158 @Inject 159 JobDriver(final Clock clock, 160 final HttpServer httpServer, 161 final NameServer nameServer, 162 final JobMessageObserver jobMessageObserver, 163 final EvaluatorRequestor evaluatorRequestor, 164 final DriverStatusManager driverStatusManager, 165 final LoggingScopeFactory loggingScopeFactory, 166 final LocalAddressProvider localAddressProvider, 167 final ActiveContextBridgeFactory activeContextBridgeFactory, 168 final REEFFileNames reefFileNames, 169 final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory, 170 final CLRProcessFactory clrProcessFactory) { 171 this.clock = clock; 172 this.httpServer = httpServer; 173 this.jobMessageObserver = jobMessageObserver; 174 this.evaluatorRequestor = evaluatorRequestor; 175 this.nameServer = nameServer; 176 this.driverStatusManager = driverStatusManager; 177 this.activeContextBridgeFactory = activeContextBridgeFactory; 178 this.allocatedEvaluatorBridgeFactory = allocatedEvaluatorBridgeFactory; 179 this.nameServerInfo = localAddressProvider.getLocalAddress() + ":" + this.nameServer.getPort(); 180 this.loggingScopeFactory = loggingScopeFactory; 181 this.reefFileNames = reefFileNames; 182 this.localAddressProvider = localAddressProvider; 183 this.clrProcessFactory = clrProcessFactory; 184 } 185 186 private void setupBridge(final ClrHandlersInitializer initializer) { 187 // Signal to the clr buffered log handler that the driver has started and that 188 // we can begin logging 189 LOG.log(Level.INFO, "Initializing CLRBufferedLogHandler..."); 190 try (final LoggingScope lb = this.loggingScopeFactory.setupBridge()) { 191 final CLRBufferedLogHandler handler = getCLRBufferedLogHandler(); 192 if (handler == null) { 193 LOG.log(Level.WARNING, "CLRBufferedLogHandler could not be initialized"); 194 } else { 195 handler.setDriverInitialized(); 196 LOG.log(Level.INFO, "CLRBufferedLogHandler init complete."); 197 } 198 199 final String portNumber = httpServer == null ? null : Integer.toString((httpServer.getPort())); 200 if (portNumber != null){ 201 try { 202 final File outputFileName = new File(reefFileNames.getDriverHttpEndpoint()); 203 BufferedWriter out = new BufferedWriter( 204 new OutputStreamWriter(new FileOutputStream(outputFileName), StandardCharsets.UTF_8)); 205 out.write(localAddressProvider.getLocalAddress() + ":" + portNumber + "\n"); 206 out.close(); 207 } catch (IOException ex) { 208 throw new RuntimeException(ex); 209 } 210 } 211 212 this.evaluatorRequestorBridge = 213 new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory); 214 final long[] handlers = initializer.getClrHandlers(portNumber, evaluatorRequestorBridge); 215 if (handlers != null) { 216 if (handlers.length != NativeInterop.N_HANDLERS) { 217 throw new RuntimeException( 218 String.format("%s handlers initialized in CLR while native bridge is expecting %s handlers", 219 String.valueOf(handlers.length), 220 String.valueOf(NativeInterop.N_HANDLERS))); 221 } 222 this.allocatedEvaluatorHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.ALLOCATED_EVALUATOR_KEY)]; 223 this.activeContextHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.ACTIVE_CONTEXT_KEY)]; 224 this.taskMessageHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.TASK_MESSAGE_KEY)]; 225 this.failedTaskHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.FAILED_TASK_KEY)]; 226 this.failedEvaluatorHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.FAILED_EVALUATOR_KEY)]; 227 this.httpServerEventHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.HTTP_SERVER_KEY)]; 228 this.completedTaskHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.COMPLETED_TASK_KEY)]; 229 this.runningTaskHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.RUNNING_TASK_KEY)]; 230 this.suspendedTaskHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.SUSPENDED_TASK_KEY)]; 231 this.completedEvaluatorHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.COMPLETED_EVALUATOR_KEY)]; 232 this.closedContextHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.CLOSED_CONTEXT_KEY)]; 233 this.failedContextHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.FAILED_CONTEXT_KEY)]; 234 this.contextMessageHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.CONTEXT_MESSAGE_KEY)]; 235 this.driverRestartActiveContextHandler = 236 handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_ACTIVE_CONTEXT_KEY)]; 237 this.driverRestartRunningTaskHandler = 238 handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_RUNNING_TASK_KEY)]; 239 this.driverRestartCompletedHandler = 240 handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_COMPLETED_KEY)]; 241 this.driverRestartFailedEvaluatorHandler = 242 handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_FAILED_EVALUATOR_KEY)]; 243 } 244 245 try (final LoggingScope lp = 246 this.loggingScopeFactory.getNewLoggingScope("setupBridge::clrSystemHttpServerHandlerOnNext")) { 247 final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge("SPEC"); 248 NativeInterop.clrSystemHttpServerHandlerOnNext(this.httpServerEventHandler, httpServerEventBridge, 249 this.interopLogger); 250 final String specList = httpServerEventBridge.getUriSpecification(); 251 LOG.log(Level.INFO, "Starting http server, getUriSpecification: {0}", specList); 252 if (specList != null) { 253 final String[] specs = specList.split(":"); 254 for (final String s : specs) { 255 final HttpHandler h = new HttpServerBridgeEventHandler(); 256 h.setUriSpecification(s); 257 this.httpServer.addHttpHandler(h); 258 } 259 } 260 } 261 this.clrBridgeSetup = true; 262 } 263 LOG.log(Level.INFO, "CLR Bridge setup."); 264 } 265 266 private CLRBufferedLogHandler getCLRBufferedLogHandler() { 267 for (final Handler handler : Logger.getLogger("").getHandlers()) { 268 if (handler instanceof CLRBufferedLogHandler) { 269 return (CLRBufferedLogHandler) handler; 270 } 271 } 272 return null; 273 } 274 275 private void submitEvaluator(final AllocatedEvaluator eval, final EvaluatorProcess process) { 276 synchronized (JobDriver.this) { 277 eval.setProcess(process); 278 LOG.log(Level.INFO, "Allocated Evaluator: {0}, total running running {1}", 279 new Object[]{eval.getId(), JobDriver.this.contexts.size()}); 280 if (JobDriver.this.allocatedEvaluatorHandler == 0) { 281 throw new RuntimeException("Allocated Evaluator Handler not initialized by CLR."); 282 } 283 final AllocatedEvaluatorBridge allocatedEvaluatorBridge = 284 this.allocatedEvaluatorBridgeFactory.getAllocatedEvaluatorBridge(eval, this.nameServerInfo); 285 allocatedEvaluatorBridges.put(allocatedEvaluatorBridge.getId(), allocatedEvaluatorBridge); 286 NativeInterop.clrSystemAllocatedEvaluatorHandlerOnNext(JobDriver.this.allocatedEvaluatorHandler, 287 allocatedEvaluatorBridge, this.interopLogger); 288 } 289 } 290 291 private void handleFailedEvaluator(final FailedEvaluator eval, final boolean isRestartFailed) { 292 try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(eval.getId())) { 293 synchronized (JobDriver.this) { 294 LOG.log(Level.SEVERE, "FailedEvaluator", eval); 295 for (final FailedContext failedContext : eval.getFailedContextList()) { 296 final String failedContextId = failedContext.getId(); 297 LOG.log(Level.INFO, "removing context " + failedContextId + " from job driver contexts."); 298 JobDriver.this.contexts.remove(failedContextId); 299 } 300 String message = "Evaluator " + eval.getId() + " failed with message: " 301 + eval.getEvaluatorException().getMessage(); 302 JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8)); 303 304 if (isRestartFailed) { 305 evaluatorFailedHandlerWaitForCLRBridgeSetup(driverRestartFailedEvaluatorHandler, eval, isRestartFailed); 306 } else { 307 evaluatorFailedHandlerWaitForCLRBridgeSetup(failedEvaluatorHandler, eval, isRestartFailed); 308 } 309 } 310 } 311 } 312 313 private void evaluatorFailedHandlerWaitForCLRBridgeSetup(final long handle, 314 final FailedEvaluator eval, 315 final boolean isRestartFailed) { 316 if (handle == 0) { 317 if (JobDriver.this.clrBridgeSetup) { 318 final String message = "No CLR FailedEvaluator handler was set, exiting now"; 319 LOG.log(Level.WARNING, message); 320 JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8)); 321 } else { 322 clock.scheduleAlarm(0, new EventHandler<Alarm>() { 323 @Override 324 public void onNext(final Alarm time) { 325 if (JobDriver.this.clrBridgeSetup) { 326 handleFailedEvaluatorInCLR(eval, isRestartFailed); 327 } else { 328 LOG.log(Level.INFO, "Waiting for CLR bridge to be set up"); 329 clock.scheduleAlarm(5000, this); 330 } 331 } 332 }); 333 } 334 } else{ 335 handleFailedEvaluatorInCLR(eval, isRestartFailed); 336 } 337 } 338 339 private void handleFailedEvaluatorInCLR(final FailedEvaluator eval, final boolean isRestartFailed) { 340 final String message = "CLR FailedEvaluator handler set, handling things with CLR handler."; 341 LOG.log(Level.INFO, message); 342 final FailedEvaluatorBridge failedEvaluatorBridge = 343 new FailedEvaluatorBridge(eval, JobDriver.this.evaluatorRequestor, 344 JobDriver.this.isRestarted, loggingScopeFactory); 345 if (isRestartFailed) { 346 NativeInterop.clrSystemDriverRestartFailedEvaluatorHandlerOnNext( 347 JobDriver.this.driverRestartFailedEvaluatorHandler, failedEvaluatorBridge, JobDriver.this.interopLogger); 348 } else { 349 NativeInterop.clrSystemFailedEvaluatorHandlerOnNext(JobDriver.this.failedEvaluatorHandler, failedEvaluatorBridge, 350 JobDriver.this.interopLogger); 351 } 352 353 final int additionalRequestedEvaluatorNumber = failedEvaluatorBridge.getNewlyRequestedEvaluatorNumber(); 354 if (additionalRequestedEvaluatorNumber > 0) { 355 LOG.log(Level.INFO, "number of additional evaluators requested after evaluator failure: " + 356 additionalRequestedEvaluatorNumber); 357 } 358 359 JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8)); 360 } 361 362 /** 363 * Submit a Task to a single Evaluator. 364 */ 365 private void submit(final ActiveContext context) { 366 try { 367 LOG.log(Level.INFO, "Send task to context: {0}", new Object[]{context}); 368 if (JobDriver.this.activeContextHandler == 0) { 369 throw new RuntimeException("Active Context Handler not initialized by CLR."); 370 } 371 final ActiveContextBridge activeContextBridge = activeContextBridgeFactory.getActiveContextBridge(context); 372 NativeInterop.clrSystemActiveContextHandlerOnNext(JobDriver.this.activeContextHandler, activeContextBridge, 373 JobDriver.this.interopLogger); 374 } catch (final Exception ex) { 375 LOG.log(Level.SEVERE, "Fail to submit task to active context"); 376 context.close(); 377 throw new RuntimeException(ex); 378 } 379 } 380 381 /** 382 * Handles AllocatedEvaluator: Submit an empty context. 383 */ 384 public final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { 385 @Override 386 public void onNext(final AllocatedEvaluator allocatedEvaluator) { 387 try (final LoggingScope ls = loggingScopeFactory.evaluatorAllocated(allocatedEvaluator.getId())) { 388 synchronized (JobDriver.this) { 389 LOG.log(Level.INFO, "AllocatedEvaluatorHandler.OnNext"); 390 JobDriver.this.submitEvaluator(allocatedEvaluator, clrProcessFactory.newEvaluatorProcess()); 391 } 392 } 393 } 394 } 395 396 /** 397 * Receive notification that a new Context is available. 398 */ 399 public final class ActiveContextHandler implements EventHandler<ActiveContext> { 400 @Override 401 public void onNext(final ActiveContext context) { 402 try (final LoggingScope ls = loggingScopeFactory.activeContextReceived(context.getId())) { 403 synchronized (JobDriver.this) { 404 LOG.log(Level.INFO, "ActiveContextHandler: Context available: {0}", 405 new Object[]{context.getId()}); 406 JobDriver.this.contexts.put(context.getId(), context); 407 JobDriver.this.submit(context); 408 } 409 } 410 } 411 } 412 413 /** 414 * Receive notification that the Task has completed successfully. 415 */ 416 public final class CompletedTaskHandler implements EventHandler<CompletedTask> { 417 @Override 418 public void onNext(final CompletedTask task) { 419 LOG.log(Level.INFO, "Completed task: {0}", task.getId()); 420 try (final LoggingScope ls = loggingScopeFactory.taskCompleted(task.getId())) { 421 // Take the message returned by the task and add it to the running result. 422 String result = "default result"; 423 try { 424 result = new String(task.get(), StandardCharsets.UTF_8); 425 } catch (final Exception e) { 426 LOG.log(Level.WARNING, "failed to decode task outcome"); 427 } 428 LOG.log(Level.INFO, "Return results to the client:\n{0}", result); 429 JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(result)); 430 if (JobDriver.this.completedTaskHandler == 0) { 431 LOG.log(Level.INFO, "No CLR handler bound to handle completed task."); 432 } else { 433 LOG.log(Level.INFO, "CLR CompletedTaskHandler handler set, handling things with CLR handler."); 434 final CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task, activeContextBridgeFactory); 435 NativeInterop.clrSystemCompletedTaskHandlerOnNext(JobDriver.this.completedTaskHandler, completedTaskBridge, 436 JobDriver.this.interopLogger); 437 } 438 } 439 } 440 } 441 442 /** 443 * Receive notification that the entire Evaluator had failed. 444 */ 445 public final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { 446 @Override 447 public void onNext(final FailedEvaluator eval) { 448 JobDriver.this.handleFailedEvaluator(eval, false); 449 allocatedEvaluatorBridges.remove(eval.getId()); 450 } 451 } 452 453 /** 454 * Receive notification that the entire Evaluator had failed on Driver Restart. 455 */ 456 public final class DriverRestartFailedEvaluatorHandler implements EventHandler<FailedEvaluator> { 457 @Override 458 public void onNext(final FailedEvaluator eval) { 459 JobDriver.this.handleFailedEvaluator(eval, true); 460 } 461 } 462 463 final class HttpServerBridgeEventHandler implements HttpHandler { 464 private String uriSpecification; 465 466 /** 467 * returns URI specification for the handler. 468 */ 469 @Override 470 public String getUriSpecification() { 471 return uriSpecification; 472 } 473 474 public void setUriSpecification(final String s) { 475 uriSpecification = s; 476 } 477 478 /** 479 * process http request. 480 */ 481 @Override 482 public void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpServletResponse response) 483 throws IOException, ServletException { 484 LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest: {0}", parsedHttpRequest.getRequestUri()); 485 try (final LoggingScope ls = loggingScopeFactory.httpRequest(parsedHttpRequest.getRequestUri())) { 486 final AvroHttpSerializer httpSerializer = new AvroHttpSerializer(); 487 final AvroHttpRequest avroHttpRequest = httpSerializer.toAvro(parsedHttpRequest); 488 489 final String requestString = httpSerializer.toString(avroHttpRequest); 490 final byte[] requestBytes = requestString.getBytes(Charset.forName(AvroHttpSerializer.JSON_CHARSET)); 491 //final byte[] requestBytes = httpSerializer.toBytes(avroHttpRequest); 492 493 try { 494 final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge(requestBytes); 495 NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.httpServerEventHandler, httpServerEventBridge, 496 JobDriver.this.interopLogger); 497 final String responseBody = new String(httpServerEventBridge.getQueryResponseData(), "UTF-8"); 498 response.getWriter().println(responseBody); 499 LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest received response: {0}", responseBody); 500 } catch (final Exception ex) { 501 LOG.log(Level.SEVERE, "Fail to invoke CLR Http Server handler", ex); 502 throw new RuntimeException(ex); 503 } 504 } 505 } 506 } 507 508 /** 509 * Handle failed task. 510 */ 511 public final class FailedTaskHandler implements EventHandler<FailedTask> { 512 @Override 513 public void onNext(final FailedTask task) throws RuntimeException { 514 LOG.log(Level.SEVERE, "FailedTask received, will be handle in CLR handler, if set."); 515 if (JobDriver.this.failedTaskHandler == 0) { 516 LOG.log(Level.SEVERE, "Failed Task Handler not initialized by CLR, fail for real."); 517 throw new RuntimeException("Failed Task Handler not initialized by CLR."); 518 } 519 try { 520 final FailedTaskBridge failedTaskBridge = new FailedTaskBridge(task, activeContextBridgeFactory); 521 NativeInterop.clrSystemFailedTaskHandlerOnNext(JobDriver.this.failedTaskHandler, failedTaskBridge, 522 JobDriver.this.interopLogger); 523 } catch (final Exception ex) { 524 LOG.log(Level.SEVERE, "Fail to invoke CLR failed task handler"); 525 throw new RuntimeException(ex); 526 } 527 } 528 } 529 530 /** 531 * Receive notification that the Task is running. 532 */ 533 public final class RunningTaskHandler implements EventHandler<RunningTask> { 534 @Override 535 public void onNext(final RunningTask task) { 536 try (final LoggingScope ls = loggingScopeFactory.taskRunning(task.getId())) { 537 if (JobDriver.this.runningTaskHandler == 0) { 538 LOG.log(Level.INFO, "RunningTask event received but no CLR handler was bound. Exiting handler."); 539 } else { 540 LOG.log(Level.INFO, "RunningTask will be handled by CLR handler. Task Id: {0}", task.getId()); 541 try { 542 final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task, activeContextBridgeFactory); 543 NativeInterop.clrSystemRunningTaskHandlerOnNext(JobDriver.this.runningTaskHandler, runningTaskBridge, 544 JobDriver.this.interopLogger); 545 } catch (final Exception ex) { 546 LOG.log(Level.WARNING, "Fail to invoke CLR running task handler"); 547 throw new RuntimeException(ex); 548 } 549 } 550 } 551 } 552 } 553 554 /** 555 * Receive notification that the Task is running when driver restarted. 556 */ 557 public final class DriverRestartRunningTaskHandler implements EventHandler<RunningTask> { 558 @Override 559 public void onNext(final RunningTask task) { 560 try (final LoggingScope ls = loggingScopeFactory.driverRestartRunningTask(task.getId())) { 561 clock.scheduleAlarm(0, new EventHandler<Alarm>() { 562 @Override 563 public void onNext(final Alarm time) { 564 if (JobDriver.this.clrBridgeSetup) { 565 if (JobDriver.this.driverRestartRunningTaskHandler != 0) { 566 LOG.log(Level.INFO, "CLR driver restart RunningTask handler implemented, now handle it in CLR."); 567 NativeInterop.clrSystemDriverRestartRunningTaskHandlerOnNext( 568 JobDriver.this.driverRestartRunningTaskHandler, 569 new RunningTaskBridge(task, activeContextBridgeFactory)); 570 } else { 571 LOG.log(Level.WARNING, "No CLR driver restart RunningTask handler implemented, " + 572 "done with DriverRestartRunningTaskHandler."); 573 } 574 } else { 575 LOG.log(Level.INFO, "Waiting for driver to complete restart process " + 576 "before checking out CLR driver restart RunningTaskHandler..."); 577 clock.scheduleAlarm(2000, this); 578 } 579 } 580 }); 581 } 582 } 583 } 584 585 /** 586 * Receive notification that an context is active on Evaluator when the driver restarted. 587 */ 588 public final class DriverRestartActiveContextHandler implements EventHandler<ActiveContext> { 589 @Override 590 public void onNext(final ActiveContext context) { 591 try (final LoggingScope ls = loggingScopeFactory.driverRestartActiveContextReceived(context.getId())) { 592 JobDriver.this.contexts.put(context.getId(), context); 593 LOG.log(Level.INFO, "DriverRestartActiveContextHandler event received: " + context.getId()); 594 clock.scheduleAlarm(0, new EventHandler<Alarm>() { 595 @Override 596 public void onNext(final Alarm time) { 597 if (JobDriver.this.clrBridgeSetup) { 598 if (JobDriver.this.driverRestartActiveContextHandler != 0) { 599 LOG.log(Level.INFO, "CLR driver restart ActiveContext handler implemented, now handle it in CLR."); 600 NativeInterop.clrSystemDriverRestartActiveContextHandlerOnNext( 601 JobDriver.this.driverRestartActiveContextHandler, 602 activeContextBridgeFactory.getActiveContextBridge(context)); 603 } else { 604 LOG.log(Level.WARNING, "No CLR driver restart ActiveContext handler implemented, " + 605 "done with DriverRestartActiveContextHandler."); 606 } 607 } else { 608 LOG.log(Level.INFO, "Waiting for driver to complete restart process " + 609 "before checking out CLR driver restart DriverRestartActiveContextHandler..."); 610 clock.scheduleAlarm(2000, this); 611 } 612 } 613 }); 614 } 615 } 616 } 617 618 /** 619 * Job Driver is ready and the clock is set up: request the evaluators. 620 */ 621 public final class StartHandler implements EventHandler<StartTime> { 622 @Override 623 public void onNext(final StartTime startTime) { 624 try (final LoggingScope ls = loggingScopeFactory.driverStart(startTime)) { 625 synchronized (JobDriver.this) { 626 627 setupBridge(new DriverStartClrHandlersInitializer(startTime)); 628 LOG.log(Level.INFO, "Driver Started"); 629 } 630 } 631 } 632 } 633 634 635 /** 636 * Job driver is restarted after previous crash. 637 */ 638 public final class RestartHandler implements EventHandler<DriverRestarted> { 639 @Override 640 public void onNext(final DriverRestarted driverRestarted) { 641 try (final LoggingScope ls = loggingScopeFactory.driverRestart(driverRestarted.getStartTime())) { 642 synchronized (JobDriver.this) { 643 644 JobDriver.this.isRestarted = true; 645 setupBridge(new DriverRestartClrHandlersInitializer(driverRestarted)); 646 647 LOG.log(Level.INFO, "Driver Restarted and CLR bridge set up."); 648 } 649 } 650 } 651 } 652 653 /** 654 * Receive notification that driver restart has completed. 655 */ 656 public final class DriverRestartCompletedHandler implements EventHandler<DriverRestartCompleted> { 657 @Override 658 public void onNext(final DriverRestartCompleted driverRestartCompleted) { 659 LOG.log(Level.INFO, "Java DriverRestartCompleted event received at time [{0}]. ", 660 driverRestartCompleted.getCompletedTime()); 661 try (final LoggingScope ls = loggingScopeFactory.driverRestartCompleted( 662 driverRestartCompleted.getCompletedTime().getTimeStamp())) { 663 if (JobDriver.this.driverRestartCompletedHandler != 0) { 664 LOG.log(Level.INFO, "CLR driver restart handler implemented, now handle it in CLR."); 665 666 NativeInterop.clrSystemDriverRestartCompletedHandlerOnNext( 667 JobDriver.this.driverRestartCompletedHandler, new DriverRestartCompletedBridge(driverRestartCompleted)); 668 } else { 669 LOG.log(Level.WARNING, "No CLR driver restart handler implemented, done with DriverRestartCompletedHandler."); 670 } 671 } 672 } 673 } 674 675 /** 676 * Shutting down the job driver: close the evaluators. 677 */ 678 final class StopHandler implements EventHandler<StopTime> { 679 @Override 680 public void onNext(final StopTime time) { 681 LOG.log(Level.INFO, " StopTime: {0}", new Object[]{time}); 682 try (final LoggingScope ls = loggingScopeFactory.driverStop(time.getTimeStamp())) { 683 for (final ActiveContext context : contexts.values()) { 684 context.close(); 685 } 686 } 687 } 688 } 689 690 /** 691 * Handler for message received from the Task. 692 */ 693 public final class TaskMessageHandler implements EventHandler<TaskMessage> { 694 @Override 695 public void onNext(final TaskMessage taskMessage) { 696 final String msg = new String(taskMessage.get(), StandardCharsets.UTF_8); 697 LOG.log(Level.INFO, "Received TaskMessage: {0} from CLR", msg); 698 //try (LoggingScope ls = loggingScopeFactory.taskMessageReceived(new String(msg))) { 699 if (JobDriver.this.taskMessageHandler != 0) { 700 final TaskMessageBridge taskMessageBridge = new TaskMessageBridge(taskMessage); 701 // if CLR implements the task message handler, handle the bytes in CLR handler 702 NativeInterop.clrSystemTaskMessageHandlerOnNext(JobDriver.this.taskMessageHandler, taskMessage.get(), 703 taskMessageBridge, JobDriver.this.interopLogger); 704 } 705 //} 706 } 707 } 708 709 /** 710 * Receive notification that the Task has been suspended. 711 */ 712 public final class SuspendedTaskHandler implements EventHandler<SuspendedTask> { 713 @Override 714 public void onNext(final SuspendedTask task) { 715 final String message = "Received notification that task [" + task.getId() + "] has been suspended."; 716 LOG.log(Level.INFO, message); 717 try (final LoggingScope ls = loggingScopeFactory.taskSuspended(task.getId())) { 718 if (JobDriver.this.suspendedTaskHandler != 0) { 719 final SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task, activeContextBridgeFactory); 720 // if CLR implements the suspended task handler, handle it in CLR 721 LOG.log(Level.INFO, "Handling the event of suspended task in CLR bridge."); 722 NativeInterop.clrSystemSuspendedTaskHandlerOnNext(JobDriver.this.suspendedTaskHandler, suspendedTaskBridge); 723 } 724 JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(message)); 725 } 726 } 727 } 728 729 /** 730 * Receive notification that the Evaluator has been shut down. 731 */ 732 public final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> { 733 @Override 734 public void onNext(final CompletedEvaluator evaluator) { 735 LOG.log(Level.INFO, " Completed Evaluator {0}", evaluator.getId()); 736 try (final LoggingScope ls = loggingScopeFactory.evaluatorCompleted(evaluator.getId())) { 737 if (JobDriver.this.completedEvaluatorHandler != 0) { 738 final CompletedEvaluatorBridge completedEvaluatorBridge = new CompletedEvaluatorBridge(evaluator); 739 // if CLR implements the completed evaluator handler, handle it in CLR 740 LOG.log(Level.INFO, "Handling the event of completed evaluator in CLR bridge."); 741 NativeInterop.clrSystemCompletedEvaluatorHandlerOnNext(completedEvaluatorHandler, completedEvaluatorBridge); 742 allocatedEvaluatorBridges.remove(completedEvaluatorBridge.getId()); 743 } 744 } 745 } 746 } 747 748 749 /** 750 * Receive notification that the Context had completed. 751 * Remove context from the list of active context. 752 */ 753 public final class ClosedContextHandler implements EventHandler<ClosedContext> { 754 @Override 755 public void onNext(final ClosedContext context) { 756 LOG.log(Level.INFO, "Completed Context: {0}", context.getId()); 757 try (final LoggingScope ls = loggingScopeFactory.closedContext(context.getId())) { 758 if (JobDriver.this.closedContextHandler != 0) { 759 final ClosedContextBridge closedContextBridge = new ClosedContextBridge(context, activeContextBridgeFactory); 760 // if CLR implements the closed context handler, handle it in CLR 761 LOG.log(Level.INFO, "Handling the event of closed context in CLR bridge."); 762 NativeInterop.clrSystemClosedContextHandlerOnNext(JobDriver.this.closedContextHandler, closedContextBridge); 763 } 764 synchronized (JobDriver.this) { 765 JobDriver.this.contexts.remove(context.getId()); 766 } 767 } 768 } 769 } 770 771 772 /** 773 * Receive notification that the Context had failed. 774 * Remove context from the list of active context and notify the client. 775 */ 776 public final class FailedContextHandler implements EventHandler<FailedContext> { 777 @Override 778 public void onNext(final FailedContext context) { 779 LOG.log(Level.SEVERE, "FailedContext", context); 780 try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(context.getId())) { 781 if (JobDriver.this.failedContextHandler != 0) { 782 final FailedContextBridge failedContextBridge = new FailedContextBridge(context, activeContextBridgeFactory); 783 // if CLR implements the failed context handler, handle it in CLR 784 LOG.log(Level.INFO, "Handling the event of failed context in CLR bridge."); 785 NativeInterop.clrSystemFailedContextHandlerOnNext(JobDriver.this.failedContextHandler, failedContextBridge); 786 } 787 synchronized (JobDriver.this) { 788 JobDriver.this.contexts.remove(context.getId()); 789 } 790 final Optional<byte[]> err = context.getData(); 791 if (err.isPresent()) { 792 JobDriver.this.jobMessageObserver.sendMessageToClient(err.get()); 793 } 794 } 795 } 796 } 797 798 /** 799 * Receive notification that a ContextMessage has been received. 800 */ 801 public final class ContextMessageHandler implements EventHandler<ContextMessage> { 802 @Override 803 public void onNext(final ContextMessage message) { 804 LOG.log(Level.SEVERE, "Received ContextMessage:", message.get()); 805 try (final LoggingScope ls = 806 loggingScopeFactory.contextMessageReceived(new String(message.get(), StandardCharsets.UTF_8))) { 807 if (JobDriver.this.contextMessageHandler != 0) { 808 final ContextMessageBridge contextMessageBridge = new ContextMessageBridge(message); 809 // if CLR implements the context message handler, handle it in CLR 810 LOG.log(Level.INFO, "Handling the event of context message in CLR bridge."); 811 NativeInterop.clrSystemContextMessageHandlerOnNext(JobDriver.this.contextMessageHandler, 812 contextMessageBridge); 813 } 814 } 815 } 816 } 817}