#! /usr/bin/env python # ----------------------------------------------------------------------- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # ----------------------------------------------------------------------- # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ # + # + ducc_watcher # + # + purpose: send e-mail when a DUCC daemon overall state changes # + # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ import ast import datetime import getpass import json #import logging import logging.handlers import os import smtplib import socket import string import sys import time import urllib2 #from optparse import HelpFormatter #from optparse import OptionGroup from optparse import OptionParser # ---------------------------------------------- # Extend OptionParser class class ExtendedOptionParser(OptionParser): # override epilog formatter so # that newlines are not deleted! def format_epilog(self, formatter): return self.epilog # ---------------------------------------------- name = 'ducc_watcher' version = 2.0 webserver_lifetime_millis = 0 webserver = 'Webserver' head_daemons = [ 'Orchestrator', 'ResourceManager', 'Database', 'Broker', 'ProcessManager', 'ServiceManager', webserver ] jda = 'JobDriverAllocation' flag_info = True flag_trace = False logger = None port = '42133' path = None log_file = None state_file = None flag_agents = False mail_host = 'localhost' email_list = None list_errors = [] list_warns = [] url_timeout = 30 # produce a time stamp def get_timestamp(): tod = time.time() timestamp = datetime.datetime.fromtimestamp(tod).strftime('%Y-%m-%d %H:%M:%S') return timestamp # get the host running this script def get_host(): host = socket.gethostname() return host # get the user running this script def get_user(): user = getpass.getuser() return user # make directories, if not already existing def mkdirs(path): debug('mkdirs: path='+path) if(os.path.exists(path)): return try: os.makedirs(path) except Exception as e: exception(e) # info message to log def info(text): global logger level = 'I' line = get_timestamp()+' '+get_user()+'@'+get_host()+' '+level+' '+text logger.info(line) return line # trace message to log def trace(text): global logger global flag_trace level = 'T' line = get_timestamp()+' '+get_user()+'@'+get_host()+' '+level+' '+text if(flag_trace): logger.debug(line) return line # debug message to log def debug(text): global logger level = 'D' line = get_timestamp()+' '+get_user()+'@'+get_host()+' '+level+' '+text logger.debug(line) return line # error message to log def error(text): global logger global list_errors level = 'E' line = get_timestamp()+' '+get_user()+'@'+get_host()+' '+level+' '+text logger.error(line) list_errors.append(line) return line # warn message to log def warn(text): global logger global list_warns level = 'W' line = get_timestamp()+' '+get_user()+'@'+get_host()+' '+level+' '+text logger.warn(line) list_warns.append(line) return line # _exit def _exit(code): text = 'exit code='+str(code) email(text) error(text) sys.exit(code) # exception def exception(e): line = error(str(e)) return line # epilog for --help def get_epilog(): epilog = '' return epilog # debug is normally not set def validate_debug(options): global logger if(options.flag_debug): logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) # consider head node daemons only # unless --agents is specified def validate_agents(options): global flag_agents if(options.flag_agents): flag_agents = True # ignore job driver allocation # unless --job-driver-allocation is specified def validate_job_driver_allocation(options): global job_driver_allocation job_driver_allocation = options.job_driver_allocation # use /tmp/ as log+state directory # unless --path is specified def validate_path(options): if(options.path == None): options.path = '/tmp'+'/'+get_user() mkdirs(options.path) # setup rotating log file handler with # 8 versions of 8M bytes with base name # ducc_watcher..log def setup_log_file(options): global name global target global logger log_file = options.path if(not log_file.endswith('/')): log_file = log_file + '/' log_file = log_file + name + '.' + target +'.log' handler = logging.handlers.RotatingFileHandler( log_file, maxBytes=8*1024*1024, backupCount=8) logger.addHandler(handler) debug('log_file: '+log_file) # ducc_watcher..state def setup_state_file(options): global name global target global state_file state_file = options.path if(not state_file.endswith('/')): state_file = state_file + '/' state_file = state_file + name + '.' + target +'.state' debug('state_file: '+state_file) # must specify --target host:port of WS for fetching # of daemons status def validate_target(options): global port global target global ducc_url_base global ducc_url_servlet_system_daemons_data global ducc_url_servlet_reservations_data protocol = 'http://' if(options.target == None): error('required "target" not specified') _exit(1) target = options.target if(':' not in target): target = target+':'+str(port) if(target.startswith(protocol)): target = target.replace(protocol,'',1) ducc_url_base = protocol+target # servlet = '/ducc-servlet/json-format-aaData-daemons' ducc_url_servlet_system_daemons_data = protocol+target+servlet debug('target: '+ducc_url_servlet_system_daemons_data) # servlet = '/ducc-servlet/json-format-reservations'+'?maxRecords=1024&stateType=Active' ducc_url_servlet_reservations_data = protocol+target+servlet debug('target: '+ducc_url_servlet_reservations_data) # mail host, if any def validate_mail_host(options): global mail_host if(not options.mail_host == None): mail_host = options.mail_host debug('mail-host: '+str(mail_host)) # list of e-mail recipients, if any def validate_email_list(options): global email_list if(not options.email_list == None): email_list = options.email_list.split() debug('email-list: '+str(email_list)) # parse command line def parse_cmdline(): global name global mail_host parser = ExtendedOptionParser(epilog=get_epilog()) width = 45 parser.formatter.help_position = width parser.formatter.max_help_position = width parser.add_option('-a','--agents', action='store_true', dest='flag_agents', default=False, help='include agents') parser.add_option('-d','--debug', action='store_true', dest='flag_debug', default=False, help='display debugging messages') parser.add_option('-e','--email-list', action='store', dest='email_list', default=None, help='blank separated list of email addresses to receive down + error notifications') parser.add_option('-j','--job-driver-allocation', action='store', dest='job_driver_allocation', default=None, help='check job driver allocation for specified class') parser.add_option('-m','--mail-host', action='store', dest='mail_host', default=None, help='mail host (default='+mail_host+')') parser.add_option('-p','--path', action='store', dest='path', default=None, help='path to directory where log and state information are written, default is /tmp'+'/'+get_user()) parser.add_option('-t','--target', action='store', dest='target', default=None, help='[REQUIRED] with default port of '+port+' or :') (options, args) = parser.parse_args() # debug(str(options)) debug(str(args)) # -d validate_debug(options) # -t validate_target(options) # -e validate_email_list(options) # -m validate_mail_host(options) # -p validate_path(options) # dependencies setup_log_file(options) setup_state_file(options) # -a validate_agents(options) # -j validate_job_driver_allocation(options) # determine if named daemon is one of the head node ones def is_head(key): global head_daemons retVal = False if(key in head_daemons): retVal = True return retVal def is_jda(key): global jda retVal = False if(key == jda): retVal = True return retVal def is_key(key): retVal = False if(is_head(key)): retVal = True elif(is_jda(key)): retVal = True return retVal # get rid of noise. remove if # 1. state is unknown # 2. if is agent and agents are not wanted def filter_state(state_dict): global flag_agents retVal = {} for key in state_dict: if(state_dict[key] == 'unknown'): pass else: if(is_key(key)): retVal[key] = state_dict[key] elif(flag_agents): retVal[key] = state_dict[key] return retVal # read previous daemons state def init_state_previous(): global state_file try: with open(state_file, 'r') as f: f.read() except: with open(state_file, 'w') as f: f.seek(0) f.write('{}'+'\n') f.truncate() # read precious daemons state def read_state_previous(): global state_dict_previous global state_file state_dict_previous = {} try: with open(state_file, 'r') as f: s = f.read() state_dict_previous = ast.literal_eval(s) debug('state_previous(read): '+str(state_dict_previous)) state_dict_previous = filter_state(state_dict_previous) debug('state_previous(filter): '+str(state_dict_previous)) except Exception as e: error('unable to read state from '+state_file) exception(e) # current becomes previous daemons state def write_state_current(): global state_dict_current global state_file try: with open(state_file, 'w') as f: f.seek(0) f.write(str(state_dict_current)+'\n') f.truncate() debug('state_previous(write): '+str(state_dict_current)) except Exception as e: error('unable to write state to '+state_file) exception(e) # remove html decorations def _undecorate(text): retVal = text if(text != None): item = text index = item.find('') if(index > 0): item = item[:index] index = item.find('>') if(index > 0): item = item[index+1:] retVal = item return retVal def toMillis(dts): debug(dts) d = dts.split(' ')[0] t= dts.split(' ')[1] dt = datetime.datetime.strptime(d+' '+t, '%Y.%m.%d %H:%M:%S').strftime('%s') ms = int(dt)*1000 debug(str(ms)) return ms def elapsedMillis(dts): then = toMillis(dts) now = 1000*time.time() diff = now - then text = str(dts)+' '+str(diff) debug(text) return diff # fetch daemons state # col[0] = Status # col[1] = Daemon Name # col[2] = Boot Time # col[3] = Host IP # col[4] = Host Name def fetch_state_daemons(): global flag_agents global state_dict_current global ducc_url_servlet_system_daemons_data global webserver global webserver_lifetime_millis global url_timeout state_dict_current = {} daemons = {} try: opener = urllib2.build_opener() if(flag_agents): opener.addheaders.append(('Cookie', 'DUCCagents=show')) response = opener.open(ducc_url_servlet_system_daemons_data, timeout=url_timeout) data = response.read() jdata = json.loads(data)['aaData'] for row in jdata: if(len(row) > 4): status = _undecorate(row[0]) daemon = row[1] date = row[2] if(daemon == webserver): webserver_lifetime_millis = elapsedMillis(date) if(daemon == 'Agent'): daemon = daemon+'@'+row[4] daemons[daemon] = status #print "data="+str(jdata) for daemon in daemons: status = daemons[daemon] debug(daemon+':'+' '+status+' ') state_dict_current[daemon] = status debug('state_current(read): '+str(state_dict_current)) state_dict_current = filter_state(state_dict_current) debug('state_current(filter): '+str(state_dict_current)) except Exception as e: # for WS status to down whenever contact fails daemon = webserver status = 'unreachable' state_dict_current[daemon] = status error('unable to fetch data from '+ducc_url_servlet_system_daemons_data) exception(e) debug('state_current: '+str(state_dict_current)) # check if RM is 'up' def rm_up(): global state_dict_current retVal = False debug(str(state_dict_current)) if(state_dict_current != None): rm_state = get_state(state_dict_current,'ResourceManager') if(rm_state == 'up'): retVal = True return retVal # fetch job driver allocation def fetch_state_job_driver_allocation(): global state_dict_current global jda global job_driver_allocation global ducc_url_servlet_reservations_data global webserver global url_timeout try: if(job_driver_allocation != None): if(rm_up()): opener = urllib2.build_opener() debug(ducc_url_servlet_reservations_data) response = opener.open(ducc_url_servlet_reservations_data, timeout=url_timeout) data = response.read() debug(data) json_data = json.loads(data) count = 0 if(json_data != None): reservations_list = json_data debug(str(len(reservations_list))) rcount = 0 for reservation in reservations_list: res_id = reservation['id'] rclass = reservation['rclass'] state = reservation['state'] rcount = rcount + 1 if(rclass == job_driver_allocation): state = reservation['state'] if(state == 'Assigned'): count = count+1 text = 'id:'+str(res_id)+' '+'class:'+rclass+' '+'state:'+state debug(text) if(count == 0): state_dict_current[jda] = 'not assigned' else: state_dict_current[jda] = 'assigned' debug(state_dict_current[jda]) except Exception as e: # for WS status to down whenever contact fails daemon = webserver status = 'unreachable' state_dict_current[daemon] = status error('unable to fetch data from '+ducc_url_servlet_reservations_data) exception(e) # fetch current state def fetch_state_current(): fetch_state_daemons() fetch_state_job_driver_allocation() # determine state summary overall: # { "up", "up, JD allocation pending", "down", "unknown", "no data" }, # and list the daemons in each state def summarize_state(state_dict): up = [] down = [] unknown = [] jd = [] for key in state_dict: state = state_dict.get(key, '?') if(state == 'unreachable'): down.append(key) elif(state == 'down'): down.append(key) elif(state == 'up'): up.append(key) elif(state == 'unknown'): unknown.append(key) elif(state == 'not assigned'): jd = [ 'not assigned' ] elif(state == 'assigned'): jd = [] else: warn(key+'='+state) if(len(down) > 0): overall = 'down' elif(len(unknown) > 0): overall = 'unknown' elif(len(jd) > 0): overall = 'up, JD allocation pending...' elif(len(up) > 0): overall = 'up' else: overall = 'no data' summary = { 'overall':overall, 'up': up, 'down':down, 'unknown':unknown, 'jd':jd } debug(str(summary)) return summary # summarize state previous and current def summarize(): global state_dict_current global state_dict_previous global summary_current global summary_previous summary_previous = summarize_state(state_dict_previous) info('previous: '+str(summary_previous)) summary_current = summarize_state(state_dict_current) info('current: '+str(summary_current)) # send email def email(HOST, SUBJECT, TO, FROM, TEXT): try: BODY = string.join(( "From: %s" % FROM, "To: %s" % TO, "Subject: %s" % SUBJECT , "", TEXT ), "\r\n") server = smtplib.SMTP(HOST) server.sendmail(FROM, [TO], BODY) server.quit() info('sent: ['+TO+'] '+TEXT) except Exception as e: error('not sent: ['+TO+'] '+TEXT) exception(e) # send email def email_to_list(HOST, SUBJECT, TO_LIST, FROM, TEXT): if(TO_LIST == None): info('e-mail list empty') else: for TO in TO_LIST: email(HOST, SUBJECT, TO, FROM, TEXT) # wait to WS to be up > 60 seconds to complain about overall state 'unknown' def is_reportable(overall_current): global webserver_lifetime_millis retVal = True try: if(webserver_lifetime_millis < 60*1000): if(overall_current == 'unknown'): retVal = False except: pass debug('reportable: '+str(retVal)) return retVal # e-mail message subject def get_subject(status): global ducc_url_base subject = 'DUCC'+' '+'status='+status+' '+ducc_url_base return subject def get_lines(LIST): LINES = '' if(len(LIST) > 0): for item in LIST: LINES = LINES+item+'\n' LINES = LINES return LINES def get_errors(): global list_errors return get_lines(list_errors) def get_warnings(): global list_warns return get_lines(list_warns) def toString(LIST): retVal = '' for item in LIST: retVal = retVal+item+' ' return retVal def add_details(key): global summary_current details = '' result = get_state(summary_current,key) debug(key+'='+str(result)) if(result != None): if(len(result) > 0): details = key+':'+' '+toString(result)+'\n' return details # e-mail message body def get_body(text): global name global ducc_url_base sender = get_user()+'@'+get_host() body = '['+sender+']'+' '+name+' '+'reports'+' '+ducc_url_base+' '+'state change:'+' '+text+'\n\n' body = body+add_details('down') body = body+add_details('unknown') return body def get_state(state_dict,key): try: state = state_dict[key] except: state = None debug(str(state)) return state # e-mail state changes, if any def email_state_changes(): global summary_current global summary_previous global mail_host global email_list key = 'overall' overall_previous = get_state(summary_previous,key) info('previous: '+key+'='+overall_previous) overall_current = get_state(summary_current,key) info('current: '+key+'='+overall_current) if(overall_previous != overall_current): if(is_reportable(overall_current)): status = overall_current TIME = get_timestamp() subject = get_subject(status) HOST = mail_host SUBJECT = subject TO_LIST = email_list FROM = get_user()+'@'+get_host() TEXT = TIME+' '+get_body(str(status)) TEXT = get_warnings()+TEXT TEXT = get_errors()+TEXT email_to_list(HOST, SUBJECT, TO_LIST, FROM, TEXT) else: debug('not reportable') else: debug('no state change') # check for DUCC daemon status changes def main(argv): global logger try: logger = logging.getLogger('logger') handler = logging.StreamHandler(sys.stdout) logger.addHandler(handler) parse_cmdline() init_state_previous() read_state_previous() fetch_state_current() summarize() write_state_current() email_state_changes() except Exception as e: error('exception in main') exception(e) if __name__ == '__main__': main(sys.argv[1:])