#!/usr/bin/env python # ----------------------------------------------------------------------- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # ----------------------------------------------------------------------- import os import sys from time import time import getopt import signal from ducc_util import DuccUtil from properties import Properties from local_hooks import verify_slave_node from local_hooks import verify_master_node #from ducc_util import ThreadWorker from ducc_util import ThreadPool class CheckDucc(DuccUtil): def __init__(self): DuccUtil.__init__(self) self.badnodes = [] def validate(self, checkdate): verify_slave_node(checkdate, self.ducc_properties) self.check_clock_skew(checkdate) self.verify_jvm() self.verify_limits() (viable, elevated, safe) = self.verify_duccling() self.duccling_ok(viable, elevated, safe) if ( not safe or not viable ): print 'NOTOK ducc_ling is not installed correctly.' return def verify_database(self): if ( self.db_bypass == True ): return True ret = self.db_alive(1) if ( ret ): print 'The database is running' else: print 'The database is not running' def verify_activemq(self): if ( self.is_amq_active() ): print 'ActiveMQ is found listening at', self.broker_protocol + "://" + self.broker_host + ':' + self.broker_port return True return False def check_node(self, args): messages = [] spacer = ' ' node = args[0] messages.append((' ')) messages.append(('Checking', node, '...')) if(self.ssh_operational(node)): text = "ssh is operational to "+node #print text else: text = "ssh is NOT operational to "+node print text messages.append((spacer, text)) return messages response = self.find_ducc_process(node) # a tuple, (True|False, proclist) if ( not response[0] ): messages.append((spacer, "No response.")) return messages proclist = response[1] # a list of tuples, tuple is (component, pid, user) if ( len(proclist) > 0 ): for proc in proclist: component = proc[0] pid = proc[1] found_user = proc[2] signal = self.kill_signal if ( component == 'orchestrator' ): component = 'or' if ( component == 'database' ): if ( signal != None ): if ( self.kill_db9 == False ): signal = '-QUIT' process_id = found_user + ' ' + component + '@' + node + ' PID ' + pid if ( signal != None ) : if ( self.user != found_user ): messages.append((spacer, "Not killing someone else's process.", process_id)) elif ( component == 'unknown-java' ): messages.append((spacer, 'Not killing non-ducc process', process_id)) else: messages.append((spacer, 'Killing (' + signal + ')', process_id)) self.kill_process(node, proc, signal) self.pids.delete(pid) process_changes = True else: messages.append((spacer, 'Found', process_id)) full_name = component + '@' + node if ( component == 'agent' ): self.pids.put(full_name, pid) if ( component in self.default_components ): self.pids.put(full_name, pid) self.pids.put(component, full_name) else: messages.append((spacer, 'no processes found.')) if ( self.kill_signal == None ): response = "Node health checks return." lines = self.ssh(node, True, self.DUCC_HOME + "/admin/check_ducc", "-x", str(int(time()))) while 1: line = lines.readline() if ( 'signal' in line ): response = "Node health did not complete: " + line self.badnodes.append(node) # these next two filter junk if 'mesg' is running in a shell rc if ( 'stdin: is not a tty' in line ): continue if ( 'mesg' in line ): continue if ( not line ): break line = line.strip() messages.append((spacer, line)) #messages.append((spacer, '[]', line)) messages.append((spacer, response)) return messages def signalHandler(self, signum, frame): print "-------- Caught signal", signum, "--------" if ( len(self.badnodes) != 0 ): print "Health checks on these nodes did not return:" for n in self.badnodes: print n, print '' sys.exit(1) def usage(self, msg): if ( msg != None ): print msg print "Usage:" print " check_ducc [options]" print " If no options are given this is the equivalent of:" print "" print " check_ducc -n ../resources/ducc.nodes" print "" print "Options:" print " -n --nodelist nodefile" print " Check for agents on the nodes in nodefile. This option may be specified multiple time" print " for multiple nodefiles. The 'local' node is always checked" print "" print " -c --configuration" print " Do basic sanity checking on the configuration only. Note that configuration checking is always" print " performed with most options. The [-c, --configuration] option does ONLY configuration checking." print "" print " -k --kill" print " Force-kill any DUCC process you find on a node (if normal stop_ducc isn't working. This" print " uses kill -KILL (-9) for all daemons, except database which uses -QUIT (3)," print " and only kills processes owned by the invoking user." print "" print " --db-9" print " Use signal -KILL (-9) to kill database, rather than the default -QUIT (-3)" print "" print " -i --int" print " Force-kill any DUCC process you find on a node (if normal stop_ducc isn't working. This" print " uses kill -INT (-2) and only kills processes owned by the invoking user." print "" print " -q --quit" print " Force-kill any DUCC process you find on a node (if normal stop_ducc isn't working. This" print " uses kill -QUIT (-3) and only kills processes owned by the invoking user." print "" print " -p --pids" print " Rewrite the PID file. The PID file is always rewritten if any changes to processes are made. Sometimes" print " the PID file needs rebuilding. This option causes the file to be rebuilt regardless of" print " changes." print "" print " -x localdate" print " Validate the local installation, called via ssh usually. The date is the dat on the calling machine." print "" print " --nothreading" print " Disable multithreaded operation if it would otherwise be used" print "" print " -v --verbose" print " If specified, print the validated configuration to the console." print "" print " -? prints this message." sys.exit(1) def main(self, argv): try: opts, args = getopt.getopt(argv, 'cikn:opqx:h?v', ['configuration', 'nodelist=', 'int', 'quit', 'kill', 'db-9', 'pids', 'verbose', 'nothreading', ]) except: self.usage("Invalid arguments " + ' '.join(argv)) nodefiles = [] self.user = os.environ['LOGNAME'] self.kill_signal = None self.kill_db9 = False redo_pids = False process_changes = False do_validate = False checkdate = 0 config_only = False verbose = False for ( o, a ) in opts: if o in ('-c', '--configuration'): config_only = True elif o in ('-n', '--nodelist'): nodefiles.append(a) elif o in ('-i', '--int'): if ( self.kill_signal != None ): print 'Conflicting kill signals: -INT and', self.kill_signal return self.kill_signal = '-INT' elif o in ('-q', '--quit'): if ( self.kill_signal != None ): print 'Conflicting kill signals: -QUIT and', self.kill_signal return self.kill_signal = '-QUIT' elif o in ('-k', '--kill'): if ( self.kill_signal != None ): print 'Conflicting kill signals: -KILL and', self.kill_signal return self.kill_signal = '-KILL' elif o in ('--db-9'): self.kill_db9 = True elif o in ( '--nothreading' ): self.disable_threading() elif o in ('-p', '--pids'): redo_pids = True elif o in ('-x'): # intended to be called recursively from check_ducc, NOT from the command line do_validate = True checkdate = float(a) elif o in ('-v', '--verbose'): verbose = True elif o in ('-h', '-?', '--help'): self.usage(None) else: print 'badarg', a usage('bad arg: ' + a) if not self.installed(): print "Head node is not initialized. Have you run ducc_post_install?" return if ( do_validate ): # if validating, ONLY validate, called via ssh usually self.validate(checkdate) return # When called directly must be from the head node self.verify_head() self.set_duccling_version() os.system('cat ' + self.DUCC_HOME + '/state/duccling.version') # not -x option, do this only on local node env = self.show_ducc_environment() for e in env: print e jvm = self.ducc_properties.get('ducc.jvm') if ( jvm == None ): print 'WARN: ducc.jvm is not specified in ducc.properties. Default is simply "java" which may not work on all nodes.' if ( not verify_master_node(self.ducc_properties) ): print 'FAIL: Cannot verify master mode' return if ( not self.verify_activemq() ): print 'ActiveMQ broker is not running on', self.broker_protocol + "://" + self.broker_host + ':' + self.broker_port self.verify_database() # init the PID file self.pids = Properties() self.pids.load_if_exists(self.pid_file) # read the nodelists if ( len(nodefiles) == 0 ): nodefiles = self.default_nodefiles check_nodepools = True else: # if using other than the fully configured set of nodes we can't reliably check nodepools # because anything other than the full set of nodes may be missing something check_nodepools = False nodes = {} n_nodes = 0 for nf in nodefiles: n_nodes, nodes = self.read_nodefile(nf, nodes) # # add in the local host if needed, and the webserver node # localnodes = [] if ( not self.localhost in nodes ): localnodes.append(self.localhost) if ( not (self.webserver_node in ['localhost', self.localhost, None]) ): localnodes.append(self.webserver_node) if ( len(localnodes) > 0 ): nodes['local'] = localnodes self.verify_jvm() if ( config_only ): if ( nodefiles != self.default_nodefiles): print "NOTOK: Config check only works with full, default nodefile:", self.default_nodefiles return if self.verify_class_configuration(nodefiles[0], verbose): print "OK: Class configuration checked" else: print "NOTOK: Errors in class or node configuration." if self.verify_head_failover_configuration(): print "OK: Failover configuration checked" else: print "NOTOK: Errors in failover configuration." return # checking starts here print "Checking", n_nodes, "nodes" self.threadpool = ThreadPool(n_nodes + 5) # more for the head processes checked = {} signal.signal(signal.SIGINT, self.signalHandler) try: for (nodefile, nodelist) in nodes.items(): if ( nodelist == None ): # loading the nodes prints the necessary message continue for node in nodelist: if ( checked.has_key(node) ): continue checked[node] = node self.threadpool.invoke(self.check_node, node) except: self.threadpool.quit() print sys.exc_info()[0], "Exiting." sys.exit(1) self.threadpool.quit() if ( self.kill_signal != None ): print 'Stopping broker' self.stop_broker() print 'Stopping database' self.db_stop() if ( len(self.pids) == 0): if ( os.path.exists(self.pid_file) ): os.remove(self.pid_file) elif (process_changes or redo_pids): self.pids.write(self.pid_file) if __name__ == "__main__": checker = CheckDucc() checker.main(sys.argv[1:])