From 67c9975a59367eddda18043211361f0cff4517dd Mon Sep 17 00:00:00 2001 From: KowalczykBartek Date: Wed, 11 Dec 2024 18:06:04 +0100 Subject: [PATCH 01/10] RESP3 support for monitor command Signed-off-by: KowalczykBartek --- src/replication.c | 9 ++++++++- src/server.c | 1 + src/server.h | 2 +- tests/unit/introspection.tcl | 14 ++++++++++++++ 4 files changed, 24 insertions(+), 2 deletions(-) diff --git a/src/replication.c b/src/replication.c index b5ce77f5e0..bef0b493f8 100644 --- a/src/replication.c +++ b/src/replication.c @@ -704,7 +704,14 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, listRewind(monitors, &li); while ((ln = listNext(&li))) { client *monitor = ln->value; - addReply(monitor, cmdobj); + if(monitor->resp > 2) { + monitor->flag.pushing = 1; + addReplyPushLen(monitor,2); + addReply(monitor,shared.monitorbulk); + addReply(monitor,cmdobj); + } else { + addReply(monitor,cmdobj); + } updateClientMemUsageAndBucket(monitor); } decrRefCount(cmdobj); diff --git a/src/server.c b/src/server.c index 1e38b5ac69..16780a3ef1 100644 --- a/src/server.c +++ b/src/server.c @@ -2035,6 +2035,7 @@ void createSharedObjects(void) { shared.ssubscribebulk = createStringObject("$10\r\nssubscribe\r\n", 17); shared.sunsubscribebulk = createStringObject("$12\r\nsunsubscribe\r\n", 19); shared.smessagebulk = createStringObject("$8\r\nsmessage\r\n", 14); + shared.monitorbulk = createStringObject("$7\r\nmonitor\r\n", 13); shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n", 17); shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n", 19); diff --git a/src/server.h b/src/server.h index 14a16593b0..df7296f957 100644 --- a/src/server.h +++ b/src/server.h @@ -1437,7 +1437,7 @@ struct sharedObjectsStruct { *xgroup, *xclaim, *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, *time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *getack, *special_asterick, *special_equals, *default_username, *redacted, *ssubscribebulk, *sunsubscribebulk, - *smessagebulk, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], + *smessagebulk, *monitorbulk, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ *bulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "$\r\n" */ *maphdr[OBJ_SHARED_BULKHDR_LEN], /* "%\r\n" */ diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index bafc46d4b7..4b696febab 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -299,6 +299,20 @@ start_server {tags {"introspection"}} { set _ $res } {*"set" "foo"*"get" "foo"*} + test {MONITOR should support RESP3 protocol} { + set rd [valkey_deferring_client] + $rd HELLO 3 + $rd read ; # Consume the HELLO reply + + $rd monitor + $rd read ; # Consume the MONITOR reply + + r set foo bar + + assert_match {monitor*"set"*"foo"*"bar"*} [$rd read] + $rd close + } + test {MONITOR can log commands issued by the scripting engine} { set rd [valkey_deferring_client] $rd monitor From 1fabef40d0f021cd5499ff674e6ea8d49f503212 Mon Sep 17 00:00:00 2001 From: KowalczykBartek Date: Fri, 13 Dec 2024 14:25:02 +0100 Subject: [PATCH 02/10] Add test case for monitor and tracking Signed-off-by: KowalczykBartek --- src/server.c | 4 +-- tests/unit/introspection.tcl | 54 ++++++++++++++++++++++++++++++++++-- 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/src/server.c b/src/server.c index 16780a3ef1..682912b03c 100644 --- a/src/server.c +++ b/src/server.c @@ -4259,10 +4259,10 @@ int processCommand(client *c) { return C_OK; } - /* Prevent a replica from sending commands that access the keyspace. + /* Prevent a replica (but not a monitor client) from sending commands that access the keyspace. * The main objective here is to prevent abuse of client pause check * from which replicas are exempt. */ - if (c->flag.replica && (is_may_replicate_command || is_write_command || is_read_command)) { + if ((c->flag.replica && !c->flag.monitor) && (is_may_replicate_command || is_write_command || is_read_command)) { rejectCommandFormat(c, "Replica can't interact with the keyspace"); return C_OK; } diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 4b696febab..919e100926 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -306,13 +306,63 @@ start_server {tags {"introspection"}} { $rd monitor $rd read ; # Consume the MONITOR reply + $rd readraw 1 ; r set foo bar - - assert_match {monitor*"set"*"foo"*"bar"*} [$rd read] + assert_equal ">2" [$rd read] + assert_equal "\$7" [$rd read] + assert_equal "monitor" [$rd read] + assert_match {*"set"*"foo"*"bar"*} [$rd read] + $rd close } + test {MONITOR and CLIENT TRACKING should work on the same connection with RESP3} { + set rd1 [valkey_deferring_client] + set rd2 [valkey_deferring_client] + + $rd1 HELLO 3 + $rd1 read ; # Consume the HELLO reply + + $rd1 client tracking on + $rd1 read ; # Consume the TRACKING reply + + $rd1 monitor + $rd1 read ; # Consume the MONITOR reply + + $rd1 set foo bar + assert_equal "OK" [$rd1 read] + assert_match {monitor*"set"*"foo"*"bar"*} [$rd1 read] + + $rd1 readraw 1 ; + + $rd1 get foo + assert_equal "\$3" [$rd1 read] + assert_equal "bar" [$rd1 read] + + assert_equal ">2" [$rd1 read] + assert_equal "\$7" [$rd1 read] + assert_equal "monitor" [$rd1 read] + assert_match {*"get"*"foo"*} [$rd1 read] + + $rd2 set foo baz + + assert_equal ">2" [$rd1 read] + assert_equal "\$10" [$rd1 read] + assert_equal "invalidate" [$rd1 read] + assert_equal "*1" [$rd1 read] + assert_equal "\$3" [$rd1 read] + assert_equal "foo" [$rd1 read] + + assert_equal ">2" [$rd1 read] + assert_equal "\$7" [$rd1 read] + assert_equal "monitor" [$rd1 read] + assert_match {*"set"*"foo"*"baz"*} [$rd1 read] + + $rd1 close + $rd2 close + } + test {MONITOR can log commands issued by the scripting engine} { set rd [valkey_deferring_client] $rd monitor From dfadb79db922c1874206249330f85014f7f5108d Mon Sep 17 00:00:00 2001 From: KowalczykBartek Date: Fri, 13 Dec 2024 18:19:09 +0100 Subject: [PATCH 03/10] fix formatting Signed-off-by: KowalczykBartek --- src/replication.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index bef0b493f8..de401b5847 100644 --- a/src/replication.c +++ b/src/replication.c @@ -704,7 +704,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, listRewind(monitors, &li); while ((ln = listNext(&li))) { client *monitor = ln->value; - if(monitor->resp > 2) { + if (monitor->resp > 2) { monitor->flag.pushing = 1; addReplyPushLen(monitor,2); addReply(monitor,shared.monitorbulk); From 15b283458b5a3f85a85ab863fef49e6e117b5503 Mon Sep 17 00:00:00 2001 From: KowalczykBartek Date: Fri, 13 Dec 2024 19:43:01 +0100 Subject: [PATCH 04/10] fix formatting Signed-off-by: KowalczykBartek --- src/replication.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/replication.c b/src/replication.c index de401b5847..f75b8c5164 100644 --- a/src/replication.c +++ b/src/replication.c @@ -706,11 +706,11 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, client *monitor = ln->value; if (monitor->resp > 2) { monitor->flag.pushing = 1; - addReplyPushLen(monitor,2); - addReply(monitor,shared.monitorbulk); - addReply(monitor,cmdobj); + addReplyPushLen(monitor, 2); + addReply(monitor, shared.monitorbulk); + addReply(monitor, cmdobj); } else { - addReply(monitor,cmdobj); + addReply(monitor, cmdobj); } updateClientMemUsageAndBucket(monitor); } From 493d8389bb9c7f71ef46b5be43e2b95d5d8d5875 Mon Sep 17 00:00:00 2001 From: KowalczykBartek Date: Fri, 13 Dec 2024 21:01:23 +0100 Subject: [PATCH 05/10] Add comment to readraw command Signed-off-by: KowalczykBartek --- tests/unit/introspection.tcl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 919e100926..35176a1932 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -334,6 +334,8 @@ start_server {tags {"introspection"}} { assert_equal "OK" [$rd1 read] assert_match {monitor*"set"*"foo"*"bar"*} [$rd1 read] + # Because we need to verify exact RESP3 response correctness, + # we need to instruct valkey client to return raw, unprased response. $rd1 readraw 1 ; $rd1 get foo From 9325141af21042d0fff65ad254b9aae92e325d97 Mon Sep 17 00:00:00 2001 From: KowalczykBartek Date: Fri, 13 Dec 2024 23:44:05 +0100 Subject: [PATCH 06/10] typos everywhere ... Signed-off-by: KowalczykBartek --- tests/unit/introspection.tcl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 35176a1932..5a62dcdc3d 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -335,7 +335,7 @@ start_server {tags {"introspection"}} { assert_match {monitor*"set"*"foo"*"bar"*} [$rd1 read] # Because we need to verify exact RESP3 response correctness, - # we need to instruct valkey client to return raw, unprased response. + # we need to instruct valkey client to return raw, unparsed response. $rd1 readraw 1 ; $rd1 get foo From 182b66e7ea61a20e983e1ac6f0b53d1f3cb5e541 Mon Sep 17 00:00:00 2001 From: KowalczykBartek Date: Sat, 14 Dec 2024 22:08:03 +0100 Subject: [PATCH 07/10] Align to review comments Signed-off-by: KowalczykBartek --- src/server.c | 11 +++++--- tests/unit/introspection.tcl | 54 ++++++++---------------------------- 2 files changed, 18 insertions(+), 47 deletions(-) diff --git a/src/server.c b/src/server.c index 682912b03c..a3e8857d0c 100644 --- a/src/server.c +++ b/src/server.c @@ -4259,10 +4259,10 @@ int processCommand(client *c) { return C_OK; } - /* Prevent a replica (but not a monitor client) from sending commands that access the keyspace. + /* Prevent a replica from sending commands that access the keyspace. * The main objective here is to prevent abuse of client pause check * from which replicas are exempt. */ - if ((c->flag.replica && !c->flag.monitor) && (is_may_replicate_command || is_write_command || is_read_command)) { + if (c->flag.replica && (is_may_replicate_command || is_write_command || is_read_command)) { rejectCommandFormat(c, "Replica can't interact with the keyspace"); return C_OK; } @@ -6182,8 +6182,11 @@ void monitorCommand(client *c) { return; } - /* ignore MONITOR if already replica or in monitor mode */ - if (c->flag.replica) return; + /* Gently notify the client that the monitor command has already been issued. */ + if (c->flag.replica) { + addReplyError(c, "The connection is already in monitoring mode."); + return; + } c->flag.replica = 1; c->flag.monitor = 1; diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 5a62dcdc3d..ed9743128a 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -317,52 +317,20 @@ start_server {tags {"introspection"}} { $rd close } - test {MONITOR and CLIENT TRACKING should work on the same connection with RESP3} { - set rd1 [valkey_deferring_client] - set rd2 [valkey_deferring_client] - - $rd1 HELLO 3 - $rd1 read ; # Consume the HELLO reply - - $rd1 client tracking on - $rd1 read ; # Consume the TRACKING reply - - $rd1 monitor - $rd1 read ; # Consume the MONITOR reply - - $rd1 set foo bar - assert_equal "OK" [$rd1 read] - assert_match {monitor*"set"*"foo"*"bar"*} [$rd1 read] - - # Because we need to verify exact RESP3 response correctness, - # we need to instruct valkey client to return raw, unparsed response. - $rd1 readraw 1 ; - - $rd1 get foo - assert_equal "\$3" [$rd1 read] - assert_equal "bar" [$rd1 read] - - assert_equal ">2" [$rd1 read] - assert_equal "\$7" [$rd1 read] - assert_equal "monitor" [$rd1 read] - assert_match {*"get"*"foo"*} [$rd1 read] - - $rd2 set foo baz + test {multiple MONITOR commands should result in ERR} { + set rd [valkey_deferring_client] + $rd HELLO 3 + $rd read ; # Consume the HELLO reply - assert_equal ">2" [$rd1 read] - assert_equal "\$10" [$rd1 read] - assert_equal "invalidate" [$rd1 read] - assert_equal "*1" [$rd1 read] - assert_equal "\$3" [$rd1 read] - assert_equal "foo" [$rd1 read] + $rd readraw 1 ; - assert_equal ">2" [$rd1 read] - assert_equal "\$7" [$rd1 read] - assert_equal "monitor" [$rd1 read] - assert_match {*"set"*"foo"*"baz"*} [$rd1 read] + $rd monitor + assert_equal "+OK" [$rd read] - $rd1 close - $rd2 close + $rd monitor + assert_equal "-ERR The connection is already in monitoring mode." [$rd read] + + $rd close } test {MONITOR can log commands issued by the scripting engine} { From 9c68dd62fe54ce03e29c6dd040168c2db88aaa6c Mon Sep 17 00:00:00 2001 From: KowalczykBartek Date: Mon, 16 Dec 2024 12:14:31 +0100 Subject: [PATCH 08/10] fix review comments Signed-off-by: KowalczykBartek --- tests/unit/introspection.tcl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index ed9743128a..47d9ee9e15 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -306,7 +306,7 @@ start_server {tags {"introspection"}} { $rd monitor $rd read ; # Consume the MONITOR reply - $rd readraw 1 ; + $rd readraw 1; r set foo bar assert_equal ">2" [$rd read] @@ -322,7 +322,7 @@ start_server {tags {"introspection"}} { $rd HELLO 3 $rd read ; # Consume the HELLO reply - $rd readraw 1 ; + $rd readraw 1; $rd monitor assert_equal "+OK" [$rd read] From 0c9e0dcf8dd7b6cf97dc0f96309bb460e604fec6 Mon Sep 17 00:00:00 2001 From: KowalczykBartek Date: Mon, 16 Dec 2024 17:37:13 +0100 Subject: [PATCH 09/10] unable to make test passing Signed-off-by: KowalczykBartek --- src/replication.c | 2 ++ tests/unit/introspection.tcl | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/src/replication.c b/src/replication.c index f75b8c5164..bcad27110c 100644 --- a/src/replication.c +++ b/src/replication.c @@ -705,10 +705,12 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, while ((ln = listNext(&li))) { client *monitor = ln->value; if (monitor->resp > 2) { + struct ClientFlags old_flags = monitor->flag; monitor->flag.pushing = 1; addReplyPushLen(monitor, 2); addReply(monitor, shared.monitorbulk); addReply(monitor, cmdobj); + if (!old_flags.pushing) monitor->flag.pushing = 0; } else { addReply(monitor, cmdobj); } diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 47d9ee9e15..ddfd85cac2 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -333,6 +333,26 @@ start_server {tags {"introspection"}} { $rd close } + test {MONITOR should came after PONG reply} { + set rd [valkey_deferring_client] + $rd HELLO 3 + $rd read ; # Consume the HELLO reply + + $rd monitor + $rd read ; # Consume the MONITOR reply + $rd readraw 1; + + r ping + + assert_equal "+pong" [$rd read] + assert_equal ">2" [$rd read] + assert_equal "\$7" [$rd read] + assert_equal "monitor" [$rd read] + assert_match {*"ping"*} [$rd read] + + $rd close + } + test {MONITOR can log commands issued by the scripting engine} { set rd [valkey_deferring_client] $rd monitor From eb27600718ee9b477cfd1081735d353c9496b2cb Mon Sep 17 00:00:00 2001 From: KowalczykBartek Date: Tue, 17 Dec 2024 00:50:22 +0100 Subject: [PATCH 10/10] fix the MONITOR should came after PONG reply test case Signed-off-by: KowalczykBartek --- tests/unit/introspection.tcl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index ddfd85cac2..483861517c 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -342,9 +342,9 @@ start_server {tags {"introspection"}} { $rd read ; # Consume the MONITOR reply $rd readraw 1; - r ping + $rd ping - assert_equal "+pong" [$rd read] + assert_equal "+PONG" [$rd read] assert_equal ">2" [$rd read] assert_equal "\$7" [$rd read] assert_equal "monitor" [$rd read]