#!/usr/bin/env python # ----------------------------------------------------------------------- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # ----------------------------------------------------------------------- import os import sys import time import getopt import glob from ducc_util import DuccUtil from properties import * from ducc import Ducc class StopDucc(DuccUtil): def stop_component(self, component, force): if ( (component == 'broker') and self.automanage ): print 'Stopping broker' self.stop_broker() return if ( component == 'db' ): print 'Stopping database' self.db_stop() return # # If it's an unqualified management component, we need to get it's qualified name # if ( component in self.default_components ): if ( self.pids.has_key(component) ): component = self.pids.get(component) else: print 'Skipping', component, 'not in pids file.' return # # If the name is not qualified we've got a problem, everything in the pids file is qualified # if ( component.find('@') >= 0 ): com, target_node = component.split('@') else: self.invalid("Must specify hostname when stopping", component) # # If despite all that we can't find the pid, we need to run check_ducc # if ( not self.pids.has_key(component) ): print "Cannot find PID for component", component, ". Run check_ducc -p to refresh PIDS and then rerun stop_ducc." return pid = self.pids.get(component) if ( force ): print 'Stopping component', com, 'on node', target_node, 'with PID', pid, 'forcibly (kill -9)' self.nohup(['ssh', target_node, 'kill', '-KILL', pid], False) pass else: print 'Stopping component', com, 'on node', target_node, 'with PID', pid self.nohup(['ssh', target_node, 'kill', '-INT', pid], False) # clear the short name if it exists, and the long name self.pids.delete(com) self.pids.delete(component) def quiesce_agents(self, components, nodes): allnodes = [] for ( nf, nl ) in nodes.items(): allnodes = allnodes + nl for c in components: if ( c.find('@') >= 0 ): com, target_node = c.split('@') allnodes.append(target_node) else: self.invalid("Must specify hostname when stopping", component) qparm = ','.join(allnodes) print 'Quiescing', qparm DUCC_JVM_OPTS = ' -Dducc.deploy.configuration=' + self.DUCC_HOME + "/resources/ducc.properties " DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -DDUCC_HOME=' + self.DUCC_HOME DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -Dducc.head=' + self.ducc_properties.get('ducc.head') self.spawn(self.java(), DUCC_JVM_OPTS, 'org.apache.uima.ducc.common.main.DuccAdmin', '--quiesceAgents', qparm) # NOTE: quiesce does not actually cause agents to terminate so we don't update the PIDs file return def stop_agents(self, node, force): self.stop_component('agent@' + node.strip(), force) def usage(self, msg): if ( msg != None ): print msg print 'stop_ducc [options]' print ' If no options are given, this help screen is shown.' print '' print 'Options:' print ' -a --all' print ' Stop all the DUCC processes, including agents and management processes.' print '' print ' -n --nodelist nodefile' print ' Stop agents on the nodes in the nodefile. Multiple nodefiles may be specified:' print '' print ' stop_ducc -n foo.nodes -n bar.nodes -n baz.nodes' print '' print ' -c --component component' print ' Stop a specific component. The component may be qualified with the node name' print ' using the @ symbol: component@node.' print '' print ' stop_ducc -c rm@foonode' print ' stop_ducc -c agent@barnode -c or' print '' print ' Components include:' print ' agent - node agent' print ' broker - AMQ broker' print ' db - database' print ' or - orchestrator' print ' pm - process manager' print ' rm - resource manager' print ' sm - services manager' print ' ws - web server' print ' head = { or, pm, rm, sm, ws, db, broker }' print '' print ' -w --wait' print ' Time to wait for everything to come down, in seconds. Default is 60.' print '' print ' -k --kill' print ' Stop the component forcibly and immediately using kill -9. Use this only if a' print ' normal stop does not work (e.g. the process may be hung).' print '' print ' --nothreading' print ' Disable multithreaded operation if it would otherwise be used' print '' sys.exit(1) def invalid(self, *msg): if ( msg[0] != None ): print ' '.join(msg) print "For usage run" print " stop_ducc -h" print 'or' print ' stop_ducc --help' sys.exit(1) def main(self, argv): self.verify_head() if ( len(argv) == 0 ): self.usage(None) components = [] nodefiles = [] do_agents = False do_components = False force = False quiesce = False all = False wait_time = 60 try: opts, args = getopt.getopt(argv, 'ac:n:kn:w:qh?v', ['all', 'component=', 'help', 'nodelist=', 'kill', 'quiesce', 'nothreading', 'wait']) except: self.invalid('Invalid arguments ' + ' '.join(argv)) if (len(args) > 0): self.invalid('Invalid extra args: ', ' '.join(args)) for ( o, a ) in opts: if o in ('-c', '--component' ): if (a.strip() == 'head'): components.append('or') components.append('pm') components.append('rm') components.append('sm') components.append('ws') components.append('db') components.append('broker') else: components.append(a) do_components = True elif o in ( '-a', '--all' ): all = True components = self.default_components elif o in ( '-n', '--nodelist' ): nodefiles.append(a) do_agents = True elif o in ( '-k', '--kill' ): force = True elif o in ( '-q', '--quiesce' ): quiesce = True elif o in ( '-w', '--wait' ): wait_time = int(a) elif o in ( '--nothreading' ): self.disable_threading() elif ( o == '-v' ) : print self.version() sys.exit(0) elif o in ( '-h', '--help' ): self.usage(None) elif ( o == '-?'): self.usage(None) else: self.invalid('bad arg: ' + o) if ( quiesce ): if ( all ): self.invalid("May not quiesce 'all'."); if ( force ): self.invalid("May not both quiesce and force."); for c in components: if ( not c.startswith('agent') ): self.invalid("Only agents may be quiesced.") # avaid confusion by insuring that if 'all', then nothing else is specified if ( all and ( do_components ) ): self.invalid("The --all option is mutually exclusive with --component") # 'all' means everything. we use broadcast. should use check_ducc to make sure # it actually worked, and find the stragglers. if ( all ): if ( not force ) : self.clean_shutdown() # Agents may wait up to 60 secs for processes to quiesce print "Waiting " + str(wait_time) + " seconds to broadcast agent shutdown." time.sleep(wait_time) if ( self.automanage ): print "Stopping broker" self.stop_broker() print "Stopping database" self.db_stop() if ( os.path.exists(self.pid_file) ): os.remove(self.pid_file) return else: if ( len(nodefiles) == 0 ): nodefiles = self.default_nodefiles self.pids = Properties() sc = set(components) sb = set(['broker', 'db']) read_pids = True if ( sc.issubset(sb) ): read_pids = False # The broker and db do not set the pid file if ( read_pids ): try: self.pids.load(self.pid_file) pass except PropertiesException, (inst): print inst.msg print '' print 'Run check_ducc -p to refresh the PIDs file, or check_ducc -k to search for and', print 'kill all DUCC processes.' print '' sys.exit(1) # # if not 'all', we use nodefiles and component names # # make sure all the nodefiles exist and are readable ok = True nodes = {} n_nodes = 0 for n in nodefiles: n_nodes, nodes = self.read_nodefile(n, nodes) for ( nf, nl ) in nodes.items(): if ( nl == None ): # die early if the parameters are wrong print "Can't read nodefile", nf ok = False if ( not ok ): sys.exit(1) if ( quiesce ): self.quiesce_agents(components, nodes) else: for (nf, nl) in nodes.items(): for n in nl: self.stop_agents(n, force) host = self.localhost.split('.')[0] for c in components: c = c.strip() if(c in ('pm','rm','sm','ws')): c = c+'@'+host self.stop_component(c, force) time.sleep(2) for c in components: c = c.strip() if(c in ('or')): c = c+'@'+host self.stop_component(c, force) time.sleep(2) for c in components: c = c.strip() if(c in ('db','broker')): self.stop_component(c, force) if ( read_pids ): if ( len(self.pids) > 0 ): self.pids.write(self.pid_file) else: os.remove(self.pid_file) return if __name__ == "__main__": stopper = StopDucc() stopper.main(sys.argv[1:])