class Sequel::ThreadedConnectionPool

A connection pool allowing multi-threaded access to a pool of connections. This is the default connection pool used by Sequel.

Constants

USE_WAITER

Attributes

allocated[R]

A hash with thread keys and connection values for currently allocated connections.

available_connections[R]

An array of connections that are available for use by the pool.

max_size[R]

The maximum number of connections this pool will create (per shard/server if sharding).

Public Class Methods

new(db, opts = OPTS) click to toggle source

The following additional options are respected:

:max_connections

The maximum number of connections the connection pool will open (default 4)

:pool_timeout

The amount of seconds to wait to acquire a connection before raising a PoolTimeoutError (default 5)

Calls superclass method Sequel::ConnectionPool::new
   # File lib/sequel/connection_pool/threaded.rb
25 def initialize(db, opts = OPTS)
26   super
27   @max_size = Integer(opts[:max_connections] || 4)
28   raise(Sequel::Error, ':max_connections must be positive') if @max_size < 1
29   @mutex = Mutex.new  
30   @connection_handling = opts[:connection_handling]
31   @available_connections = []
32   @allocated = {}
33   @timeout = Float(opts[:pool_timeout] || 5)
34   @waiter = ConditionVariable.new
35 end

Public Instance Methods

all_connections() { |c| ... } click to toggle source

Yield all of the available connections, and the one currently allocated to this thread. This will not yield connections currently allocated to other threads, as it is not safe to operate on them. This holds the mutex while it is yielding all of the available connections, which means that until the method's block returns, the pool is locked.

   # File lib/sequel/connection_pool/threaded.rb
42 def all_connections
43   hold do |c|
44     sync do
45       yield c
46       @available_connections.each{|conn| yield conn}
47     end
48   end
49 end
disconnect(opts=OPTS) click to toggle source

Removes all connections currently available, optionally yielding each connection to the given block. This method has the effect of disconnecting from the database, assuming that no connections are currently being used. If you want to be able to disconnect connections that are currently in use, use the ShardedThreadedConnectionPool, which can do that. This connection pool does not, for performance reasons. To use the sharded pool, pass the servers: {} option when connecting to the database.

Once a connection is requested using hold, the connection pool creates new connections to the database.

   # File lib/sequel/connection_pool/threaded.rb
61 def disconnect(opts=OPTS)
62   conns = nil
63   sync do
64     conns = @available_connections.dup
65     @available_connections.clear
66     @waiter.signal
67   end
68   conns.each{|conn| disconnect_connection(conn)}
69 end
hold(server=nil) { |conn| ... } click to toggle source

Chooses the first available connection, or if none are available, creates a new connection. Passes the connection to the supplied block:

pool.hold {|conn| conn.execute('DROP TABLE posts')}

Pool#hold is re-entrant, meaning it can be called recursively in the same thread without blocking.

If no connection is immediately available and the pool is already using the maximum number of connections, Pool#hold will block until a connection is available or the timeout expires. If the timeout expires before a connection can be acquired, a Sequel::PoolTimeout is raised.

    # File lib/sequel/connection_pool/threaded.rb
 84 def hold(server=nil)
 85   t = Thread.current
 86   if conn = owned_connection(t)
 87     return yield(conn)
 88   end
 89   begin
 90     conn = acquire(t)
 91     yield conn
 92   rescue Sequel::DatabaseDisconnectError, *@error_classes => e
 93     if disconnect_error?(e)
 94       oconn = conn
 95       conn = nil
 96       disconnect_connection(oconn) if oconn
 97       sync do 
 98         @allocated.delete(t)
 99         @waiter.signal
100       end
101     end
102     raise
103   ensure
104     if conn
105       sync{release(t)}
106       if @connection_handling == :disconnect
107         disconnect_connection(conn)
108       end
109     end
110   end
111 end
pool_type() click to toggle source
    # File lib/sequel/connection_pool/threaded.rb
113 def pool_type
114   :threaded
115 end
size() click to toggle source

The total number of connections opened, either available or allocated. The calling code should not have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
119 def size
120   @mutex.synchronize{_size}
121 end

Private Instance Methods

_size() click to toggle source

The total number of connections opened, either available or allocated. The calling code should already have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
127 def _size
128   @allocated.length + @available_connections.length
129 end
acquire(thread) click to toggle source

Assigns a connection to the supplied thread, if one is available. The calling code should NOT already have the mutex when calling this.

This should return a connection is one is available within the timeout, or nil if a connection could not be acquired within the timeout.

    # File lib/sequel/connection_pool/threaded.rb
137 def acquire(thread)
138   if conn = assign_connection(thread)
139     return conn
140   end
141 
142   timeout = @timeout
143   timer = Sequel.start_timer
144 
145   sync do
146     @waiter.wait(@mutex, timeout)
147     if conn = next_available
148       return(@allocated[thread] = conn)
149     end
150   end
151 
152   until conn = assign_connection(thread)
153     elapsed = Sequel.elapsed_seconds_since(timer)
154     raise_pool_timeout(elapsed) if elapsed > timeout
155 
156     # :nocov:
157     # It's difficult to get to this point, it can only happen if there is a race condition
158     # where a connection cannot be acquired even after the thread is signalled by the condition variable
159     sync do
160       @waiter.wait(@mutex, timeout - elapsed)
161       if conn = next_available
162         return(@allocated[thread] = conn)
163       end
164     end
165     # :nocov:
166   end
167 
168   conn
169 end
assign_connection(thread) click to toggle source

Assign a connection to the thread, or return nil if one cannot be assigned. The caller should NOT have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
173 def assign_connection(thread)
174   allocated = @allocated
175   do_make_new = false
176   to_disconnect = nil
177 
178   sync do
179     if conn = next_available
180       return(allocated[thread] = conn)
181     end
182 
183     if (n = _size) >= (max = @max_size)
184       allocated.keys.each do |t|
185         unless t.alive?
186           (to_disconnect ||= []) << @allocated.delete(t)
187         end
188       end
189       n = nil
190     end
191 
192     if (n || _size) < max
193       do_make_new = allocated[thread] = true
194     end
195   end
196 
197   if to_disconnect
198     to_disconnect.each{|dconn| disconnect_connection(dconn)}
199   end
200 
201   # Connect to the database outside of the connection pool mutex,
202   # as that can take a long time and the connection pool mutex
203   # shouldn't be locked while the connection takes place.
204   if do_make_new
205     begin
206       conn = make_new(:default)
207       sync{allocated[thread] = conn}
208     ensure
209       unless conn
210         sync{allocated.delete(thread)}
211       end
212     end
213   end
214 
215   conn
216 end
checkin_connection(conn) click to toggle source

Return a connection to the pool of available connections, returns the connection. The calling code should already have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
220 def checkin_connection(conn)
221   @available_connections << conn
222   conn
223 end
next_available() click to toggle source

Return the next available connection in the pool, or nil if there is not currently an available connection. The calling code should already have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
228 def next_available
229   case @connection_handling
230   when :stack
231     @available_connections.pop
232   else
233     @available_connections.shift
234   end
235 end
owned_connection(thread) click to toggle source

Returns the connection owned by the supplied thread, if any. The calling code should NOT already have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
239 def owned_connection(thread)
240   sync{@allocated[thread]}
241 end
preconnect(concurrent = false) click to toggle source

Create the maximum number of connections immediately. The calling code should NOT have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
245 def preconnect(concurrent = false)
246   enum = (max_size - _size).times
247 
248   conns = if concurrent
249     enum.map{Thread.new{make_new(:default)}}.map(&:value)
250   else
251     enum.map{make_new(:default)}
252   end
253 
254   sync{conns.each{|conn| checkin_connection(conn)}}
255 end
raise_pool_timeout(elapsed) click to toggle source

Raise a PoolTimeout error showing the current timeout, the elapsed time, and the database's name (if any).

    # File lib/sequel/connection_pool/threaded.rb
259 def raise_pool_timeout(elapsed)
260   name = db.opts[:name]
261   raise ::Sequel::PoolTimeout, "timeout: #{@timeout}, elapsed: #{elapsed}#{", database name: #{name}" if name}"
262 end
release(thread) click to toggle source

Releases the connection assigned to the supplied thread back to the pool. The calling code should already have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
266 def release(thread)
267   conn = @allocated.delete(thread)
268 
269   unless @connection_handling == :disconnect
270     checkin_connection(conn)
271   end
272 
273   @waiter.signal
274   nil
275 end
sync() { || ... } click to toggle source

Yield to the block while inside the mutex. The calling code should NOT already have the mutex before calling this.

    # File lib/sequel/connection_pool/threaded.rb
279 def sync
280   @mutex.synchronize{yield}
281 end