#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from qpidstore import jerr, jrnl, janal import optparse, os, sys #== class StoreChk ============================================================ class StoreChk(object): """ This class: 1. Reads a journal jinf file, and from its info: 2. Analyzes the journal data files to determine which is the last to be written, then 3. Reads and analyzes all the records in the journal files. The only public method is run() which kicks off the analysis. """ def __init__(self): """Constructor""" # params self.opts = None self._jdir = None # recovery analysis objects # self._jrnl_info = None # self.jrnl_rdr = None self._process_args() self._jrnl_info = jrnl.JrnlInfo(self._jdir, self.opts.bfn) # FIXME: This is a hack... find an elegant way of getting the file size to jrec! jrnl.JRNL_FILE_SIZE = self._jrnl_info.get_jrnl_file_size_bytes() self.jrnl_anal = janal.JrnlAnalyzer(self._jrnl_info) self.jrnl_rdr = janal.JrnlReader(self._jrnl_info, self.jrnl_anal, self.opts.qflag, self.opts.rflag, self.opts.vflag) def run(self): """Run the store check""" if not self.opts.qflag: print self._jrnl_info print self.jrnl_anal self.jrnl_rdr.run() self._report() def _report(self): """Print the results of the store check""" if not self.opts.qflag: print print " === REPORT ====" print print "Records: %8d non-transactional" % \ (self.jrnl_rdr.get_msg_cnt() - self.jrnl_rdr.get_txn_msg_cnt()) print " %8d transactional" % self.jrnl_rdr.get_txn_msg_cnt() print " %8d total" % self.jrnl_rdr.get_msg_cnt() print print "Transactions: %8d aborts" % self.jrnl_rdr.get_abort_cnt() print " %8d commits" % self.jrnl_rdr.get_commit_cnt() print " %8d total" % (self.jrnl_rdr.get_abort_cnt() + self.jrnl_rdr.get_commit_cnt()) print if self.jrnl_rdr.emap().size() > 0: print "Remaining enqueued records (sorted by rid): " rid_list = self.jrnl_rdr.emap().rids() rid_list.sort() for rid in rid_list: l = self.jrnl_rdr.emap().get(rid) locked = "" if l[2]: locked += " (locked)" print " fid=%d %s%s" % (l[0], l[1], locked) print "WARNING: Enqueue-Dequeue mismatch, %d enqueued records remain." % self.jrnl_rdr.emap().size() else: print "No remaining enqueued records found (emap empty)." print if self.jrnl_rdr.tmap().size() > 0: txn_rec_cnt = 0 print "Incomplete transactions: " for xid in self.jrnl_rdr.tmap().xids(): jrnl.Utils.format_xid(xid) recs = self.jrnl_rdr.tmap().get(xid) for l in recs: print " fid=%d %s" % (l[0], l[1]) print " Total: %d records for %s" % (len(recs), jrnl.Utils.format_xid(xid)) print txn_rec_cnt += len(recs) print "WARNING: Incomplete transactions found, %d xids remain containing a total of %d records." % \ (self.jrnl_rdr.tmap().size(), txn_rec_cnt) else: print "No incomplete transactions found (tmap empty)." print print "%d enqueues, %d journal records processed." % \ (self.jrnl_rdr.get_msg_cnt(), self.jrnl_rdr.get_rec_cnt()) def _process_args(self): """Process the command-line arguments""" opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0") opt.add_option("-b", "--base-filename", action="store", dest="bfn", default="JournalData", help="Base filename for old journal files") opt.add_option("-q", "--quiet", action="store_true", dest="qflag", help="Quiet (suppress all non-error output)") opt.add_option("-r", "--records", action="store_true", dest="rflag", help="Print all records and transactions (including consumed/closed)") opt.add_option("-v", "--verbose", action="store_true", dest="vflag", help="Verbose output") (self.opts, args) = opt.parse_args() if len(args) == 0: opt.error("No journal directory argument") elif len(args) > 1: opt.error("Too many positional arguments: %s" % args) if self.opts.qflag and self.opts.rflag: opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive") if self.opts.qflag and self.opts.vflag: opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive") self._jdir = args[0] if not os.path.exists(self._jdir): opt.error("Journal path \"%s\" does not exist" % self._jdir) #== class CsvStoreChk ========================================================= class CsvStoreChk(StoreChk): """ This class, in addition to analyzing a journal, can compare the journal footprint (ie enqueued/dequeued/transaction record counts) to expected values from a CSV file. This can be used for additional automated testing, and is currently in use in the long store tests for journal encode testing. """ # CSV file cols TEST_NUM_COL = 0 NUM_MSGS_COL = 5 MIN_MSG_SIZE_COL = 7 MAX_MSG_SIZE_COL = 8 MIN_XID_SIZE_COL = 9 MAX_XID_SIZE_COL = 10 AUTO_DEQ_COL = 11 TRANSIENT_COL = 12 EXTERN_COL = 13 COMMENT_COL = 20 def __init__(self): """Constructor""" StoreChk.__init__(self) # csv params self.num_msgs = None self.msg_len = None self.auto_deq = None self.xid_len = None self.transient = None self.extern = None self._warning = [] self.jrnl_rdr.set_callbacks(self, CsvStoreChk._csv_pre_run_chk, CsvStoreChk._csv_enq_chk, CsvStoreChk._csv_deq_chk, CsvStoreChk._csv_txn_chk, CsvStoreChk._csv_post_run_chk) self._get_csv_test() def _get_csv_test(self): """Get a test from the CSV reader""" if self.opts.csvfn != None and self.opts.tnum != None: tparams = self._read_csv_file(self.opts.csvfn, self.opts.tnum) if tparams == None: print "ERROR: Test %d not found in CSV file \"%s\"" % (self.opts.tnum, self.opts.csvfn) sys.exit(1) self.num_msgs = tparams["num_msgs"] if tparams["min_size"] == tparams["max_size"]: self.msg_len = tparams["max_size"] else: self.msg_len = 0 self.auto_deq = tparams["auto_deq"] if tparams["xid_min_size"] == tparams["xid_max_size"]: self.xid_len = tparams["xid_max_size"] else: self.xid_len = 0 self.transient = tparams["transient"] self.extern = tparams["extern"] def _read_csv_file(self, filename, tnum): """Read the CSV test parameter file""" try: csvf = open(filename, "r") except IOError: print "ERROR: Unable to open CSV file \"%s\"" % filename sys.exit(1) for line in csvf: str_list = line.strip().split(",") if len(str_list[0]) > 0 and str_list[0][0] != "\"": try: if (int(str_list[self.TEST_NUM_COL]) == tnum): return { "num_msgs": int(str_list[self.NUM_MSGS_COL]), "min_size": int(str_list[self.MIN_MSG_SIZE_COL]), "max_size": int(str_list[self.MAX_MSG_SIZE_COL]), "auto_deq": not (str_list[self.AUTO_DEQ_COL] == "FALSE" or str_list[self.AUTO_DEQ_COL] == "0"), "xid_min_size": int(str_list[self.MIN_XID_SIZE_COL]), "xid_max_size": int(str_list[self.MAX_XID_SIZE_COL]), "transient": not (str_list[self.TRANSIENT_COL] == "FALSE" or str_list[self.TRANSIENT_COL] == "0"), "extern": not (str_list[self.EXTERN_COL] == "FALSE" or str_list[self.EXTERN_COL] == "0"), "comment": str_list[self.COMMENT_COL] } except Exception: pass return None def _process_args(self): """Process command-line arguments""" opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0") opt.add_option("-b", "--base-filename", action="store", dest="bfn", default="JournalData", help="Base filename for old journal files") opt.add_option("-c", "--csv-filename", action="store", dest="csvfn", help="CSV filename containing test parameters") opt.add_option("-q", "--quiet", action="store_true", dest="qflag", help="Quiet (suppress all non-error output)") opt.add_option("-r", "--records", action="store_true", dest="rflag", help="Print all records and transactions (including consumed/closed)") opt.add_option("-t", "--test-num", action="store", type="int", dest="tnum", help="Test number from CSV file - only valid if CSV file named") opt.add_option("-v", "--verbose", action="store_true", dest="vflag", help="Verbose output") (self.opts, args) = opt.parse_args() if len(args) == 0: opt.error("No journal directory argument") elif len(args) > 1: opt.error("Too many positional arguments: %s" % args) if self.opts.qflag and self.opts.rflag: opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive") if self.opts.qflag and self.opts.vflag: opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive") self._jdir = args[0] if not os.path.exists(self._jdir): opt.error("Journal path \"%s\" does not exist" % self._jdir) # Callbacks for checking against CSV test parameters. Return False if ok, True to raise error. #@staticmethod def _csv_pre_run_chk(csv_store_chk): """Check performed before a test runs""" if csv_store_chk.num_msgs == None: return if csv_store_chk.jrnl_anal.is_empty() and csv_store_chk.num_msgs > 0: raise jerr.AllJrnlFilesEmptyCsvError(csv_store_chk.get_opts().tnum, csv_store_chk.num_msgs) return False _csv_pre_run_chk = staticmethod(_csv_pre_run_chk) #@staticmethod def _csv_enq_chk(csv_store_chk, hdr): """Check performed before each enqueue operation""" #if csv_store_chk.num_msgs == None: return # if csv_store_chk.extern != None: if csv_store_chk.extern != hdr.extern: raise jerr.ExternFlagCsvError(csv_store_chk.opts.tnum, csv_store_chk.extern) if hdr.extern and hdr.data != None: raise jerr.ExternFlagWithDataCsvError(csv_store_chk.opts.tnum) if csv_store_chk.msg_len != None and csv_store_chk.msg_len > 0 and hdr.data != None and \ len(hdr.data) != csv_store_chk.msg_len: raise jerr.MessageLengthCsvError(csv_store_chk.opts.tnum, csv_store_chk.msg_len, len(hdr.data)) if csv_store_chk.xid_len != None and csv_store_chk.xid_len > 0 and len(hdr.xid) != csv_store_chk.xid_len: raise jerr.XidLengthCsvError(csv_store_chk.opts.tnum, csv_store_chk.xid_len, len(hdr.xid)) if csv_store_chk.transient != None and hdr.transient != csv_store_chk.transient: raise jerr.TransactionCsvError(csv_store_chk.opts.tnum, csv_store_chk.transient) return False _csv_enq_chk = staticmethod(_csv_enq_chk) #@staticmethod def _csv_deq_chk(csv_store_chk, hdr): """Check performed before each dequeue operation""" if csv_store_chk.auto_deq != None and not csv_store_chk.auto_deq: raise jerr.JWarning("[CSV %d] WARNING: Dequeue record rid=%d found in non-dequeue test - ignoring." % (csv_store_chk.opts.tnum, hdr.rid)) #self._warning.append("[CSV %d] WARNING: Dequeue record rid=%d found in non-dequeue test - ignoring." % # (csv_store_chk.opts.tnum, hdr.rid)) return False _csv_deq_chk = staticmethod(_csv_deq_chk) #@staticmethod def _csv_txn_chk(csv_store_chk, hdr): """Check performed before each transaction commit/abort""" return False _csv_txn_chk = staticmethod(_csv_txn_chk) #@staticmethod def _csv_post_run_chk(csv_store_chk): """Cehck performed after the completion of the test""" # Exclude this check if lastFileFlag is set - the count may be less than the number of msgs sent because # of journal overwriting if csv_store_chk.num_msgs != None and not csv_store_chk.jrnl_rdr.is_last_file() and \ csv_store_chk.num_msgs != csv_store_chk.jrnl_rdr.get_msg_cnt(): raise jerr.NumMsgsCsvError(csv_store_chk.opts.tnum, csv_store_chk.num_msgs, csv_store_chk.jrnl_rdr.get_msg_cnt()) return False _csv_post_run_chk = staticmethod(_csv_post_run_chk) #============================================================================== # main program #============================================================================== if __name__ == "__main__": M = CsvStoreChk() try: M.run() except Exception, e: sys.exit(e)