Skip to content

Commit

Permalink
Update specs for lb
Browse files Browse the repository at this point in the history
  • Loading branch information
comandeo-mongo committed Aug 13, 2024
1 parent cc834f5 commit 2526fb6
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 67 deletions.
4 changes: 2 additions & 2 deletions lib/mongo/collection/view/map_reduce.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def each
result = send_fetch_query_with_connection(connection, session) unless inline?
else
result = send_initial_query(server, context)
result = send_fetch_query(session) unless inline?
result = send_fetch_query(server, session) unless inline?
end
@cursor = Cursor.new(view, result, server, session: session)
if block_given?
Expand Down Expand Up @@ -326,7 +326,7 @@ def fetch_query_op(session)
Operation::Find.new(spec)
end

def send_fetch_query(session)
def send_fetch_query(server, session)
fetch_query_op(session).execute(server, context: Operation::Context.new(client: client, session: session))
end

Expand Down
3 changes: 3 additions & 0 deletions lib/mongo/cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ def initialize(view, result, server, options = {})
# @api private
attr_reader :initial_result

# @api private
attr_reader :connection

# Finalize the cursor for garbage collection. Schedules this cursor to be included
# in a killCursors operation executed by the Cluster's CursorReaper.
#
Expand Down
75 changes: 15 additions & 60 deletions spec/integration/cursor_pinning_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,73 +52,28 @@
context 'lb' do
require_topology :load_balanced

# In load-balanced topology, we cannot create new connections to a
# particular service.
# In load-balanced topology, a cursor retains the connection used to create
# it until the cursor is closed.

context 'when no connection is available' do
context 'when connection is available' do
require_multi_mongos

it 'raises ConnectionCheckOutTimeout' do
server.pool.size.should == 0
let(:client) { authorized_client.with(max_pool_size: 2) }

it 'does not return connection to the pool if cursor not drained' do
expect(server.pool).not_to receive(:check_in)
enum = collection.find({}, batch_size: 1).to_enum
# Still zero because we haven't iterated
server.pool.size.should == 0

# Get the first element only; cursor is not drained, so there should
# be no check_in of the connection.
enum.next
server.pool.size.should == 1

# Grab the connection that was used
server.with_connection do
# This requires a new connection, but we cannot make one.
lambda do
enum.next
end.should raise_error(Mongo::Error::ConnectionCheckOutTimeout)

server.pool.size.should == 1
end
end
end

context 'when connection is available' do
require_multi_mongos

let(:client) { authorized_client.with(max_pool_size: 4) }

it 'uses the available connection' do
server.pool.size.should == 0

# Create 4 connections.

enums = []
connections = []
connection_ids = []

4.times do
view = collection.find({}, batch_size: 1)
enum = view.to_enum

enum.next

enums << enum
connection_ids << view.cursor.initial_result.connection_global_id
connections << server.pool.check_out
end

connection_ids.uniq.length.should be > 1

server.pool.size.should == 4

connections.each do |c|
server.pool.check_in(c)
end

# At this point, in theory, all connections are equally likely to
# be chosen, but we have cursors referencing more than one
# distinct service.
# Iterate each cursor to ensure they all continue to work.
enums.each do |enum|
enum.next
end
it 'returns connection to the pool when cursor is drained' do
view = collection.find({}, batch_size: 1)
enum = view.to_enum
expect_any_instance_of(Mongo::Cursor).to receive(:check_in_connection)
# Drain the cursor
enum.each { |it| it.nil? }
end
end
end
Expand Down
23 changes: 18 additions & 5 deletions spec/mongo/cursor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,11 @@

before do
expect(cursor).to receive(:get_more_operation).and_return(op).ordered
expect(op).to receive(:execute).and_raise(Mongo::Error::SocketError).ordered
if SpecConfig.instance.connect_options[:connect] == :load_balanced
expect(op).to receive(:execute_with_connection).and_raise(Mongo::Error::SocketError).ordered
else
expect(op).to receive(:execute).and_raise(Mongo::Error::SocketError).ordered
end
end

it 'raises the error' do
Expand Down Expand Up @@ -621,6 +625,9 @@
allow(reply).to receive(:connection_description).and_return(conn_desc)
allow(reply).to receive(:cursor_id).and_return(42)
allow(reply).to receive(:connection_global_id).and_return(1)
if SpecConfig.instance.connect_options[:connect] == :load_balanced
allow(reply).to receive(:connection).and_return(nil)
end
end
end

Expand Down Expand Up @@ -774,10 +781,16 @@

it 'does not raise an error' do
cursor
server.with_connection do |conn|
expect(conn).to receive(:deliver)
.at_least(:once)
.and_raise(Mongo::Error::SocketError, "test error")
if SpecConfig.instance.connect_options[:connect] == :load_balanced
expect(cursor.connection).to receive(:deliver)
.at_least(:once)
.and_raise(Mongo::Error::SocketError, "test error")
else
server.with_connection do |conn|
expect(conn).to receive(:deliver)
.at_least(:once)
.and_raise(Mongo::Error::SocketError, "test error")
end
end
expect do
cursor.close
Expand Down

0 comments on commit 2526fb6

Please sign in to comment.