class EventMachine::Pool

A simple async resource pool based on a resource and work queue. Resources are enqueued and work waits for resources to become available.

@example

require 'em-http-request'

EM.run do
  pool  = EM::Pool.new
  spawn = lambda { pool.add EM::HttpRequest.new('http://example.org') }
  10.times { spawn[] }
  done, scheduled = 0, 0

  check = lambda do
    done += 1
    if done >= scheduled
      EM.stop
    end
  end

  pool.on_error { |conn| spawn[] }

  100.times do |i|
    scheduled += 1
    pool.perform do |conn|
      req = conn.get :path => '/', :keepalive => true

      req.callback do
        p [:success, conn.object_id, i, req.response.size]
        check[]
      end

      req.errback { check[] }

      req
    end
  end
end

Resources are expected to be controlled by an object responding to a deferrable/completion style API with callback and errback blocks.

Public Class Methods

new() click to toggle source
# File lib/em/pool.rb, line 45
def initialize
  @resources = EM::Queue.new
  @removed = []
  @contents = []
  @on_error = nil
end

Public Instance Methods

add(resource) click to toggle source
# File lib/em/pool.rb, line 52
def add resource
  @contents << resource
  requeue resource
end
contents() click to toggle source

Returns a list for introspection purposes only. You should NEVER call modification or work oriented methods on objects in this list. A good example use case is periodic statistics collection against a set of connection resources.

@example

pool.contents.inject(0) { |sum, connection| connection.num_bytes }
# File lib/em/pool.rb, line 69
def contents
  @contents.dup
end
num_waiting() click to toggle source

A peek at the number of enqueued jobs waiting for resources

# File lib/em/pool.rb, line 107
def num_waiting
  @resources.num_waiting
end
on_error(*a, &b) click to toggle source

Define a default catch-all for when the deferrables returned by work blocks enter a failed state. By default all that happens is that the resource is returned to the pool. If #on_error is defined, this block is responsible for re-adding the resource to the pool if it is still usable. In other words, it is generally assumed that #on_error blocks explicitly handle the rest of the lifetime of the resource.

# File lib/em/pool.rb, line 79
def on_error *a, &b
  @on_error = EM::Callback(*a, &b)
end
perform(*a, &b) click to toggle source

Perform a given call-able object or block. The callable object will be called with a resource from the pool as soon as one is available, and is expected to return a deferrable.

The deferrable will have callback and errback added such that when the deferrable enters a finished state, the object is returned to the pool.

If #on_error is defined, then objects are not automatically returned to the pool.

# File lib/em/pool.rb, line 92
def perform(*a, &b)
  work = EM::Callback(*a, &b)

  @resources.pop do |resource|
    if removed? resource
      @removed.delete resource
      reschedule work
    else
      process work, resource
    end
  end
end
Also aliased as: reschedule
remove(resource) click to toggle source
# File lib/em/pool.rb, line 57
def remove resource
  @contents.delete resource
  @removed << resource
end
removed?(resource) click to toggle source

Removed will show resources in a partial pruned state. Resources in the removed list may not appear in the contents list if they are currently in use.

# File lib/em/pool.rb, line 114
def removed? resource
  @removed.include? resource
end
reschedule(*a, &b)
Alias for: perform

Protected Instance Methods

completion(deferrable, resource) click to toggle source
# File lib/em/pool.rb, line 134
def completion deferrable, resource
  deferrable.callback { requeue resource }
  deferrable.errback  { failure resource }
end
failure(resource) click to toggle source
# File lib/em/pool.rb, line 123
def failure resource
  if @on_error
    @contents.delete resource
    @on_error.call resource
    # Prevent users from calling a leak.
    @removed.delete resource
  else
    requeue resource
  end
end
process(work, resource) click to toggle source
# File lib/em/pool.rb, line 139
def process work, resource
  deferrable = work.call resource
  if deferrable.kind_of?(EM::Deferrable)
    completion deferrable, resource
  else
    raise ArgumentError, "deferrable expected from work"
  end
rescue
  failure resource
  raise
end
requeue(resource) click to toggle source
# File lib/em/pool.rb, line 119
def requeue resource
  @resources.push resource
end