001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.data.output;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ContextConfiguration;
023import org.apache.reef.driver.evaluator.AllocatedEvaluator;
024import org.apache.reef.driver.evaluator.EvaluatorRequest;
025import org.apache.reef.driver.evaluator.EvaluatorRequestor;
026import org.apache.reef.driver.task.TaskConfiguration;
027import org.apache.reef.io.data.output.OutputService;
028import org.apache.reef.tang.Configuration;
029import org.apache.reef.tang.annotations.Unit;
030import org.apache.reef.wake.EventHandler;
031import org.apache.reef.wake.time.event.StartTime;
032
033import javax.inject.Inject;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.logging.Level;
036import java.util.logging.Logger;
037
038/**
039 * The Driver code for the output service demo app.
040 */
041@Unit
042public final class OutputServiceDriver {
043  private static final Logger LOG = Logger.getLogger(OutputServiceDriver.class.getName());
044
045  /**
046   * Evaluator requestor object used to create new evaluator containers.
047   */
048  private final EvaluatorRequestor requestor;
049
050  /**
051   * Output service object.
052   */
053  private final OutputService outputService;
054
055  /**
056   * Sub-id for Tasks.
057   * This object grants different IDs to each task
058   * e.g. Task-0, Task-1, and so on.
059   */
060  private final AtomicInteger taskId = new AtomicInteger(0);
061
062  /**
063   * Job driver constructor - instantiated via TANG.
064   *
065   * @param requestor evaluator requestor object used to create new evaluator containers.
066   * @param outputService output service object.
067   */
068  @Inject
069  public OutputServiceDriver(final EvaluatorRequestor requestor,
070                             final OutputService outputService) {
071    LOG.log(Level.FINE, "Instantiated 'OutputServiceDriver'");
072    this.requestor = requestor;
073    this.outputService = outputService;
074  }
075
076  /**
077   * Handles the StartTime event: Request three Evaluators.
078   */
079  public final class StartHandler implements EventHandler<StartTime> {
080    @Override
081    public void onNext(final StartTime startTime) {
082      OutputServiceDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
083          .setNumber(3)
084          .setMemory(64)
085          .setNumberOfCores(1)
086          .build());
087      LOG.log(Level.INFO, "Requested Evaluator.");
088    }
089  }
090
091  /**
092   * Handles AllocatedEvaluator: Submit the output service and a context for it.
093   */
094  public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
095    @Override
096    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
097      LOG.log(Level.INFO, "Submitting Output Service to AllocatedEvaluator: {0}", allocatedEvaluator);
098      final Configuration contextConfiguration = ContextConfiguration.CONF
099          .set(ContextConfiguration.IDENTIFIER, "OutputServiceContext")
100          .build();
101      allocatedEvaluator.submitContextAndService(
102          contextConfiguration, outputService.getServiceConfiguration());
103    }
104  }
105
106  /**
107   * Handles ActiveContext: Submit the output service demo task.
108   */
109  public final class ActiveContextHandler implements EventHandler<ActiveContext> {
110    @Override
111    public void onNext(final ActiveContext activeContext) {
112      LOG.log(Level.INFO,
113          "Submitting OutputServiceREEF task to AllocatedEvaluator: {0}",
114          activeContext.getEvaluatorDescriptor());
115      final Configuration taskConfiguration = TaskConfiguration.CONF
116          .set(TaskConfiguration.IDENTIFIER, "Task-" + taskId.getAndIncrement())
117          .set(TaskConfiguration.TASK, OutputServiceTask.class)
118          .build();
119      activeContext.submitTask(taskConfiguration);
120    }
121  }
122}