#!/usr/bin/env python # ----------------------------------------------------------------------- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # ----------------------------------------------------------------------- # # This script starts a simulated DUCC cluster. All processes are real, so all # codepaths are executed. The simulation comes from running multiple DUCC agents # on a single node, allowing large-scale testing on small-scale clusters. All # the system-test jobs sleep rather than compute and as such are very small and # inexpensive, which allows large-scale testing to be performed on minimal # hardware. # import os import sys import time import getopt from threading import * import Queue #designed to run only here, relative to ducc_runtime DUCC_HOME = os.path.abspath(__file__ + '/../../..') sys.path.append(DUCC_HOME + '/admin') from ducc_util import DuccUtil from properties import Properties from ducc import Ducc from ducc_util import ThreadPool # multi-threaded start can overwhelm ssh if going to the same target host. We inject # a short sleep between starts to make it better. This is how long to sleep in secs. # Note: when 0.1 the ducc.properties merge step run by each agent caused problems/hangs global SLEEP_TIME SLEEP_TIME = 0.5 class StartSim(DuccUtil): def __init__(self): DuccUtil.__init__(self, True) def start_broker(self): if ( not self.automanage ): print "Broker is not automanaged, returning." broker_host = self.ducc_properties.get('ducc.broker.hostname') print 'broker host', broker_host lines = self.ssh(broker_host, True, "'", self.DUCC_HOME + '/admin/ducc.py', '-c', 'broker', '--simtest', "'") for i in range(0, 9): if ( self.is_amq_active() ): return print 'Waiting for broker', str(i) time.sleep(1) def run_local_agent(self, pnode, ip, memory ): if ( not self.verify_jvm() ): return self.verify_duccling() self.verify_limits() memory = int(memory) * 1024 * 1024 # to GB from KB CMDPARMS = [] CMDPARMS.append(self.java()) CMDPARMS.append('-Dducc.deploy.components=agent') CMDPARMS.append('-Dos.page.size=' + self.os_pagesize) CMDPARMS.append('-Dducc.deploy.configuration=' + self.DUCC_HOME + "/resources/ducc.properties") CMDPARMS.append('-Djava.library.path=' + self.DUCC_HOME) CMDPARMS.append('-Xmx100M') CMDPARMS.append('-XX:+HeapDumpOnOutOfMemoryError') ducc_heap_dump_path = self.ducc_properties.get('ducc.heap.dump.path') if ducc_heap_dump_path != None: CMDPARMS.append('-XX:HeapDumpPath='+ducc_heap_dump_path) CMDPARMS.append('-Dducc.agent.node.metrics.fake.memory.size=' + str(memory)) # Put virtual IP on command line for duplicate daemon detector CMDPARMS.append('-Dducc.agent.virtual=' + ip) CMDPARMS.append('org.apache.uima.ducc.common.main.DuccService') print "Start agent with pnode", pnode, "IP", ip, "memory", memory os.environ['DUCC_NODENAME'] = pnode os.environ['DUCC_IP'] = ip self.nohup(CMDPARMS) #self.spawn(' '.join(CMDPARMS)) # # Start admin components rm pm sm ws db or, on local node using Ducc.py # def startComponent(self, args): msgs = [] com, or_parms = args if ( com in ('ws', 'viz') ): node = self.webserver_node else: node = self.localhost if ( com == 'or' ): lines = self.ssh(node, True, "'", self.DUCC_HOME + '/admin/ducc.py', '-c', 'or', '-b', '--or_parms', or_parms, "'") else: lines = self.ssh(node, True, "'", self.DUCC_HOME + '/admin/ducc.py', '-c', com, '-b', "'") msgs.append(('Start', com, 'on', node)) while 1: line = lines.readline().strip() if ( not line ): break #print '[] ' + line if ( line.startswith('PID') ): toks = line.split(' ') # get the PID self.pidlock.acquire(); self.pids.put(com, self.localhost + ' ' + toks[1] + ' ' + self.localhost) self.pidlock.release(); lines.close() msgs.append((' PID', toks[1])) break return msgs # # Read the special nodelist and start "special" agents # # Nodelist has records compatible with java properties like this: # # index nodename memory-in-gb # # We generate fake IP addresses from 192.168.4.X where x is the index. # We generate fake node names from nodename-X where x is the index. # We pass the memory-in-gb to the agent as the fake memory it reports instead of real memory. # # We use Jerry's memory override for each agent to get it to falsely report the memory # instead of reading from the real machine. # def startOneAgent(self, args): response = [] node, cmd, mem, ip, pnode, index = args response.append(('Starting agent on', node, 'instance', index, 'as pseudo-node', pnode, 'IP', ip, 'memory', mem)) lines = self.ssh(node, True, "'", cmd, '--agent', '--memory', mem, '--addr', ip, '--pseudoname', pnode, "'") while 1: line = lines.readline().strip() if ( not line ): break #print '[]', line if ( line.startswith('PID')): toks = line.split(' ') # get the PID lines.close() response.append((' ... Started, PID', toks[1])) # Gottoa run on old python that doesn't know 'with' self.pidlock.acquire() self.pids.put(index, node + ' ' + toks[1] + ' ' + pnode) self.pidlock.release() break return response def startAgents(self, nodelist, instances): do_all = True if ( len(instances) > 0 ): do_all = False print 'Starting instances:', instances print "Starting from nodes in", nodelist props = Properties() props.load(nodelist) # first parse the node config into a list of tuples describing the simulated nodes # each tuple is like this: (index, nodename, mem) # where 'index' is a unique number (must be a number) # 'nodename' is the name of a real node, which will contain one or more agents # to simulate multiple nodes # 'mem' is the simulated memory the agent will report nodes = props.get('nodes').split() mems = props.get('memory').split() allnodes = [] ndx = 1 for node in nodes: # allow the index to be reset by-node so you can vary the number of # pseudo nodes on a real node without changing the indexes of others # if desired, e.g. to manage a custom ducc.nodes key = 'index.' + node if ( props.has_key(key) ): ndx = int(props.get(key)) # generate node names and associate simulated memory for mem in mems: count = props.get(node + '.' + mem) if ( count != None ): for i in range(0, int(count)): allnodes.append( (str(ndx), node, mem) ) ndx = ndx + 1 here = os.getcwd() cmd = os.path.abspath(sys.argv[0]) for (index, node, mem) in allnodes: if ( not do_all ): if ( not instances.has_key(index) ): continue ip = '192.168.4.' + index pnode = node + '-' + index self.threadpool.invoke(self.startOneAgent, node, cmd, mem, ip, pnode, index) time.sleep(SLEEP_TIME) def usage(self, msg): if (msg != None): if ( msg[0] != None ): msg = ' '.join(msg) print msg print "Usage:" print " start_sim [options]" print " If no options are given this help screen is shown." print "" print "Options:" print " -n --nodelist nodelist" print " The nodelist describing agents that is to be used. Not valid with --agent, --memory, --agent, or --pseudoname." print " A nodelist provides the parameters for starting agents. Lines are of the form:" print " index nodename mem-in-GB" print " 'index' is any unique number." print " 'nodename' is the physical node you want the agent placed on" print " 'mem-in-GB' is the amount of simulated memory the agent will report" print "" print " Example:" print " 1 node290 31" print " 2 node290 31" print " 9 node291 31" print " 10 node291 31" print " 17 node292 47" print " 18 node292 47" print " 25 node293 47" print " 26 node293 47" print "" print " -c --components component" print " Start the indicated component, must be one of", ' '.join(self.default_components), "or 'all'" print "" print " -i --instance instanceid" print " Start only this instance of an agent from the nodelist." print "" print " --agent" print " Start an agent only on localhost. All of --memory, --addr, and --pseudoname are required, -n is disallowed." print "" print " --memory mem" print " Use this memory override. Valid only with --agent." print "" print " --addr" print " Use this IP override. Valid only with --agent." print "" print " --pseudoname pseudoname" print " Use this as the hostname for the agent. Valid only with -a." print "" print " --nothreading" print " Disable multithreaded operation if it would otherwise be used" print "" print " -v, --version" print " Print the current DUCC version" sys.exit(1) def invalid(self, *msg): if ( msg != None ): if ( msg[0] != None ): msg = ' '.join(msg) print msg print "For usage run" print " start_sim -h" print 'or' print ' start_sim --help' sys.exit(1) def main(self, argv): print "Running as", os.getpid() if ( len(argv) == 0 ): self.usage(None) node_config = None components = {} instances = {} run_agent = False memory = None pseudoname = None IP = None or_parms = self.ducc_properties.get('ducc.orchestrator.start.type') try: opts, args = getopt.getopt(argv, 'c:i:n:vh?', ['component=', 'help', 'agent', 'memory=', 'instance=', 'addr=', 'pseudoname=', 'node-config=', 'version', 'warm', 'cold', 'nothreading']) except: self.invalid('Invalid arguments', ' '.join(argv)) for ( o, a ) in opts: if o in ( '-n', '--node-config' ): node_config = a elif o in ( '--agent' ): run_agent = True elif o in ( '-c', '--component' ): if ( a == 'all' ): for cmp in self.default_components: components[cmp] = cmp else: components[a] = a elif o in ( '--memory' ): memory = a elif o in ( '--addr' ): IP = a elif o in ( '-i', '--instance' ): instances[a] = a elif o in ( '--pseudoname' ): pseudoname = a elif o in ( '--nothreading' ): self.disable_threading() elif o in ( '-v', '--version' ): print self.version() os.exit(0) elif o in ( '--warm', '--cold' ): or_parms = o[2:] elif o in ( '-h', '--help' ): self.usage(None) elif ( o == '-?'): self.usage(None) else: self.invalid('bad args: ', ' '.join(argv)) self.pids = Properties() self.ducc = Ducc() if ( not self.verify_jvm() ): return self.set_duccling_version() self.verify_duccling() if ( os.path.exists('sim.pids') ): self.pids.load('sim.pids') if ( run_agent ): # # checks that aren't valid if we want run_agent # if ( node_config != None ): self.invalid("Nodelist is not compatible with agent") if ( (IP == None ) or ( memory == None) or ( pseudoname == None )): self.invalid("Missing IP, memory, or pseudoname") self. run_local_agent(pseudoname, IP, memory) sys.exit(0) else: self.pidlock = Lock() self.threadpool = ThreadPool(50) if ( (IP != None) or (memory != None) or ( pseudoname != None )) : self.invalid("Running with a nodelist is not compatible with running a single agent."); try: specials = ['broker', 'db'] sc = set(components) sb = set(specials) specials_only = False if ( sc.issubset(sb) ): read_pids = True print '-------- start broker' if ( components.get('broker') != None ): self.start_broker() print '-------- start database' if ( components.get('db') != None ): try: self.db_start() except Exception (e): # print e print sys.exc_info()[0], "Can't start the database." sys.exit(1) print '-------- specials_only', specials_only if ( specials_only ): return print '-------- start agents' if ( node_config != None ): self.startAgents(node_config, instances) for (com, com) in components.items(): if ( not com in specials ): # specials start with different rules self.threadpool.invoke(self.startComponent, com, or_parms) time.sleep(SLEEP_TIME) except: pass self.threadpool.quit() self.pids.write('sim.pids') if __name__ == "__main__": starter = StartSim() starter.main(sys.argv[1:])