#! /usr/bin/ruby1.8 require "pathname" ENV["GEM_HOME"] = nil if ENV["GEM_HOME"] and !Pathname.new(ENV["GEM_HOME"]).exist? ENV["GEM_PATH"] = nil if ENV["GEM_PATH"] and !Pathname.new(ENV["GEM_PATH"]).exist? require "rubygems" require "libisi" init_libisi daemon = false type = :fifo waiting_time = nil force = false force_user = nil count = nil migrate_redo = false action, params = optparse({ :text => "TODO: add text here", :actions => { "migrate DB" => "Migrate databse (use evironment variable VERSION to specify version)", "dump DB" => "Dump database schema", "load DB" => "Load database schema", "check DB" => "Check database version", "start SOURCECLASS" => "start", "stdin" => "Read logs from stdin (for syslog)", }}) {|o| o.on( "-d", "--daemon", "Run as daemon." ) do |daemon| end o.on( "-f", "--force", "Force prisma to run even if do_not_run_prisma is defined." ) do |force| end o.on( "-u", "--user USERNAME", "Force user root to access database (useful for migrations)." ) do |force_user| end o.on( "-c", "--count COUNT", "Amount of recrods to process per step.") do |c| count = c end o.on( "-w", "--waiting WAITINGTIME", "The time in seconds to wait if the queue table is empty.") do |w| waiting_time = w.to_i end o.on( "-r", "--redo COUNT", "Redo the latest migration, only valid for migrate task.") do |migrate_redo| end o.on( "--all", "Transform all messages in the queue and exit afterwards." ) do |a| type = :all end } cf = Pathname.new(__FILE__).dirname + "../conf/prisma/environment.rb" if cf.exist? require cf else require "/etc/prisma/environment.rb" end options = { :force_user => force_user } case action when "migrate" # migration options[:redo] = migrate_redo.to_i if migrate_redo Prisma::Database.migrate(params[:db], ENV["VERSION"] ? ENV["VERSION"].to_i : nil, options) when "dump" Prisma::Database.schema_dump(params[:db], options) when "load" Prisma::Database.schema_load(params[:db], options) when "check" Prisma::Database.load_all(options) Prisma::Database.check(params[:db], options) when "stdin" Prisma::Database.load_all(options) Prisma::Database.check("prisma" , options) raise "Stdin processing can not be run as daemon" if daemon source = FileMeta.new source.basename = "STDIN" source.save STDIN.each {|line| meta = PureMeta.new.prisma_initialize(source, {:message=>line}) meta.transform } when "start" if force; $do_not_run_prisma = false; end if $do_not_run_prisma $log.warn("$do_not_run_prisma=true, not starting prisma") exit 0 end Prisma::Database.load_all(options) if daemon Prisma::Util.define_new_logger("prisma") daemonize(:pid_file => "/var/run/prisma") end Signal.trap("TERM") do; $terminate = true; end Signal.trap("USR1") do; $terminate = true; $restart = true; end klass = eval(params[:sourceclass]) Prisma::Util.define_new_logger(params[:sourceclass]) source = nil $restart = true while $restart Prisma::Database.reconnect if daemon $terminate = false $restart = type != :all begin Prisma::Database.check("prisma" , options) Prisma::Database.check("pumpy" , options) $log.info{"Process #{type} records of class #{klass.name}, #{count} per step"} if source source.finished = true source.save end source = SourceDbMeta.new.prisma_initialize(type, klass, count,nil, false, waiting_time) $enable_dublette_recognition = source.may_contain_dublettes source.transform rescue ActiveRecord::Transactions::TransactionError raise $! rescue $log.error{ "Processing class #{klass.name} threw error #{$!}!" } for line in $!.backtrace $log.error{"#{line}"} end if Prisma::Database.check_connections $log.fatal{"Connections are good, so something bad happened. Will not risk to restart queue #{klass.name}."} $terminate = true $restart = false else $log.info{"At least one prisma connection is down."} connection_wait_count = 1 while not (Prisma::Database.check_connections or $terminate) wait_time = connection_wait_count wait_time = 30 if wait_time > 30 $log.warn{"#{connection_wait_count} Waiting #{wait_time} seconds."} Prisma::Util.save_sleep(wait_time) connection_wait_count += 1 end if !$terminate $log.info{"Connection are good again. Restarting queue #{klass.name}."} end end end end exit 0 else raise "Unexpected action #{action}" end