module Parallel
Constants
- INTERRUPT_SIGNAL
- Stop
- VERSION
Public Class Methods
each(array, options={}, &block)
click to toggle source
# File lib/parallel.rb, line 151 def each(array, options={}, &block) map(array, options.merge(:preserve_results => false), &block) array end
each_with_index(array, options={}, &block)
click to toggle source
# File lib/parallel.rb, line 156 def each_with_index(array, options={}, &block) each(array, options.merge(:with_index => true), &block) end
in_processes(options = {}, &block)
click to toggle source
# File lib/parallel.rb, line 145 def in_processes(options = {}, &block) count, options = extract_count_from_options(options) count ||= processor_count map(0...count, options.merge(:in_processes => count), &block) end
in_threads(options={:count => 2}) { |i| ... }
click to toggle source
# File lib/parallel.rb, line 128 def in_threads(options={:count => 2}) count, options = extract_count_from_options(options) out = [] threads = [] count.times do |i| threads[i] = Thread.new do out[i] = yield(i) end end kill_on_ctrl_c(threads, options) { wait_for_threads(threads) } out end
map(array, options = {}, &block)
click to toggle source
# File lib/parallel.rb, line 160 def map(array, options = {}, &block) options[:mutex] = Mutex.new if RUBY_PLATFORM =~ /java/ and not options[:in_processes] method = :in_threads size = options[method] || processor_count elsif options[:in_threads] method = :in_threads size = options[method] else method = :in_processes if Process.respond_to?(:fork) size = options[method] || processor_count else $stderr.puts "Warning: Process.fork is not supported by this Ruby" size = 0 end end items = ItemWrapper.new(array, options[:mutex]) size = [items.producer? ? size : items.size, size].min options[:return_results] = (options[:preserve_results] != false || !!options[:finish]) add_progress_bar!(items, options) if size == 0 work_direct(items, options, &block) elsif method == :in_threads work_in_threads(items, options.merge(:count => size), &block) else work_in_processes(items, options.merge(:count => size), &block) end end
map_with_index(array, options={}, &block)
click to toggle source
# File lib/parallel.rb, line 195 def map_with_index(array, options={}, &block) map(array, options.merge(:with_index => true), &block) end
Private Class Methods
add_progress_bar!(items, options)
click to toggle source
# File lib/parallel.rb, line 201 def add_progress_bar!(items, options) if title = options[:progress] raise "Progressbar and producers don't mix" if items.producer? require 'ruby-progressbar' progress = ProgressBar.create( :title => title, :total => items.size, :format => '%t |%E | %B | %a' ) old_finish = options[:finish] options[:finish] = lambda do |item, i, result| old_finish.call(item, i, result) if old_finish progress.increment end end end
call_with_index(item, index, options, &block)
click to toggle source
# File lib/parallel.rb, line 424 def call_with_index(item, index, options, &block) args = [item] args << index if options[:with_index] if options[:return_results] block.call(*args) else block.call(*args) nil # avoid GC overhead of passing large results around end end
create_workers(items, options, &block)
click to toggle source
# File lib/parallel.rb, line 294 def create_workers(items, options, &block) workers = [] Array.new(options[:count]).each do workers << worker(items, options.merge(:started_workers => workers), &block) end workers end
extract_count_from_options(options)
click to toggle source
options is either a Integer or a Hash with :count
# File lib/parallel.rb, line 361 def extract_count_from_options(options) if options.is_a?(Hash) count = options[:count] else count = options options = {} end [count, options] end
handle_exception(exception, results)
click to toggle source
# File lib/parallel.rb, line 354 def handle_exception(exception, results) return nil if [Parallel::Break, Parallel::Kill].include? exception.class raise exception if exception results end
kill_on_ctrl_c(things, options) { || ... }
click to toggle source
kill all these pids or threads if user presses Ctrl+c
# File lib/parallel.rb, line 372 def kill_on_ctrl_c(things, options) @to_be_killed ||= [] old_interrupt = nil signal = options.fetch(:interrupt_signal, INTERRUPT_SIGNAL) if @to_be_killed.empty? old_interrupt = trap_interrupt(signal) do $stderr.puts 'Parallel execution interrupted, exiting ...' @to_be_killed.flatten.compact.each { |thing| kill_that_thing!(thing) } end end @to_be_killed << things yield ensure @to_be_killed.pop # free threads for GC and do not kill pids that could be used for new processes restore_interrupt(old_interrupt, signal) if @to_be_killed.empty? end
kill_that_thing!(thing)
click to toggle source
# File lib/parallel.rb, line 411 def kill_that_thing!(thing) if thing.is_a?(Thread) thing.kill else begin Process.kill(:KILL, thing) rescue Errno::ESRCH # some linux systems already automatically killed the children at this point # so we just ignore them not being there end end end
process_incoming_jobs(read, write, items, options, &block)
click to toggle source
# File lib/parallel.rb, line 329 def process_incoming_jobs(read, write, items, options, &block) while !read.eof? data = Marshal.load(read) item, index = items.unpack(data) result = begin call_with_index(item, index, options, &block) rescue StandardError => e ExceptionWrapper.new(e) end Marshal.dump(result, write) end end
restore_interrupt(old, signal)
click to toggle source
# File lib/parallel.rb, line 407 def restore_interrupt(old, signal) Signal.trap signal, old end
trap_interrupt(signal) { || ... }
click to toggle source
# File lib/parallel.rb, line 392 def trap_interrupt(signal) old = Signal.trap signal, 'IGNORE' Signal.trap signal do yield if old == "DEFAULT" raise Interrupt else old.call end end old end
wait_for_threads(threads)
click to toggle source
# File lib/parallel.rb, line 342 def wait_for_threads(threads) interrupted = threads.compact.map do |t| begin t.join nil rescue Interrupt => e e # thread died, do not stop other threads end end.compact raise interrupted.first if interrupted.first end
with_instrumentation(item, index, options) { || ... }
click to toggle source
# File lib/parallel.rb, line 435 def with_instrumentation(item, index, options) on_start = options[:start] on_finish = options[:finish] options[:mutex].synchronize { on_start.call(item, index) } if on_start result = yield result unless options[:preserve_results] == false ensure options[:mutex].synchronize { on_finish.call(item, index, result) } if on_finish end
work_direct(items, options) { |e,i| ... }
click to toggle source
# File lib/parallel.rb, line 219 def work_direct(items, options) results = [] items.each_with_index do |e,i| results << (options[:with_index] ? yield(e,i) : yield(e)) end results end
work_in_processes(items, options, &blk)
click to toggle source
# File lib/parallel.rb, line 252 def work_in_processes(items, options, &blk) workers = create_workers(items, options, &blk) results = [] exception = nil kill_on_ctrl_c(workers.map(&:pid), options) do in_threads(options) do |i| worker = workers[i] worker.thread = Thread.current begin loop do break if exception item, index = items.next break unless index output = with_instrumentation item, index, options do worker.work(items.pack(item, index)) end if ExceptionWrapper === output exception = output.exception if Parallel::Kill === exception (workers - [worker]).each do |w| kill_that_thing!(w.thread) kill_that_thing!(w.pid) end end else results[index] = output end end ensure worker.close_pipes worker.wait # if it goes zombie, rather wait here to be able to debug end end end handle_exception(exception, results) end
work_in_threads(items, options, &block)
click to toggle source
# File lib/parallel.rb, line 227 def work_in_threads(items, options, &block) results = [] exception = nil in_threads(options) do # as long as there are more items, work on one of them loop do break if exception item, index = items.next break unless index begin results[index] = with_instrumentation item, index, options do call_with_index(item, index, options, &block) end rescue StandardError => e exception = e break end end end handle_exception(exception, results) end
worker(items, options, &block)
click to toggle source
# File lib/parallel.rb, line 302 def worker(items, options, &block) # use less memory on REE GC.copy_on_write_friendly = true if GC.respond_to?(:copy_on_write_friendly=) child_read, parent_write = IO.pipe parent_read, child_write = IO.pipe pid = Process.fork do begin options.delete(:started_workers).each(&:close_pipes) parent_write.close parent_read.close process_incoming_jobs(child_read, child_write, items, options, &block) ensure child_read.close child_write.close end end child_read.close child_write.close Worker.new(parent_read, parent_write, pid) end