#!/usr/bin/env python # ----------------------------------------------------------------------- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # ----------------------------------------------------------------------- import os import sys from ducc_util import DuccUtil from properties import Properties from properties import Property from ducc_util_out import * from optparse import OptionParser from shutil import copy2 from tempfile import mkstemp from shutil import move from os import remove common_timestamp = get_timestamp() from ducc_logger import DuccLogger logger = DuccLogger() # ----------------------------------------------------------------------- # Extend OptionParser class class ExtendedOptionParser(OptionParser): # override epilog formatter so # that newlines are not deleted! def format_epilog(self, formatter): return self.epilog # ----------------------------------------------------------------------- # epilog for --help def get_epilog(): epilog = '' epilog = epilog+'\n' epilog = epilog+'Run this command on the host which will become the new DUCC head node.' epilog = epilog+'\n' epilog = epilog+'\n' epilog = epilog+'Prerequisites:' epilog = epilog+'\n' epilog = epilog+'1. the current ducc.head node in site.ducc.properties is up' epilog = epilog+'\n' epilog = epilog+'2. the head daemons (broker, database, or, pm, rm, sm, ws) on the ducc.head node are down (e.g. stop_ducc -c head)' epilog = epilog+'\n' epilog = epilog+'\n' epilog = epilog+'Operation:' epilog = epilog+'\n' epilog = epilog+'To the extent possible, the cluster will be checked to see if ' epilog = epilog+'it is safe to edit the site.ducc.properties file, and if so ' epilog = epilog+'then a backup of the original file is made then the ' epilog = epilog+'requisite changes are made to realize the head node move. ' epilog = epilog+'Likewise, the appropriate ducc.nodes file is backed-up and updated as necessary.' epilog = epilog+'\n' return epilog class MoveDucc(DuccUtil): # exit code code = 0 # if current ducc host is offline (ie unreachable): # when True proceed with move anyway # when False abort the move (the default) offline = False # exit def exit(self): text = 'exit code='+str(self.code) logger.error(text) sys.exit(self.code) # abort def abort(self): message = 'move not performed' logger.error(message) self.exit() # display by way of debug site.ducc.properties def dump_properties(self,name,props): logger.debug(name+':') keys = props.get_keys() for key in keys: value = props.get(key) logger.debug(key+'='+value) # initialize def initialize(self): self.site_root = self.DUCC_HOME+'/resources/' self.site_stem = 'site.ducc.properties' self.site_path = self.site_root+self.site_stem self.site_props = Properties() self.site_props.load(self.site_path) self.dump_properties(self.site_stem, self.site_props) # parse command line def parse_cmdline(self): parser = ExtendedOptionParser(epilog=get_epilog()) width = 45 parser.formatter.help_position = width parser.formatter.max_help_position = width parser.add_option('-d','--debug', action='store_true', dest='flag_debug', default=False, help='display debugging messages') parser.add_option('-o','--offline', action='store_true', dest='flag_offline', default=False, help='indicate current DUCC head node is offline, Note: USE THIS OPTION WITH EXTREME CAUTION else risk corrupting database') parser.add_option('-q','--quiet', action='store_true', dest='flag_quiet', default=False, help='do not display informational messages') #parser.add_option('-t','--target', action='store', dest='target', default=self.localhost, # help='the desired new DUCC head node, default='+self.localhost) (options, args) = parser.parse_args() if(options.flag_debug): debug_on() if(not options.flag_quiet): info_on() self.offline = options.flag_offline try: self.target = options.target except: self.target = self.localhost self.target = self.target.split('.')[0] # fetch required property from site.ducc.properties def get_required_prop(self,props,stem,key): value = props.get(key) if(value == None): message = key+' not found in '+stem logger.warn(message) self.code = 1 self.abort() return value # assure target is legal def vette_target(self): key = 'ducc.head' props = self.site_props stem = self.site_stem value = self.get_required_prop(props,stem,key) if(value == self.target): message = 'target '+self.target+' is already ducc.head' logger.warn(message) self.code = 1 self.abort() key = 'ducc.head.failover' value = self.get_required_prop(props,stem,key) if(not self.target in value): message = 'target '+self.target+' not found in ducc.head.failover='+str(value) logger.warn(message) self.code = 1 self.abort() # assure daemon is down def vette_daemon(self, node, user, tuples, daemon): for t in tuples: if(t[2] == user): if(t[0] == daemon): message = 'node='+node+' pid='+t[1]+' user='+t[2]+' '+daemon+'=up' logger.warn(message) self.code = 1 # assure broker is down def vette_broker(self): if(self.is_amq_active()): message = 'ActiveMQ listening at ' +self.broker_protocol + "://" + self.broker_host + ':' + self.broker_port logger.warn(message) self.code = 1 else: message = 'ActiveMQ down' logger.debug(message) # assure database is down def vette_db(self): retry = 1 verbose = False if(self.db_alive(retry,verbose)): message = 'database alive' logger.warn(message) code = 1 else: message = 'database down' logger.debug(message) # head node is not reachable def abort_unreachable(self): hint = 'hint: use flag "-o or --offline"' logger.info(hint) self.code = 1 self.abort() # warn if --offline flag is ignored def is_offline_honored(self): if(self.offline): message = '--offline request not honored' logger.warn(message) # assure head node daemons are down or # require --offline flag when head node is unreachable def vette_head(self): props = self.site_props key = 'ducc.head' prop = props.get_property(key) node = prop.v self.head = node message = message = 'node='+node+' '+'checking ducc head node daemons status' logger.info(message) operational = self.ssh_operational(node) if(operational): self.is_offline_honored(); hint = 'hint: run "stop_ducc -c head" or "check_ducc -k"' user = os.environ['LOGNAME'] (bool, tuples) = self.find_ducc_process(node) logger.debug('ducc processes:'+str(tuples)) self.vette_daemon(node, user, tuples, 'broker') self.vette_daemon(node, user, tuples, 'database') self.vette_daemon(node, user, tuples, 'orchestrator') self.vette_daemon(node, user, tuples, 'pm') self.vette_daemon(node, user, tuples, 'rm') self.vette_daemon(node, user, tuples, 'sm') self.vette_daemon(node, user, tuples, 'ws') self.vette_broker() self.vette_db() if(self.code > 0): logger.info(hint) self.abort() else: message = 'node='+node+' '+'ducc head node daemons are down' logger.info(message) else: message = 'node='+node+' '+'not reachable' if(not self.offline): logger.warn(message) self.abort_unreachable() else : logger.info(message) # assure node pool compatibility def vette_nodepool(self): default = '--default--' node = self.head np_head = str(self.get_nodepool(node,default)) message = 'nodepool[head]='+np_head logger.debug(message) node = self.target np_target = str(self.get_nodepool(node,default)) message = 'nodepool[target]='+np_target logger.debug(message) if((not np_head == default) and (np_target == default)): message = 'node='+self.head+' nodepool='+np_head logger.info(message) message = 'node='+self.target+' nodepool='+np_target logger.info(message) message = 'nodepool swap' logger.debug(message) elif(not np_head == np_target): message = 'node='+self.head+' nodepool='+np_head logger.warn(message) message = 'node='+self.target+' nodepool='+np_target logger.warn(message) message = 'nodepools do not match' logger.warn(message) self.code = 1 self.abort() # backup file def backup_file(self,root,stem): global common_timestamp timestamp = str(common_timestamp).replace(' ','@') back = stem+'.'+timestamp message = 'creating backup file='+back logger.info(message) f_src = root+stem f_tgt = root+back copy2(f_src,f_tgt) # backup existing site.ducc.properties file def config_backup(self): self.backup_file(self.site_root,self.site_stem) # debug property changed def tell_change(self,key,old,new): message = key+'='+old+'->'+new logger.debug(message) # warn property not changed def tell_no_change(self,key,old): message = key+'='+old+'->'+'no change' logger.warn(message) # update properties def update_site_ducc_properties(self): global common_timestamp comment = [ '# moved '+common_timestamp ] file = self.site_path props = self.site_props changes = 0 # ducc.head key = 'ducc.head' prop = props.get_property(key) old = prop.v new = self.target prop.v = new prop.c = comment self.tell_change(key,old,new) changes = changes + 1 self.orig_head = old.split('.')[0] # ducc.head key = 'ducc.database.host' prop = props.get_property(key) old = prop.v if(old == self.orig_head): new = self.target prop.v = new prop.c = comment self.tell_change(key,old,new) changes = changes + 1 else: new = 'no change' self.tell_no_change(key,old) self.orig_database = old.split('.')[0] # name key = 'ducc.cluster.name' prop = props.get_property(key) old = prop.v new = self.target if(self.orig_head in old): new = old.replace(self.orig_head,self.target) prop.v = new prop.c = comment self.tell_change(key,old,new) changes = changes + 1 # write file if(self.orig_head == self.orig_database): props.write(file) message = self.site_stem+' '+'updates='+str(changes) logger.info(message) else: message = 'head:'+self.orig_head+' does not match '+'database:'+self.orig_database logger.error(message) code = 1 self.abort() # edit nodefile def edit_nodefile(self,source_file_path,before,after): fh, target_file_path = mkstemp() with open(target_file_path, 'w') as target_file: with open(source_file_path, 'r') as source_file: for line in source_file: text = line.strip() if(text == before): target_file.write(after+'\n') logger.info(before+' -> '+after) else: target_file.write(line) remove(source_file_path) move(target_file_path, source_file_path) # update nodefile def update_nodefile(self): default = '' np_head = str(self.get_nodepool_file(self.head,default)) np_target = str(self.get_nodepool_file(self.target,default)) message = 'file[head]='+np_head logger.debug(message) if(not np_head == 'null'): np_head_file = np_head.rsplit('/',1)[1] self.backup_file(self.site_root,np_head_file) message = 'updating '+np_head_file logger.info(message) self.edit_nodefile(np_head,self.head,self.target) message = 'file[target]='+np_target logger.debug(message) #if(not np_target == 'null'): # np_target_file = np_target.rsplit('/',1)[1] # self.backup_file(self.site_root,np_target_file) #message = 'nodepool updated' #logger.info(message) # perform move and notify of success def config_update(self): self.update_site_ducc_properties() self.update_nodefile() message = 'move completed' logger.info(message) def main(self, argv): self.parse_cmdline() self.initialize() self.vette_target() self.vette_head() self.vette_nodepool() self.config_backup() self.config_update() if __name__ == '__main__': instance = MoveDucc() instance.main(sys.argv[1:])