#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from qpidstore import jerr, jrnl, janal import glob, optparse, os, sys, time #== class Resize ============================================================== class Resize(object): """ Creates a new store journal and copies records from old journal to new. The new journal may be of different size from the old one. The records are packed into the new journal (ie only remaining enqueued records and associated transactions - if any - are copied over without spaces between them). The default action is to push the old journal down into a 'bak' sub-directory and then create a new journal of the same size and pack it with the records from the old. However, it is possible to suppress the pushdown (using --no-pushdown), in which case either a new journal id (using --new-base-filename) or an old journal id (usnig --old-base-filename) must be supplied. In the former case,a new journal will be created using the new base file name alongside the old one. In the latter case, the old journal will be renamed to the supplied name, and the new one will take the default. Note that both can be specified together with the --no-pushdown option. To resize the journal, use the optional --num-jfiles and/or --jfile-size parameters. These should be large enough to write all the records or an error will result. If the size is large enough to write all records, but too small to keep below the enqueue threshold, a warning will be printed. Note that as any valid size will be accepted, a journal can also be shrunk, as long as it is sufficiently big to accept the transferred records. """ BAK_DIR = "bak" JFILE_SIZE_PGS_MIN = 1 JFILE_SIZE_PGS_MAX = 32768 NUM_JFILES_MIN = 4 NUM_JFILES_MAX = 64 def __init__(self): """Constructor""" self._opts = None self._jdir = None self._fname = None self._fnum = None self._file = None self._file_rec_wr_cnt = None self._filler_wr_cnt = None self._last_rec_fid = None self._last_rec_offs = None self._rec_wr_cnt = None self._jrnl_info = None self._jrnl_analysis = None self._jrnl_reader = None self._process_args() self._jrnl_info = jrnl.JrnlInfo(self._jdir, self._opts.bfn) # FIXME: This is a hack... find an elegant way of getting the file size to jrec! jrnl.JRNL_FILE_SIZE = self._jrnl_info.get_jrnl_file_size_bytes() self._jrnl_analysis = janal.JrnlAnalyzer(self._jrnl_info) self._jrnl_reader = janal.JrnlReader(self._jrnl_info, self._jrnl_analysis, self._opts.qflag, self._opts.rflag, self._opts.vflag) def run(self): """Perform the action of resizing the journal""" if not self._opts.qflag: print self._jrnl_analysis self._jrnl_reader.run() if self._opts.vflag: print self._jrnl_info if not self._opts.qflag: print self._jrnl_reader.report(self._opts.vflag, self._opts.rflag) self._handle_old_files() self._create_new_files() if not self._opts.qflag: print "Transferred %d records to new journal." % self._rec_wr_cnt self._chk_free() def _chk_free(self): """Check if sufficient space is available in resized journal to be able to enqueue. Raise a warning if not.""" if self._last_rec_fid == None or self._last_rec_offs == None: return wr_capacity_bytes = self._last_rec_fid * self._jrnl_info.get_jrnl_data_size_bytes() + self._last_rec_offs tot_capacity_bytes = self._jrnl_info.get_tot_jrnl_data_size_bytes() percent_full = 100.0 * wr_capacity_bytes / tot_capacity_bytes if percent_full > 80.0: raise jerr.JWarning("WARNING: Journal %s is %2.1f%% full and will likely not allow enqueuing of new records" " until some existing records are dequeued." % (self._jrnl_info.get_jrnl_id(), percent_full)) def _create_new_files(self): """Create new journal files""" # Assemble records to be transfered master_record_list = {} txn_record_list = self._jrnl_reader.txn_obj_list() if self._opts.vflag and self._jrnl_reader.emap().size() > 0: print "* Assembling %d records from emap" % self._jrnl_reader.emap().size() for tup in self._jrnl_reader.emap().get_rec_list(): hdr = tup[1] hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi master_record_list[long(hdr.rid)] = hdr if hdr.xidsize > 0 and hdr.xid in txn_record_list: txn_hdr = txn_record_list[hdr.xid] del(txn_record_list[hdr.xid]) txn_hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi master_record_list[long(txn_hdr.rid)] = txn_hdr if self._opts.vflag and self._jrnl_reader.tmap().size() > 0: print "* Assembling %d records from tmap" % self._jrnl_reader.tmap().size() for xid in self._jrnl_reader.tmap().xids(): for l in self._jrnl_reader.tmap().get(xid): hdr = l[1] hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi master_record_list[hdr.rid] = hdr rid_list = master_record_list.keys() rid_list.sort() # get base filename bfn = self._opts.bfn if self._opts.nbfn != None: bfn = self._opts.nbfn # write jinf file self._jrnl_info.resize(self._opts.njf, self._opts.jfs) self._jrnl_info.write(self._jdir, bfn) # write records if self._opts.vflag: print "* Transferring records to new journal files" fro = self._jrnl_info.get_jrnl_sblk_size_bytes() while len(rid_list) > 0: hdr = master_record_list[rid_list.pop(0)] rec = hdr.encode() pos = 0 while pos < len(rec): if self._file == None or self._file.tell() >= self._jrnl_info.get_jrnl_file_size_bytes(): if self._file == None: rid = hdr.rid elif len(rid_list) == 0: rid = 0 else: rid = rid_list[0] if not self._rotate_file(rid, fro): raise jerr.JournalSpaceExceededError() if len(rec) - pos <= self._jrnl_info.get_jrnl_file_size_bytes() - self._file.tell(): self._file.write(rec[pos:]) self._fill_file(jrnl.Utils.size_in_bytes_to_blk(self._file.tell(), self._jrnl_info.get_jrnl_dblk_size_bytes())) pos = len(rec) fro = self._jrnl_info.get_jrnl_sblk_size_bytes() else: flen = self._jrnl_info.get_jrnl_file_size_bytes() - self._file.tell() self._file.write(rec[pos:pos + flen]) pos += flen rem = len(rec) - pos if rem <= self._jrnl_info.get_jrnl_data_size_bytes(): fro = (jrnl.Utils.size_in_bytes_to_blk(self._jrnl_info.get_jrnl_sblk_size_bytes() + rem, self._jrnl_info.get_jrnl_dblk_size_bytes())) else: fro = 0 self._rec_wr_cnt += 1 self._file_rec_wr_cnt += 1 self._fill_file(add_filler_recs = True) while self._rotate_file(): pass def _fill_file(self, to_posn = None, add_filler_recs = False): """Fill a file to a known offset""" if self._file == None: return if add_filler_recs: nfr = int(jrnl.Utils.rem_bytes_in_blk(self._file, self._jrnl_info.get_jrnl_sblk_size_bytes()) / self._jrnl_info.get_jrnl_dblk_size_bytes()) if nfr > 0: self._filler_wr_cnt = nfr for i in range(0, nfr): self._file.write("RHMx") self._fill_file(jrnl.Utils.size_in_bytes_to_blk(self._file.tell(), self._jrnl_info.get_jrnl_dblk_size_bytes())) self._last_rec_fid = self._fnum self._last_rec_offs = self._file.tell() if to_posn == None: to_posn = self._jrnl_info.get_jrnl_file_size_bytes() elif to_posn > self._jrnl_info.get_jrnl_file_size_bytes(): raise jerr.FillExceedsFileSizeError(to_posn, self._jrnl_info.get_jrnl_file_size_bytes()) diff = to_posn - self._file.tell() self._file.write(str("\0" * diff)) #DEBUG if self._file.tell() != to_posn: raise jerr.FillSizeError(self._file.tell(), to_posn) def _rotate_file(self, rid = None, fro = None): """Switch to the next logical file""" if self._file != None: self._file.close() if self._opts.vflag: if self._file_rec_wr_cnt == 0: print " (empty)" elif self._filler_wr_cnt == None: print " (%d records)" % self._file_rec_wr_cnt else: print " (%d records + %d filler(s))" % (self._file_rec_wr_cnt, self._filler_wr_cnt) if self._fnum == None: self._fnum = 0 self._rec_wr_cnt = 0 elif self._fnum == self._jrnl_info.get_num_jrnl_files() - 1: return False else: self._fnum += 1 self._file_rec_wr_cnt = 0 self._fname = os.path.join(self._jrnl_info.get_jrnl_dir(), "%s.%04x.jdat" % (self._jrnl_info.get_jrnl_base_name(), self._fnum)) if self._opts.vflag: print "* Opening file %s" % self._fname, self._file = open(self._fname, "w") if rid == None or fro == None: self._fill_file() else: now = time.time() fhdr = jrnl.FileHdr(0, "RHMf", jrnl.Hdr.HDR_VER, int(jrnl.Hdr.BIG_ENDIAN), 0, rid) fhdr.init(self._file, 0, self._fnum, self._fnum, fro, int(now), 1000000000*(now - int(now))) self._file.write(fhdr.encode()) self._fill_file(self._jrnl_info.get_jrnl_sblk_size_bytes()) return True def _handle_old_files(self): """Push old journal down into a backup directory""" target_dir = self._jdir if not self._opts.npd: target_dir = os.path.join(self._jdir, self.BAK_DIR) if os.path.exists(target_dir): if self._opts.vflag: print "* Pushdown directory %s exists, deleting content" % target_dir for fname in glob.glob(os.path.join(target_dir, "*")): os.unlink(fname) else: if self._opts.vflag: print "* Creating new pushdown directory %s" % target_dir os.mkdir(target_dir) if not self._opts.npd or self._opts.obfn != None: if self._opts.obfn != None and self._opts.vflag: print "* Renaming old journal files using base name %s" % self._opts.obfn # .jdat files for fname in glob.glob(os.path.join(self._jdir, "%s.*.jdat" % self._opts.bfn)): tbfn = os.path.basename(fname) if self._opts.obfn != None: per1 = tbfn.rfind(".") if per1 >= 0: per2 = tbfn.rfind(".", 0, per1) if per2 >= 0: tbfn = "%s%s" % (self._opts.obfn, tbfn[per2:]) os.rename(fname, os.path.join(target_dir, tbfn)) # .jinf file self._jrnl_info.write(target_dir, self._opts.obfn) os.unlink(os.path.join(self._jdir, "%s.jinf" % self._opts.bfn)) def _print_options(self): """Print program options""" if self._opts.vflag: print "Journal dir: %s" % self._jdir print "Options: Base filename: %s" % self._opts.bfn print " New base filename: %s" % self._opts.nbfn print " Old base filename: %s" % self._opts.obfn print " Pushdown: %s" % self._opts.npd print " No. journal files: %d" % self._opts.njf print " Journal file size: %d 64kiB blocks" % self._opts.jfs print " Show records flag: %s" % self._opts.rflag print " Verbose flag: %s" % True print def _process_args(self): """Process the command-line arguments""" opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0") opt.add_option("-b", "--base-filename", action="store", dest="bfn", default="JournalData", help="Base filename for old journal files") opt.add_option("-B", "--new-base-filename", action="store", dest="nbfn", help="Base filename for new journal files") opt.add_option("-n", "--no-pushdown", action="store_true", dest="npd", help="Suppress pushdown of old files into \"bak\" dir; old files will remain in existing dir") opt.add_option("-N", "--num-jfiles", action="store", type="int", dest="njf", default=8, help="Number of files for new journal (%d-%d)" % (self.NUM_JFILES_MIN, self.NUM_JFILES_MAX)) opt.add_option("-o", "--old-base-filename", action="store", dest="obfn", help="Base filename for old journal files") opt.add_option("-q", "--quiet", action="store_true", dest="qflag", help="Quiet (suppress all non-error output)") opt.add_option("-r", "--records", action="store_true", dest="rflag", help="Print remaining records and transactions") opt.add_option("-s", "--jfile-size-pgs", action="store", type="int", dest="jfs", default=24, help="Size of each new journal file in 64kiB blocks (%d-%d)" % (self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX)) opt.add_option("-v", "--verbose", action="store_true", dest="vflag", help="Verbose output") (self._opts, args) = opt.parse_args() if len(args) == 0: opt.error("No journal directory argument") elif len(args) > 1: opt.error("Too many positional arguments: %s" % args) if self._opts.qflag and self._opts.rflag: opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive") if self._opts.qflag and self._opts.vflag: opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive") if self._opts.njf != None and (self._opts.njf < self.NUM_JFILES_MIN or self._opts.njf > self.NUM_JFILES_MAX): opt.error("Number of files (%d) is out of range (%d-%d)" % (self._opts.njf, self.NUM_JFILES_MIN, self.NUM_JFILES_MAX)) if self._opts.jfs != None and (self._opts.jfs < self.JFILE_SIZE_PGS_MIN or self._opts.jfs > self.JFILE_SIZE_PGS_MAX): opt.error("File size (%d) is out of range (%d-%d)" % (self._opts.jfs, self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX)) if self._opts.npd != None and (self._opts.nbfn == None and self._opts.obfn == None): opt.error("If (-n/--no-pushdown) is used, then at least one of (-B/--new-base-filename) and" " (-o/--old-base-filename) must be used.") self._jdir = args[0] if not os.path.exists(self._jdir): opt.error("Journal path \"%s\" does not exist" % self._jdir) self._print_options() #============================================================================== # main program #============================================================================== if __name__ == "__main__": R = Resize() try: R.run() except Exception, e: sys.exit(e)