module Parallel

Constants

INTERRUPT_SIGNAL
Stop
VERSION

Public Class Methods

each(array, options={}, &block) click to toggle source
# File lib/parallel.rb, line 151
def each(array, options={}, &block)
  map(array, options.merge(:preserve_results => false), &block)
  array
end
each_with_index(array, options={}, &block) click to toggle source
# File lib/parallel.rb, line 156
def each_with_index(array, options={}, &block)
  each(array, options.merge(:with_index => true), &block)
end
in_processes(options = {}, &block) click to toggle source
# File lib/parallel.rb, line 145
def in_processes(options = {}, &block)
  count, options = extract_count_from_options(options)
  count ||= processor_count
  map(0...count, options.merge(:in_processes => count), &block)
end
in_threads(options={:count => 2}) { |i| ... } click to toggle source
# File lib/parallel.rb, line 128
def in_threads(options={:count => 2})
  count, options = extract_count_from_options(options)

  out = []
  threads = []

  count.times do |i|
    threads[i] = Thread.new do
      out[i] = yield(i)
    end
  end

  kill_on_ctrl_c(threads, options) { wait_for_threads(threads) }

  out
end
map(array, options = {}, &block) click to toggle source
# File lib/parallel.rb, line 160
def map(array, options = {}, &block)
  options[:mutex] = Mutex.new

  if RUBY_PLATFORM =~ /java/ and not options[:in_processes]
    method = :in_threads
    size = options[method] || processor_count
  elsif options[:in_threads]
    method = :in_threads
    size = options[method]
  else
    method = :in_processes
    if Process.respond_to?(:fork)
      size = options[method] || processor_count
    else
      $stderr.puts "Warning: Process.fork is not supported by this Ruby"
      size = 0
    end
  end

  items = ItemWrapper.new(array, options[:mutex])

  size = [items.producer? ? size : items.size, size].min

  options[:return_results] = (options[:preserve_results] != false || !!options[:finish])
  add_progress_bar!(items, options)

  if size == 0
    work_direct(items, options, &block)
  elsif method == :in_threads
    work_in_threads(items, options.merge(:count => size), &block)
  else
    work_in_processes(items, options.merge(:count => size), &block)
  end
end
map_with_index(array, options={}, &block) click to toggle source
# File lib/parallel.rb, line 195
def map_with_index(array, options={}, &block)
  map(array, options.merge(:with_index => true), &block)
end

Private Class Methods

add_progress_bar!(items, options) click to toggle source
# File lib/parallel.rb, line 201
def add_progress_bar!(items, options)
  if title = options[:progress]
    raise "Progressbar and producers don't mix" if items.producer?
    require 'ruby-progressbar'
    progress = ProgressBar.create(
      :title => title,
      :total => items.size,
      :format => '%t |%E | %B | %a'
    )
    old_finish = options[:finish]
    options[:finish] = lambda do |item, i, result|
      old_finish.call(item, i, result) if old_finish
      progress.increment
    end
  end
end
call_with_index(item, index, options, &block) click to toggle source
# File lib/parallel.rb, line 424
def call_with_index(item, index, options, &block)
  args = [item]
  args << index if options[:with_index]
  if options[:return_results]
    block.call(*args)
  else
    block.call(*args)
    nil # avoid GC overhead of passing large results around
  end
end
create_workers(items, options, &block) click to toggle source
# File lib/parallel.rb, line 294
def create_workers(items, options, &block)
  workers = []
  Array.new(options[:count]).each do
    workers << worker(items, options.merge(:started_workers => workers), &block)
  end
  workers
end
extract_count_from_options(options) click to toggle source

options is either a Integer or a Hash with :count

# File lib/parallel.rb, line 361
def extract_count_from_options(options)
  if options.is_a?(Hash)
    count = options[:count]
  else
    count = options
    options = {}
  end
  [count, options]
end
handle_exception(exception, results) click to toggle source
# File lib/parallel.rb, line 354
def handle_exception(exception, results)
  return nil if [Parallel::Break, Parallel::Kill].include? exception.class
  raise exception if exception
  results
end
kill_on_ctrl_c(things, options) { || ... } click to toggle source

kill all these pids or threads if user presses Ctrl+c

# File lib/parallel.rb, line 372
def kill_on_ctrl_c(things, options)
  @to_be_killed ||= []
  old_interrupt = nil
  signal = options.fetch(:interrupt_signal, INTERRUPT_SIGNAL)

  if @to_be_killed.empty?
    old_interrupt = trap_interrupt(signal) do
      $stderr.puts 'Parallel execution interrupted, exiting ...'
      @to_be_killed.flatten.compact.each { |thing| kill_that_thing!(thing) }
    end
  end

  @to_be_killed << things

  yield
ensure
  @to_be_killed.pop # free threads for GC and do not kill pids that could be used for new processes
  restore_interrupt(old_interrupt, signal) if @to_be_killed.empty?
end
kill_that_thing!(thing) click to toggle source
# File lib/parallel.rb, line 411
def kill_that_thing!(thing)
  if thing.is_a?(Thread)
    thing.kill
  else
    begin
      Process.kill(:KILL, thing)
    rescue Errno::ESRCH
      # some linux systems already automatically killed the children at this point
      # so we just ignore them not being there
    end
  end
end
process_incoming_jobs(read, write, items, options, &block) click to toggle source
# File lib/parallel.rb, line 329
def process_incoming_jobs(read, write, items, options, &block)
  while !read.eof?
    data = Marshal.load(read)
    item, index = items.unpack(data)
    result = begin
      call_with_index(item, index, options, &block)
    rescue StandardError => e
      ExceptionWrapper.new(e)
    end
    Marshal.dump(result, write)
  end
end
restore_interrupt(old, signal) click to toggle source
# File lib/parallel.rb, line 407
def restore_interrupt(old, signal)
  Signal.trap signal, old
end
trap_interrupt(signal) { || ... } click to toggle source
# File lib/parallel.rb, line 392
def trap_interrupt(signal)
  old = Signal.trap signal, 'IGNORE'

  Signal.trap signal do
    yield
    if old == "DEFAULT"
      raise Interrupt
    else
      old.call
    end
  end

  old
end
wait_for_threads(threads) click to toggle source
# File lib/parallel.rb, line 342
def wait_for_threads(threads)
  interrupted = threads.compact.map do |t|
    begin
      t.join
      nil
    rescue Interrupt => e
      e # thread died, do not stop other threads
    end
  end.compact
  raise interrupted.first if interrupted.first
end
with_instrumentation(item, index, options) { || ... } click to toggle source
# File lib/parallel.rb, line 435
def with_instrumentation(item, index, options)
  on_start = options[:start]
  on_finish = options[:finish]
  options[:mutex].synchronize { on_start.call(item, index) } if on_start
  result = yield
  result unless options[:preserve_results] == false
ensure
  options[:mutex].synchronize { on_finish.call(item, index, result) } if on_finish
end
work_direct(items, options) { |e,i| ... } click to toggle source
# File lib/parallel.rb, line 219
def work_direct(items, options)
  results = []
  items.each_with_index do |e,i|
    results << (options[:with_index] ? yield(e,i) : yield(e))
  end
  results
end
work_in_processes(items, options, &blk) click to toggle source
# File lib/parallel.rb, line 252
def work_in_processes(items, options, &blk)
  workers = create_workers(items, options, &blk)
  results = []
  exception = nil

  kill_on_ctrl_c(workers.map(&:pid), options) do
    in_threads(options) do |i|
      worker = workers[i]
      worker.thread = Thread.current

      begin
        loop do
          break if exception
          item, index = items.next
          break unless index

          output = with_instrumentation item, index, options do
            worker.work(items.pack(item, index))
          end

          if ExceptionWrapper === output
            exception = output.exception
            if Parallel::Kill === exception
              (workers - [worker]).each do |w|
                kill_that_thing!(w.thread)
                kill_that_thing!(w.pid)
              end
            end
          else
            results[index] = output
          end
        end
      ensure
        worker.close_pipes
        worker.wait # if it goes zombie, rather wait here to be able to debug
      end
    end
  end

  handle_exception(exception, results)
end
work_in_threads(items, options, &block) click to toggle source
# File lib/parallel.rb, line 227
def work_in_threads(items, options, &block)
  results = []
  exception = nil

  in_threads(options) do
    # as long as there are more items, work on one of them
    loop do
      break if exception
      item, index = items.next
      break unless index

      begin
        results[index] = with_instrumentation item, index, options do
          call_with_index(item, index, options, &block)
        end
      rescue StandardError => e
        exception = e
        break
      end
    end
  end

  handle_exception(exception, results)
end
worker(items, options, &block) click to toggle source
# File lib/parallel.rb, line 302
def worker(items, options, &block)
  # use less memory on REE
  GC.copy_on_write_friendly = true if GC.respond_to?(:copy_on_write_friendly=)

  child_read, parent_write = IO.pipe
  parent_read, child_write = IO.pipe

  pid = Process.fork do
    begin
      options.delete(:started_workers).each(&:close_pipes)

      parent_write.close
      parent_read.close

      process_incoming_jobs(child_read, child_write, items, options, &block)
    ensure
      child_read.close
      child_write.close
    end
  end

  child_read.close
  child_write.close

  Worker.new(parent_read, parent_write, pid)
end