001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.javabridge.generic;
020
021import org.apache.reef.driver.client.JobMessageObserver;
022import org.apache.reef.driver.context.ActiveContext;
023import org.apache.reef.driver.context.ClosedContext;
024import org.apache.reef.driver.context.ContextMessage;
025import org.apache.reef.driver.context.FailedContext;
026import org.apache.reef.driver.evaluator.*;
027import org.apache.reef.driver.restart.DriverRestarted;
028import org.apache.reef.driver.task.*;
029import org.apache.reef.io.network.naming.NameServer;
030import org.apache.reef.javabridge.*;
031import org.apache.reef.driver.restart.DriverRestartCompleted;
032import org.apache.reef.runtime.common.driver.DriverStatusManager;
033import org.apache.reef.driver.evaluator.EvaluatorProcess;
034import org.apache.reef.runtime.common.files.REEFFileNames;
035import org.apache.reef.tang.annotations.Unit;
036import org.apache.reef.util.Optional;
037import org.apache.reef.util.logging.CLRBufferedLogHandler;
038import org.apache.reef.util.logging.LoggingScope;
039import org.apache.reef.util.logging.LoggingScopeFactory;
040import org.apache.reef.wake.EventHandler;
041import org.apache.reef.wake.remote.address.LocalAddressProvider;
042import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
043import org.apache.reef.wake.time.Clock;
044import org.apache.reef.wake.time.event.Alarm;
045import org.apache.reef.wake.time.event.StartTime;
046import org.apache.reef.wake.time.event.StopTime;
047import org.apache.reef.webserver.*;
048
049import javax.inject.Inject;
050import javax.servlet.ServletException;
051import javax.servlet.http.HttpServletResponse;
052import java.io.*;
053import java.nio.charset.Charset;
054import java.nio.charset.StandardCharsets;
055import java.util.ArrayList;
056import java.util.HashMap;
057import java.util.List;
058import java.util.Map;
059import java.util.logging.Handler;
060import java.util.logging.Level;
061import java.util.logging.Logger;
062
063/**
064 * Generic job driver for CLRBridge.
065 */
066@Unit
067public final class JobDriver {
068
069  private static final Logger LOG = Logger.getLogger(JobDriver.class.getName());
070  /**
071   * String codec is used to encode the results
072   * before passing them back to the client.
073   */
074  private static final ObjectSerializableCodec<String> JVM_CODEC = new ObjectSerializableCodec<>();
075  private final InteropLogger interopLogger = new InteropLogger();
076  private final NameServer nameServer;
077  private final String nameServerInfo;
078  private final HttpServer httpServer;
079  private final ActiveContextBridgeFactory activeContextBridgeFactory;
080  private final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory;
081
082  /**
083   * Wake clock is used to schedule periodical job check-ups.
084   */
085  private final Clock clock;
086  /**
087   * Job observer on the client.
088   * We use it to send results from the driver back to the client.
089   */
090  private final JobMessageObserver jobMessageObserver;
091  /**
092   * Job driver uses EvaluatorRequestor
093   * to request Evaluators that will run the Tasks.
094   */
095  private final EvaluatorRequestor evaluatorRequestor;
096
097  /**
098   * Driver status manager to monitor driver status.
099   */
100  private final DriverStatusManager driverStatusManager;
101
102  /**
103   * Factory to setup new CLR process configurations.
104   */
105  private final CLRProcessFactory clrProcessFactory;
106
107  /**
108   * Shell execution results from each Evaluator.
109   */
110  private final List<String> results = new ArrayList<>();
111  /**
112   * Map from context ID to running evaluator context.
113   */
114  private final Map<String, ActiveContext> contexts = new HashMap<>();
115
116  private final REEFFileNames reefFileNames;
117  private final LocalAddressProvider localAddressProvider;
118  /**
119   * Logging scope factory that provides LoggingScope.
120   */
121  private final LoggingScopeFactory loggingScopeFactory;
122
123  private long allocatedEvaluatorHandler = 0;
124  private long activeContextHandler = 0;
125  private long taskMessageHandler = 0;
126  private long failedTaskHandler = 0;
127  private long failedEvaluatorHandler = 0;
128  private long httpServerEventHandler = 0;
129  private long completedTaskHandler = 0;
130  private long runningTaskHandler = 0;
131  private long suspendedTaskHandler = 0;
132  private long completedEvaluatorHandler = 0;
133  private long closedContextHandler = 0;
134  private long failedContextHandler = 0;
135  private long contextMessageHandler = 0;
136  private long driverRestartActiveContextHandler = 0;
137  private long driverRestartRunningTaskHandler = 0;
138  private long driverRestartCompletedHandler = 0;
139  private long driverRestartFailedEvaluatorHandler = 0;
140  private boolean clrBridgeSetup = false;
141  private boolean isRestarted = false;
142  // We are holding on to following on bridge side.
143  // Need to add references here so that GC does not collect them.
144  private final HashMap<String, AllocatedEvaluatorBridge> allocatedEvaluatorBridges =
145      new HashMap<String, AllocatedEvaluatorBridge>();
146  private EvaluatorRequestorBridge evaluatorRequestorBridge;
147
148
149  /**
150   * Job driver constructor.
151   * All parameters are injected from TANG automatically.
152   *
153   * @param clock                      Wake clock to schedule and check up running jobs.
154   * @param jobMessageObserver         is used to send messages back to the client.
155   * @param evaluatorRequestor         is used to request Evaluators.
156   * @param activeContextBridgeFactory
157   */
158  @Inject
159  JobDriver(final Clock clock,
160            final HttpServer httpServer,
161            final NameServer nameServer,
162            final JobMessageObserver jobMessageObserver,
163            final EvaluatorRequestor evaluatorRequestor,
164            final DriverStatusManager driverStatusManager,
165            final LoggingScopeFactory loggingScopeFactory,
166            final LocalAddressProvider localAddressProvider,
167            final ActiveContextBridgeFactory activeContextBridgeFactory,
168            final REEFFileNames reefFileNames,
169            final AllocatedEvaluatorBridgeFactory allocatedEvaluatorBridgeFactory,
170            final CLRProcessFactory clrProcessFactory) {
171    this.clock = clock;
172    this.httpServer = httpServer;
173    this.jobMessageObserver = jobMessageObserver;
174    this.evaluatorRequestor = evaluatorRequestor;
175    this.nameServer = nameServer;
176    this.driverStatusManager = driverStatusManager;
177    this.activeContextBridgeFactory = activeContextBridgeFactory;
178    this.allocatedEvaluatorBridgeFactory = allocatedEvaluatorBridgeFactory;
179    this.nameServerInfo = localAddressProvider.getLocalAddress() + ":" + this.nameServer.getPort();
180    this.loggingScopeFactory = loggingScopeFactory;
181    this.reefFileNames = reefFileNames;
182    this.localAddressProvider = localAddressProvider;
183    this.clrProcessFactory = clrProcessFactory;
184  }
185
186  private void setupBridge(final ClrHandlersInitializer initializer) {
187    // Signal to the clr buffered log handler that the driver has started and that
188    // we can begin logging
189    LOG.log(Level.INFO, "Initializing CLRBufferedLogHandler...");
190    try (final LoggingScope lb = this.loggingScopeFactory.setupBridge()) {
191      final CLRBufferedLogHandler handler = getCLRBufferedLogHandler();
192      if (handler == null) {
193        LOG.log(Level.WARNING, "CLRBufferedLogHandler could not be initialized");
194      } else {
195        handler.setDriverInitialized();
196        LOG.log(Level.INFO, "CLRBufferedLogHandler init complete.");
197      }
198
199      final String portNumber = httpServer == null ? null : Integer.toString((httpServer.getPort()));
200      if (portNumber != null){
201        try {
202          final File outputFileName = new File(reefFileNames.getDriverHttpEndpoint());
203          BufferedWriter out = new BufferedWriter(
204              new OutputStreamWriter(new FileOutputStream(outputFileName), StandardCharsets.UTF_8));
205          out.write(localAddressProvider.getLocalAddress() + ":" + portNumber + "\n");
206          out.close();
207        } catch (IOException ex) {
208          throw new RuntimeException(ex);
209        }
210      }
211
212      this.evaluatorRequestorBridge =
213          new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory);
214      final long[] handlers = initializer.getClrHandlers(portNumber, evaluatorRequestorBridge);
215      if (handlers != null) {
216        if (handlers.length != NativeInterop.N_HANDLERS) {
217          throw new RuntimeException(
218              String.format("%s handlers initialized in CLR while native bridge is expecting %s handlers",
219                  String.valueOf(handlers.length),
220                  String.valueOf(NativeInterop.N_HANDLERS)));
221        }
222        this.allocatedEvaluatorHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.ALLOCATED_EVALUATOR_KEY)];
223        this.activeContextHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.ACTIVE_CONTEXT_KEY)];
224        this.taskMessageHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.TASK_MESSAGE_KEY)];
225        this.failedTaskHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.FAILED_TASK_KEY)];
226        this.failedEvaluatorHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.FAILED_EVALUATOR_KEY)];
227        this.httpServerEventHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.HTTP_SERVER_KEY)];
228        this.completedTaskHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.COMPLETED_TASK_KEY)];
229        this.runningTaskHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.RUNNING_TASK_KEY)];
230        this.suspendedTaskHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.SUSPENDED_TASK_KEY)];
231        this.completedEvaluatorHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.COMPLETED_EVALUATOR_KEY)];
232        this.closedContextHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.CLOSED_CONTEXT_KEY)];
233        this.failedContextHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.FAILED_CONTEXT_KEY)];
234        this.contextMessageHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.CONTEXT_MESSAGE_KEY)];
235        this.driverRestartActiveContextHandler =
236            handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_ACTIVE_CONTEXT_KEY)];
237        this.driverRestartRunningTaskHandler =
238            handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_RUNNING_TASK_KEY)];
239        this.driverRestartCompletedHandler =
240            handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_COMPLETED_KEY)];
241        this.driverRestartFailedEvaluatorHandler =
242            handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_FAILED_EVALUATOR_KEY)];
243      }
244
245      try (final LoggingScope lp =
246               this.loggingScopeFactory.getNewLoggingScope("setupBridge::clrSystemHttpServerHandlerOnNext")) {
247        final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge("SPEC");
248        NativeInterop.clrSystemHttpServerHandlerOnNext(this.httpServerEventHandler, httpServerEventBridge,
249            this.interopLogger);
250        final String specList = httpServerEventBridge.getUriSpecification();
251        LOG.log(Level.INFO, "Starting http server, getUriSpecification: {0}", specList);
252        if (specList != null) {
253          final String[] specs = specList.split(":");
254          for (final String s : specs) {
255            final HttpHandler h = new HttpServerBridgeEventHandler();
256            h.setUriSpecification(s);
257            this.httpServer.addHttpHandler(h);
258          }
259        }
260      }
261      this.clrBridgeSetup = true;
262    }
263    LOG.log(Level.INFO, "CLR Bridge setup.");
264  }
265
266  private CLRBufferedLogHandler getCLRBufferedLogHandler() {
267    for (final Handler handler : Logger.getLogger("").getHandlers()) {
268      if (handler instanceof CLRBufferedLogHandler) {
269        return (CLRBufferedLogHandler) handler;
270      }
271    }
272    return null;
273  }
274
275  private void submitEvaluator(final AllocatedEvaluator eval, final EvaluatorProcess process) {
276    synchronized (JobDriver.this) {
277      eval.setProcess(process);
278      LOG.log(Level.INFO, "Allocated Evaluator: {0}, total running running {1}",
279          new Object[]{eval.getId(), JobDriver.this.contexts.size()});
280      if (JobDriver.this.allocatedEvaluatorHandler == 0) {
281        throw new RuntimeException("Allocated Evaluator Handler not initialized by CLR.");
282      }
283      final AllocatedEvaluatorBridge allocatedEvaluatorBridge =
284          this.allocatedEvaluatorBridgeFactory.getAllocatedEvaluatorBridge(eval, this.nameServerInfo);
285      allocatedEvaluatorBridges.put(allocatedEvaluatorBridge.getId(), allocatedEvaluatorBridge);
286      NativeInterop.clrSystemAllocatedEvaluatorHandlerOnNext(JobDriver.this.allocatedEvaluatorHandler,
287          allocatedEvaluatorBridge, this.interopLogger);
288    }
289  }
290
291  private void handleFailedEvaluator(final FailedEvaluator eval, final boolean isRestartFailed) {
292    try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(eval.getId())) {
293      synchronized (JobDriver.this) {
294        LOG.log(Level.SEVERE, "FailedEvaluator", eval);
295        for (final FailedContext failedContext : eval.getFailedContextList()) {
296          final String failedContextId = failedContext.getId();
297          LOG.log(Level.INFO, "removing context " + failedContextId + " from job driver contexts.");
298          JobDriver.this.contexts.remove(failedContextId);
299        }
300        String message = "Evaluator " + eval.getId() + " failed with message: "
301            + eval.getEvaluatorException().getMessage();
302        JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8));
303
304        if (isRestartFailed) {
305          evaluatorFailedHandlerWaitForCLRBridgeSetup(driverRestartFailedEvaluatorHandler, eval, isRestartFailed);
306        } else {
307          evaluatorFailedHandlerWaitForCLRBridgeSetup(failedEvaluatorHandler, eval, isRestartFailed);
308        }
309      }
310    }
311  }
312
313  private void evaluatorFailedHandlerWaitForCLRBridgeSetup(final long handle,
314                                                           final FailedEvaluator eval,
315                                                           final boolean isRestartFailed) {
316    if (handle == 0) {
317      if (JobDriver.this.clrBridgeSetup) {
318        final String message = "No CLR FailedEvaluator handler was set, exiting now";
319        LOG.log(Level.WARNING, message);
320        JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8));
321      } else {
322        clock.scheduleAlarm(0, new EventHandler<Alarm>() {
323          @Override
324          public void onNext(final Alarm time) {
325            if (JobDriver.this.clrBridgeSetup) {
326              handleFailedEvaluatorInCLR(eval, isRestartFailed);
327            } else {
328              LOG.log(Level.INFO, "Waiting for CLR bridge to be set up");
329              clock.scheduleAlarm(5000, this);
330            }
331          }
332        });
333      }
334    } else{
335      handleFailedEvaluatorInCLR(eval, isRestartFailed);
336    }
337  }
338
339  private void handleFailedEvaluatorInCLR(final FailedEvaluator eval, final boolean isRestartFailed) {
340    final String message = "CLR FailedEvaluator handler set, handling things with CLR handler.";
341    LOG.log(Level.INFO, message);
342    final FailedEvaluatorBridge failedEvaluatorBridge =
343        new FailedEvaluatorBridge(eval, JobDriver.this.evaluatorRequestor,
344        JobDriver.this.isRestarted, loggingScopeFactory);
345    if (isRestartFailed) {
346      NativeInterop.clrSystemDriverRestartFailedEvaluatorHandlerOnNext(
347          JobDriver.this.driverRestartFailedEvaluatorHandler, failedEvaluatorBridge, JobDriver.this.interopLogger);
348    } else {
349      NativeInterop.clrSystemFailedEvaluatorHandlerOnNext(JobDriver.this.failedEvaluatorHandler, failedEvaluatorBridge,
350          JobDriver.this.interopLogger);
351    }
352
353    final int additionalRequestedEvaluatorNumber = failedEvaluatorBridge.getNewlyRequestedEvaluatorNumber();
354    if (additionalRequestedEvaluatorNumber > 0) {
355      LOG.log(Level.INFO, "number of additional evaluators requested after evaluator failure: " +
356          additionalRequestedEvaluatorNumber);
357    }
358
359    JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8));
360  }
361
362  /**
363   * Submit a Task to a single Evaluator.
364   */
365  private void submit(final ActiveContext context) {
366    try {
367      LOG.log(Level.INFO, "Send task to context: {0}", new Object[]{context});
368      if (JobDriver.this.activeContextHandler == 0) {
369        throw new RuntimeException("Active Context Handler not initialized by CLR.");
370      }
371      final ActiveContextBridge activeContextBridge = activeContextBridgeFactory.getActiveContextBridge(context);
372      NativeInterop.clrSystemActiveContextHandlerOnNext(JobDriver.this.activeContextHandler, activeContextBridge,
373          JobDriver.this.interopLogger);
374    } catch (final Exception ex) {
375      LOG.log(Level.SEVERE, "Fail to submit task to active context");
376      context.close();
377      throw new RuntimeException(ex);
378    }
379  }
380
381  /**
382   * Handles AllocatedEvaluator: Submit an empty context.
383   */
384  public final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
385    @Override
386    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
387      try (final LoggingScope ls = loggingScopeFactory.evaluatorAllocated(allocatedEvaluator.getId())) {
388        synchronized (JobDriver.this) {
389          LOG.log(Level.INFO, "AllocatedEvaluatorHandler.OnNext");
390          JobDriver.this.submitEvaluator(allocatedEvaluator, clrProcessFactory.newEvaluatorProcess());
391        }
392      }
393    }
394  }
395
396  /**
397   * Receive notification that a new Context is available.
398   */
399  public final class ActiveContextHandler implements EventHandler<ActiveContext> {
400    @Override
401    public void onNext(final ActiveContext context) {
402      try (final LoggingScope ls = loggingScopeFactory.activeContextReceived(context.getId())) {
403        synchronized (JobDriver.this) {
404          LOG.log(Level.INFO, "ActiveContextHandler: Context available: {0}",
405              new Object[]{context.getId()});
406          JobDriver.this.contexts.put(context.getId(), context);
407          JobDriver.this.submit(context);
408        }
409      }
410    }
411  }
412
413  /**
414   * Receive notification that the Task has completed successfully.
415   */
416  public final class CompletedTaskHandler implements EventHandler<CompletedTask> {
417    @Override
418    public void onNext(final CompletedTask task) {
419      LOG.log(Level.INFO, "Completed task: {0}", task.getId());
420      try (final LoggingScope ls = loggingScopeFactory.taskCompleted(task.getId())) {
421        // Take the message returned by the task and add it to the running result.
422        String result = "default result";
423        try {
424          result = new String(task.get(), StandardCharsets.UTF_8);
425        } catch (final Exception e) {
426          LOG.log(Level.WARNING, "failed to decode task outcome");
427        }
428        LOG.log(Level.INFO, "Return results to the client:\n{0}", result);
429        JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(result));
430        if (JobDriver.this.completedTaskHandler == 0) {
431          LOG.log(Level.INFO, "No CLR handler bound to handle completed task.");
432        } else {
433          LOG.log(Level.INFO, "CLR CompletedTaskHandler handler set, handling things with CLR handler.");
434          final CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task, activeContextBridgeFactory);
435          NativeInterop.clrSystemCompletedTaskHandlerOnNext(JobDriver.this.completedTaskHandler, completedTaskBridge,
436              JobDriver.this.interopLogger);
437        }
438      }
439    }
440  }
441
442  /**
443   * Receive notification that the entire Evaluator had failed.
444   */
445  public final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
446    @Override
447    public void onNext(final FailedEvaluator eval) {
448      JobDriver.this.handleFailedEvaluator(eval, false);
449      allocatedEvaluatorBridges.remove(eval.getId());
450    }
451  }
452
453  /**
454   * Receive notification that the entire Evaluator had failed on Driver Restart.
455   */
456  public final class DriverRestartFailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
457    @Override
458    public void onNext(final FailedEvaluator eval) {
459      JobDriver.this.handleFailedEvaluator(eval, true);
460    }
461  }
462
463  final class HttpServerBridgeEventHandler implements HttpHandler {
464    private String uriSpecification;
465
466    /**
467     * returns URI specification for the handler.
468     */
469    @Override
470    public String getUriSpecification() {
471      return uriSpecification;
472    }
473
474    public void setUriSpecification(final String s) {
475      uriSpecification = s;
476    }
477
478    /**
479     * process http request.
480     */
481    @Override
482    public void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpServletResponse response)
483        throws IOException, ServletException {
484      LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest: {0}", parsedHttpRequest.getRequestUri());
485      try (final LoggingScope ls = loggingScopeFactory.httpRequest(parsedHttpRequest.getRequestUri())) {
486        final AvroHttpSerializer httpSerializer = new AvroHttpSerializer();
487        final AvroHttpRequest avroHttpRequest = httpSerializer.toAvro(parsedHttpRequest);
488
489        final String requestString = httpSerializer.toString(avroHttpRequest);
490        final byte[] requestBytes = requestString.getBytes(Charset.forName(AvroHttpSerializer.JSON_CHARSET));
491        //final byte[] requestBytes = httpSerializer.toBytes(avroHttpRequest);
492
493        try {
494          final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge(requestBytes);
495          NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.httpServerEventHandler, httpServerEventBridge,
496              JobDriver.this.interopLogger);
497          final String responseBody = new String(httpServerEventBridge.getQueryResponseData(), "UTF-8");
498          response.getWriter().println(responseBody);
499          LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest received response: {0}", responseBody);
500        } catch (final Exception ex) {
501          LOG.log(Level.SEVERE, "Fail to invoke CLR Http Server handler", ex);
502          throw new RuntimeException(ex);
503        }
504      }
505    }
506  }
507
508  /**
509   * Handle failed task.
510   */
511  public final class FailedTaskHandler implements EventHandler<FailedTask> {
512    @Override
513    public void onNext(final FailedTask task) throws RuntimeException {
514      LOG.log(Level.SEVERE, "FailedTask received, will be handle in CLR handler, if set.");
515      if (JobDriver.this.failedTaskHandler == 0) {
516        LOG.log(Level.SEVERE, "Failed Task Handler not initialized by CLR, fail for real.");
517        throw new RuntimeException("Failed Task Handler not initialized by CLR.");
518      }
519      try {
520        final FailedTaskBridge failedTaskBridge = new FailedTaskBridge(task, activeContextBridgeFactory);
521        NativeInterop.clrSystemFailedTaskHandlerOnNext(JobDriver.this.failedTaskHandler, failedTaskBridge,
522            JobDriver.this.interopLogger);
523      } catch (final Exception ex) {
524        LOG.log(Level.SEVERE, "Fail to invoke CLR failed task handler");
525        throw new RuntimeException(ex);
526      }
527    }
528  }
529
530  /**
531   * Receive notification that the Task is running.
532   */
533  public final class RunningTaskHandler implements EventHandler<RunningTask> {
534    @Override
535    public void onNext(final RunningTask task) {
536      try (final LoggingScope ls = loggingScopeFactory.taskRunning(task.getId())) {
537        if (JobDriver.this.runningTaskHandler == 0) {
538          LOG.log(Level.INFO, "RunningTask event received but no CLR handler was bound. Exiting handler.");
539        } else {
540          LOG.log(Level.INFO, "RunningTask will be handled by CLR handler. Task Id: {0}", task.getId());
541          try {
542            final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task, activeContextBridgeFactory);
543            NativeInterop.clrSystemRunningTaskHandlerOnNext(JobDriver.this.runningTaskHandler, runningTaskBridge,
544                JobDriver.this.interopLogger);
545          } catch (final Exception ex) {
546            LOG.log(Level.WARNING, "Fail to invoke CLR running task handler");
547            throw new RuntimeException(ex);
548          }
549        }
550      }
551    }
552  }
553
554  /**
555   * Receive notification that the Task is running when driver restarted.
556   */
557  public final class DriverRestartRunningTaskHandler implements EventHandler<RunningTask> {
558    @Override
559    public void onNext(final RunningTask task) {
560      try (final LoggingScope ls = loggingScopeFactory.driverRestartRunningTask(task.getId())) {
561        clock.scheduleAlarm(0, new EventHandler<Alarm>() {
562          @Override
563          public void onNext(final Alarm time) {
564            if (JobDriver.this.clrBridgeSetup) {
565              if (JobDriver.this.driverRestartRunningTaskHandler != 0) {
566                LOG.log(Level.INFO, "CLR driver restart RunningTask handler implemented, now handle it in CLR.");
567                NativeInterop.clrSystemDriverRestartRunningTaskHandlerOnNext(
568                    JobDriver.this.driverRestartRunningTaskHandler,
569                    new RunningTaskBridge(task, activeContextBridgeFactory));
570              } else {
571                LOG.log(Level.WARNING, "No CLR driver restart RunningTask handler implemented, " +
572                    "done with DriverRestartRunningTaskHandler.");
573              }
574            } else {
575              LOG.log(Level.INFO, "Waiting for driver to complete restart process " +
576                  "before checking out CLR driver restart RunningTaskHandler...");
577              clock.scheduleAlarm(2000, this);
578            }
579          }
580        });
581      }
582    }
583  }
584
585  /**
586   * Receive notification that an context is active on Evaluator when the driver restarted.
587   */
588  public final class DriverRestartActiveContextHandler implements EventHandler<ActiveContext> {
589    @Override
590    public void onNext(final ActiveContext context) {
591      try (final LoggingScope ls = loggingScopeFactory.driverRestartActiveContextReceived(context.getId())) {
592        JobDriver.this.contexts.put(context.getId(), context);
593        LOG.log(Level.INFO, "DriverRestartActiveContextHandler event received: " + context.getId());
594        clock.scheduleAlarm(0, new EventHandler<Alarm>() {
595          @Override
596          public void onNext(final Alarm time) {
597            if (JobDriver.this.clrBridgeSetup) {
598              if (JobDriver.this.driverRestartActiveContextHandler != 0) {
599                LOG.log(Level.INFO, "CLR driver restart ActiveContext handler implemented, now handle it in CLR.");
600                NativeInterop.clrSystemDriverRestartActiveContextHandlerOnNext(
601                    JobDriver.this.driverRestartActiveContextHandler,
602                    activeContextBridgeFactory.getActiveContextBridge(context));
603              } else {
604                LOG.log(Level.WARNING, "No CLR driver restart ActiveContext handler implemented, " +
605                    "done with DriverRestartActiveContextHandler.");
606              }
607            } else {
608              LOG.log(Level.INFO, "Waiting for driver to complete restart process " +
609                  "before checking out CLR driver restart DriverRestartActiveContextHandler...");
610              clock.scheduleAlarm(2000, this);
611            }
612          }
613        });
614      }
615    }
616  }
617
618  /**
619   * Job Driver is ready and the clock is set up: request the evaluators.
620   */
621  public final class StartHandler implements EventHandler<StartTime> {
622    @Override
623    public void onNext(final StartTime startTime) {
624      try (final LoggingScope ls = loggingScopeFactory.driverStart(startTime)) {
625        synchronized (JobDriver.this) {
626
627          setupBridge(new DriverStartClrHandlersInitializer(startTime));
628          LOG.log(Level.INFO, "Driver Started");
629        }
630      }
631    }
632  }
633
634
635  /**
636   * Job driver is restarted after previous crash.
637   */
638  public final class RestartHandler implements EventHandler<DriverRestarted> {
639    @Override
640    public void onNext(final DriverRestarted driverRestarted) {
641      try (final LoggingScope ls = loggingScopeFactory.driverRestart(driverRestarted.getStartTime())) {
642        synchronized (JobDriver.this) {
643
644          JobDriver.this.isRestarted = true;
645          setupBridge(new DriverRestartClrHandlersInitializer(driverRestarted));
646
647          LOG.log(Level.INFO, "Driver Restarted and CLR bridge set up.");
648        }
649      }
650    }
651  }
652
653  /**
654   * Receive notification that driver restart has completed.
655   */
656  public final class DriverRestartCompletedHandler implements EventHandler<DriverRestartCompleted> {
657    @Override
658    public void onNext(final DriverRestartCompleted driverRestartCompleted) {
659      LOG.log(Level.INFO, "Java DriverRestartCompleted event received at time [{0}]. ",
660          driverRestartCompleted.getCompletedTime());
661      try (final LoggingScope ls = loggingScopeFactory.driverRestartCompleted(
662          driverRestartCompleted.getCompletedTime().getTimeStamp())) {
663        if (JobDriver.this.driverRestartCompletedHandler != 0) {
664          LOG.log(Level.INFO, "CLR driver restart handler implemented, now handle it in CLR.");
665
666          NativeInterop.clrSystemDriverRestartCompletedHandlerOnNext(
667              JobDriver.this.driverRestartCompletedHandler, new DriverRestartCompletedBridge(driverRestartCompleted));
668        } else {
669          LOG.log(Level.WARNING, "No CLR driver restart handler implemented, done with DriverRestartCompletedHandler.");
670        }
671      }
672    }
673  }
674
675  /**
676   * Shutting down the job driver: close the evaluators.
677   */
678  final class StopHandler implements EventHandler<StopTime> {
679    @Override
680    public void onNext(final StopTime time) {
681      LOG.log(Level.INFO, " StopTime: {0}", new Object[]{time});
682      try (final LoggingScope ls = loggingScopeFactory.driverStop(time.getTimeStamp())) {
683        for (final ActiveContext context : contexts.values()) {
684          context.close();
685        }
686      }
687    }
688  }
689
690  /**
691   * Handler for message received from the Task.
692   */
693  public final class TaskMessageHandler implements EventHandler<TaskMessage> {
694    @Override
695    public void onNext(final TaskMessage taskMessage) {
696      final String msg = new String(taskMessage.get(), StandardCharsets.UTF_8);
697      LOG.log(Level.INFO, "Received TaskMessage: {0} from CLR", msg);
698      //try (LoggingScope ls = loggingScopeFactory.taskMessageReceived(new String(msg))) {
699      if (JobDriver.this.taskMessageHandler != 0) {
700        final TaskMessageBridge taskMessageBridge = new TaskMessageBridge(taskMessage);
701        // if CLR implements the task message handler, handle the bytes in CLR handler
702        NativeInterop.clrSystemTaskMessageHandlerOnNext(JobDriver.this.taskMessageHandler, taskMessage.get(),
703            taskMessageBridge, JobDriver.this.interopLogger);
704      }
705      //}
706    }
707  }
708
709  /**
710   * Receive notification that the Task has been suspended.
711   */
712  public final class SuspendedTaskHandler implements EventHandler<SuspendedTask> {
713    @Override
714    public void onNext(final SuspendedTask task) {
715      final String message = "Received notification that task [" + task.getId() + "] has been suspended.";
716      LOG.log(Level.INFO, message);
717      try (final LoggingScope ls = loggingScopeFactory.taskSuspended(task.getId())) {
718        if (JobDriver.this.suspendedTaskHandler != 0) {
719          final SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task, activeContextBridgeFactory);
720          // if CLR implements the suspended task handler, handle it in CLR
721          LOG.log(Level.INFO, "Handling the event of suspended task in CLR bridge.");
722          NativeInterop.clrSystemSuspendedTaskHandlerOnNext(JobDriver.this.suspendedTaskHandler, suspendedTaskBridge);
723        }
724        JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(message));
725      }
726    }
727  }
728
729  /**
730   * Receive notification that the Evaluator has been shut down.
731   */
732  public final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> {
733    @Override
734    public void onNext(final CompletedEvaluator evaluator) {
735      LOG.log(Level.INFO, " Completed Evaluator {0}", evaluator.getId());
736      try (final LoggingScope ls = loggingScopeFactory.evaluatorCompleted(evaluator.getId())) {
737        if (JobDriver.this.completedEvaluatorHandler != 0) {
738          final CompletedEvaluatorBridge completedEvaluatorBridge = new CompletedEvaluatorBridge(evaluator);
739          // if CLR implements the completed evaluator handler, handle it in CLR
740          LOG.log(Level.INFO, "Handling the event of completed evaluator in CLR bridge.");
741          NativeInterop.clrSystemCompletedEvaluatorHandlerOnNext(completedEvaluatorHandler, completedEvaluatorBridge);
742          allocatedEvaluatorBridges.remove(completedEvaluatorBridge.getId());
743        }
744      }
745    }
746  }
747
748
749  /**
750   * Receive notification that the Context had completed.
751   * Remove context from the list of active context.
752   */
753  public final class ClosedContextHandler implements EventHandler<ClosedContext> {
754    @Override
755    public void onNext(final ClosedContext context) {
756      LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
757      try (final LoggingScope ls = loggingScopeFactory.closedContext(context.getId())) {
758        if (JobDriver.this.closedContextHandler != 0) {
759          final ClosedContextBridge closedContextBridge = new ClosedContextBridge(context, activeContextBridgeFactory);
760          // if CLR implements the closed context handler, handle it in CLR
761          LOG.log(Level.INFO, "Handling the event of closed context in CLR bridge.");
762          NativeInterop.clrSystemClosedContextHandlerOnNext(JobDriver.this.closedContextHandler, closedContextBridge);
763        }
764        synchronized (JobDriver.this) {
765          JobDriver.this.contexts.remove(context.getId());
766        }
767      }
768    }
769  }
770
771
772  /**
773   * Receive notification that the Context had failed.
774   * Remove context from the list of active context and notify the client.
775   */
776  public final class FailedContextHandler implements EventHandler<FailedContext> {
777    @Override
778    public void onNext(final FailedContext context) {
779      LOG.log(Level.SEVERE, "FailedContext", context);
780      try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(context.getId())) {
781        if (JobDriver.this.failedContextHandler != 0) {
782          final FailedContextBridge failedContextBridge = new FailedContextBridge(context, activeContextBridgeFactory);
783          // if CLR implements the failed context handler, handle it in CLR
784          LOG.log(Level.INFO, "Handling the event of failed context in CLR bridge.");
785          NativeInterop.clrSystemFailedContextHandlerOnNext(JobDriver.this.failedContextHandler, failedContextBridge);
786        }
787        synchronized (JobDriver.this) {
788          JobDriver.this.contexts.remove(context.getId());
789        }
790        final Optional<byte[]> err = context.getData();
791        if (err.isPresent()) {
792          JobDriver.this.jobMessageObserver.sendMessageToClient(err.get());
793        }
794      }
795    }
796  }
797
798  /**
799   * Receive notification that a ContextMessage has been received.
800   */
801  public final class ContextMessageHandler implements EventHandler<ContextMessage> {
802    @Override
803    public void onNext(final ContextMessage message) {
804      LOG.log(Level.SEVERE, "Received ContextMessage:", message.get());
805      try (final LoggingScope ls =
806               loggingScopeFactory.contextMessageReceived(new String(message.get(), StandardCharsets.UTF_8))) {
807        if (JobDriver.this.contextMessageHandler != 0) {
808          final ContextMessageBridge contextMessageBridge = new ContextMessageBridge(message);
809          // if CLR implements the context message handler, handle it in CLR
810          LOG.log(Level.INFO, "Handling the event of context message in CLR bridge.");
811          NativeInterop.clrSystemContextMessageHandlerOnNext(JobDriver.this.contextMessageHandler,
812              contextMessageBridge);
813        }
814      }
815    }
816  }
817}