#!/usr/bin/env python # ----------------------------------------------------------------------- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # ----------------------------------------------------------------------- import os import sys import time import getopt import threading import traceback from ducc_util import DuccUtil from properties import Properties from local_hooks import verify_slave_node from local_hooks import verify_master_node from ducc import Ducc from ducc_util import ThreadPool from ducc_base import find_ducc_home class StartDucc(DuccUtil): def __init__(self): DuccUtil.__init__(self, True) def start_broker(self): broker_host = self.ducc_properties.get('ducc.broker.hostname') print 'Starting broker on', broker_host lines = self.ssh(broker_host, True, "'", self.DUCC_HOME + '/admin/ducc.py', '-c', 'broker', "'") while 1: line = lines.readline().strip() if ( not line ): break #print '[] ' + line if ( line.startswith('PID') ): toks = line.split(' ') # get the PID print "Broker on", broker_host, 'PID', toks[1] self.pids.put('broker@' + broker_host, toks[1]) lines.close() break for i in range(0, 9): if ( self.is_amq_active() ): return print 'Waiting for broker .....', str(i) time.sleep(1) def start_component(self, args): ducc, component, or_parms = args msgs = [] node = self.ducc_properties.get('ducc.head') com = component if ( com.find('@') >= 0 ): com, node = com.split('@') if ( ( com == 'ws' ) and ( node == 'local' ) and ( self.webserver_node != 'localhost' )): if ( self.webserver_node != None ): node = self.webserver_node component = com + '@' + node if ((com in self.default_components) or ( com == 'agent')) : msgs.append((node, 'Starting', com)) else: msgs.append(('Unrecognized component', component)) return msgs if ( or_parms == None ): or_parms = '--or_parms=' else: or_parms = '--or_parms=' + or_parms if ( node == 'local' ): node = self.localhost lines = self.ssh(node, True, "'", self.DUCC_HOME + '/admin/ducc.py', '-c', com, '-b', or_parms, '-d', str(time.time()), '--nodup', "'") # we'll capture anything that the python shell spews because it may be useful, and then drop the # pipe when we see a PID message while 1: line = lines.readline().strip() if ( not line ): break #msgs.append(('[]', line)) if ( line.startswith('PID') ): toks = line.split(' ') # get the PID msgs.append((' PID', toks[1])) self.pids.put(com + '@' + node, toks[1]) lines.close() break if ( line.startswith('WARN') ): msgs.append((' ', line)) if ( com in self.default_components ): # tracks where the management processes are self.pidlock.acquire() self.pids.put(com, com + '@' + node) self.pidlock.release() return msgs def start_one_agent(self, args): host = args[0] msgs = [] spacer = ' ' msgs.append((host, "")) lines = self.ssh(host, True, "'", self.DUCC_HOME + '/admin/ducc.py', '-c' 'agent', '-b', '-d', str(time.time()), '--nodup', "'") for line in lines: line = line.strip() # print '[]', host, line # msgs.append(('[l]', line)) if ( line.startswith('PID') ): toks = line.split(' ') pid = toks[1] self.pidlock.acquire() self.pids.put('agent@' + host, pid) self.pidlock.release() lines.close() msgs.append((spacer, 'DUCC Agent started PID', pid)) break if ( 'tty' in line ): # ssh junk if mesg is set continue toks = line.split() sshmsgs = self.ssh_ok(host, line ) if ( sshmsgs != None ): for m in sshmsgs: print '[S]', m if ( toks[0] == 'NOTOK' ): msgs.append((spacer, 'NOTOK Not started:', ' '.join(toks[1:]))) else: msgs.append((spacer, line)) return msgs def verify_required_directories(self): for dir in ('history', 'state', 'logs'): d = self.DUCC_HOME + '/' + dir if ( not os.path.exists(d) ): print "Initializing", d os.mkdir(d) def usage(self, *msg): if ( msg[0] != None ): print ' '.join(msg) print "Usage:" print " start_ducc [options]" print " If no options are given, all DUCC processes are started, using the default" print " nodelist, DUCC_HOME/resources/ducc.nodes. " print "" print "Options:" print " -n --nodelist nodefile" print " Start agents on the nodes in the nodefile. Multiple nodefiles may be specified:" print "" print " start_ducc -n foo.nodes -n bar.nodes -n baz.nodes" print "" print " -c, --component component" print " Start a specific DUCC component, optionally on a specific node. If the component name" print " is qualified with a nodename, the component is started on that node. To qualify a" print " component name with a destination node, use the notation component@nodename." print " Multiple components may be specified:" print "" print " start_ducc -c sm -c pm -c rm@node1 -c or@node2 -c agent@remote1 -c agent@remote2" print "" print " Components include:" print " rm - resource manager" print " or - orchestrator" print " pm - process manager" print " sm - services manager" print " ws - web server" print " agent - node agent" print ' head = { or, pm, rm, sm, ws, db, broker }' print "" print " --nothreading" print " Disable multithreaded operation if it would otherwise be used" print "" print " Choose none or one of the following two options, which is only effective when the orchestrator (or) component is started." print " When specified here it supersedes that specified for ducc.orchestrator.start.type in ducc.properties." print " When not specified here or in ducc.properties, the default is --warm." print "" print " --warm" print " Do NOT force active Jobs, Services, and Reservations to Completed state." print "" print " --cold" print " Force active Jobs, Services, and Reservations to Completed state." print "" print "Examples:" print " Start all DUCC processes, using custom nodelists:" print " start_ducc -n foo.nodes -n bar.nodes" print "" print " Start just agents on a specific set of nodes:" print " start_ducc -n foo.nodes -n bar.nodes" print "" print " Start the webserver on node 'bingle':" print " start_ducc -c ws@bingle" sys.exit(1) def invalid(self, *msg): if ( msg[0] != None ): print print ' '.join(msg) print print "For usage run" print " start_ducc -h" print 'or' print ' start_ducc --help' sys.exit(1) def main(self, argv): self.verify_head() if ( not self.verify_jvm() ): sys.exit(1); self.set_duccling_version() nodefiles = [] components = [] or_parms = self.ducc_properties.get('ducc.orchestrator.start.type') self.pids = Properties() self.pids.load_if_exists(self.pid_file) try: opts, args = getopt.getopt(argv, 'c:mn:sh?v', ['component=', 'help', 'nodelist=', 'cold', 'warm', 'nothreading']) except: self.invalid('Invalid arguments', ' '.join(argv)) if (len(args) > 0): self.invalid('Invalid extra args: ', ' '.join(args)) for ( o, a ) in opts: if o in ( '-c', '--component' ): if (a.strip() == 'head'): components.append('or') components.append('pm') components.append('rm') components.append('sm') components.append('ws') components.append('db') components.append('broker') else: components.append(a) elif o in ( '-n', '--nodelist' ): nodefiles.append(a) elif o in ( '--nothreading' ): self.disable_threading() elif o in ( '--cold', '--warm' ): or_parms = o[2:] # (strip the leading --) elif ( o == '-v'): print self.version() sys.exit(0) elif o in ( '-h', '--help' ): self.usage(None) elif ( o == '-?'): self.usage(None) else: self.invalid('bad arg: ', o, 'in:', ' '.join(argv)) if not self.installed(): print "Head node is not initialized. Have you run ducc_post_install?" return environ = self.show_ducc_environment() for e in environ: print e # no args, or just -s - make equivalent of -management and -nodefile=DUCC.HOME/resources/ducc.nodes if ( (len(components) == 0) and (len(nodefiles) == 0 ) ) : nodefiles = self.default_nodefiles components = self.default_components self.verify_required_directories() if ( not verify_master_node(self.ducc_properties) ): print 'FAIL: Cannot run javac to run java verification' return # make sure all the nodefiles exist and are readable ok = True nodes = {} n_nodes = 0 for n in nodefiles: n_nodes, nodes = self.read_nodefile(n, nodes) for ( nf, nl ) in nodes.items(): if ( nl == None ): print "Can't read nodefile", nf ok = False if ok and (nodefiles == self.default_nodefiles): if self.verify_class_configuration(nodefiles[0], False): print "OK: Class configuration checked" else: print "NOTOK: Bad configuration, cannot start." ok = False if ( not ok ): sys.exit(1) if ( not self.verify_limits() ): print "Limits too low to run DUCC" sys.exit(1) # activeMQ needs to be started externally before starting any DUCC processes if ( self.automanage and ('broker' in components) ): if ( self.is_amq_active() ): print 'ActiveMQ broker is already running on host and port:', self.broker_host + ':' + self.broker_port, 'NOT restarting' else: try: self.start_broker() except: print sys.exc_info()[0], "DUCC may not be started correctly." sys.exit(1) if ( 'db' in components ): try: if ( not self.db_start() ): print "Failed to start or connect to the database." sys.exit(1) except Exception (e): # print e print sys.exc_info()[0], "Can't start the database." sys.exit(1) if ( self.is_amq_active() ): print 'ActiveMQ broker is found on configured host and port:', self.broker_host + ':' + self.broker_port else: print 'ActiveMQ broker is required but cannot be found on', self.broker_host + ':' + self.broker_port sys.exit(1) ducc = Ducc() print "Starting", n_nodes, "agents" self.threadpool = ThreadPool(n_nodes + 5) # a few more for the head processes self.pidlock = threading.Lock() #start 'or' first to field system log requests if ( len(components) != 0 ): for com in components: if ( com in ('or') ): try: self.threadpool.invoke(self.start_component, ducc, com, or_parms) #self.start_component(ducc, com, or_parms) except: self.threadpool.quit() print sys.exc_info()[0], "DUCC may not be started correctly." sys.exit(1) # give 'or' a small head start time.sleep(2) for (nodefile, nodelist) in nodes.items(): print '********** Starting agents from file', nodefile try: for node in nodelist: self.threadpool.invoke(self.start_one_agent, node) except: self.threadpool.quit() print sys.exc_info()[0], "DUCC may not be started correctly." sys.exit(1) if ( len(components) != 0 ): print 'Starting', or_parms for com in components: if ( com in ('broker', 'db', 'or') ): pass # already started else: try: self.threadpool.invoke(self.start_component, ducc, com, or_parms) #self.start_component(ducc, com, or_parms) except: self.threadpool.quit() print sys.exc_info()[0], "DUCC may not be started correctly." sys.exit(1) self.threadpool.quit() if ( len(self.pids) > 0 ): self.pids.write(self.pid_file) return if __name__ == "__main__": # First check if ducc_post_install has been run DUCC_HOME = find_ducc_home() propsfile = DUCC_HOME + '/resources/site.ducc.properties' if ( not os.path.exists(propsfile) ): print "\n>> ERROR >> Missing site.ducc.properties -- please run ducc_post_install\n" sys.exit(99) starter = StartDucc() starter.main(sys.argv[1:])