#!/bin/sh # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """:" work_dir=$(dirname $0) base_name=$(basename $0) original_dir=$PWD cd $work_dir if [ $HOD_PYTHON_HOME ]; then exec $HOD_PYTHON_HOME -u -OO $base_name ${1+"$@"} elif [ -e /usr/bin/python ]; then exec /usr/bin/python -u -OO $base_name ${1+"$@"} elif [ -e /usr/local/bin/python ]; then exec /usr/local/bin/python -u -OO $base_name ${1+"$@"} else exec python -u -OO $base_name ${1+"$@"} fi ":""" """The executable to be used by the user""" import sys, os, re, pwd, threading, sys, random, time, pprint, shutil, time, re from pprint import pformat from optparse import OptionParser myName = os.path.basename(sys.argv[0]) myName = re.sub(".*/", "", myName) binDirectory = os.path.realpath(sys.argv[0]) rootDirectory = re.sub("/bin/.*", "", binDirectory) libDirectory = rootDirectory sys.path.append(libDirectory) from hodlib.Common.threads import simpleCommand from hodlib.Common.util import local_fqdn, tar, filter_warnings,\ get_exception_string, get_exception_error_string from hodlib.Common.logger import hodLog from hodlib.Common.logger import getLogger from hodlib.HodRing.hodRing import createMRSystemDirectoryManager filter_warnings() reVersion = re.compile(".*(\d+_\d+).*") reHdfsURI = re.compile("(hdfs://.*?:\d+)(.*)") VERSION = None if os.path.exists("./VERSION"): vFile = open("./VERSION", 'r') VERSION = vFile.readline() vFile.close() def __archive_logs(conf, log): # need log-destination-uri, __hadoopLogDirs, temp-dir status = True logUri = conf['log-destination-uri'] hadoopLogDirs = conf['hadoop-log-dirs'] if logUri: try: if hadoopLogDirs: date = time.localtime() for logDir in hadoopLogDirs: (head, tail) = os.path.split(logDir) (head, logType) = os.path.split(head) tarBallFile = "%s-%s-%04d%02d%02d%02d%02d%02d-%s.tar.gz" % ( logType, local_fqdn(), date[0], date[1], date[2], date[3], date[4], date[5], random.randint(0,1000)) if logUri.startswith('file://'): tarBallFile = os.path.join(logUri[7:], tarBallFile) else: tarBallFile = os.path.join(conf['temp-dir'], tarBallFile) log.debug('archiving log files to: %s' % tarBallFile) status = tar(tarBallFile, logDir, ['*',]) log.info('archive %s status: %s' % (tarBallFile, status)) if status and \ logUri.startswith('hdfs://'): __copy_archive_to_dfs(conf, tarBallFile) log.info("copying archive to dfs finished") dict = {} except: log.error(get_exception_string()) status = False return status def __copy_archive_to_dfs(conf, archiveFile): # need log-destination-uri, hadoopCommandstring and/or pkgs hdfsURIMatch = reHdfsURI.match(conf['log-destination-uri']) (head, tail) = os.path.split(archiveFile) destFile = os.path.join(hdfsURIMatch.group(2), conf['user-id'], 'hod-logs', conf['service-id'], tail) log.info("copying archive %s to DFS %s ..." % (archiveFile, destFile)) hadoopCmd = conf['hadoop-command-string'] if conf['pkgs']: hadoopCmd = os.path.join(conf['pkgs'], 'bin', 'hadoop') copyCommand = "%s dfs -fs %s -copyFromLocal %s %s" % (hadoopCmd, hdfsURIMatch.group(1), archiveFile, destFile) log.debug(copyCommand) copyThread = simpleCommand('hadoop', copyCommand) copyThread.start() copyThread.wait() copyThread.join() log.debug(pprint.pformat(copyThread.output())) os.unlink(archiveFile) def unpack(): parser = OptionParser() option_list=["--log-destination-uri", "--hadoop-log-dirs", \ "--temp-dir", "--hadoop-command-string", "--pkgs", "--user-id", \ "--service-id", "--hodring-debug", "--hodring-log-dir", \ "--hodring-syslog-address", "--hodring-cleanup-list", \ "--jt-pid", "--mr-sys-dir", "--fs-name", "--hadoop-path"] regexp = re.compile("^--") for opt in option_list: parser.add_option(opt,dest=regexp.sub("",opt),action="store") option_list.append("--hodring-stream") parser.add_option("--hodring-stream",dest="hodring-stream",metavar="bool",\ action="store_true") (options, args) = parser.parse_args() _options= {} _options['hodring'] = {} for opt in dir(options): if "--"+opt in option_list: _options[opt] = getattr(options,opt) if _options.has_key('hadoop-log-dirs') and _options['hadoop-log-dirs']: _options['hadoop-log-dirs'] = _options['hadoop-log-dirs'].split(",") if _options.has_key('hodring-syslog-address') and _options['hodring-syslog-address']: _options['hodring']['syslog-address'] = \ _options['hodring-syslog-address'].split(':') _options['hodring']['debug'] = int(_options['hodring-debug']) _options['hodring']['log-dir'] = _options['hodring-log-dir'] _options['hodring']['stream'] = _options['hodring-stream'] _options['hodring']['userid'] = _options['user-id'] os.putenv('PBS_JOBID', _options['service-id'] ) return _options if __name__ == '__main__': log = None try: conf = unpack() # Use the same log as hodring log = getLogger(conf['hodring'],'hodring') log.debug("Logger initialised successfully") mrSysDirManager = createMRSystemDirectoryManager(conf, log) if mrSysDirManager is not None: mrSysDirManager.removeMRSystemDirectory() status = __archive_logs(conf,log) log.info("Archive status : %s" % status) list = conf['hodring-cleanup-list'].split(',') log.info("now removing %s" % list) for dir in list: if os.path.exists(dir): log.debug('removing %s' % (dir)) shutil.rmtree(dir, True) log.debug("done") log.info("Cleanup successfully completed") except Exception, e: if log: log.info("Stack trace:\n%s\n%s" %(get_exception_error_string(),get_exception_string()))