class Delayed::Command

Constants

DIR_PWD

Attributes

worker_count[RW]
worker_pools[RW]

Public Class Methods

new(args) click to toggle source
# File lib/delayed/command.rb, line 18
def initialize(args) # rubocop:disable MethodLength
  @options = {
    :quiet => true,
    :pid_dir => "#{root}/tmp/pids",
    :log_dir => "#{root}/log"
  }

  @worker_count = 1
  @monitor = false

  opts = OptionParser.new do |opt|
    opt.banner = "Usage: #{File.basename($PROGRAM_NAME)} [options] start|stop|restart|run"

    opt.on('-h', '--help', 'Show this message') do
      puts opt
      exit 1
    end
    opt.on('-e', '--environment=NAME', 'Specifies the environment to run this delayed jobs under (test/development/production).') do |_e|
      STDERR.puts 'The -e/--environment option has been deprecated and has no effect. Use RAILS_ENV and see http://github.com/collectiveidea/delayed_job/issues/7'
    end
    opt.on('--min-priority N', 'Minimum priority of jobs to run.') do |n|
      @options[:min_priority] = n
    end
    opt.on('--max-priority N', 'Maximum priority of jobs to run.') do |n|
      @options[:max_priority] = n
    end
    opt.on('-n', '--number_of_workers=workers', 'Number of unique workers to spawn') do |worker_count|
      @worker_count = worker_count.to_i rescue 1
    end
    opt.on('--pid-dir=DIR', 'Specifies an alternate directory in which to store the process ids.') do |dir|
      @options[:pid_dir] = dir
    end
    opt.on('--log-dir=DIR', 'Specifies an alternate directory in which to store the delayed_job log.') do |dir|
      @options[:log_dir] = dir
    end
    opt.on('-i', '--identifier=n', 'A numeric identifier for the worker.') do |n|
      @options[:identifier] = n
    end
    opt.on('-m', '--monitor', 'Start monitor process.') do
      @monitor = true
    end
    opt.on('--sleep-delay N', 'Amount of time to sleep when no jobs are found') do |n|
      @options[:sleep_delay] = n.to_i
    end
    opt.on('--read-ahead N', 'Number of jobs from the queue to consider') do |n|
      @options[:read_ahead] = n
    end
    opt.on('-p', '--prefix NAME', 'String to be prefixed to worker process names') do |prefix|
      @options[:prefix] = prefix
    end
    opt.on('--queues=queues', 'Specify which queue DJ must look up for jobs') do |queues|
      @options[:queues] = queues.split(',')
    end
    opt.on('--queue=queue', 'Specify which queue DJ must look up for jobs') do |queue|
      @options[:queues] = queue.split(',')
    end
    opt.on('--pool=queue1[,queue2][:worker_count]', 'Specify queues and number of workers for a worker pool') do |pool|
      parse_worker_pool(pool)
    end
    opt.on('--exit-on-complete', 'Exit when no more jobs are available to run. This will exit if all jobs are scheduled to run in the future.') do
      @options[:exit_on_complete] = true
    end
    opt.on('--daemon-options a, b, c', Array, 'options to be passed through to daemons gem') do |daemon_options|
      @daemon_options = daemon_options
    end
  end
  @args = opts.parse!(args) + (@daemon_options || [])
end

Public Instance Methods

daemonize() click to toggle source
# File lib/delayed/command.rb, line 87
def daemonize # rubocop:disable PerceivedComplexity
  dir = @options[:pid_dir]
  FileUtils.mkdir_p(dir) unless File.exist?(dir)

  if worker_pools
    setup_pools
  elsif @options[:identifier]
    # rubocop:disable GuardClause
    if worker_count > 1
      raise ArgumentError, 'Cannot specify both --number-of-workers and --identifier'
    else
      run_process("delayed_job.#{@options[:identifier]}", @options)
    end
    # rubocop:enable GuardClause
  else
    worker_count.times do |worker_index|
      process_name = worker_count == 1 ? 'delayed_job' : "delayed_job.#{worker_index}"
      run_process(process_name, @options)
    end
  end
end
run(worker_name = nil, options = {}) click to toggle source
# File lib/delayed/command.rb, line 129
def run(worker_name = nil, options = {})
  Dir.chdir(root)

  Delayed::Worker.after_fork
  Delayed::Worker.logger ||= Logger.new(File.join(@options[:log_dir], 'delayed_job.log'))

  worker = Delayed::Worker.new(options)
  worker.name_prefix = "#{worker_name} "
  worker.start
rescue => e
  STDERR.puts e.message
  STDERR.puts e.backtrace
  ::Rails.logger.fatal(e) if rails_logger_defined?
  exit_with_error_status
end
run_process(process_name, options = {}) click to toggle source
# File lib/delayed/command.rb, line 121
def run_process(process_name, options = {})
  Delayed::Worker.before_fork
  Daemons.run_proc(process_name, :dir => options[:pid_dir], :dir_mode => :normal, :monitor => @monitor, :ARGV => @args) do |*_args|
    $0 = File.join(options[:prefix], process_name) if @options[:prefix]
    run process_name, options
  end
end
setup_pools() click to toggle source
# File lib/delayed/command.rb, line 109
def setup_pools
  worker_index = 0
  @worker_pools.each do |queues, worker_count|
    options = @options.merge(:queues => queues)
    worker_count.times do
      process_name = "delayed_job.#{worker_index}"
      run_process(process_name, options)
      worker_index += 1
    end
  end
end

Private Instance Methods

exit_with_error_status() click to toggle source
# File lib/delayed/command.rb, line 168
def exit_with_error_status
  exit 1
end
parse_worker_pool(pool) click to toggle source
# File lib/delayed/command.rb, line 147
def parse_worker_pool(pool)
  @worker_pools ||= []

  queues, worker_count = pool.split(':')
  queues = ['*', '', nil].include?(queues) ? [] : queues.split(',')
  worker_count = (worker_count || 1).to_i rescue 1
  @worker_pools << [queues, worker_count]
end
rails_logger_defined?() click to toggle source
# File lib/delayed/command.rb, line 164
def rails_logger_defined?
  defined?(::Rails.logger)
end
rails_root_defined?() click to toggle source
# File lib/delayed/command.rb, line 160
def rails_root_defined?
  defined?(::Rails.root)
end
root() click to toggle source
# File lib/delayed/command.rb, line 156
def root
  @root ||= rails_root_defined? ? ::Rails.root : DIR_PWD
end