# Copyright 2008 The Apache Software Foundation Licensed under the # Apache License, Version 2.0 (the "License"); you may not use this # file except in compliance with the License. You may obtain a copy # of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless # required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. See the License for the specific language governing # permissions and limitations under the License. This package implements dynamic priority scheduling for MapReduce jobs. Overview -------- The purpose of this scheduler is to allow users to increase and decrease their queue priorities continuosly to meet the requirements of their current workloads. The scheduler is aware of the current demand and makes it more expensive to boost the priority under peak usage times. Thus users who move their workload to low usage times are rewarded with discounts. Priorities can only be boosted within a limited quota. All users are given a quota or a budget which is deducted periodically in configurable accounting intervals. How much of the budget is deducted is determined by a per-user spending rate, which may be modified at any time directly by the user. The cluster slots share allocated to a particular user is computed as that users spending rate over the sum of all spending rates in the same accounting period. Configuration ------------- This scheduler comprises two components, an accounting or resource allocation part that manages and bills for queue shares, and a scheduler that enforces the queue shares in the form of map and reduce slots of running jobs. Hadoop Configuration (e.g. hadoop-site.xml): mapred.jobtracker.taskScheduler This needs to be set to org.apache.hadoop.mapred.DynamicPriorityScheduler to use the dynamic scheduler. Scheduler Configuration: mapred.dynamic-scheduler.scheduler The Java path of the MapReduce scheduler that should enforce the allocated shares. Has been tested with (which is the default): org.apache.hadoop.mapred.PriorityScheduler mapred.priority-scheduler.acl-file Full path of ACL with syntax: separated by line feeds mapred.dynamic-scheduler.budget-file The full OS path of the file from which the budgets are read and stored. The syntax of this file is: separated by newlines where budget can be specified as a Java float. The file should not be edited directly, if the server is running, but through the servlet API to ensure proper synchronization. mapred.dynamic-scheduler.alloc-interval Allocation interval, when the scheduler rereads the spending rates and recalculates the cluster shares. Specified as seconds between allocations. Default is 20 seconds. Servlet API ---------- The queue should be managed through the Servlet REST API if the jobtracker server is running. It is installed at [job tracker URL]/scheduler operations supported: price i get the current price of the cluster (aggregate spending rates of queues with running or pending jobs) time get start time of server and current time in epoch units info=queue_to_query get info about queue (requires user or admin privilege> infos get info about all queues (requires admin privilege) addBudget=budget_to_add,queue=queue_to_change add budget to queue (requires admin privilege) setSpending=spending_to_set,queue=queue_to_change set spending rate of queue (requires user or admin privilege) addQueue=queue_to_add add new queue (requires admin privilege) removeQueue=queue_to_remove remove queue (requires admin privilege) Example: http://myhost:50030/scheduler?setSpending=0.01&queue=myqueue The Authorization header is used for signing The signature is created akin to the AWS Query Authentication scheme HMAC_SHA1("&user=×tamp=", key) For the servlet operations query path is everything that comes after /scheduler? in the url. For job submission the query path is just the empty string "". Job submissions also need to set the following job properties: -Dmapred.job.timestamp= -Dmapred.job.signature= -Dmapred.job.queue.name= Note queue must match the user submitting the job. Example python query -------------------------------- import base64 import hmac import sha import httplib, urllib import sys import time from popen2 import popen3 import os def hmac_sha1(data, key): return urllib.quote(base64.encodestring(hmac.new(key, data, sha).digest()).strip()) stdout, stdin, stderr = popen3("id -un") USER = stdout.read().strip() f = open(os.path.expanduser("~/.ssh/hadoop_key")) KEY = f.read().strip() f.close() f = open(os.path.expanduser("/etc/hadoop_server")) SERVER = f.read().strip() f.close() URL = "/scheduler" conn = httplib.HTTPConnection(SERVER) params = sys.argv[1] params = params + "&user=%s×tamp=%d" % (USER,long(time.time()*1000)) print params headers = {"Authorization": hmac_sha1(params, KEY)} print headers conn.request("GET",URL + "?" + params,None, headers) response = conn.getresponse() print response.status, response.reason data = response.read() conn.close() print data Example python job submission parameter generation -------------------------------------------------- import base64 import hmac import sha import httplib, urllib import sys import time import os from popen2 import popen3 def hmac_sha1(data, key): return urllib.quote(base64.encodestring(hmac.new(key, data, sha).digest()).strip()) stdout, stdin, stderr = popen3("id -un") USER = stdout.read().strip() f = open(os.path.expanduser("~/.ssh/hadoop_key")) KEY = f.read().strip() f.close() if len(sys.argv) > 1: params = sys.argv[1] else: params = "" timestamp = long(time.time()*1000) params = params + "&user=%s×tamp=%d" % (USER,timestamp) print "-Dmapred.job.timestamp=%d -Dmapred.job.signature=%s -Dmapred.job.queue.name=%s" % (timestamp, hmac_sha1(params, KEY), USER)