From 71f8c34eede9a8e0fdab09b53ff4202d8bbaa434 Mon Sep 17 00:00:00 2001 From: zhenwei pi Date: Mon, 21 Oct 2024 16:11:27 +0800 Subject: [PATCH 01/15] RDMA: Fix listener priv opaque pointer (#1194) struct connListener.priv should be used by connection type specific data, static local listener data should not use this. A RDMA config structure is going to be introduced in the next step: ``` typedef struct serverRdmaContextConfig { char *bindaddr; int bindaddr_count; int port; int rx_size; int comp_vector; ... } serverRdmaContextConfig; ``` Then a builtin RDMA will be supported. Signed-off-by: zhenwei pi --- src/rdma.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/rdma.c b/src/rdma.c index dd6de395d0..15e23758b7 100644 --- a/src/rdma.c +++ b/src/rdma.c @@ -127,6 +127,8 @@ typedef struct rdma_listener { * handler into pending list */ static list *pending_list; +static rdma_listener *rdma_listeners; + static ConnectionType CT_RDMA; static int valkey_rdma_rx_size = VALKEY_RDMA_DEFAULT_RX_SIZE; @@ -748,7 +750,7 @@ static rdma_listener *rdmaFdToListener(connListener *listener, int fd) { for (int i = 0; i < listener->count; i++) { if (listener->fd[i] != fd) continue; - return (rdma_listener *)listener->priv + i; + return &rdma_listeners[i]; } return NULL; @@ -1537,7 +1539,7 @@ int connRdmaListen(connListener *listener) { bindaddr = default_bindaddr; } - listener->priv = rdma_listener = zcalloc_num(bindaddr_count, sizeof(*rdma_listener)); + rdma_listeners = rdma_listener = zcalloc_num(bindaddr_count, sizeof(*rdma_listener)); for (j = 0; j < bindaddr_count; j++) { char *addr = bindaddr[j]; int optional = *addr == '-'; @@ -1757,13 +1759,14 @@ static int rdmaChangeListener(void) { aeDeleteFileEvent(server.el, listener->fd[i], AE_READABLE); listener->fd[i] = -1; - struct rdma_listener *rdma_listener = (struct rdma_listener *)listener->priv + i; + struct rdma_listener *rdma_listener = &rdma_listeners[i]; rdma_destroy_id(rdma_listener->cm_id); rdma_destroy_event_channel(rdma_listener->cm_channel); } listener->count = 0; - zfree(listener->priv); + zfree(rdma_listeners); + rdma_listeners = NULL; closeListener(listener); From 29b83f1ac8dd80a9c3214c1e1f0ff3b7730fb612 Mon Sep 17 00:00:00 2001 From: ranshid <88133677+ranshid@users.noreply.github.com> Date: Mon, 21 Oct 2024 12:56:44 +0300 Subject: [PATCH 02/15] Introduce bgsave cancel (#757) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In some cases bgsave child process can run for a long time exhausting system resources. Although it is possible to kill the bgsave child process from the system shell, sometimes it is not possible allowing OS level access. This PR adds a new subcommand to the BGSAVE command. When user will issue `BGSAVE CANCEL`, it will do one of the 2: 1. In case a bgsave child process is currently running, the child process would be immediately killed thus terminating any save/replication full sync process. 2. In case a bgsave child process is SCHEDULED to run, the scheduled execution will be cancelled. --------- Signed-off-by: ranshid Signed-off-by: ranshid <88133677+ranshid@users.noreply.github.com> Signed-off-by: Ran Shidlansik Signed-off-by: Binbin Co-authored-by: Binbin Co-authored-by: Viktor Söderqvist --- src/commands.def | 11 ++++++-- src/commands/bgsave.json | 30 +++++++++++++++++--- src/rdb.c | 20 ++++++++++++++ tests/integration/rdb.tcl | 58 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 113 insertions(+), 6 deletions(-) diff --git a/src/commands.def b/src/commands.def index cd9f8e2984..98a9b40f01 100644 --- a/src/commands.def +++ b/src/commands.def @@ -6408,6 +6408,7 @@ struct COMMAND_STRUCT ACL_Subcommands[] = { /* BGSAVE history */ commandHistory BGSAVE_History[] = { {"3.2.2","Added the `SCHEDULE` option."}, +{"8.0.0","Added the `CANCEL` option."}, }; #endif @@ -6421,9 +6422,15 @@ commandHistory BGSAVE_History[] = { #define BGSAVE_Keyspecs NULL #endif +/* BGSAVE operation argument table */ +struct COMMAND_ARG BGSAVE_operation_Subargs[] = { +{MAKE_ARG("schedule",ARG_TYPE_PURE_TOKEN,-1,"SCHEDULE",NULL,"3.2.2",CMD_ARG_NONE,0,NULL)}, +{MAKE_ARG("cancel",ARG_TYPE_PURE_TOKEN,-1,"CANCEL",NULL,"8.0.0",CMD_ARG_NONE,0,NULL)}, +}; + /* BGSAVE argument table */ struct COMMAND_ARG BGSAVE_Args[] = { -{MAKE_ARG("schedule",ARG_TYPE_PURE_TOKEN,-1,"SCHEDULE",NULL,"3.2.2",CMD_ARG_OPTIONAL,0,NULL)}, +{MAKE_ARG("operation",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=BGSAVE_operation_Subargs}, }; /********** COMMAND COUNT ********************/ @@ -10989,7 +10996,7 @@ struct COMMAND_STRUCT serverCommandTable[] = { /* server */ {MAKE_CMD("acl","A container for Access List Control commands.","Depends on subcommand.","6.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,ACL_History,0,ACL_Tips,0,NULL,-2,CMD_SENTINEL,0,ACL_Keyspecs,0,NULL,0),.subcommands=ACL_Subcommands}, {MAKE_CMD("bgrewriteaof","Asynchronously rewrites the append-only file to disk.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,BGREWRITEAOF_History,0,BGREWRITEAOF_Tips,0,bgrewriteaofCommand,1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT,0,BGREWRITEAOF_Keyspecs,0,NULL,0)}, -{MAKE_CMD("bgsave","Asynchronously saves the database(s) to disk.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,BGSAVE_History,1,BGSAVE_Tips,0,bgsaveCommand,-1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT,0,BGSAVE_Keyspecs,0,NULL,1),.args=BGSAVE_Args}, +{MAKE_CMD("bgsave","Asynchronously saves the database(s) to disk.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,BGSAVE_History,2,BGSAVE_Tips,0,bgsaveCommand,-1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT,0,BGSAVE_Keyspecs,0,NULL,1),.args=BGSAVE_Args}, {MAKE_CMD("command","Returns detailed information about all commands.","O(N) where N is the total number of commands","2.8.13",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,COMMAND_History,0,COMMAND_Tips,1,commandCommand,-1,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,COMMAND_Keyspecs,0,NULL,0),.subcommands=COMMAND_Subcommands}, {MAKE_CMD("config","A container for server configuration commands.","Depends on subcommand.","2.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,CONFIG_History,0,CONFIG_Tips,0,NULL,-2,0,0,CONFIG_Keyspecs,0,NULL,0),.subcommands=CONFIG_Subcommands}, {MAKE_CMD("dbsize","Returns the number of keys in the database.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,DBSIZE_History,0,DBSIZE_Tips,2,dbsizeCommand,1,CMD_READONLY|CMD_FAST,ACL_CATEGORY_KEYSPACE,DBSIZE_Keyspecs,0,NULL,0)}, diff --git a/src/commands/bgsave.json b/src/commands/bgsave.json index f73d8a89b5..cf1c920474 100644 --- a/src/commands/bgsave.json +++ b/src/commands/bgsave.json @@ -10,6 +10,10 @@ [ "3.2.2", "Added the `SCHEDULE` option." + ], + [ + "8.0.0", + "Added the `CANCEL` option." ] ], "command_flags": [ @@ -19,11 +23,23 @@ ], "arguments": [ { - "name": "schedule", - "token": "SCHEDULE", - "type": "pure-token", + "name": "operation", + "type": "oneof", "optional": true, - "since": "3.2.2" + "arguments": [ + { + "name": "schedule", + "token": "SCHEDULE", + "type": "pure-token", + "since": "3.2.2" + }, + { + "name": "cancel", + "token": "CANCEL", + "type": "pure-token", + "since": "8.0.0" + } + ] } ], "reply_schema": { @@ -33,6 +49,12 @@ }, { "const": "Background saving scheduled" + }, + { + "const": "Background saving cancelled" + }, + { + "const": "Scheduled background saving cancelled" } ] } diff --git a/src/rdb.c b/src/rdb.c index bc2d03e86c..e5ec4d8f3e 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3689,6 +3689,21 @@ void bgsaveCommand(client *c) { if (c->argc > 1) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "schedule")) { schedule = 1; + } else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "cancel")) { + /* Terminates an in progress BGSAVE */ + if (server.child_type == CHILD_TYPE_RDB) { + /* There is an ongoing bgsave */ + serverLog(LL_NOTICE, "Background saving will be aborted due to user request"); + killRDBChild(); + addReplyStatus(c, "Background saving cancelled"); + } else if (server.rdb_bgsave_scheduled == 1) { + serverLog(LL_NOTICE, "Scheduled background saving will be cancelled due to user request"); + server.rdb_bgsave_scheduled = 0; + addReplyStatus(c, "Scheduled background saving cancelled"); + } else { + addReplyError(c, "Background saving is currently not in progress or scheduled"); + } + return; } else { addReplyErrorObject(c, shared.syntaxerr); return; @@ -3703,6 +3718,11 @@ void bgsaveCommand(client *c) { } else if (hasActiveChildProcess() || server.in_exec) { if (schedule || server.in_exec) { server.rdb_bgsave_scheduled = 1; + if (schedule) { + serverLog(LL_NOTICE, "Background saving scheduled due to user request"); + } else { + serverLog(LL_NOTICE, "Background saving scheduled to run after transaction execution"); + } addReplyStatus(c, "Background saving scheduled"); } else { addReplyError(c, "Another child process is active (AOF?): can't BGSAVE right now. " diff --git a/tests/integration/rdb.tcl b/tests/integration/rdb.tcl index e3f92bf521..61cb0cea7e 100644 --- a/tests/integration/rdb.tcl +++ b/tests/integration/rdb.tcl @@ -170,6 +170,64 @@ start_server {} { } assert_equal [s rdb_changes_since_last_save] 0 } + + test {bgsave cancel aborts save} { + r config set save "" + # Generating RDB will take some 100 seconds + r config set rdb-key-save-delay 1000000 + populate 100 "" 16 + + r bgsave + wait_for_condition 50 100 { + [s rdb_bgsave_in_progress] == 1 + } else { + fail "bgsave did not start in time" + } + set fork_child_pid [get_child_pid 0] + + assert {[r bgsave cancel] eq {Background saving cancelled}} + set temp_rdb [file join [lindex [r config get dir] 1] temp-${fork_child_pid}.rdb] + # Temp rdb must be deleted + wait_for_condition 50 100 { + ![file exists $temp_rdb] + } else { + fail "bgsave temp file was not deleted after cancel" + } + + # Make sure no save is running and that bgsave return an error + wait_for_condition 50 100 { + [s rdb_bgsave_in_progress] == 0 + } else { + fail "bgsave is currently running" + } + assert_error "ERR Background saving is currently not in progress or scheduled" {r bgsave cancel} + } + + test {bgsave cancel schedulled request} { + r config set save "" + # Generating RDB will take some 100 seconds + r config set rdb-key-save-delay 1000000 + populate 100 "" 16 + + # start a long AOF child + r bgrewriteaof + wait_for_condition 50 100 { + [s aof_rewrite_in_progress] == 1 + } else { + fail "aof not started" + } + + # Make sure cancel return valid status + assert {[r bgsave schedule] eq {Background saving scheduled}} + + # Cancel the scheduled save + assert {[r bgsave cancel] eq {Scheduled background saving cancelled}} + + # Make sure a second call to bgsave cancel return an error + assert_error "ERR Background saving is currently not in progress or scheduled" {r bgsave cancel} + } + + } test {client freed during loading} { From 5885dc56bdb40b3e0ea9b3d20a8bb08c7f2c3157 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Mon, 21 Oct 2024 16:04:47 +0200 Subject: [PATCH 03/15] Fix BGSAVE CANCEL since and history fields (#1200) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes wrong "since" and "history" introduced in #757. --------- Signed-off-by: Viktor Söderqvist --- src/commands/bgsave.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/commands/bgsave.json b/src/commands/bgsave.json index cf1c920474..6b4688ba57 100644 --- a/src/commands/bgsave.json +++ b/src/commands/bgsave.json @@ -12,7 +12,7 @@ "Added the `SCHEDULE` option." ], [ - "8.0.0", + "8.1.0", "Added the `CANCEL` option." ] ], @@ -37,7 +37,7 @@ "name": "cancel", "token": "CANCEL", "type": "pure-token", - "since": "8.0.0" + "since": "8.1.0" } ] } From 771918e4bf52406519dd66e7421de00bc6169d63 Mon Sep 17 00:00:00 2001 From: Shivshankar Date: Mon, 21 Oct 2024 16:48:29 -0400 Subject: [PATCH 04/15] Updating command.def by running the generate-command-code.py (#1203) Part of https://github.com/valkey-io/valkey/pull/1200 PR, since feild is changed. Looks like commands.def is missed to get genereated based on the changes so that is causing CI failure on unstable. Signed-off-by: Shivshankar-Reddy --- src/commands.def | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/commands.def b/src/commands.def index 98a9b40f01..791b30d540 100644 --- a/src/commands.def +++ b/src/commands.def @@ -6408,7 +6408,7 @@ struct COMMAND_STRUCT ACL_Subcommands[] = { /* BGSAVE history */ commandHistory BGSAVE_History[] = { {"3.2.2","Added the `SCHEDULE` option."}, -{"8.0.0","Added the `CANCEL` option."}, +{"8.1.0","Added the `CANCEL` option."}, }; #endif @@ -6425,7 +6425,7 @@ commandHistory BGSAVE_History[] = { /* BGSAVE operation argument table */ struct COMMAND_ARG BGSAVE_operation_Subargs[] = { {MAKE_ARG("schedule",ARG_TYPE_PURE_TOKEN,-1,"SCHEDULE",NULL,"3.2.2",CMD_ARG_NONE,0,NULL)}, -{MAKE_ARG("cancel",ARG_TYPE_PURE_TOKEN,-1,"CANCEL",NULL,"8.0.0",CMD_ARG_NONE,0,NULL)}, +{MAKE_ARG("cancel",ARG_TYPE_PURE_TOKEN,-1,"CANCEL",NULL,"8.1.0",CMD_ARG_NONE,0,NULL)}, }; /* BGSAVE argument table */ From 285064b114c80cce5bd4a48bf8d6493c9d0e0971 Mon Sep 17 00:00:00 2001 From: Shivshankar Date: Mon, 21 Oct 2024 22:54:40 -0400 Subject: [PATCH 05/15] fix typo (#1202) Signed-off-by: Shivshankar-Reddy --- src/io_threads.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io_threads.c b/src/io_threads.c index b0368cf07b..f4471b96d0 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -319,7 +319,7 @@ void initIOThreads(void) { int trySendReadToIOThreads(client *c) { if (server.active_io_threads_num <= 1) return C_ERR; - /* If IO thread is areadty reading, return C_OK to make sure the main thread will not handle it. */ + /* If IO thread is already reading, return C_OK to make sure the main thread will not handle it. */ if (c->io_read_state != CLIENT_IDLE) return C_OK; /* Currently, replica/master writes are not offloaded and are processed synchronously. */ if (c->flag.primary || getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR; From 5d70ccd70eb42db718e0176987891f91d21d29c8 Mon Sep 17 00:00:00 2001 From: Binbin Date: Wed, 23 Oct 2024 10:22:25 +0800 Subject: [PATCH 06/15] Make replica CLUSTER RESET flush async based on lazyfree-lazy-user-flush (#1190) Currently, if the replica has a lot of data, CLUSTER RESET will block for a while and report the slowlog, and it seems that there is no harm in making it async so external components can be easier when monitoring it. Signed-off-by: Binbin Co-authored-by: Ping Xie --- src/cluster_legacy.c | 2 +- valkey.conf | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 14f8a6bd1e..4e21d1473d 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1230,7 +1230,7 @@ void clusterReset(int hard) { if (nodeIsReplica(myself)) { clusterSetNodeAsPrimary(myself); replicationUnsetPrimary(); - emptyData(-1, EMPTYDB_NO_FLAGS, NULL); + emptyData(-1, server.lazyfree_lazy_user_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, NULL); } /* Close slots, reset manual failover state. */ diff --git a/valkey.conf b/valkey.conf index f485b42b1a..5960feb1b6 100644 --- a/valkey.conf +++ b/valkey.conf @@ -1308,7 +1308,11 @@ lazyfree-lazy-user-del yes # deletion, which can be controlled by passing the [SYNC|ASYNC] flags into the # commands. When neither flag is passed, this directive will be used to determine # if the data should be deleted asynchronously. - +# +# When a replica performs a node reset via CLUSTER RESET, the entire +# database content is removed to allow the node to become an empty primary. +# This directive also determines whether the data should be deleted asynchronously. +# # There are many problems with running flush synchronously. Even in single CPU # environments, the thread managers should balance between the freeing and # serving incoming requests. The default value is yes. From b803f7aeff4ffe43c866a52d9ea830add33b5834 Mon Sep 17 00:00:00 2001 From: Binbin Date: Wed, 23 Oct 2024 17:11:42 +0800 Subject: [PATCH 07/15] Cleaned up getSlotOrReply is return -1 instead of C_ERR (#1211) Minor cleanup since getSlotOrReply return -1 on error, not return C_ERR. Signed-off-by: Binbin --- src/cluster_legacy.c | 8 +++++--- src/cluster_slot_stats.c | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 4e21d1473d..e56f1c2823 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -5805,6 +5805,8 @@ const char *clusterGetMessageTypeString(int type) { return "unknown"; } +/* Get the slot from robj and return it. If the slot is not valid, + * return -1 and send an error to the client. */ int getSlotOrReply(client *c, robj *o) { long long slot; @@ -6530,7 +6532,7 @@ int clusterCommandSpecial(client *c) { memset(slots, 0, CLUSTER_SLOTS); /* Check that all the arguments are parseable.*/ for (j = 2; j < c->argc; j++) { - if ((slot = getSlotOrReply(c, c->argv[j])) == C_ERR) { + if ((slot = getSlotOrReply(c, c->argv[j])) == -1) { zfree(slots); return 1; } @@ -6563,11 +6565,11 @@ int clusterCommandSpecial(client *c) { /* Check that all the arguments are parseable and that all the * slots are not already busy. */ for (j = 2; j < c->argc; j += 2) { - if ((startslot = getSlotOrReply(c, c->argv[j])) == C_ERR) { + if ((startslot = getSlotOrReply(c, c->argv[j])) == -1) { zfree(slots); return 1; } - if ((endslot = getSlotOrReply(c, c->argv[j + 1])) == C_ERR) { + if ((endslot = getSlotOrReply(c, c->argv[j + 1])) == -1) { zfree(slots); return 1; } diff --git a/src/cluster_slot_stats.c b/src/cluster_slot_stats.c index 284208af54..b52692bd15 100644 --- a/src/cluster_slot_stats.c +++ b/src/cluster_slot_stats.c @@ -279,8 +279,8 @@ void clusterSlotStatsCommand(client *c) { if (c->argc == 5 && !strcasecmp(c->argv[2]->ptr, "slotsrange")) { /* CLUSTER SLOT-STATS SLOTSRANGE start-slot end-slot */ int startslot, endslot; - if ((startslot = getSlotOrReply(c, c->argv[3])) == C_ERR || - (endslot = getSlotOrReply(c, c->argv[4])) == C_ERR) { + if ((startslot = getSlotOrReply(c, c->argv[3])) == -1 || + (endslot = getSlotOrReply(c, c->argv[4])) == -1) { return; } if (startslot > endslot) { From c176de4251cb82eb2b005b40fe284b93bdaa7353 Mon Sep 17 00:00:00 2001 From: danish-mehmood <35922417+danish-mehmood@users.noreply.github.com> Date: Thu, 24 Oct 2024 02:30:42 +0500 Subject: [PATCH 08/15] Clarify the wording from dually to the more common doubly (#1214) Clarify documentation is ziplist.c Signed-off-by: danish-mehmood --- src/ziplist.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ziplist.c b/src/ziplist.c index d4f8b71699..608487fa2b 100644 --- a/src/ziplist.c +++ b/src/ziplist.c @@ -1,4 +1,4 @@ -/* The ziplist is a specially encoded dually linked list that is designed +/* The ziplist is a specially encoded doubly linked list that is designed * to be very memory efficient. It stores both strings and integer values, * where integers are encoded as actual integers instead of a series of * characters. It allows push and pop operations on either side of the list From c419524c05d7544636d5bc0cc0cb333052b0a517 Mon Sep 17 00:00:00 2001 From: muelstefamzn Date: Wed, 23 Oct 2024 16:56:32 -0700 Subject: [PATCH 09/15] Trim free space from inline command argument strings to avoid excess memory usage (#1213) The command argument strings created while parsing inline commands (see `processInlineBuffer()`) can contain free capacity. Since some commands ,such as `SET`, store these strings in the database, that free capacity increases the memory usage. In the worst case, it could double the memory usage. This only occurs if the inline command format is used. The argument strings are built by appending character by character in `sdssplitargs()`. Regular RESP commands are not affected. This change trims the strings within `processInlineBuffer()`. ### Why `trimStringObjectIfNeeded()` within `object.c` is not solving this? When the command argument string is packed into an object, `trimStringObjectIfNeeded()` is called. This does only trim the string if it is larger than `PROTO_MBULK_BIG_ARG` (32kB), as only strings larger than this would ever need trimming if the command it sent using the bulk string format. We could modify this condition, but that would potentially have a performance impact on commands using the bulk format. Since those make up for the vast majority of executed commands, limiting this change to inline commands seems prudent. ### Experiment Results * 1 million `SET [key] [value]` commands * Random keys (16 bytes) * 600 bytes values Memory usage without this change: ``` used_memory:1089327888 used_memory_human:1.01G used_memory_rss:1131696128 used_memory_rss_human:1.05G used_memory_peak:1089348264 used_memory_peak_human:1.01G used_memory_peak_perc:100.00% used_memory_overhead:49302800 used_memory_startup:911808 used_memory_dataset:1040025088 used_memory_dataset_perc:95.55% ``` Memory usage with this change: ``` used_memory:705327888 used_memory_human:672.65M used_memory_rss:718802944 used_memory_rss_human:685.50M used_memory_peak:705348256 used_memory_peak_human:672.67M used_memory_peak_perc:100.00% used_memory_overhead:49302800 used_memory_startup:911808 used_memory_dataset:656025088 used_memory_dataset_perc:93.13% ``` If the same experiment is repeated using the normal RESP array of bulk string format (`*3\r\n$3\r\nSET\r\n...`) then the memory usage is 672MB with and without of this change. If a replica is attached, its memory usage is 672MB with and without this change, since the replication link never uses inline commands. Signed-off-by: Stefan Mueller --- src/networking.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/networking.c b/src/networking.c index c24a95019b..6751f5c7b8 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2669,6 +2669,8 @@ void processInlineBuffer(client *c) { /* Create an Object for all arguments. */ for (c->argc = 0, j = 0; j < argc; j++) { + /* Strings returned from sdssplitargs() may have unused capacity that we can trim. */ + argv[j] = sdsRemoveFreeSpace(argv[j], 1); c->argv[c->argc] = createObject(OBJ_STRING, argv[j]); c->argc++; c->argv_len_sum += sdslen(argv[j]); From a21fe718f46e5987dd9cf1ca698dce6d0d060795 Mon Sep 17 00:00:00 2001 From: Binbin Date: Thu, 24 Oct 2024 16:38:47 +0800 Subject: [PATCH 10/15] Limit CLUSTER_CANT_FAILOVER_DATA_AGE log to 10 times period (#1189) If a replica is step into data_age too old stage, it can not trigger the failover and currently it can not be automatically recovered and we will print a log every CLUSTER_CANT_FAILOVER_RELOG_PERIOD, which is every second. If the primary has not recovered or there is no manual failover, this log will flood the log file. In this case, limit its frequency to 10 times period, which is 10 seconds in our code. Also in this data_age too old stage, the repeated logs also can stand for the progress of the failover. See also #780 for more details about it. Signed-off-by: Binbin Co-authored-by: Ping Xie --- src/cluster_legacy.c | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index e56f1c2823..43d56b9a09 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -4433,11 +4433,18 @@ int clusterGetReplicaRank(void) { void clusterLogCantFailover(int reason) { char *msg; static time_t lastlog_time = 0; + time_t now = time(NULL); - /* Don't log if we have the same reason for some time. */ - if (reason == server.cluster->cant_failover_reason && - time(NULL) - lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD) + /* General logging suppression if the same reason has occurred recently. */ + if (reason == server.cluster->cant_failover_reason && now - lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD) { return; + } + + /* Special case: If the failure reason is due to data age, log 10 times less frequently. */ + if (reason == server.cluster->cant_failover_reason && reason == CLUSTER_CANT_FAILOVER_DATA_AGE && + now - lastlog_time < 10 * CLUSTER_CANT_FAILOVER_RELOG_PERIOD) { + return; + } server.cluster->cant_failover_reason = reason; From 2956367731305f73005b44b26d6f665564731de3 Mon Sep 17 00:00:00 2001 From: Binbin Date: Thu, 24 Oct 2024 21:53:05 +0800 Subject: [PATCH 11/15] Maintain return value of rdbSaveDb after writing slot-info aux (#1222) All other places written in this function are maintained it, although the caller of rdbSaveDb does not reply on it, it is maintained to be consistent with other places, is its duty. Signed-off-by: Binbin --- src/rdb.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rdb.c b/src/rdb.c index e5ec4d8f3e..1c200e54f5 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1355,6 +1355,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) { sdsfree(slot_info); goto werr; } + written += res; last_slot = curr_slot; sdsfree(slot_info); } From 55bbbe09a3b37a98c907dbfbc225768ff253e84f Mon Sep 17 00:00:00 2001 From: zixuan zhao Date: Thu, 24 Oct 2024 18:36:32 -0400 Subject: [PATCH 12/15] Configurable log and timestamp formats (logfmt, ISO8601) (#1022) Add ability to configure log output format and timestamp format in the logs. This change adds two new configs: * `log-format`: Either legacy or logfmt (See https://brandur.org/logfmt) * `log-timestamp-format`: legacy, iso8601 or milliseconds (since the eppch). Related to #1006. Example: ``` $ ./valkey-server /home/zhaoz12/git/valkey/valkey/valkey.conf pid=109463 role=RDB/AOF timestamp="2024-09-10T20:37:25.738-04:00" level=warning message="WARNING Memory overcommit must be enabled! Without it, a background save or replication may fail under low memory condition. Being disabled, it can also cause failures without low memory condition, see https://github.com/jemalloc/jemalloc/issues/1328. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect." pid=109463 role=RDB/AOF timestamp="2024-09-10T20:37:25.738-04:00" level=notice message="oO0OoO0OoO0Oo Valkey is starting oO0OoO0OoO0Oo" pid=109463 role=RDB/AOF timestamp="2024-09-10T20:37:25.738-04:00" level=notice message="Valkey version=255.255.255, bits=64, commit=affbea5d, modified=1, pid=109463, just started" pid=109463 role=RDB/AOF timestamp="2024-09-10T20:37:25.738-04:00" level=notice message="Configuration loaded" pid=109463 role=master timestamp="2024-09-10T20:37:25.738-04:00" level=notice message="monotonic clock: POSIX clock_gettime" pid=109463 role=master timestamp="2024-09-10T20:37:25.739-04:00" level=warning message="Failed to write PID file: Permission denied" ``` --------- Signed-off-by: azuredream --- src/config.c | 9 +++++ src/server.c | 103 +++++++++++++++++++++++++++++++++++++++++++++++---- src/server.h | 33 +++++++++++------ 3 files changed, 127 insertions(+), 18 deletions(-) diff --git a/src/config.c b/src/config.c index 7bee87946d..b3d760dd5c 100644 --- a/src/config.c +++ b/src/config.c @@ -152,6 +152,13 @@ configEnum propagation_error_behavior_enum[] = { {"panic-on-replicas", PROPAGATION_ERR_BEHAVIOR_PANIC_ON_REPLICAS}, {NULL, 0}}; +configEnum log_format_enum[] = {{"legacy", LOG_FORMAT_LEGACY}, {"logfmt", LOG_FORMAT_LOGFMT}, {NULL, 0}}; + +configEnum log_timestamp_format_enum[] = {{"legacy", LOG_TIMESTAMP_LEGACY}, + {"iso8601", LOG_TIMESTAMP_ISO8601}, + {"milliseconds", LOG_TIMESTAMP_MILLISECONDS}, + {NULL, 0}}; + /* Output buffer limits presets. */ clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = { {0, 0, 0}, /* normal */ @@ -3190,6 +3197,8 @@ standardConfig static_configs[] = { createEnumConfig("propagation-error-behavior", NULL, MODIFIABLE_CONFIG, propagation_error_behavior_enum, server.propagation_error_behavior, PROPAGATION_ERR_BEHAVIOR_IGNORE, NULL, NULL), createEnumConfig("shutdown-on-sigint", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigint, 0, isValidShutdownOnSigFlags, NULL), createEnumConfig("shutdown-on-sigterm", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigterm, 0, isValidShutdownOnSigFlags, NULL), + createEnumConfig("log-format", NULL, MODIFIABLE_CONFIG, log_format_enum, server.log_format, LOG_FORMAT_LEGACY, NULL, NULL), + createEnumConfig("log-timestamp-format", NULL, MODIFIABLE_CONFIG, log_timestamp_format_enum, server.log_timestamp_format, LOG_TIMESTAMP_LEGACY, NULL, NULL), /* Integer configs */ createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL), diff --git a/src/server.c b/src/server.c index 531fb07b76..f5ce4d21e7 100644 --- a/src/server.c +++ b/src/server.c @@ -109,11 +109,69 @@ const char *replstateToString(int replstate); * function of the server may be called from other threads. */ void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst); +/* Formats the timezone offset into a string. daylight_active indicates whether dst is active (1) + * or not (0). */ +void formatTimezone(char *buf, size_t buflen, int timezone, int daylight_active) { + serverAssert(buflen >= 7); + serverAssert(timezone >= -50400 && timezone <= 43200); + // Adjust the timezone for daylight saving, if active + int total_offset = (-1) * timezone + 3600 * daylight_active; + int hours = abs(total_offset / 3600); + int minutes = abs(total_offset % 3600) / 60; + buf[0] = total_offset >= 0 ? '+' : '-'; + buf[1] = '0' + hours / 10; + buf[2] = '0' + hours % 10; + buf[3] = ':'; + buf[4] = '0' + minutes / 10; + buf[5] = '0' + minutes % 10; + buf[6] = '\0'; +} + +bool hasInvalidLogfmtChar(const char *msg) { + if (msg == NULL) return false; + + for (int i = 0; msg[i] != '\0'; i++) { + if (msg[i] == '"' || msg[i] == '\n' || msg[i] == '\r') { + return true; + } + } + return false; +} + +/* Modifies the input string by: + * replacing \r and \n with whitespace + * replacing " with ' + * + * Parameters: + * safemsg - A char pointer where the modified message will be stored + * safemsglen - size of safemsg + * msg - The original message */ +void filterInvalidLogfmtChar(char *safemsg, size_t safemsglen, const char *msg) { + serverAssert(safemsglen == LOG_MAX_LEN); + if (msg == NULL) return; + + size_t index = 0; + while (index < safemsglen - 1 && msg[index] != '\0') { + if (msg[index] == '"') { + safemsg[index] = '\''; + } else if (msg[index] == '\n' || msg[index] == '\r') { + safemsg[index] = ' '; + } else { + safemsg[index] = msg[index]; + } + index++; + } + safemsg[index] = '\0'; +} + /* Low level logging. To use only for very big messages, otherwise * serverLog() is to prefer. */ void serverLogRaw(int level, const char *msg) { const int syslogLevelMap[] = {LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING}; const char *c = ".-*#"; + const char *verbose_level[] = {"debug", "info", "notice", "warning"}; + const char *roles[] = {"sentinel", "RDB/AOF", "replica", "primary"}; + const char *role_chars = "XCSM"; FILE *fp; char buf[64]; int rawmode = (level & LL_RAW); @@ -133,23 +191,54 @@ void serverLogRaw(int level, const char *msg) { } else { int off; struct timeval tv; - int role_char; pid_t pid = getpid(); int daylight_active = atomic_load_explicit(&server.daylight_active, memory_order_relaxed); gettimeofday(&tv, NULL); struct tm tm; nolocks_localtime(&tm, tv.tv_sec, server.timezone, daylight_active); - off = strftime(buf, sizeof(buf), "%d %b %Y %H:%M:%S.", &tm); - snprintf(buf + off, sizeof(buf) - off, "%03d", (int)tv.tv_usec / 1000); + switch (server.log_timestamp_format) { + case LOG_TIMESTAMP_LEGACY: + off = strftime(buf, sizeof(buf), "%d %b %Y %H:%M:%S.", &tm); + snprintf(buf + off, sizeof(buf) - off, "%03d", (int)tv.tv_usec / 1000); + break; + + case LOG_TIMESTAMP_ISO8601: + off = strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S.", &tm); + char tzbuf[7]; + formatTimezone(tzbuf, sizeof(tzbuf), server.timezone, server.daylight_active); + snprintf(buf + off, sizeof(buf) - off, "%03d%s", (int)tv.tv_usec / 1000, tzbuf); + break; + + case LOG_TIMESTAMP_MILLISECONDS: + snprintf(buf, sizeof(buf), "%lld", (long long)tv.tv_sec * 1000 + (long long)tv.tv_usec / 1000); + break; + } + int role_index; if (server.sentinel_mode) { - role_char = 'X'; /* Sentinel. */ + role_index = 0; /* Sentinel. */ } else if (pid != server.pid) { - role_char = 'C'; /* RDB / AOF writing child. */ + role_index = 1; /* RDB / AOF writing child. */ } else { - role_char = (server.primary_host ? 'S' : 'M'); /* replica or Primary. */ + role_index = (server.primary_host ? 2 : 3); /* Replica or Primary. */ + } + switch (server.log_format) { + case LOG_FORMAT_LOGFMT: + if (hasInvalidLogfmtChar(msg)) { + char safemsg[LOG_MAX_LEN]; + filterInvalidLogfmtChar(safemsg, LOG_MAX_LEN, msg); + fprintf(fp, "pid=%d role=%s timestamp=\"%s\" level=%s message=\"%s\"\n", (int)getpid(), roles[role_index], + buf, verbose_level[level], safemsg); + } else { + fprintf(fp, "pid=%d role=%s timestamp=\"%s\" level=%s message=\"%s\"\n", (int)getpid(), roles[role_index], + buf, verbose_level[level], msg); + } + break; + + case LOG_FORMAT_LEGACY: + fprintf(fp, "%d:%c %s %c %s\n", (int)getpid(), role_chars[role_index], buf, c[level], msg); + break; } - fprintf(fp, "%d:%c %s %c %s\n", (int)getpid(), role_char, buf, c[level], msg); } fflush(fp); diff --git a/src/server.h b/src/server.h index 280c439323..5cf56e9c86 100644 --- a/src/server.h +++ b/src/server.h @@ -567,6 +567,15 @@ typedef enum { #define PAUSE_ACTION_EVICT (1 << 3) #define PAUSE_ACTION_REPLICA (1 << 4) /* pause replica traffic */ +/* Sets log format */ +typedef enum { LOG_FORMAT_LEGACY = 0, + LOG_FORMAT_LOGFMT } log_format_type; + +/* Sets log timestamp format */ +typedef enum { LOG_TIMESTAMP_LEGACY = 0, + LOG_TIMESTAMP_ISO8601, + LOG_TIMESTAMP_MILLISECONDS } log_timestamp_type; + /* common sets of actions to pause/unpause */ #define PAUSE_ACTIONS_CLIENT_WRITE_SET \ (PAUSE_ACTION_CLIENT_WRITE | PAUSE_ACTION_EXPIRE | PAUSE_ACTION_EVICT | PAUSE_ACTION_REPLICA) @@ -1978,17 +1987,19 @@ struct valkeyServer { serverOpArray also_propagate; /* Additional command to propagate. */ int replication_allowed; /* Are we allowed to replicate? */ /* Logging */ - char *logfile; /* Path of log file */ - int syslog_enabled; /* Is syslog enabled? */ - char *syslog_ident; /* Syslog ident */ - int syslog_facility; /* Syslog facility */ - int crashlog_enabled; /* Enable signal handler for crashlog. - * disable for clean core dumps. */ - int crashed; /* True if the server has crashed, used in catClientInfoString - * to indicate that no wait for IO threads is needed. */ - int memcheck_enabled; /* Enable memory check on crash. */ - int use_exit_on_panic; /* Use exit() on panic and assert rather than - * abort(). useful for Valgrind. */ + char *logfile; /* Path of log file */ + int syslog_enabled; /* Is syslog enabled? */ + char *syslog_ident; /* Syslog ident */ + int syslog_facility; /* Syslog facility */ + int crashlog_enabled; /* Enable signal handler for crashlog. + * disable for clean core dumps. */ + int crashed; /* True if the server has crashed, used in catClientInfoString + * to indicate that no wait for IO threads is needed. */ + int memcheck_enabled; /* Enable memory check on crash. */ + int use_exit_on_panic; /* Use exit() on panic and assert rather than + * abort(). useful for Valgrind. */ + int log_format; /* Print log in specific format */ + int log_timestamp_format; /* Timestamp format in log */ /* Shutdown */ int shutdown_timeout; /* Graceful shutdown time limit in seconds. */ int shutdown_on_sigint; /* Shutdown flags configured for SIGINT. */ From 9c60fcdae241a7e1dedc2f51d19491d168d66b9b Mon Sep 17 00:00:00 2001 From: Lipeng Zhu Date: Fri, 25 Oct 2024 17:13:28 +0800 Subject: [PATCH 13/15] Do security attack check only when command not found to reduce the critical path (#1212) When explored the cycles distribution for main thread with io-threads enabled. We found this security attack check takes significant time in main thread, **~3%** cycles were used to do the commands security check in main thread. This patch try to completely avoid doing it in the hot path. We can do it only after we looked up the command and it wasn't found, just before we call commandCheckExistence. --------- Signed-off-by: Lipeng Zhu Co-authored-by: Wangyang Guo --- src/server.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/server.c b/src/server.c index f5ce4d21e7..e95012eefa 100644 --- a/src/server.c +++ b/src/server.c @@ -3916,12 +3916,6 @@ int processCommand(client *c) { reqresAppendRequest(c); } - /* Handle possible security attacks. */ - if (!strcasecmp(c->argv[0]->ptr, "host:") || !strcasecmp(c->argv[0]->ptr, "post")) { - securityWarningCommand(c); - return C_ERR; - } - /* If we're inside a module blocked context yielding that wants to avoid * processing clients, postpone the command. */ if (server.busy_module_yield_flags != BUSY_MODULE_YIELD_NONE && @@ -3936,6 +3930,13 @@ int processCommand(client *c) { * we do not have to repeat the same checks */ if (!client_reprocessing_command) { struct serverCommand *cmd = c->io_parsed_cmd ? c->io_parsed_cmd : lookupCommand(c->argv, c->argc); + if (!cmd) { + /* Handle possible security attacks. */ + if (!strcasecmp(c->argv[0]->ptr, "host:") || !strcasecmp(c->argv[0]->ptr, "post")) { + securityWarningCommand(c); + return C_ERR; + } + } c->cmd = c->lastcmd = c->realcmd = cmd; sds err; if (!commandCheckExistence(c, &err)) { From 4be09e434a3b5bf55e6e6b5a98f315720e31010f Mon Sep 17 00:00:00 2001 From: Shivshankar Date: Fri, 25 Oct 2024 08:03:59 -0400 Subject: [PATCH 14/15] Fix typo in valkey.conf file's shutdown section (#1224) Found typo "exists" ==> "exits" in valkey.conf in shutdown section. Signed-off-by: Shivshankar-Reddy --- valkey.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/valkey.conf b/valkey.conf index 5960feb1b6..5887468673 100644 --- a/valkey.conf +++ b/valkey.conf @@ -1580,7 +1580,7 @@ aof-timestamp-enabled no # Maximum time to wait for replicas when shutting down, in seconds. # # During shut down, a grace period allows any lagging replicas to catch up with -# the latest replication offset before the primary exists. This period can +# the latest replication offset before the primary exits. This period can # prevent data loss, especially for deployments without configured disk backups. # # The 'shutdown-timeout' value is the grace period's duration in seconds. It is From 5d2ff853a335af2bc2c1527da126b3e947269ad2 Mon Sep 17 00:00:00 2001 From: Binbin Date: Sun, 27 Oct 2024 15:23:00 +0800 Subject: [PATCH 15/15] Fix minor repldbfd leak in updateReplicasWaitingBgsave if fstat fails (#1226) In the old code, if fstat fails, replica->repldbfd will hold the fd and we are doing a free client. And in freeClient, we check and close only if repl_state == REPLICA_STATE_SEND_BULK. So if fstat fails, we will leak the fd. We can also extend freeClient to handle REPLICA_STATE_WAIT_BGSAVE_END as well, but here seems to be a more friendly (and safer) way. Signed-off-by: Binbin --- src/replication.c | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/replication.c b/src/replication.c index 63433de865..a92bb79984 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1741,6 +1741,7 @@ void updateReplicasWaitingBgsave(int bgsaveerr, int type) { client *replica = ln->value; if (replica->repl_state == REPLICA_STATE_WAIT_BGSAVE_END) { + int repldbfd; struct valkey_stat buf; if (bgsaveerr != C_OK) { @@ -1790,17 +1791,26 @@ void updateReplicasWaitingBgsave(int bgsaveerr, int type) { } replica->repl_start_cmd_stream_on_ack = 1; } else { - if ((replica->repldbfd = open(server.rdb_filename, O_RDONLY)) == -1 || - valkey_fstat(replica->repldbfd, &buf) == -1) { + repldbfd = open(server.rdb_filename, O_RDONLY); + if (repldbfd == -1) { freeClientAsync(replica); - serverLog(LL_WARNING, "SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); + serverLog(LL_WARNING, "SYNC failed. Can't open DB after BGSAVE: %s", strerror(errno)); continue; } + if (valkey_fstat(repldbfd, &buf) == -1) { + freeClientAsync(replica); + serverLog(LL_WARNING, "SYNC failed. Can't stat DB after BGSAVE: %s", strerror(errno)); + close(repldbfd); + continue; + } + replica->repldbfd = repldbfd; replica->repldboff = 0; replica->repldbsize = buf.st_size; replica->repl_state = REPLICA_STATE_SEND_BULK; replica->replpreamble = sdscatprintf(sdsempty(), "$%lld\r\n", (unsigned long long)replica->repldbsize); + /* When repl_state changes to REPLICA_STATE_SEND_BULK, we will release + * the resources in freeClient. */ connSetWriteHandler(replica->conn, NULL); if (connSetWriteHandler(replica->conn, sendBulkToReplica) == C_ERR) { freeClientAsync(replica);