# Copyright 2008 The Apache Software Foundation Licensed under the
# Apache License, Version 2.0 (the "License"); you may not use this
# file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
# required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
This package implements fair scheduling for MapReduce jobs.
Fair scheduling is a method of assigning resources to jobs such that all jobs
get, on average, an equal share of resources over time. When there is a single
job running, that job uses the entire cluster. When other jobs are submitted,
tasks slots that free up are assigned to the new jobs, so that each job gets
roughly the same amount of CPU time. Unlike the default Hadoop scheduler, which
forms a queue of jobs, this lets short jobs finish in reasonable time while not
starving long jobs. It is also a reasonable way to share a cluster between a
number of users. Finally, fair sharing can also work with job priorities - the
priorities are used as weights to determine the fraction of total compute time
that each job should get.
In addition to providing fair sharing, the Fair Scheduler allows assigning
jobs to "pools" with guaranteed minimum shares. When a pool contains jobs,
it gets at least its minimum share, but when a pool does not need its full
capacity, the excess is shared between other running jobs. Thus pools are
a way to guarantee capacity for particular user groups while utilizing the
cluster efficiently when these users are not submitting any jobs. Within each
pool, fair sharing is used to share capacity between the running jobs. By
default the pool is set based on the queue.name property in the jobconf which
will be introduced with the Hadoop Resource Manager (JIRA 3445), but it's
possible to also have a pool per user or per Unix user group.
The fair scheduler lets all jobs run by default, but it is also possible to
limit the number of running jobs per user and per pool through the config
file. This can be useful when a user must submit hundreds of jobs at once,
or in general to improve performance if running too many jobs at once would
cause too much intermediate data to be created or too much context-switching.
Limiting the jobs does not cause any subsequently submitted jobs to fail, only
to wait in the sheduler's queue until some of the user's earlier jobs finish.
Jobs to run from each user/pool are chosen in order of priority and then submit
time, as in the default FIFO scheduler in Hadoop.
Finally, the fair scheduler provides several extension points where the basic
functionality can be extended. For example, the weight calculation can be
modified to give a priority boost to new jobs, implementing a "shortest job
first" like policy which will reduce response times for interactive jobs even
further.
--------------------------------------------------------------------------------
BUILDING:
In HADOOP_HOME, run ant package to build Hadoop and its contrib packages.
--------------------------------------------------------------------------------
INSTALLING:
To run the fair scheduler in your Hadoop installation, you need to put it on
the CLASSPATH. The easiest way is to copy the hadoop-*-fairscheduler.jar
from HADOOP_HOME/build/contrib/fairscheduler to HADOOP_HOME/lib. Alternatively
you can modify HADOOP_CLASSPATH to include this jar, in conf/hadoop-env.sh.
You will also need to set the following property in the Hadoop config file
(conf/hadoop-site.xml) to have Hadoop use the fair scheduler:
mapred.jobtracker.taskScheduler
org.apache.hadoop.mapred.FairScheduler
Once you restart the cluster, you can check that the fair scheduler is running
by going to http:///scheduler on the JobTracker's web UI. A
"job scheduler administration" page should be visible there. This page is
described in the Administration section.
--------------------------------------------------------------------------------
CONFIGURING:
The following properties can be set in hadoop-site.xml to configure the
scheduler:
mapred.fairscheduler.allocation.file:
Specifies an absolute path to an XML file which contains the allocations
for each pool, as well as the per-pool and per-user limits on number of
running jobs. If this property is not provided, allocations are not used.
This file must be in XML format, and can contain three types of elements:
- pool elements, which may contain elements for minMaps, minReduces and
maxRunningJobs (limit the number of jobs from the pool to run at once).
- user elements, which may contain a maxRunningJobs to limit jobs.
- A userMaxJobsDefault element, which sets the running job limit for any
users that do not have their own elements.
The following example file shows how to create each type of element:
5
5
6
3
This example creates a pool sample_pool with a guarantee of 5 map slots
and 5 reduce slots. It also limits the number of running jobs per user
to 3, except for sample_user, who can run 6 jobs concurrently.
Any pool not defined in the allocations file will have no guaranteed
capacity. Also, any pool or user with no max running jobs set in the file
will be allowed to run an unlimited number of jobs.
mapred.fairscheduler.assignmultiple:
Allows the scheduler to assign both a map task and a reduce task on each
heartbeat, which improves cluster throughput when there are many small
tasks to run. Boolean value, default: false.
mapred.fairscheduler.sizebasedweight:
Take into account job sizes in calculating their weights for fair sharing.
By default, weights are only based on job priorities. Setting this flag to
true will make them based on the size of the job (number of tasks needed)
as well, though not linearly (the weight will be proportional to the log
of the number of tasks needed). This lets larger jobs get larger fair
shares while still providing enough of a share to small jobs to let them
finish fast. Boolean value, default: false.
mapred.fairscheduler.poolnameproperty:
Specify which jobconf property is used to determine the pool that a job
belongs in. String, default: queue.name (the same property as the queue
name in the Hadoop Resource Manager, JIRA 3445). You can use user.name
or group.name to base it on the Unix user or Unix group for example.
mapred.fairscheduler.weightadjuster:
An extensibility point that lets you specify a class to adjust the weights
of running jobs. This class should implement the WeightAdjuster interface.
There is currently one example implementation - NewJobWeightBooster, which
increases the weight of jobs for the first 5 minutes of their lifetime
to let short jobs finish faster. To use it, set the weightadjuster property
to the full class name, org.apache.hadoop.mapred.NewJobWeightBooster.
NewJobWeightBooster itself provides two parameters for setting the duration
and boost factor - mapred.newjobweightbooster.factor (default 3) and
mapred.newjobweightbooster.duration (in milliseconds, default 300000 for 5
minutes).
mapred.fairscheduler.loadmanager:
An extensibility point that lets you specify a class that determines
how many maps and reduces can run on a given TaskTracker. This class should
implement the LoadManager interface. By default the task caps in the Hadoop
config file are used, but this option could be used to make the load based
on available memory and CPU utilization for example.
mapred.fairscheduler.taskselector:
An extensibility point that lets you specify a class that determines
which task from within a job to launch on a given tracker. This can be
used to change either the locality policy (e.g. keep some jobs within
a particular rack) or the speculative execution algorithm (select when to
launch speculative tasks). The default implementation uses Hadoop's
default algorithms from JobInProgress.
--------------------------------------------------------------------------------
ADMINISTRATION:
The fair scheduler provides support for administration at runtime through
two mechanisms. First, it is possible to modify pools' allocations and user
and pool running job limits at runtime by editing the allocation config file.
The scheduler will reload this file 10-15 seconds after it sees that it was
modified. Second, current jobs, pools, and fair shares can be examined through
the JobTracker's web interface, at http:///scheduler. On this
interface, it is also possible to modify jobs' priorities or move jobs from
one pool to another and see the effects on the fair shares (this requires
JavaScript). The following fields can be seen for each job on the web interface:
Submitted - Date and time job was submitted.
JobID, User, Name - Job identifiers as on the standard web UI.
Pool - Current pool of job. Select another value to move job to another pool.
Priority - Current priority. Select another value to change the job's priority.
Maps/Reduces Finished: Number of tasks finished / total tasks.
Maps/Reduces Running: Tasks currently running.
Map/Reduce Fair Share: The average number of task slots that this job should
have at any given time according to fair sharing. The actual number of
tasks will go up and down depending on how much compute time the job has
had, but on average it will get its fair share amount.
In addition, it is possible to turn on an "advanced" view for the web UI, by
going to http:///scheduler?advanced. This view shows four more
columns used for calculations internally:
Maps/Reduce Weight: Weight of the job in the fair sharing calculations. This
depends on priority and potentially also on job size and job age if the
sizebasedweight and NewJobWeightBooster are enabled.
Map/Reduce Deficit: The job's scheduling deficit in macine-seconds - the amount
of resources it should have gotten according to its fair share, minus how
many it actually got. Positive deficit means the job will be scheduled
again in the near future because it needs to catch up to its fair share.
The scheduler schedules jobs with higher deficit ahead of others. Please
see the Implementation section of this document for details.
Finally, the web interface provides a button for switching to FIFO scheduling,
at runtime, at the bottom of the page, in case this becomes necessary and it
is inconvenient to restart the MapReduce cluster.
--------------------------------------------------------------------------------
IMPLEMENTATION:
There are two aspects to implementing fair scheduling: Calculating each job's
fair share, and choosing which job to run when a task slot becomes available.
To select jobs to run, the scheduler then keeps track of a "deficit" for
each job - the difference between the amount of compute time it should have
gotten on an ideal scheduler, and the amount of compute time it actually got.
This is a measure of how "unfair" we've been to the job. Every few hundred
milliseconds, the scheduler updates the deficit of each job by looking at
how many tasks each job had running during this interval vs. its fair share.
Whenever a task slot becomes available, it is assigned to the job with the
highest deficit. There is one exception - if there were one or more jobs who
were not meeting their pool capacity guarantees, we only choose among these
"needy" jobs (based again on their deficit), to ensure that the scheduler
meets pool guarantees as soon as possible.
The fair shares are calculated by dividing the capacity of the cluster among
runnable jobs according to a "weight" for each job. By default the weight is
based on priority, with each level of priority having 2x higher weight than the
next (for example, VERY_HIGH has 4x the weight of NORMAL). However, weights can
also be based on job sizes and ages, as described in the Configuring section.
For jobs that are in a pool, fair shares also take into account the minimum
guarantee for that pool. This capacity is divided among the jobs in that pool
according again to their weights.
Finally, when limits on a user's running jobs or a pool's running jobs are in
place, we choose which jobs get to run by sorting all jobs in order of priority
and then submit time, as in the standard Hadoop scheduler. Any jobs that fall
after the user/pool's limit in this ordering are queued up and wait idle until
they can be run. During this time, they are ignored from the fair sharing
calculations and do not gain or lose deficit (their fair share is set to zero).