It is instructive to first remind us of how MapReduce works: Hadoop Map-Reduce schedules compute tasks on containers for executing map and reduce functions on record data. The basic structure of a Map-Reduce job is as follows. For each input block, schedule a map task that passes each internal record to a user-defined map function and materializes the output in key-sorted order. Further, assign a user-defined number of reduce tasks to non-overlapping portions of the key-space from the map output, then shuffle it across the network to where the respective reduce task is scheduled. For each reduce task, perform a global key-based sort on the shuffled data, group it by key and call the reduce function on each record group; storing the output in a durable fashion (i.e., HDFS).
From the perspective of the scheduler, a number of issues arise that must be appropriately handled in order to scale-out to massive datasets. First, each map task should be scheduled close to where the input block resides; ideally on the same machine or rack. Second, failures can occur at the task level at any step; requiring backup tasks to be scheduled or the job being aborted. Third, performance bottlenecks can cause an imbalance in the task-level progress. The scheduler must react to these stragglers by scheduling clones and incorporating the logical task that crosses the finish line first.
Anyone of these issues can limit the scale-out degree of a Map-Reduce job. In what follows, we prescribe a scheduler framework that provides task life-cycle management mechanisms. Using this framework, we developed a complete version of the Map-Reduce runtime that addresses the above issues. Our framework is designed around three components.
Below, we describe the client facing interfaces to these components. The core REEF control flow design is based on the reactive extensions (Rx), which enforce asynchronous message-passing method signatures. In Rx terms, interfaces are based on an observer pattern, which expose methods that accept messages from a (possibly remote) asynchronous caller.
A running REEF job with two RunningEvaluators and one RunningTask
The above figure presents the REEF components in terms of a running application, which is written in terms of a Driver and task-specific Task modules. The application code is packaged and submitted to a REEF client API, which in turn submits a REEF-AM configuration to YARN. The REEF-AM contains a runtime for launching the Driver and client libraries for requesting Evaluators and launching Tasks. When a request for Evaluators is made, the REEF-AM negotiates containers with the YARN-RM and launches an Evaluator runtime on the YARN-NM that hosts the allocated container. In turn, the Driver is given an Evaluator object reference, which it uses to submit an Task. The Driver is also given a Task object reference, which it may use to send messages to the Task running in the Evaluator. The REEF layer implements these communication channels and encodes the computational life-cycle as state transitions, which are surfaced to the Driver in the form of Rx messages.
States of Evaluator, Contexts, and Tasks
The Figure above describes the state transitions for (a) Evaluator and Context and (b) Task components. Each state transition is associated with an object reference that is surfaced to the Driver in an Rx-style interface. For instance, when the YARN-RM notifies the REEF-AM of an allocated container, the Driver is given an AllocatedEvaluator object; containing methods for adding configurations (i.e., for data services, see below) and file resources, and submit methods that bootstraps the Evaluator runtime on the YARN-NM. When an Evaluator bootstrap successfully completes, the Driver is given an ActiveContext object, which can be used to launch Tasks or to initiate a close, which triggers a shutdown event at the Evaluator runtime and a subsequent container release at the YARN-RM. If at any point a failure occurs, the Driver is passed a FailedEvaluator object, containing an exception trace when possible.
Recall that the Driver launches a Task on a submit method call from the ActiveContext reference. This state transition is denoted in the Figure above by the edge labeled submit, spanning the two state machines. The REEF-AM passes a RunningTask object to the Driver after receiving confirmation of a successful Task start or resume. The Driver may use the RunningTask reference to close or suspend the execution, triggering a CompletedTask or SuspendedTask object reference to the Driver. The SuspendedTask object contains a memento used to resume the execution on some (possibly alternative) ActiveContext. Exceptions during the Task execution are surfaced to the Driver in the form of a FailedTask, which contains the actual exception object.
A Task encapsulates the task work of a job. The client interface contains a single synchronous call method that takes an optional memento argument and returns a byte array, which will be packaged with the CompletedTask object surfaced to the Driver. An exception may be thrown at any point during the call method, returning control back to the Evaluator, which packages the exception and sends it to the Driver where it is surfaced as a FailedTask. The Evaluator periodically performs a heartbeat with the REEF-AM to convey its status information. A Task can optionally implement a method interface that, when called, returns a (bounded) byte array, which the Evaluator includes in its heartbeat to the REEF-AM and surfaced to the Driver.