module Parallel

Constants

VERSION

Public Instance Methods

each(array, options={}, &block) click to toggle source
# File lib/parallel.rb, line 80
def each(array, options={}, &block)
  map(array, options.merge(:preserve_results => false), &block)
  array
end
each_with_index(array, options={}, &block) click to toggle source
# File lib/parallel.rb, line 85
def each_with_index(array, options={}, &block)
  each(array, options.merge(:with_index => true), &block)
end
in_processes(options = {}, &block) click to toggle source
# File lib/parallel.rb, line 74
def in_processes(options = {}, &block)
  count, options = extract_count_from_options(options)
  count ||= processor_count
  map(0...count, options.merge(:in_processes => count), &block)
end
in_threads(options={:count => 2}) { |i| ... } click to toggle source
# File lib/parallel.rb, line 57
def in_threads(options={:count => 2})
  count, options = extract_count_from_options(options)

  out = []
  threads = []

  count.times do |i|
    threads[i] = Thread.new do
      out[i] = yield(i)
    end
  end

  kill_on_ctrl_c(threads) { wait_for_threads(threads) }

  out
end
map(array, options = {}, &block) click to toggle source
# File lib/parallel.rb, line 89
def map(array, options = {}, &block)
  array = array.to_a # turn Range and other Enumerable-s into an Array

  if RUBY_PLATFORM =~ /java/ and not options[:in_processes]
    method = :in_threads
    size = options[method] || processor_count
  elsif options[:in_threads]
    method = :in_threads
    size = options[method]
  else
    method = :in_processes
    if Process.respond_to?(:fork)
      size = options[method] || processor_count
    else
      $stderr.puts "Warning: Process.fork is not supported by this Ruby"
      size = 0
    end
  end
  size = [array.size, size].min

  if size == 0
    work_direct(array, options, &block)
  elsif method == :in_threads
    work_in_threads(array, options.merge(:count => size), &block)
  else
    work_in_processes(array, options.merge(:count => size), &block)
  end
end
map_with_index(array, options={}, &block) click to toggle source
# File lib/parallel.rb, line 118
def map_with_index(array, options={}, &block)
  map(array, options.merge(:with_index => true), &block)
end
physical_processor_count() click to toggle source

Number of physical processor cores on the current system.

# File lib/parallel.rb, line 174
def physical_processor_count
  @physical_processor_count ||= begin
    ppc = case RbConfig::CONFIG["target_os"]
    when /darwin1/
      IO.popen("/usr/sbin/sysctl -n hw.physicalcpu").read.to_i
    when /linux/
      cores = {}  # unique physical ID / core ID combinations
      phy = 0
      IO.read("/proc/cpuinfo").scan(/^physical id.*|^core id.*/) do |ln|
        if ln.start_with?("physical")
          phy = ln[/\d+/]
        elsif ln.start_with?("core")
          cid = phy + ":" + ln[/\d+/]
          cores[cid] = true if not cores[cid]
        end
      end
      cores.count
    when /mswin|mingw/
      require 'win32ole'
      result_set = WIN32OLE.connect("winmgmts://").ExecQuery(
          "select NumberOfCores from Win32_Processor")
      result_set.to_enum.collect(&:NumberOfCores).reduce(:+)
    else
      processor_count
    end
    # fall back to logical count if physical info is invalid
    ppc > 0 ? ppc : processor_count
  end
end
processor_count() click to toggle source

Number of processors seen by the OS and used for process scheduling.

  • AIX: /usr/sbin/pmcycles (AIX 5+), /usr/sbin/lsdev

  • BSD: /sbin/sysctl

  • Cygwin: /proc/cpuinfo

  • Darwin: /usr/bin/hwprefs, /usr/sbin/sysctl

  • HP-UX: /usr/sbin/ioscan

  • IRIX: /usr/sbin/sysconf

  • Linux: /proc/cpuinfo

  • Minix 3+: /proc/cpuinfo

  • Solaris: /usr/sbin/psrinfo

  • Tru64 UNIX: /usr/sbin/psrinfo

  • UnixWare: /usr/sbin/psrinfo

# File lib/parallel.rb, line 136
def processor_count
  @processor_count ||= begin
    os_name = RbConfig::CONFIG["target_os"]
    if os_name =~ /mingw|mswin/
      require 'win32ole'
      result = WIN32OLE.connect("winmgmts://").ExecQuery(
          "select NumberOfLogicalProcessors from Win32_Processor")
      result.to_enum.collect(&:NumberOfLogicalProcessors).reduce(:+)
    elsif File.readable?("/proc/cpuinfo")
      IO.read("/proc/cpuinfo").scan(/^processor/).size
    elsif File.executable?("/usr/bin/hwprefs")
      IO.popen("/usr/bin/hwprefs thread_count").read.to_i
    elsif File.executable?("/usr/sbin/psrinfo")
      IO.popen("/usr/sbin/psrinfo").read.scan(/^.*on-*line/).size
    elsif File.executable?("/usr/sbin/ioscan")
      IO.popen("/usr/sbin/ioscan -kC processor") do |out|
        out.read.scan(/^.*processor/).size
      end
    elsif File.executable?("/usr/sbin/pmcycles")
      IO.popen("/usr/sbin/pmcycles -m").read.count("\n")
    elsif File.executable?("/usr/sbin/lsdev")
      IO.popen("/usr/sbin/lsdev -Cc processor -S 1").read.count("\n")
    elsif File.executable?("/usr/sbin/sysconf") and os_name =~ /irix/i
      IO.popen("/usr/sbin/sysconf NPROC_ONLN").read.to_i
    elsif File.executable?("/usr/sbin/sysctl")
      IO.popen("/usr/sbin/sysctl -n hw.ncpu").read.to_i
    elsif File.executable?("/sbin/sysctl")
      IO.popen("/sbin/sysctl -n hw.ncpu").read.to_i
    else
      $stderr.puts "Unknown platform: " + RbConfig::CONFIG["target_os"]
      $stderr.puts "Assuming 1 processor."
      1
    end
  end
end

Private Instance Methods

call_with_index(array, index, options, &block) click to toggle source
# File lib/parallel.rb, line 382
def call_with_index(array, index, options, &block)
  args = [array[index]]
  args << index if options[:with_index]
  if options[:preserve_results] == false
    block.call(*args)
    nil # avoid GC overhead of passing large results around
  else
    block.call(*args)
  end
end
create_workers(items, options, &block) click to toggle source
# File lib/parallel.rb, line 276
def create_workers(items, options, &block)
  workers = []
  Array.new(options[:count]).each do
    workers << worker(items, options.merge(:started_workers => workers), &block)
  end
  workers
end
extract_count_from_options(options) click to toggle source

options is either a Integer or a Hash with :count

# File lib/parallel.rb, line 340
def extract_count_from_options(options)
  if options.is_a?(Hash)
    count = options[:count]
  else
    count = options
    options = {}
  end
  [count, options]
end
handle_exception(exception, results) click to toggle source
# File lib/parallel.rb, line 333
def handle_exception(exception, results)
  return nil if exception.class == Parallel::Break
  raise exception if exception
  results
end
kill_on_ctrl_c(things) { || ... } click to toggle source

kill all these pids or threads if user presses Ctrl+c

# File lib/parallel.rb, line 351
def kill_on_ctrl_c(things)
  if defined?(@to_be_killed) && @to_be_killed
    @to_be_killed << things
  else
    @to_be_killed = [things]
    Signal.trap :SIGINT do
      if @to_be_killed.any?
        $stderr.puts 'Parallel execution interrupted, exiting ...'
        @to_be_killed.flatten.compact.each { |thing| kill_that_thing!(thing) }
      end
      exit 1 # Quit with 'failed' signal
    end
  end
  yield
ensure
  @to_be_killed.pop # free threads for GC and do not kill pids that could be used for new processes
end
kill_that_thing!(thing) click to toggle source
# File lib/parallel.rb, line 369
def kill_that_thing!(thing)
  if thing.is_a?(Thread)
    thing.kill
  else
    begin
      Process.kill(:KILL, thing)
    rescue Errno::ESRCH
      # some linux systems already automatically killed the children at this point
      # so we just ignore them not being there
    end
  end
end
process_incoming_jobs(read, write, items, options, &block) click to toggle source
# File lib/parallel.rb, line 311
def process_incoming_jobs(read, write, items, options, &block)
  while !read.eof?
    index = Marshal.load(read)
    result = begin
      call_with_index(items, index, options, &block)
    rescue Exception => e
      ExceptionWrapper.new(e)
    end
    Marshal.dump(result, write)
  end
end
wait_for_threads(threads) click to toggle source
# File lib/parallel.rb, line 323
def wait_for_threads(threads)
  threads.compact.each do |t|
    begin
      t.join
    rescue Interrupt
      # thread died, do not stop other threads
    end
  end
end
with_instrumentation(item, index, options) { || ... } click to toggle source
# File lib/parallel.rb, line 393
def with_instrumentation(item, index, options)
  on_start = options[:start]
  on_finish = options[:finish]
  on_start.call(item, index) if on_start
  result = yield
ensure
  on_finish.call(item, index, result) if on_finish
end
work_direct(array, options) { |e,i| ... } click to toggle source
# File lib/parallel.rb, line 206
def work_direct(array, options)
  results = []
  array.each_with_index do |e,i|
    results << (options[:with_index] ? yield(e,i) : yield(e))
  end
  results
end
work_in_processes(items, options, &blk) click to toggle source
# File lib/parallel.rb, line 241
def work_in_processes(items, options, &blk)
  workers = create_workers(items, options, &blk)
  current_index = -1
  results = []
  exception = nil
  kill_on_ctrl_c(workers.map(&:pid)) do
    in_threads(options[:count]) do |i|
      worker = workers[i]

      begin
        loop do
          break if exception
          index = Thread.exclusive{ current_index += 1 }
          break if index >= items.size

          output = with_instrumentation items[index], index, options do
            worker.work(index)
          end

          if ExceptionWrapper === output
            exception = output.exception
          else
            results[index] = output
          end
        end
      ensure
        worker.close_pipes
        worker.wait # if it goes zombie, rather wait here to be able to debug
      end
    end
  end

  handle_exception(exception, results)
end
work_in_threads(items, options, &block) click to toggle source
# File lib/parallel.rb, line 214
def work_in_threads(items, options, &block)
  results = []
  current = -1
  exception = nil

  in_threads(options[:count]) do
    # as long as there are more items, work on one of them
    loop do
      break if exception

      index = Thread.exclusive { current += 1 }
      break if index >= items.size

      with_instrumentation items[index], index, options do
        begin
          results[index] = call_with_index(items, index, options, &block)
        rescue Exception => e
          exception = e
          break
        end
      end
    end
  end

  handle_exception(exception, results)
end
worker(items, options, &block) click to toggle source
# File lib/parallel.rb, line 284
def worker(items, options, &block)
  # use less memory on REE
  GC.copy_on_write_friendly = true if GC.respond_to?(:copy_on_write_friendly=)

  child_read, parent_write = IO.pipe
  parent_read, child_write = IO.pipe

  pid = Process.fork do
    begin
      options.delete(:started_workers).each(&:close_pipes)

      parent_write.close
      parent_read.close

      process_incoming_jobs(child_read, child_write, items, options, &block)
    ensure
      child_read.close
      child_write.close
    end
  end

  child_read.close
  child_write.close

  Worker.new(parent_read, parent_write, pid)
end