Skip to content

Commit 2d435a9

Browse files
KJ TsanaktsidisKJTsanaktsidis
authored andcommitted
Add an explicit #watch method to RedisClient::Cluster
This returns a "watcher" object, which can either be used for three things: * To add keys to be watched on the same connection (by calling #watch * To begin a MULTI transaction on the connection (by calling #multi) * To UNWATCH the connection and return it to its original state (by calling... #unwatch) This means that the following pattern becomes possible in redis-cluster-client: ``` client.watch(["{slot}k1", "{slot}k2"]) do |watcher| # Further reads can be performed with client directly; this is # perfectly safe and they will be individually redirected if required # as normal. # If a read on a slot being watched is redirected, that's also OK, # because it means the final EXEC will fail (since the watch got # modified). current_value = client.call('GET', '{slot}k1') some_other_thing = client.call('GET', '{slot}something_unwatched') # You can add more keys to the watch if required # This could raise a redireciton error, and cause the whole watch # block to be re-attempted watcher.watch('{slot}differet_key') different_value = client.call('GET', '{slot}different_key') if do_transaction? # Once you're ready to perform a transaction, you can use multi... watcher.multi do |tx| # tx is now a pipeliend RedisClient::Cluster::Transaction # instance, like normal multi tx.call('SET', '{slot}k1', 'new_value') tx.call('SET', '{slot}k2', 'new_value') end # At this point, the transaction is committed else # You can abort the transaction by calling unwatch # (this will also happen if an exception is thrown) watcher.unwatch end end ``` This interface is what's required to make redis-clustering/redis-rb work correctly, I think.
1 parent 7eff2f2 commit 2d435a9

File tree

4 files changed

+67
-38
lines changed

4 files changed

+67
-38
lines changed

lib/redis_client/cluster.rb

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,19 +91,16 @@ def pipelined
9191
pipeline.execute
9292
end
9393

94-
def multi(watch: nil)
94+
def multi(watch: nil, &block)
9595
if watch.nil? || watch.empty?
9696
transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder)
9797
yield transaction
9898
return transaction.execute
9999
end
100100

101-
::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot, asking|
102-
transaction = ::RedisClient::Cluster::Transaction.new(
103-
@router, @command_builder, node: c, slot: slot, asking: asking
104-
)
105-
yield transaction
106-
transaction.execute
101+
watcher = ::RedisClient::Cluster::OptimisticLocking.new(@router, @command_builder)
102+
watcher.watch(watch) do
103+
watcher.multi(&block)
107104
end
108105
end
109106

@@ -128,6 +125,17 @@ def close
128125

129126
private
130127

128+
# This API is called by redis-clustering/redis-rb, but requries further refinement before we commit
129+
# to making it part of redis-cluster-client's official public API.
130+
def watch(keys)
131+
raise ArgumentError, "#{self.class.name}#watch requires a block for the initial watch" unless block_given?
132+
133+
watcher = ::RedisClient::Cluster::OptimisticLocking.new(@router, @command_builder)
134+
watcher.watch(keys) do
135+
yield watcher
136+
end
137+
end
138+
131139
def process_with_arguments(key, hashtag) # rubocop:disable Metrics/CyclomaticComplexity
132140
raise ArgumentError, 'Only one of key or hashtag may be provided' if key && hashtag
133141

lib/redis_client/cluster/optimistic_locking.rb

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,38 +6,71 @@
66
class RedisClient
77
class Cluster
88
class OptimisticLocking
9-
def initialize(router)
9+
def initialize(router, command_builder)
1010
@router = router
11+
@command_builder = command_builder
12+
@slot = nil
13+
@conn = nil
1114
@asking = false
1215
end
1316

14-
def watch(keys)
15-
slot = find_slot(keys)
16-
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil?
17+
def watch(keys, &block)
18+
if @conn
19+
# We're already watching, and the caller wants to watch additional keys
20+
add_to_watch(keys)
21+
else
22+
# First call to #watch
23+
start_watch(keys, &block)
24+
end
25+
end
26+
27+
def unwatch
28+
@conn.call('UNWATCH')
29+
end
30+
31+
def multi
32+
transaction = ::RedisClient::Cluster::Transaction.new(
33+
@router, @command_builder, node: @conn, slot: @slot, asking: @asking
34+
)
35+
yield transaction
36+
transaction.execute
37+
end
38+
39+
private
40+
41+
def start_watch(keys)
42+
@slot = find_slot(keys)
43+
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if @slot.nil?
1744

1845
# We have not yet selected a node for this transaction, initially, which means we can handle
1946
# redirections freely initially (i.e. for the first WATCH call)
20-
node = @router.find_primary_node_by_slot(slot)
47+
node = @router.find_primary_node_by_slot(@slot)
2148
handle_redirection(node, retry_count: 1) do |nd|
2249
nd.with do |c|
2350
c.ensure_connected_cluster_scoped(retryable: false) do
24-
c.call('ASKING') if @asking
25-
c.call('WATCH', *keys)
51+
@conn = c
52+
@conn.call('ASKING') if @asking
53+
@conn.call('WATCH', *keys)
2654
begin
27-
yield(c, slot, @asking)
55+
yield(c, @slot, @asking)
2856
rescue ::RedisClient::ConnectionError
2957
# No need to unwatch on a connection error.
3058
raise
3159
rescue StandardError
32-
c.call('UNWATCH')
60+
unwatch
3361
raise
3462
end
3563
end
3664
end
3765
end
3866
end
3967

40-
private
68+
def add_to_watch(keys)
69+
slot = find_slot(keys)
70+
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "inconsistent watch: #{keys.join(' ')}" if slot != @slot
71+
72+
@conn.call('WATCH', *keys)
73+
end
4174

4275
def handle_redirection(node, retry_count: 1, &blk)
4376
@router.handle_redirection(node, retry_count: retry_count) do |nd|

lib/redis_client/cluster/router.rb

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
4646
when 'memory' then send_memory_command(method, command, args, &block)
4747
when 'script' then send_script_command(method, command, args, &block)
4848
when 'pubsub' then send_pubsub_command(method, command, args, &block)
49-
when 'watch' then send_watch_command(command, &block)
5049
when 'acl', 'auth', 'bgrewriteaof', 'bgsave', 'quit', 'save'
5150
@node.call_all(method, command, args).first.then(&TSF.call(block))
5251
when 'flushall', 'flushdb'
@@ -311,19 +310,6 @@ def send_pubsub_command(method, command, args, &block) # rubocop:disable Metrics
311310
end
312311
end
313312

314-
# for redis-rb
315-
def send_watch_command(command)
316-
raise ::RedisClient::Cluster::Transaction::ConsistencyError, 'A block required. And you need to use the block argument as a client for the transaction.' unless block_given?
317-
318-
::RedisClient::Cluster::OptimisticLocking.new(self).watch(command[1..]) do |c, slot|
319-
transaction = ::RedisClient::Cluster::Transaction.new(
320-
self, @command_builder, node: c, slot: slot
321-
)
322-
yield transaction
323-
transaction.execute
324-
end
325-
end
326-
327313
def update_cluster_info!
328314
@node.reload!
329315
end

test/redis_client/test_cluster.rb

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -461,20 +461,22 @@ def test_transaction_in_race_condition
461461
def test_transaction_with_dedicated_watch_command
462462
@client.call('MSET', '{key}1', '0', '{key}2', '0')
463463

464-
got = @client.call('WATCH', '{key}1', '{key}2') do |tx|
465-
tx.call('ECHO', 'START')
466-
tx.call('SET', '{key}1', '1')
467-
tx.call('SET', '{key}2', '2')
468-
tx.call('ECHO', 'FINISH')
464+
got = @client.send(:watch, ['{key}1', '{key}2']) do |watcher|
465+
watcher.multi do |tx|
466+
tx.call('ECHO', 'START')
467+
tx.call('SET', '{key}1', '1')
468+
tx.call('SET', '{key}2', '2')
469+
tx.call('ECHO', 'FINISH')
470+
end
469471
end
470472

471473
assert_equal(%w[START OK OK FINISH], got)
472474
assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2'))
473475
end
474476

475477
def test_transaction_with_dedicated_watch_command_without_block
476-
assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
477-
@client.call('WATCH', '{key}1', '{key}2')
478+
assert_raises(ArgumentError) do
479+
@client.send(:watch, ['{key}1', '{key}2'])
478480
end
479481
end
480482

0 commit comments

Comments
 (0)