#!/usr/bin/env python # ----------------------------------------------------------------------- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # ----------------------------------------------------------------------- import os import sys import subprocess import datetime import getopt import time import db_util from ducc_util import DuccUtil class DuccRmQProcesses(DuccUtil): def usage(self, *msg): if ( msg[0] != None ): print ' '.join(msg) print 'Usage:' print ' q_processes options' print '' print 'Where options include:' print ' -j --job jobid sho all processes for this job' print ' -n --node show all processes on a node' print ' -f --from date include only process since this date' print ' -t --to date include processes only up to this date' print '' print 'Notes:' print ' Omit -f and -t to get all processes.' print '' print ' If -f OR -t is specified, you MUST specifiy a node (-n) as well.' print '' print '' print ' Date formats:' print ' mm/dd/yy Use this to specify everything on a given day' print ' mm/dd/yy.hh:mm Use this to specify a specific hour and minute.' print ' Hours use the military (24-hour) clock.' print '' print 'Examples:' print '' print ' Show all work on bluejbob on Feb 15 2015 between noon and 4PM' print ' q_processes --node bluejbob --from 2/14/15.12:00 --to 2/14/15.16:00' print '' print ' Show all history for job 1234' print ' q_processes --job 1234' print '' print ' Show history for job 1234 on node bluebob' print ' q_processes --job 1234 --node bluebob' sys.exit(0) def parse_date(self, dat): if ( '.' in dat ): fmt = '%m/%d/%y.%H:%M' else: fmt = '%m/%d/%y' d = datetime.datetime.strptime(dat, fmt) return int(time.mktime(d.timetuple()))*1000 def get_date(self, dat): return datetime.datetime.fromtimestamp(dat) def main(self, argv): node = None fromt = None tot = None jobid = None conjunction = 'WHERE' print 'argv', argv try: opts, args = getopt.getopt(argv, 'f:j:n:t:h?', ['from=', 'to=', 'job=', 'node=', 'help', ]) except: self.usage("Invalid arguments " + ' '.join(argv)) for ( o, a ) in opts: if o in ('-n', '--node'): node = a elif o in ('-f', '--from'): fromt = self.parse_date(a) elif o in ('-t', '--to'): tot = self.parse_date(a) elif o in ('-j', '--job'): jobid = a elif o in ('-h', '-?', '--help'): self.usage(None) if ( ( fromt != None ) and ( tot != None ) and ( node == None ) ): self.usage("Node must be specified when a date range is specified.") query = ['select * from ducc.processes'] if ( node != None ): query.append(conjunction) conjunction = 'AND' query.append("node='" + node + "'") if ( fromt != None ): query.append(conjunction) conjunction = 'AND' query.append("start > " + str(fromt)) if ( jobid != None ): query.append(conjunction) conjunction = 'AND' query.append("job_id = " + jobid) if ( tot != None ): if ( fromt == None ): usage("--from must be specified if --to is also specified.") query.append(conjunction) conjunction = 'AND' query.append("stop < " + str(tot)) if ( fromt != None ): query.append("ALLOW FILTERING") query = '"' + ' '.join(query) + '"' DH = self.DUCC_HOME dbn = self.ducc_properties.get('ducc.database.host') CMD = [DH + '/cassandra-server/bin/cqlsh', dbn, '-u', 'guest', '-p', 'guest', '-e', query] CMD = ' '.join(CMD) print CMD lines = [] proc = subprocess.Popen(CMD, bufsize=0, stdout=subprocess.PIPE, shell=True) for line in proc.stdout: # print '[]', line.strip() lines.append(line) header, lines = db_util.parse(lines, 'job_id') for line in lines: line['start'] = str(self.get_date(int(line['start'])/1000)) line['stop'] = str(self.get_date(int(line['stop'])/1000)) db_util.format(header, lines) return if __name__ == "__main__": stopper = DuccRmQProcesses() stopper.main(sys.argv[1:])