Create a new set of connection pools.
The pool manager will by default use the original seed list passed to the connection objects, accessible via connection.seeds. In addition, the user may pass an additional list of seeds nodes discovered in real time. The union of these lists will be used when attempting to connect, with the newly-discovered nodes being used first.
# File lib/mongo/util/pool_manager.rb, line 15 def initialize(connection, seeds=[]) @connection = connection @original_seeds = connection.seeds @seeds = seeds @previously_connected = false end
We're healthy if all members are pingable and if the view of the replica set returned by isMaster is equivalent to our view. If any of these isn't the case, set @refresh_required to true, and return.
# File lib/mongo/util/pool_manager.rb, line 44 def check_connection_health begin seed = get_valid_seed_node rescue ConnectionFailure @refresh_required = true return end config = seed.set_config if !config @refresh_required = true seed.close return end if config['hosts'].length != @members.length @refresh_required = true seed.close return end config['hosts'].each do |host| member = @members.detect do |m| m.address == host end if member && validate_existing_member(member) next else @refresh_required = true seed.close return false end end seed.close end
# File lib/mongo/util/pool_manager.rb, line 91 def close(opts={}) begin if @primary_pool @primary_pool.close(opts) end if @secondary_pools @secondary_pools.each do |pool| pool.close(opts) end end if @members @members.each do |member| member.close end end rescue ConnectionFailure end end
# File lib/mongo/util/pool_manager.rb, line 87 def closed? pools.all? { |pool| pool.closed? } end
# File lib/mongo/util/pool_manager.rb, line 26 def connect close if @previously_connected initialize_data members = connect_to_members initialize_pools(members) cache_discovered_seeds(members) set_read_pool set_tag_mappings @members = members @previously_connected = true end
# File lib/mongo/util/pool_manager.rb, line 22 def inspect "<Mongo::PoolManager:0x#{self.object_id.to_s(16)} @seeds=#{@seeds}>" end
The replica set connection should initiate a full refresh.
# File lib/mongo/util/pool_manager.rb, line 83 def refresh_required? @refresh_required end
The set of nodes that this class has discovered and successfully connected to.
# File lib/mongo/util/pool_manager.rb, line 115 def seeds @seeds || [] end
# File lib/mongo/util/pool_manager.rb, line 207 def assign_primary(member) member.last_state = :primary @primary = member.host_port @primary_pool = Pool.new(self.connection, member.host, member.port, :size => self.connection.pool_size, :timeout => self.connection.pool_timeout, :node => member) associate_tags_with_pool(member.tags, @primary_pool) end
# File lib/mongo/util/pool_manager.rb, line 217 def assign_secondary(member) member.last_state = :secondary @secondaries << member.host_port pool = Pool.new(self.connection, member.host, member.port, :size => self.connection.pool_size, :timeout => self.connection.pool_timeout, :node => member) @secondary_pools << pool associate_tags_with_pool(member.tags, pool) end
# File lib/mongo/util/pool_manager.rb, line 300 def cache_discovered_seeds(members) @seeds = members.map { |n| n.host_port } end
Connect to each member of the replica set as reported by the given seed node, and return as a list of Mongo::Node objects.
# File lib/mongo/util/pool_manager.rb, line 163 def connect_to_members members = [] seed = get_valid_seed_node seed.node_list.each do |host| node = Mongo::Node.new(self.connection, host) if node.connect && node.set_config members << node end end seed.close if members.empty? raise ConnectionFailure, "Failed to connect to any given member." end members end
Iterate through the list of provided seed nodes until we've gotten a response from the replica set we're trying to connect to.
If we don't get a response, raise an exception.
# File lib/mongo/util/pool_manager.rb, line 280 def get_valid_seed_node seed_list.each do |seed| node = Mongo::Node.new(self.connection, seed) if !node.connect next elsif node.set_config return node else node.close end end raise ConnectionFailure, "Cannot connect to a replica set using seeds " + "#{seed_list.map {|s| "#{s[0]}:#{s[1]}" }.join(', ')}" end
# File lib/mongo/util/pool_manager.rb, line 145 def initialize_data @primary = nil @primary_pool = nil @read_pool = nil @arbiters = [] @secondaries = [] @secondary_pool = nil @secondary_pools = [] @hosts = Set.new @members = Set.new @tags_to_pools = {} @tag_map = {} @refresh_required = false end
Initialize the connection pools for the primary and secondary nodes.
# File lib/mongo/util/pool_manager.rb, line 191 def initialize_pools(members) members.each do |member| @hosts << member.host_string if member.primary? assign_primary(member) elsif member.secondary? && !@secondaries.include?(member.host_port) assign_secondary(member) end end @max_bson_size = members.first.config['maxBsonObjectSize'] || Mongo::DEFAULT_MAX_BSON_SIZE @arbiters = members.first.arbiters end
# File lib/mongo/util/pool_manager.rb, line 255 def nearby_pool_from_set(pool_set) ping_ranges = Array.new(3) { |i| Array.new } pool_set.each do |pool| case pool.ping_time when 0..150 ping_ranges[0] << pool when 150..1000 ping_ranges[1] << pool else ping_ranges[2] << pool end end for list in ping_ranges do break if !list.empty? end list[rand(list.length)] end
# File lib/mongo/util/pool_manager.rb, line 121 def pools [@primary_pool, *@secondary_pools] end
# File lib/mongo/util/pool_manager.rb, line 296 def seed_list @seeds | @original_seeds end
Pick a node from the set of possible secondaries. If more than one node is available, use the ping time to figure out which nodes to choose from.
# File lib/mongo/util/pool_manager.rb, line 243 def set_read_pool if @secondary_pools.empty? @read_pool = @primary_pool elsif @secondary_pools.size == 1 @read_pool = @secondary_pools[0] @secondary_pool = @read_pool else @read_pool = nearby_pool_from_set(@secondary_pools) @secondary_pool = @read_pool end end
If there's more than one pool associated with a given tag, choose a close one using the bucket method.
# File lib/mongo/util/pool_manager.rb, line 230 def set_tag_mappings @tags_to_pools.each do |key, pool_list| if pool_list.length == 1 @tag_map[key] = pool_list.first else @tag_map[key] = nearby_pool_from_set(pool_list) end end end
# File lib/mongo/util/pool_manager.rb, line 125 def validate_existing_member(member) config = member.set_config if !config return false else if member.primary? if member.last_state == :primary return true else # This node is now primary, but didn't used to be. return false end elsif member.last_state == :secondary && member.secondary? return true else # This node isn't what it used to be. return false end end end