#!/usr/bin/env python # ----------------------------------------------------------------------- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # ----------------------------------------------------------------------- # # This is the main DUCC system test driver. Parameters allow various # rates of error injection (both during analytic initialization and # execution) , selection of execution styles (AE, DD, SE- service # based), initialization times. See the usage() method below for details. # # This script assumes a job directory has been prepared by the "prepare" # script in this directory. # import os import sys import getopt import string import time import subprocess import shutil import signal from threading import * import Queue import random DUCC_HOME = os.path.abspath(__file__ + '/../../..') sys.path.append(DUCC_HOME + '/admin') from ducc_util import DuccUtil from properties import Properties from ducc import Ducc class DuccProcess(Thread): def __init__(self, runner, jobfile): Thread.__init__(self) self.runner = runner self.jobfile = jobfile self.DUCC_HOME = DUCC_HOME # # read file and get nthreads, memory, class # def read_jobfile(self): print 'Reading jobfile', self.jobfile f = open(self.jobfile); threads = '1' memory = '15' clz = 'normal' machines = None services = None jobtype = None for line in f: toks = line.strip().split('='); if ( toks[0].strip() == 'threads' ): if ( self.runner.thread_override == None ): threads = toks[1] else: threads = self.runner.thread_override elif (toks[0].strip() == 'class'): clz = toks[1] elif (toks[0].strip() == 'memory'): if ( self.runner.memory_override == None ): memory = toks[1] else: memory = self.runner.memory_override elif (toks[0].strip() == 'user'): user = toks[1] elif (toks[0].strip() == 'machines'): machines = toks[1] elif (toks[0].strip() == 'services'): services = toks[1].strip() elif (toks[0].strip() == 'type'): jobtype = toks[1].strip() answer = {} answer['threads'] = threads answer['memory'] = memory answer['class'] = clz answer['user'] = user answer['machines'] = machines answer['services'] = services answer['type'] = jobtype return answer def execute(self): os.environ['USER'] = self.user print 'CLASSPATH:', os.environ['CLASSPATH'] print 'Running', self.jobfile, 'as', os.environ['USER'], 'compression', self.runner.compression print self.cmd os.system(self.cmd) def run(self): os.environ['USER'] = self.user print 'CLASSPATH:', os.environ['CLASSPATH'] print "Running as", os.environ['USER'] print self.cmd ducc = subprocess.Popen(self.cmd, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) p = ducc.stdout realid = None while 1: line = p.readline().strip() print line if ( line.endswith('submitted') ) : toks = line.split() realid = toks[1] if ( not line ): ducc.wait() break if ( realid == None ): print 'Cannot verify job, no id' return #CMD = "./verify.py -j " + realid #ducc = subprocess.Popen(CMD, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) #p = ducc.stdout #while 1: # line = p.readline().strip() # if ( not line ): # ducc.wait() # return # print 'VERIFY ' + realid + ':', line if ( self.runner.observe ): self.runner.queue.get() # remove marker so main() can eventually exit self.runner.queue.task_done() def jdbloat(self): if ( self.runner.cr_getnext_bloat == 0 ): return '0' toss = random.randint(0, 100) if ( toss < self.runner.cr_getnext_bloat ): return str(self.runner.cr_getnext_bloat) return '0' def calculate_bloat(self, memory): # if we're testing our ability to contan bloat, we'll set xmx to double the # requested memory. the JP will start to allocate stuff in an infinite loop. # the agents are expected to catch this and kill the process before the # machine dies. # # sure hope that works! if ( (self.runner.init_bloat != None) or (self.runner.process_bloat != None) ): xmx = '-Xmx' + str(int(memory)*2) + 'G' if ( self.runner.init_bloat != None ): envparms = ' INIT_BLOAT=' + self.runner.init_bloat else: envparms = ' PROCESS_BLOAT=' + self.runner.process_bloat else: xmx = '-Xmx100M' envparms = '' return (xmx, envparms) def mkargs(self, arglist): return '"' + ' '.join(arglist) + '"' def submit(self): print 'SUBMIT', self.jobfile args = None HERE = os.getcwd(); cr = 'org.apache.uima.ducc.test.randomsleep.FixedSleepCR' parms = self.read_jobfile() nthreads = parms['threads'] memory = parms['memory'] pclass = parms['class'] user = parms['user'] machines = parms['machines'] services = parms['services'] jobtype = parms['type'] (process_xmx, bloat_parms) = self.calculate_bloat(memory) driver_args = [] process_args = [] if ( self.runner.style == 'AE' ): ae = 'org.apache.uima.ducc.test.randomsleep.FixedSleepAE' elif ( self.runner.style == 'DD' ): if ( self.runner.descriptor_as_file ): dd = self.DUCC_HOME + '/examples/simple/resources/randomsleep/DDSleepDescriptor.xml' else: dd = 'org.apache.uima.ducc.test.randomsleep.DDSleepDescriptor' else: ae = 'UimaAsFailAgg_' + services if ( self.runner.use_http ): plain_broker_url = 'http://' + self.runner.broker_host + ':8081' else: plain_broker_url = self.runner.broker_protocol + '://' + self.runner.broker_host + ':' + self.runner.broker_port cr_parms = '"jobfile=' + self.jobfile + ' compression=' + self.runner.compression + '"' process_args.append(process_xmx) process_args.append('-DdefaultBrokerURL=' + plain_broker_url ) if ( self.runner.system == 'Darwin' ): # Keep JP / JD processes from stealing focus on Mac process_args.append('-Djava.awt.headless=true') driver_args.append('-Djava.awt.headless=true') driver_args.append('-Xmx500M') if ( self.runner.jd_uima_log != None ): driver_args.append(' -Djava.util.logging.config.file=' + self.runner.jd_uima_log) if ( self.runner.jp_uima_log != None ): driver_args.append(' -Djava.util.logging.config.file=' + self.runner.jp_uima_log) jvm_driver_args = self.mkargs(driver_args) jvm_process_args = self.mkargs(process_args) print 'jvm_driver_args', jvm_driver_args print 'jvm_process_args', jvm_process_args CMD = os.environ['JAVA_HOME'] + '/bin/java' CMD = CMD + ' ' + self.runner.submit_package + '.cli.DuccJobSubmit' CMD = CMD + ' --description ' + '"' + self.jobfile + '[' + self.runner.style + ']"' CMD = CMD + ' --driver_descriptor_CR ' + cr CMD = CMD + ' --driver_descriptor_CR_overrides ' + cr_parms CMD = CMD + ' --driver_jvm_args ' + jvm_driver_args if ( self.runner.style == 'DD' ): CMD = CMD + ' --process_DD ' + dd else: # ae and se CMD = CMD + ' --process_descriptor_AE ' + ae if ( self.runner.style == 'SE' ): CMD = CMD + ' --service_dependency UIMA-AS:FixedSleepAE_'+ services + ':' + plain_broker_url #CMD = CMD + ' --working_directory ' + working_dir CMD = CMD + ' --process_memory_size ' + memory CMD = CMD + ' --classpath ' + self.runner.examples_classpath CMD = CMD + ' --process_jvm_args ' + jvm_process_args CMD = CMD + ' --process_pipeline_count ' + nthreads CMD = CMD + ' --scheduling_class ' + pclass CMD = CMD + ' --process_per_item_time_max ' + self.runner.process_timeout # in minutes CMD = CMD + ' --process_initialization_failures_cap ' + self.runner.init_fail_cap if ( self.runner.init_timeout > 0 ): CMD = CMD + ' --process_initialization_time_max ' + str(self.runner.init_timeout) CMD = CMD + ' --environment ' \ + '"' \ + ' AE_INIT_TIME=' + str(self.runner.init_time) \ + ' AE_INIT_RANGE=' + str(self.runner.init_range) \ + ' AE_INIT_EXIT=' + str(self.runner.ae_init_exit) \ + ' AE_INIT_ERROR=' + str(self.runner.ae_init_error) \ + ' AE_RUNTIME_EXIT=' + str(self.runner.ae_runtime_exit) \ + ' AE_RUNTIME_ERROR=' + str(self.runner.ae_runtime_error) \ + ' CR_INIT_EXIT=' + str(self.runner.cr_init_exit) \ + ' CR_INIT_ERROR=' + str(self.runner.cr_init_error) \ + ' CR_RUNTIME_EXIT=' + str(self.runner.cr_runtime_exit) \ + ' CR_RUNTIME_ERROR=' + str(self.runner.cr_runtime_error) \ + bloat_parms \ + ' CR_GETNEXT_BLOAT=' + self.jdbloat() \ + ' LD_LIBRARY_PATH=/a/bogus/path' \ + '"' if ( self.runner.max_machines == 0 or jobtype == 'reserve' ): if ( machines != None ): CMD = CMD + ' --process_deployments_max ' + machines elif (self.runner.max_machines != -1 ): CMD = CMD + ' --process_deployments_max ' + self.runner.max_machines if ( self.runner.observe ): CMD = CMD + ' --wait_for_completion' self.user = user self.cmd = CMD class ServiceThread(Thread): def __init__(self, cmd): Thread.__init__(self) self.cmd = cmd self.terminated = False def stop_service(self): self.terminated = True #os.system('kill -2 ' + str(self.svc.pid)) self.svc.send_signal(2) def run(self): print 'Starting service:', self.cmd self.svc = subprocess.Popen(self.cmd, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) p = self.svc.stdout while 1: line = p.readline().strip() if ( (not line) or ( line == '') ): if ( self.terminated ): return print 'VERIFY :' + line + ':' class ServiceStarter(DuccUtil): def __init__(self, runner): DuccUtil.__init__(self) self.runner = runner def gen_service(self, svcid, autostart): os.environ['USER'] = os.environ['LOGNAME'] # make sure I'm me - after submit may not be if ( self.runner.use_http ): plain_broker_url = 'http://' + self.broker_host + ':8081' else: plain_broker_url = self.broker_protocol + '://' + self.broker_host + ':' + self.broker_port props = Properties() props.put('description', 'Test Service ' + svcid) props.put('process_descriptor_DD', self.DUCC_HOME + '/examples/simple/resources/service/Service_FixedSleep_' + svcid + '.xml') props.put('process_memory_size', '15') props.put('service_linger', '60000') props.put('classpath', self.runner.examples_classpath); props.put('process_jvm_args', '-Xmx100M -DdefaultBrokerURL=' + plain_broker_url) props.put('environment', 'AE_INIT_TIME=5000 AE_INIT_RANGE=1000 INIT_ERROR=0 LD_LIBRARY_PATH=/yet/a/nother/dumb/path') props.put('scheduling_class', 'fixed') props.put('working_directory', os.getcwd()) props.put('service_ping_arguments', 'broker-jmx-port=' + self.broker_jmx_port) if ( autostart ): props.put('autostart', 'true') svcfile = svcid + '.gen.svc' props.write(svcfile) return svcfile def register_service(self, svcid, instances, start, autostart): os.environ['USER'] = os.environ['LOGNAME'] # make sure I'm me - after submit may not be svcfile = self.gen_service(svcid, autostart) CMD = self.DUCC_HOME + '/bin/ducc_services --register ' + svcfile + ' --instances ' + instances lines = self.popen(CMD) for line in lines: line = line.strip() print 'REGISTER', line toks = line.split() if ( (toks[0] == 'Service') and (toks[2] == 'succeeded') ): print 'Service registered as service', toks[7] if ( start ): print 'Starting registered service instance', svcid, 'service id', toks[7] os.system(self.DUCC_HOME + '/bin/ducc_services --start ' + toks[6]) return toks[6] print 'Cannot register service', svcid, ':', line sys.exit(1) def start_services(self, svcfile): os.environ['USER'] = os.environ['LOGNAME'] # make sure I'm me - after submit may not be svcprops = Properties() svcprops.load(svcfile) all_services = {} register = svcprops.get('register') self.registered = {} if ( register != None ): register = register.strip() # make a map with the service id as key and the number of instances as val toks = register.split() for t in toks: t = t.strip() if ( self.registered.has_key(t) or all_services.has_key(t) ): print "Duplicate registered service", t sys.exit(1) self.registered[t] = None all_services[t] = None start = svcprops.get('start') self.started = {} if ( start != None ): start = start.strip() # make a map with the service id as key and the number of instances as val toks = start.split() for t in toks: t = t.strip() if ( self.started.has_key(t) ): print "Duplicate started service", t sys.exit(1) if ( not self.registered.has_key(t) ): print "Trying to start service", t, "but it is not registered." self.started[t] = None auto = svcprops.get('autostart') self.autostarted = {} if ( auto != None ): auto = auto.strip() # make a map with the service id as key and the number of instances as val toks = auto.split() for t in toks: t = t.strip() if ( self.autostarted.has_key(t) ): print "Duplicate auto-started service", t sys.exit(1) if ( not self.registered.has_key(t) ): print "Trying to start service", t, "but it is not registered." self.autostarted[t] = None for (k, v) in self.registered.items(): instances = svcprops.get("instances_" + k) if ( instances == None ): print "Missing instances for registered job", k print "Registering service", k, "with", instances, "instances" start = self.started.has_key(k) autostart = self.autostarted.has_key(k) service = self.register_service(k, instances, start, autostart) svcs = self.registered[k] if ( svcs == None ): svcs = [] self.registered[k] = svcs svcs.append(service) os.system(self.DUCC_HOME + '/bin/ducc_services --query') def stop_services(self): os.environ['USER'] = os.environ['LOGNAME'] # make sure I'm me - after submit may not be for (k, v) in self.registered.items(): for id in v: print 'Unregistering', id os.system(self.DUCC_HOME + "/bin/ducc_services --unregister " + id) class RunDucc(DuccUtil): def run_batch(self): counter = 0 running = False bfile = self.test_dir + '/' + self.batchfile if ( not os.path.exists(bfile) ): print 'File', bfile, 'does not exist.' sys.exit(1) f = open(bfile) for line in f: print '----', line.strip() if ( line[0] == '#' ): continue if ( line[0:2] == 's ' ): running = True toks = line.split() if ( toks[1] == '-c' ): jobfile = toks[3] self.compression = toks[2] else: jobfile = toks[1] self.compression = '1' ducc_process = DuccProcess(self, jobfile) ducc_process.submit() if ( self.observe ) : self.queue.put(jobfile) # any old marker will do ducc_process.start() else: ducc_process.execute() continue if ( not running ): continue if ( line[0:2] == 'x ' ): sys.exit(0) continue if ( line[0:6] == '[sleep' ): #toks = string.translate(line, None, '[]').split() toks = line.strip().strip('[]').split() print toks t = toks[1].strip() # we're not going to try millisecond sleep - it's probably overdesigned # to do that anyway. if ( t[-1:] == 'S' ): delay = int(t.strip('S')) elif ( t[-1:] == 'M' ): delay = int(t.strip('M')) * 60 else: delay = int(t); print 'SLEEP', str(delay) time.sleep(delay) continue # these next aren't supported in "ducc" mode if ( line[0:3] == 'qm '): continue if ( line[0:3] == 'qj '): continue if ( line[0:3] == 'qc '): continue def usage(self, msg): if ( msg != None ): print msg print 'Usage:' print ' runducc.py [optons]' print 'Options:' print ' --AE' print ' Specifies to run this as a single CR and AE.' print '' print ' --DD' print ' Specifies to run this as CR + CM / AE / CC pipeline.' print '' print ' --SE service-startup-config' print ' Specifies to run this with the AE as a delegate service. The required parameter specifies' print ' a service startup script.' print '' print ' --FILE' print ' Use DD descriptor in filesystem, not as resource in jar file.' print '' print ' --http' print ' Use HTTP instead of tcp for services' print '' print ' -d, --directory dir' print ' This is the directory with the test files and configuration. Required' print '' print ' -b, --batchfile file' print ' This is the batch file describing the submissions. Required.' print '' print ' -i, --init_time milliseconds' print ' This is the AE initialization minimum time in seconds. Default:', self.init_time print '' print ' --init_timeout minutes' print ' Max time in minutes NOTE MINUTES a process is allowed to initialize. Best used n conjunction with careful choice of' print ' -i and -r' print '' print ' --init_fail_cap number-of-failures.' print ' This is the max init failures tolerated before the system starts to cap processes. Default:', self.init_fail_cap print '' print ' --IB' print ' The JP will leak in init() until DUCC (hopefully) kills us' print '' print ' --PB' print ' The JP will leak in process() until DUCC (hopefully) kills us' print '' print ' --TO number-of-threads' print ' Thread-override: force this number of threads regardless of what is in job spec.' print '' print ' -r, --range seconds' print ' This is the AE initializion time range over base in milliseconds. Default:', self.init_range print ' Init time is -i value + random[0, -rvalue]' print '' print ' -m, --memory_override mem-in-GB' print ' Use this instead of what is in the props file. Default: None' print '' print ' -n, --nmachines_override process_deployments_max' print '' print ' -o, --observe' print ' Specifies that we submit in keepalive mode and observe(watch) the jobs, creating a dir with outputs. Default:', self.observe print ' If specified, we run verification against the results.' print '' print ' -p, --process_timeout sec' print ' Process timeout, in minutes. Default:', self.process_timeout print '' print ' --jd_uima_log log-properties' print ' If specified, use the indicated properties file for JD UIMA/UIMA-AS logging. Default:', self.jd_uima_log print '' print ' --jp_uima_log log-properties' print ' If specified, use the indicated properties file for JP UIMA/UIMA-AS logging. Default:', self.jp_uima_log print '' print ' -q CR Probality it will leak on each getNext. Default', self.cr_getnext_bloat print '' print ' -s' print ' AE Probability that a JP will spontaneously exit during initialization. Default:', self.ae_init_exit print '' print ' -t' print ' AE Probability that a JP will throw an exception during initialization. Default:', self.ae_init_error print '' print ' -u' print ' AE Probability that a JP will spontaneously exit in the process method. Default:', self.ae_runtime_exit print '' print ' -v' print ' AE Probability that a JP will throw an exception in the process method. Default:', self.ae_runtime_error print '' print ' -w' print ' CR Probability that a JD will spontaneously exit during initialization. Default:', self.cr_init_exit print '' print ' -x' print ' CR Probability that a JD will throw an exception during initialization. Default:', self.cr_init_error print '' print ' -y' print ' CR Probability that a JD will spontaneously exit in the process method. Default:', self.cr_runtime_exit print '' print ' -z' print ' CR Probability that a JD will throw an exception in the process method. Default:', self.cr_runtime_error print '' print 'We run with DUCC_HOME set to', self.DUCC_HOME sys.exit(1) def main(self, argv): self.test_dir = None self.batchfile = None self.observe = False self.ae_init_exit = 0 # -s int 0-100 self.ae_init_error = 0 # -t int 0-100 self.ae_runtime_exit = 0.0 # -u float self.ae_runtime_error = 0.0 # -v float self.cr_init_exit = 0 # -w int 0-100 self.cr_init_error = 0 # -x int 0-100 self.cr_runtime_exit = 0.0 # -y float self.cr_runtime_error = 0.0 # -z float self.cr_getnext_bloat = 0 # -q jd leakage self.init_fail_cap = '99' self.memory_override = None self.init_time = 10000 self.init_range = 1000 self.init_timeout = 0 self.process_timeout = str(60*24) # 24 hour default - nothing in current megas will fail on this self.style = 'AE' self.service_pid = None self.init_bloat = None self.process_bloat = None self.service_startup = None self.jd_uima_log = None self.jp_uima_log = None self.submit_package = 'org.apache.uima.ducc' self.max_machines = 0 self.use_http = False self.descriptor_as_file = False self.thread_override = None try: opts, args = getopt.getopt(argv, 'b:d:fi:m:n:op:q:r:s:t:u:v:w:x:y:z:?h', ['AE', 'DD', 'file', 'SE=', 'IB=', 'PB=', 'directory=', 'batchfile=', 'init_time=', 'init_fail_cap=', 'range=', 'memory_override=', 'nmachines=', 'process_timeout=', 'init_timeout=', 'observe' 'jd_uima_log=', 'jp_uima_log=', 'http', 'threads=' ]) except: print "Unknown option" self.usage(None) for ( o, a ) in opts: print o, a if o in ('-d', '--directory'): self.test_dir = a elif o in ('-b', '--batchfile'): self.batchfile = a elif o in ('-i', '--init_time'): self.init_time = int(a) * 1000 elif o in ('-i', '--init_fail_cap'): self.init_fail_cap = a elif o in ('-r', '--range'): self.init_range = int(a) * 1000 elif o in ('-m', '--memory_override'): self.memory_override = a elif o in ('-n', '--nmachines'): self.max_machines = int(a) # force ugly failure if not a number self.max_machines = a elif o in ('-p', '--process_timeout'): self.process_timeout = a elif o in ('-o', '--observe' ): self.observe = True elif o in ('--init_timeout' ): self.init_timeout = int(a) elif o in ('--jd_uima_log' ): self.jd_uima_log = a elif o in ('--jp_uima_log' ): self.jp_uima_log = a elif o in ('--AE'): self.style = 'AE' elif o in ('--DD'): self.style = 'DD' elif o in ('--SE'): self.style = 'SE' self.service_startup = a self.observe = True elif o in ( '-f', '--file'): self.descriptor_as_file = True elif o in ('--http'): self.use_http = True elif o in ('--IB'): self.init_bloat = a elif o in ('--PB'): self.process_bloat = a elif o in ('--threads'): self.thread_override = a elif ( o == '-q'): self.cr_getnext_bloat = int(a) elif ( o == '-s'): self.ae_init_exit = int(a) elif ( o == '-t'): self.ae_init_error = int(a) elif ( o == '-u'): self.ae_runtime_exit = float(a) elif ( o == '-v'): self.ae_runtime_error = float(a) elif ( o == '-w'): self.cr_init_exit = int(a) elif ( o == '-x'): self.cr_init_error = int(a) elif ( o == '-y'): self.cr_runtime_exit = float(a) elif ( o == '-z'): self.cr_runtime_error = float(a) elif ( o == '-?'): self.usage(None) else: print 'Invalud argument:', o self.usage(None) if ( self.test_dir == None ): self.usage("Missing test_dir") print 'Running with' print ' test_dir :', self.test_dir print ' batchfile :', self.batchfile print ' style :', self.style print ' descriptor as file :', self.descriptor_as_file print ' http :', self.use_http print ' init-time :', self.init_time / 1000 print ' init-range :', self.init_range / 1000 print ' init-timeout :', self.init_timeout print ' init-bloat :', self.init_bloat print ' process-bloat :', self.process_bloat print ' observe :', self.observe print ' ae_init_exit :', self.ae_init_exit print ' ae_init_error :', self.ae_init_error print ' ae_runtime_exit :', self.ae_runtime_exit print ' ae_runtime_error :', self.ae_runtime_error print ' cr_init_exit :', self.cr_init_exit print ' cr_init_error :', self.cr_init_error print ' cr_runtime_exit :', self.cr_runtime_exit print ' cr_runtime_error :', self.cr_runtime_error print ' cr_getnext_bloat :', self.cr_getnext_bloat print ' process_timeout :', self.process_timeout print ' memory_override :', self.memory_override print ' max_machines :', self.max_machines print ' jd_uima_log :', self.jd_uima_log print ' jp_uima_log :', self.jp_uima_log print ' DUCC_HOME :', self.DUCC_HOME print ' Thread override :', self.thread_override self.submit_package = 'org.apache.uima.ducc' cp = [] cp.append(self.DUCC_HOME + '/lib/uima-ducc/examples/*') cp.append(self.DUCC_HOME + '/apache-uima/lib/*') cp.append(self.DUCC_HOME + '/apache-uima/apache-activemq/lib/*') cp.append(self.DUCC_HOME + '/apache-uima/apache-activemq/lib/optional/*') cp.append(self.DUCC_HOME + '/examples/simple/resources/service') self.examples_classpath = ':'.join(cp) if ( self.style == 'SE' ): if ( self.service_startup == None ): usage("Missing service startup file") svcfile = self.test_dir + '/' + self.service_startup service_starter = ServiceStarter(self); service_starter.start_services(svcfile) print 'Pausing a bit' time.sleep(15) os.system(self.DUCC_HOME + '/bin/ducc_services --query') #service_starter.stop_services() #return; os.environ['CLASSPATH'] = self.DUCC_HOME + "/lib/uima-ducc-cli.jar" if ( self.observe ): self.queue = Queue.Queue() self.run_batch() if ( self.observe ): self.queue.join() print 'All threads returned' if ( self.style == 'SE' ): service_starter.stop_services() # -------------------------------------------------------------------------------- # -------------------------------------------------------------------------------- if __name__ == "__main__": runducc = RunDucc() runducc.main(sys.argv[1:])