Skip to content

Commit 9546682

Browse files
author
KJ Tsanaktsidis
committed
Improve handling of retries around #watch
* It's important that all of a transaction actually happens on the same connection, with no transparent reconnection allowed inside ::RedisClient. So, we wrap a watch transaction in ensure_connected_cluster_scoped(retryable: false). * We don't need to call UNWATCH on connection errors (since the connection state is already broken). redis-rb and RedisClient don't do this either.
1 parent 597ef95 commit 9546682

File tree

2 files changed

+108
-5
lines changed

2 files changed

+108
-5
lines changed

lib/redis_client/cluster/optimistic_locking.rb

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,23 @@ def watch(keys)
1414
slot = find_slot(keys)
1515
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil?
1616

17+
# We have not yet selected a node for this transaction, initially, which means we can handle
18+
# redirections freely initially (i.e. for the first WATCH call)
1719
node = @router.find_primary_node_by_slot(slot)
1820
@router.handle_redirection(node, retry_count: 1) do |nd|
1921
nd.with do |c|
20-
c.call('WATCH', *keys)
21-
yield(c, slot)
22-
rescue StandardError
23-
c.call('UNWATCH')
24-
raise
22+
c.ensure_connected_cluster_scoped(retryable: false) do
23+
c.call('WATCH', *keys)
24+
begin
25+
yield(c, slot)
26+
rescue ::RedisClient::ConnectionError
27+
# No need to unwatch on a connection error.
28+
raise
29+
rescue StandardError
30+
c.call('UNWATCH')
31+
raise
32+
end
33+
end
2534
end
2635
end
2736
end

test/redis_client/test_cluster.rb

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,100 @@ def test_transaction_with_meaningless_watch
295295
assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2'))
296296
end
297297

298+
def test_transaction_does_not_pointlessly_unwatch_on_success
299+
@client.call('MSET', '{key}1', '0', '{key}2', '0')
300+
301+
@captured_commands.clear
302+
@client.multi(watch: %w[{key}1 {key}2]) do |tx|
303+
tx.call('SET', '{key}1', '1')
304+
tx.call('SET', '{key}2', '2')
305+
end
306+
assert_equal(%w[WATCH MULTI SET SET EXEC], @captured_commands.to_a.map(&:command).map(&:first))
307+
assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2'))
308+
end
309+
310+
def test_transaction_unwatches_on_error
311+
test_error = Class.new(StandardError)
312+
313+
@captured_commands.clear
314+
assert_raises(test_error) do
315+
@client.multi(watch: %w[{key}1 {key}2]) do
316+
raise test_error, 'error!'
317+
end
318+
end
319+
320+
assert_equal(%w[WATCH UNWATCH], @captured_commands.to_a.map(&:command).map(&:first))
321+
end
322+
323+
def test_transaction_does_not_unwatch_on_connection_error
324+
@captured_commands.clear
325+
assert_raises(RedisClient::ConnectionError) do
326+
@client.multi(watch: %w[{key}1 {key}2]) do |tx|
327+
tx.call('SET', '{key}1', 'x')
328+
tx.call('QUIT')
329+
end
330+
end
331+
command_list = @captured_commands.to_a.map(&:command).map(&:first)
332+
assert_includes(command_list, 'WATCH')
333+
refute_includes(command_list, 'UNWATCH')
334+
end
335+
336+
def test_transaction_does_not_retry_without_rewatching
337+
client2 = new_test_client
338+
339+
@client.call('SET', 'key', 'original_value')
340+
341+
assert_raises(RedisClient::ConnectionError) do
342+
@client.multi(watch: %w[key]) do |tx|
343+
# Simulate all the connections closing behind the router's back
344+
# Sending QUIT to redis makes the server side close the connection (and the client
345+
# side thus get a RedisClient::ConnectionError)
346+
node = @client.instance_variable_get(:@router).instance_variable_get(:@node)
347+
node.clients.each do |conn|
348+
conn.with(&:close)
349+
end
350+
351+
# Now the second client sets the value, which should make this watch invalid
352+
client2.call('SET', 'key', 'client2_value')
353+
354+
tx.call('SET', 'key', '@client_value')
355+
# Committing this transaction will fail, not silently reconnect (without the watch!)
356+
end
357+
end
358+
359+
# The transaction did not commit.
360+
assert_equal('client2_value', @client.call('GET', 'key'))
361+
end
362+
363+
def test_transaction_with_watch_retries_block
364+
client2 = new_test_client
365+
call_count = 0
366+
367+
@client.call('SET', 'key', 'original_value')
368+
369+
@client.multi(watch: %w[key]) do |tx|
370+
if call_count == 0
371+
# Simulate all the connections closing behind the router's back
372+
# Sending QUIT to redis makes the server side close the connection (and the client
373+
# side thus get a RedisClient::ConnectionError)
374+
node = @client.instance_variable_get(:@router).instance_variable_get(:@node)
375+
node.clients.each do |conn|
376+
conn.with(&:close)
377+
end
378+
379+
# Now the second client sets the value, which should make this watch invalid
380+
client2.call('SET', 'key', 'client2_value')
381+
end
382+
call_count += 1
383+
384+
tx.call('SET', 'key', "@client_value_#{call_count}")
385+
end
386+
387+
# The transaction did commit (but it was the second time)
388+
assert_equal('@client_value_2', @client.call('GET', 'key'))
389+
assert_equal(2, call_count)
390+
end
391+
298392
def test_transaction_with_error
299393
@client.call('SET', 'key1', 'x')
300394

0 commit comments

Comments
 (0)