#!/usr/bin/env python # ----------------------------------------------------------------------- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # ----------------------------------------------------------------------- import glob import os import socket import subprocess import sys import tarfile from optparse import HelpFormatter from optparse import OptionGroup from optparse import OptionParser import logging from ducc_util import DuccUtil # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ # + # + db_tool # + # + purpose: save & restore snapshots of DUCC's Cassandra database # + # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ # Extend OptionParser class class DbToolOptionParser(OptionParser): # override epilog formatter so # that newlines are not deleted! def format_epilog(self, formatter): return self.epilog # A DUCC tool for creating/restoring Cassandra database backups. # See --help class Logger(): def __init__(self, options): self.options = options #Get the root logger logger = logging.getLogger() #Have to set the root logger level, it defaults to logging.WARNING logger.setLevel(logging.NOTSET) formatter = logging.Formatter('%(asctime)s %(message)s') logging_handler_out = logging.StreamHandler(sys.stdout) logging_handler_out.setLevel(logging.INFO) logging_handler_out.setFormatter(formatter) logger.addHandler(logging_handler_out) #logging_handler_err = logging.StreamHandler(sys.stderr) #logging_handler_err.setLevel(logging.WARNING) #logging_handler_err.setFormatter(formatter) #logger.addHandler(logging_handler_err) def info(self, text): if(self.options.var_quiet): return if(text == None): return if(text == ''): return for line in text.splitlines(): logging.info('I '+line) def debug(self, text): if(not self.options.var_verbose): return if(text == None): return if(text == ''): return for line in text.splitlines(): logging.debug('D '+line) def warn(self, text): if(text == None): return if(text == ''): return for line in text.splitlines(): logging.warn('W '+line) class DbTool(DuccUtil): # constants subdir_state = '/state' subdir_database = subdir_state+'/database' subdir_database_data = subdir_database+'/data' subdir_cassandra = '/cassandra-server' subdir_cassandra_bin = subdir_cassandra+'/bin' cmd_name_nodetool = 'nodetool' snapshot_name = 'SNAPSHOT' # keywords kw_save = 'save' kw_save_overwrite = 'save-overwrite' kw_restore = 'restore' kw_restore_overwrite = 'restore-overwrite' kw_verbose = 'verbose' kw_quiet = 'quiet' def __init__(self): DuccUtil.__init__(self) self.cmd_nodetool = self.DUCC_HOME+self.subdir_cassandra_bin+'/'+self.cmd_name_nodetool self.ducc_database = self.DUCC_HOME+self.subdir_database self.ducc_database_data = self.DUCC_HOME+self.subdir_database_data def terminate(self): text = 'unable to continue, processing terminated' self.logger.warn(text) raise SystemExit(1) def complete(self): text = 'processing completed successfully' self.logger.info(text) raise SystemExit(0) def get_epilog(self): epilog = '' epilog = epilog + '\n' epilog = epilog + 'Example:' epilog = epilog + '\n' epilog = epilog + 'bash-4.1$ ./db_tool --save-overwrite /backups/ducc/ducc-db.tar.gz' epilog = epilog + '\n' epilog = epilog + '2016-09-30 14:05:25,938 I save-overwrite' epilog = epilog + '\n' epilog = epilog + '2016-09-30 14:05:28,570 I Uptime (seconds) : 506' epilog = epilog + '\n' epilog = epilog + '2016-09-30 14:05:28,571 I database is up' epilog = epilog + '\n' epilog = epilog + '2016-09-30 14:05:28,571 I remove snapshot' epilog = epilog + '\n' epilog = epilog + '2016-09-30 14:05:31,152 I create snapshot' epilog = epilog + '\n' epilog = epilog + '2016-09-30 14:05:34,002 I remove /home/degenaro/ducc-db.tar.gz' epilog = epilog + '\n' epilog = epilog + '2016-09-30 14:05:34,015 I create /home/degenaro/ducc-db.tar.gz' epilog = epilog + '\n' epilog = epilog + '2016-09-30 14:05:34,499 I count[files]=303' epilog = epilog + '\n' epilog = epilog + '2016-09-30 14:05:34,499 I size[bytes]=177844' epilog = epilog + '\n' epilog = epilog + '2016-09-30 14:05:34,499 I processing completed successfully' epilog = epilog + '\n' return epilog # parse command line def parse(self, argv): self.parser = DbToolOptionParser(epilog=self.get_epilog()) self.parser.formatter.help_position = 36 self.parser.formatter.max_help_position = 36 self.parser.add_option('--'+self.kw_save, action='store', dest='var_save', default=None, metavar='PATH.tar.gz', help='create snapshot of database and save to specified tar.gz file; requires database to be up') self.parser.add_option('--'+self.kw_save_overwrite, action='store', dest='var_save_overwrite', default=None, metavar='PATH.tar.gz', help='create snapshot of database and save to specified tar.gz file, overwriting previous file if one exists; requires database to be up') self.parser.add_option('--'+self.kw_restore, action='store', dest='var_restore', default=None, metavar='PATH.tar.gz', help='extract snapshot from specified tar.gz file and restore database; requires database to be down') self.parser.add_option('--'+self.kw_restore_overwrite, action='store', dest='var_restore_overwrite', default=None, metavar='PATH.tar.gz', help='extract snapshot from specified tar.gz file and restore database, overwriting previous database if one exists; requires database to be down') self.parser.add_option('--'+self.kw_quiet, action='store_true', dest='var_quiet', default=False, help='print no informational messages') self.parser.add_option('--'+self.kw_verbose, action='store_true', dest='var_verbose', default=False, help='print extra debug messages') (self.options, self.args) = self.parser.parse_args() # at least one of { save, save-overwrite, restore } if (self.options.var_save == None) and (self.options.var_save_overwrite == None) and (self.options.var_restore == None) and (self.options.var_restore_overwrite == None): self.parser.error('option --'+self.kw_save+ ' or --'+self.kw_save_overwrite+ ' or --'+self.kw_restore+ ' or --'+self.kw_restore_overwrite+ ' must be specified') # exactly one of { save, save-overwrite, restore, restore-overwrite } if (self.options.var_save != None) and (self.options.var_save_overwrite != None): self.parser.error('options --'+self.kw_save+' and --'+self.kw_save_overwrite+' are mutually exclusive') if (self.options.var_save != None) and (self.options.var_restore != None): self.parser.error('options --'+self.kw_save+' and --'+self.kw_restore+' are mutually exclusive') if (self.options.var_save != None) and (self.options.var_restore_overwrite != None): self.parser.error('options --'+self.kw_save+' and --'+self.kw_restore_overwrite+' are mutually exclusive') if (self.options.var_save_overwrite != None) and (self.options.var_restore != None): self.parser.error('options --'+self.kw_save_overwrite+' and --'+self.kw_restore+' are mutually exclusive') if (self.options.var_save_overwrite != None) and (self.options.var_restore_overwrite != None): self.parser.error('options --'+self.kw_save_overwrite+' and --'+self.kw_restore_overwrite+' are mutually exclusive') if (self.options.var_restore != None) and (self.options.var_restore_overwrite != None): self.parser.error('options --'+self.kw_restore+' and --'+self.kw_restore_overwrite+' are mutually exclusive') # exactly one of { quiet, verbose } if (self.options.var_quiet) and (self.options.var_verbose): self.parser.error('options --'+self.kw_quiet+' and --'+self.kw_verbose+' are mutually exclusive') def db_status(self): status = 'unknown' p = subprocess.Popen([self.cmd_nodetool, 'info'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() if ((err != None) and (err != '')): self.logger.debug('err: '+err) if (err.startswith(self.cmd_name_nodetool+': Failed to connect')): status = 'down' elif ((out != None) and (out != '')): self.logger.debug('out: '+out) for line in out.splitlines(): if(line.startswith('Uptime')): text = ' '.join(line.split()) self.logger.info(text) status = 'up' break return status def assure_database_up(self): if(self.db_status() != 'up'): text = 'database is not up' self.logger.warn(text) self.terminate() text = 'database is up' self.logger.info(text) def assure_database_down(self): if(self.db_status() != 'down'): text = 'database is not down' self.logger.warn(text) self.terminate() text = 'database is down' self.logger.info(text) # remove snapshot def remove_snapshot(self): text = 'remove snapshot' self.logger.info(text) name = self.snapshot_name p = subprocess.Popen([self.cmd_nodetool, 'clearsnapshot', '-t', name], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() self.logger.debug(out) if(err != None): if(err != ''): self.logger.warn(err) text = 'remove snapshot failure' self.logger.warn(text) self.terminate() # create snapshot def create_snapshot(self): text = 'create snapshot' self.logger.info(text) name = self.snapshot_name p = subprocess.Popen([self.cmd_nodetool, 'snapshot', '-t', name], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() self.logger.debug(out) if(err != None): if(err != ''): self.logger.warn(err) text = 'create snapshot failure' self.logger.warn(text) self.terminate() # remove database directory def remove_directory(self): path = self.ducc_database if(not os.path.exists(path)): return text = 'remove '+path self.logger.info(text) p = subprocess.Popen(['/bin/rm', '-fr', path], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() self.logger.debug(out) if(err != None): if(err != ''): self.logger.warn(err) text = 'remove '+path+' failure' self.logger.warn(text) self.terminate() # create database directory def create_directory(self): path = self.ducc_database_data if(os.path.exists(path)): return text = 'create '+path self.logger.info(text) os.makedirs(path) # remove tar.gz def remove_targz(self): if(not os.path.exists(self.targz)): return text = 'remove '+self.targz self.logger.info(text) p = subprocess.Popen(['/bin/rm', self.targz], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() self.logger.debug(out) if(err != None): if(err != ''): self.logger.warn(err) text = 'remove '+self.targz+' failure' self.logger.warn(text) self.terminate() # create targz directory def makedirs_targz(self): path = self.targz.rsplit('/',1)[0] if(os.path.exists(path)): return text = 'makedirs '+path self.logger.info(text) os.makedirs(path) # create tar.gz def create_targz(self): text = 'create '+self.targz self.logger.info(text) source = self.ducc_database_data target = self.targz filelist = glob.glob(source+'/*/**/snapshots/SNAPSHOT/**') tarf = tarfile.open(target,'w:gz') count = 0 try: for file in filelist: relative_name = file.split(source+'/')[1] no_snapshot_name = relative_name.split('/snapshots/SNAPSHOT') restore_name = no_snapshot_name[0]+no_snapshot_name[1] tarf.add(file, restore_name) self.logger.debug('add: '+restore_name) count = count + 1 finally: tarf.close() self.logger.info('count[files]='+str(count)) size = os.path.getsize(self.targz) self.logger.info('size[bytes]='+str(size)) # check tar.gz def check_targz(self): text = 'check '+self.targz self.logger.info(text) target = self.targz p = subprocess.Popen(['/bin/tar', '-tzf', target], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() self.logger.debug(out) if(err != None): if(err != ''): self.logger.warn(err) text = 'check '+self.targz+' failure' self.logger.warn(text) self.terminate() size = os.path.getsize(self.targz) self.logger.info('size[bytes]='+str(size)) #count = 0 #for line in out.splitlines(): # count = count + 1 #self.logger.info('count[files]='+str(count)) # extract tar.gz def extract_targz(self): text = 'extract '+self.targz self.logger.info(text) source = self.targz target = self.ducc_database_data p = subprocess.Popen(['/bin/tar', '-xvzf', source, '-C', target], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() self.logger.debug(out) if(err != None): if(err != ''): self.logger.warn(err) text = 'extract '+self.targz+' failure' self.logger.warn(text) self.terminate() count = 0 for line in out.splitlines(): self.logger.debug("extract: "+line) count = count + 1 self.logger.info('count[files]='+str(count)) # steps for --save def save_virgin(self): text = 'save' self.logger.info(text) self.targz = self.options.var_save if(os.path.exists(self.targz)): text = self.targz+' '+'exists' self.logger.warn(text) self.terminate() self.save() # steps for save-overwrite def save_overwrite(self): text = self.kw_save_overwrite self.logger.info(text) self.targz = self.options.var_save_overwrite if(os.path.exists(self.targz)): if(not os.path.isfile(self.targz)): text = self.targz+' '+'not a file' self.logger.warn(text) self.terminate() if(not os.access(self.targz, os.W_OK)): text = self.targz+' '+'not writable' self.logger.warn(text) self.terminate() self.save() # common steps for --save and --save-overwrite def save(self): self.assure_database_up() self.remove_snapshot() self.create_snapshot() self.remove_targz() self.makedirs_targz() self.create_targz() self.complete() # steps for --restore def restore_virgin(self): text = 'restore' self.logger.info(text) self.targz = self.options.var_restore path = self.ducc_database if(os.path.exists(path)): text = path+' '+'exists' self.logger.warn(text) self.terminate() self.restore() # steps for --restore-overwrite def restore_overwrite(self): text = self.kw_restore_overwrite self.logger.info(text) self.targz = self.options.var_restore_overwrite path = self.ducc_database if(os.path.exists(path)): if(os.path.isfile(path)): text = path+' '+'not a directory' self.logger.warn(text) self.terminate() if(not os.access(path, os.W_OK)): text = path+' '+'not writable' self.logger.warn(text) self.terminate() self.restore() # common steps for --restore and --restore-overwrite def restore(self): text = self.kw_restore self.logger.debug(text) self.assure_database_down() self.check_targz() self.remove_directory() self.create_directory() self.extract_targz() self.complete() # record hostname where invoked def hostname(self): text = 'host='+socket.gethostbyaddr(socket.gethostname())[0] self.logger.info(text) # --save or --save-overwrite or --restore or --restore-overwrite def main(self, argv): self.parse(argv) self.logger = Logger(self.options) self.hostname() if(self.options.var_save != None): self.save_virgin() elif (self.options.var_save_overwrite != None): self.save_overwrite() elif (self.options.var_restore != None): self.restore_virgin() elif (self.options.var_restore_overwrite != None): self.restore_overwrite() else: text = 'huh?' self.logger.warn(text) if __name__ == '__main__': instance = DbTool() instance.main(sys.argv[1:])