class Delayed::Worker
Constants
- DEFAULT_DEFAULT_PRIORITY
- DEFAULT_DELAY_JOBS
- DEFAULT_LOG_LEVEL
- DEFAULT_MAX_ATTEMPTS
- DEFAULT_MAX_RUN_TIME
- DEFAULT_QUEUES
- DEFAULT_READ_AHEAD
- DEFAULT_SLEEP_DELAY
Attributes
name_prefix[RW]
#name_prefix is ignored if name is set directly
Public Class Methods
after_fork()
click to toggle source
# File lib/delayed/worker.rb, line 92 def self.after_fork # Re-open file handles @files_to_reopen.each do |file| begin file.reopen file.path, "a+" file.sync = true rescue ::Exception end end backend.after_fork end
backend=(backend)
click to toggle source
# File lib/delayed/worker.rb, line 67 def self.backend=(backend) if backend.is_a? Symbol require "delayed/serialization/#{backend}" require "delayed/backend/#{backend}" backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize end @@backend = backend silence_warnings { ::Delayed.const_set(:Job, backend) } end
before_fork()
click to toggle source
# File lib/delayed/worker.rb, line 81 def self.before_fork unless @files_to_reopen @files_to_reopen = [] ObjectSpace.each_object(File) do |file| @files_to_reopen << file unless file.closed? end end backend.before_fork end
guess_backend()
click to toggle source
# File lib/delayed/worker.rb, line 77 def self.guess_backend warn "[DEPRECATION] guess_backend is deprecated. Please remove it from your code." end
lifecycle()
click to toggle source
# File lib/delayed/worker.rb, line 105 def self.lifecycle @lifecycle ||= Delayed::Lifecycle.new end
new(options={})
click to toggle source
# File lib/delayed/worker.rb, line 109 def initialize(options={}) @quiet = options.has_key?(:quiet) ? options[:quiet] : true @failed_reserve_count = 0 [:min_priority, :max_priority, :sleep_delay, :read_ahead, :queues, :exit_on_complete].each do |option| self.class.send("#{option}=", options[option]) if options.has_key?(option) end self.plugins.each { |klass| klass.new } end
reset()
click to toggle source
# File lib/delayed/worker.rb, line 33 def self.reset self.sleep_delay = DEFAULT_SLEEP_DELAY self.max_attempts = DEFAULT_MAX_ATTEMPTS self.max_run_time = DEFAULT_MAX_RUN_TIME self.default_priority = DEFAULT_DEFAULT_PRIORITY self.delay_jobs = DEFAULT_DELAY_JOBS self.queues = DEFAULT_QUEUES self.read_ahead = DEFAULT_READ_AHEAD end
Public Instance Methods
failed(job)
click to toggle source
# File lib/delayed/worker.rb, line 234 def failed(job) self.class.lifecycle.run_callbacks(:failure, self, job) do job.hook(:failure) self.class.destroy_failed_jobs ? job.destroy : job.fail! end end
job_say(job, text, level = DEFAULT_LOG_LEVEL)
click to toggle source
# File lib/delayed/worker.rb, line 241 def job_say(job, text, level = DEFAULT_LOG_LEVEL) text = "Job #{job.name} (id=#{job.id}) #{text}" say text, level end
max_attempts(job)
click to toggle source
# File lib/delayed/worker.rb, line 258 def max_attempts(job) job.max_attempts || self.class.max_attempts end
name()
click to toggle source
Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker restarts: Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
# File lib/delayed/worker.rb, line 124 def name return @name unless @name.nil? "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}" end
name=(val)
click to toggle source
Sets the name of the worker. Setting the name to nil will reset the default worker name
# File lib/delayed/worker.rb, line 131 def name=(val) @name = val end
reschedule(job, time = nil)
click to toggle source
Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.
# File lib/delayed/worker.rb, line 222 def reschedule(job, time = nil) if (job.attempts += 1) < max_attempts(job) time ||= job.reschedule_at job.run_at = time job.unlock job.save! else job_say job, "REMOVED permanently because of #{job.attempts} consecutive failures", 'error' failed(job) end end
run(job)
click to toggle source
# File lib/delayed/worker.rb, line 204 def run(job) job_say job, 'RUNNING' runtime = Benchmark.realtime do Timeout.timeout(self.class.max_run_time.to_i, WorkerTimeout) { job.invoke_job } job.destroy end job_say job, 'COMPLETED after %.4f' % runtime return true # did work rescue DeserializationError => error job.last_error = "#{error.message}\n#{error.backtrace.join("\n")}" failed(job) rescue Exception => error self.class.lifecycle.run_callbacks(:error, self, job){ handle_failed_job(job, error) } return false # work failed end
say(text, level = DEFAULT_LOG_LEVEL)
click to toggle source
# File lib/delayed/worker.rb, line 246 def say(text, level = DEFAULT_LOG_LEVEL) text = "[Worker(#{name})] #{text}" puts text unless @quiet if logger # TODO: Deprecate use of Fixnum log levels if !level.is_a?(String) level = Logger::Severity.constants.detect {|i| Logger::Severity.const_get(i) == level }.to_s.downcase end logger.send(level, "#{Time.now.strftime('%FT%T%z')}: #{text}") end end
start()
click to toggle source
# File lib/delayed/worker.rb, line 135 def start trap('TERM') do say 'Exiting...' stop raise SignalException.new('TERM') if self.class.raise_signal_exceptions end trap('INT') do say 'Exiting...' stop raise SignalException.new('INT') if self.class.raise_signal_exceptions && self.class.raise_signal_exceptions != :term end say "Starting job worker" self.class.lifecycle.run_callbacks(:execute, self) do loop do self.class.lifecycle.run_callbacks(:loop, self) do @realtime = Benchmark.realtime do @result = work_off end end count = @result.sum if count.zero? if self.class.exit_on_complete say "No more jobs available. Exiting" break else sleep(self.class.sleep_delay) unless stop? end else say "#{count} jobs processed at %.4f j/s, %d failed" % [count / @realtime, @result.last] end break if stop? end end end
stop()
click to toggle source
# File lib/delayed/worker.rb, line 176 def stop @exit = true end
stop?()
click to toggle source
# File lib/delayed/worker.rb, line 180 def stop? !!@exit end
work_off(num = 100)
click to toggle source
Do num jobs and return stats on success/failure. Exit early if interrupted.
# File lib/delayed/worker.rb, line 186 def work_off(num = 100) success, failure = 0, 0 num.times do case reserve_and_run_one_job when true success += 1 when false failure += 1 else break # leave if no work could be done end break if stop? # leave if we're exiting end return [success, failure] end
Protected Instance Methods
handle_failed_job(job, error)
click to toggle source
# File lib/delayed/worker.rb, line 264 def handle_failed_job(job, error) job.last_error = "#{error.message}\n#{error.backtrace.join("\n")}" job_say job, "FAILED (#{job.attempts} prior attempts) with #{error.class.name}: #{error.message}", 'error' reschedule(job) end
reserve_and_run_one_job()
click to toggle source
Run the next job we can get an exclusive lock on. If no jobs are left we return nil
# File lib/delayed/worker.rb, line 272 def reserve_and_run_one_job job = reserve_job self.class.lifecycle.run_callbacks(:perform, self, job){ run(job) } if job end
reserve_job()
click to toggle source
# File lib/delayed/worker.rb, line 277 def reserve_job job = Delayed::Job.reserve(self) @failed_reserve_count = 0 job rescue Exception => error say "Error while reserving job: #{error}" Delayed::Job.recover_from(error) @failed_reserve_count += 1 raise FatalBackendError if @failed_reserve_count >= 10 nil end