class Sequel::ThreadedConnectionPool
A connection pool allowing multi-threaded access to a pool of connections. This is the default connection pool used by Sequel
.
Constants
- USE_WAITER
Attributes
A hash with thread keys and connection values for currently allocated connections.
An array of connections that are available for use by the pool.
The maximum number of connections this pool will create (per shard/server if sharding).
Public Class Methods
The following additional options are respected:
- :max_connections
-
The maximum number of connections the connection pool will open (default 4)
- :pool_timeout
-
The amount of seconds to wait to acquire a connection before raising a PoolTimeoutError (default 5)
Sequel::ConnectionPool::new
# File lib/sequel/connection_pool/threaded.rb 25 def initialize(db, opts = OPTS) 26 super 27 @max_size = Integer(opts[:max_connections] || 4) 28 raise(Sequel::Error, ':max_connections must be positive') if @max_size < 1 29 @mutex = Mutex.new 30 @connection_handling = opts[:connection_handling] 31 @available_connections = [] 32 @allocated = {} 33 @timeout = Float(opts[:pool_timeout] || 5) 34 @waiter = ConditionVariable.new 35 end
Public Instance Methods
Yield all of the available connections, and the one currently allocated to this thread. This will not yield connections currently allocated to other threads, as it is not safe to operate on them. This holds the mutex while it is yielding all of the available connections, which means that until the method's block returns, the pool is locked.
# File lib/sequel/connection_pool/threaded.rb 42 def all_connections 43 hold do |c| 44 sync do 45 yield c 46 @available_connections.each{|conn| yield conn} 47 end 48 end 49 end
Removes all connections currently available, optionally yielding each connection to the given block. This method has the effect of disconnecting from the database, assuming that no connections are currently being used. If you want to be able to disconnect connections that are currently in use, use the ShardedThreadedConnectionPool, which can do that. This connection pool does not, for performance reasons. To use the sharded pool, pass the servers: {}
option when connecting to the database.
Once a connection is requested using hold
, the connection pool creates new connections to the database.
# File lib/sequel/connection_pool/threaded.rb 61 def disconnect(opts=OPTS) 62 conns = nil 63 sync do 64 conns = @available_connections.dup 65 @available_connections.clear 66 @waiter.signal 67 end 68 conns.each{|conn| disconnect_connection(conn)} 69 end
Chooses the first available connection, or if none are available, creates a new connection. Passes the connection to the supplied block:
pool.hold {|conn| conn.execute('DROP TABLE posts')}
Pool#hold is re-entrant, meaning it can be called recursively in the same thread without blocking.
If no connection is immediately available and the pool is already using the maximum number of connections, Pool#hold will block until a connection is available or the timeout expires. If the timeout expires before a connection can be acquired, a Sequel::PoolTimeout is raised.
# File lib/sequel/connection_pool/threaded.rb 84 def hold(server=nil) 85 t = Thread.current 86 if conn = owned_connection(t) 87 return yield(conn) 88 end 89 begin 90 conn = acquire(t) 91 yield conn 92 rescue Sequel::DatabaseDisconnectError, *@error_classes => e 93 if disconnect_error?(e) 94 oconn = conn 95 conn = nil 96 disconnect_connection(oconn) if oconn 97 sync do 98 @allocated.delete(t) 99 @waiter.signal 100 end 101 end 102 raise 103 ensure 104 if conn 105 sync{release(t)} 106 if @connection_handling == :disconnect 107 disconnect_connection(conn) 108 end 109 end 110 end 111 end
# File lib/sequel/connection_pool/threaded.rb 113 def pool_type 114 :threaded 115 end
The total number of connections opened, either available or allocated. The calling code should not have the mutex before calling this.
# File lib/sequel/connection_pool/threaded.rb 119 def size 120 @mutex.synchronize{_size} 121 end
Private Instance Methods
The total number of connections opened, either available or allocated. The calling code should already have the mutex before calling this.
# File lib/sequel/connection_pool/threaded.rb 127 def _size 128 @allocated.length + @available_connections.length 129 end
Assigns a connection to the supplied thread, if one is available. The calling code should NOT already have the mutex when calling this.
This should return a connection is one is available within the timeout, or nil if a connection could not be acquired within the timeout.
# File lib/sequel/connection_pool/threaded.rb 137 def acquire(thread) 138 if conn = assign_connection(thread) 139 return conn 140 end 141 142 timeout = @timeout 143 timer = Sequel.start_timer 144 145 sync do 146 @waiter.wait(@mutex, timeout) 147 if conn = next_available 148 return(@allocated[thread] = conn) 149 end 150 end 151 152 until conn = assign_connection(thread) 153 elapsed = Sequel.elapsed_seconds_since(timer) 154 raise_pool_timeout(elapsed) if elapsed > timeout 155 156 # :nocov: 157 # It's difficult to get to this point, it can only happen if there is a race condition 158 # where a connection cannot be acquired even after the thread is signalled by the condition variable 159 sync do 160 @waiter.wait(@mutex, timeout - elapsed) 161 if conn = next_available 162 return(@allocated[thread] = conn) 163 end 164 end 165 # :nocov: 166 end 167 168 conn 169 end
Assign a connection to the thread, or return nil if one cannot be assigned. The caller should NOT have the mutex before calling this.
# File lib/sequel/connection_pool/threaded.rb 173 def assign_connection(thread) 174 allocated = @allocated 175 do_make_new = false 176 to_disconnect = nil 177 178 sync do 179 if conn = next_available 180 return(allocated[thread] = conn) 181 end 182 183 if (n = _size) >= (max = @max_size) 184 allocated.keys.each do |t| 185 unless t.alive? 186 (to_disconnect ||= []) << @allocated.delete(t) 187 end 188 end 189 n = nil 190 end 191 192 if (n || _size) < max 193 do_make_new = allocated[thread] = true 194 end 195 end 196 197 if to_disconnect 198 to_disconnect.each{|dconn| disconnect_connection(dconn)} 199 end 200 201 # Connect to the database outside of the connection pool mutex, 202 # as that can take a long time and the connection pool mutex 203 # shouldn't be locked while the connection takes place. 204 if do_make_new 205 begin 206 conn = make_new(:default) 207 sync{allocated[thread] = conn} 208 ensure 209 unless conn 210 sync{allocated.delete(thread)} 211 end 212 end 213 end 214 215 conn 216 end
Return a connection to the pool of available connections, returns the connection. The calling code should already have the mutex before calling this.
# File lib/sequel/connection_pool/threaded.rb 220 def checkin_connection(conn) 221 @available_connections << conn 222 conn 223 end
Return the next available connection in the pool, or nil if there is not currently an available connection. The calling code should already have the mutex before calling this.
# File lib/sequel/connection_pool/threaded.rb 228 def next_available 229 case @connection_handling 230 when :stack 231 @available_connections.pop 232 else 233 @available_connections.shift 234 end 235 end
Returns the connection owned by the supplied thread, if any. The calling code should NOT already have the mutex before calling this.
# File lib/sequel/connection_pool/threaded.rb 239 def owned_connection(thread) 240 sync{@allocated[thread]} 241 end
Create the maximum number of connections immediately. The calling code should NOT have the mutex before calling this.
# File lib/sequel/connection_pool/threaded.rb 245 def preconnect(concurrent = false) 246 enum = (max_size - _size).times 247 248 conns = if concurrent 249 enum.map{Thread.new{make_new(:default)}}.map(&:value) 250 else 251 enum.map{make_new(:default)} 252 end 253 254 sync{conns.each{|conn| checkin_connection(conn)}} 255 end
Raise a PoolTimeout error showing the current timeout, the elapsed time, and the database's name (if any).
# File lib/sequel/connection_pool/threaded.rb 259 def raise_pool_timeout(elapsed) 260 name = db.opts[:name] 261 raise ::Sequel::PoolTimeout, "timeout: #{@timeout}, elapsed: #{elapsed}#{", database name: #{name}" if name}" 262 end
Releases the connection assigned to the supplied thread back to the pool. The calling code should already have the mutex before calling this.
# File lib/sequel/connection_pool/threaded.rb 266 def release(thread) 267 conn = @allocated.delete(thread) 268 269 unless @connection_handling == :disconnect 270 checkin_connection(conn) 271 end 272 273 @waiter.signal 274 nil 275 end
Yield to the block while inside the mutex. The calling code should NOT already have the mutex before calling this.
# File lib/sequel/connection_pool/threaded.rb 279 def sync 280 @mutex.synchronize{yield} 281 end