Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/unstable' into shutdown_failover
Browse files Browse the repository at this point in the history
Signed-off-by: Binbin <[email protected]>
  • Loading branch information
enjoy-binbin committed Oct 27, 2024
2 parents 0cc7ebb + 5d2ff85 commit 366e082
Show file tree
Hide file tree
Showing 15 changed files with 296 additions and 50 deletions.
23 changes: 16 additions & 7 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,7 @@ void clusterReset(int hard) {
if (nodeIsReplica(myself)) {
clusterSetNodeAsPrimary(myself);
replicationUnsetPrimary();
emptyData(-1, EMPTYDB_NO_FLAGS, NULL);
emptyData(-1, server.lazyfree_lazy_user_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, NULL);
}

/* Close slots, reset manual failover state. */
Expand Down Expand Up @@ -4433,11 +4433,18 @@ int clusterGetReplicaRank(void) {
void clusterLogCantFailover(int reason) {
char *msg;
static time_t lastlog_time = 0;
time_t now = time(NULL);

/* Don't log if we have the same reason for some time. */
if (reason == server.cluster->cant_failover_reason &&
time(NULL) - lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD)
/* General logging suppression if the same reason has occurred recently. */
if (reason == server.cluster->cant_failover_reason && now - lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD) {
return;
}

/* Special case: If the failure reason is due to data age, log 10 times less frequently. */
if (reason == server.cluster->cant_failover_reason && reason == CLUSTER_CANT_FAILOVER_DATA_AGE &&
now - lastlog_time < 10 * CLUSTER_CANT_FAILOVER_RELOG_PERIOD) {
return;
}

server.cluster->cant_failover_reason = reason;

Expand Down Expand Up @@ -5806,6 +5813,8 @@ const char *clusterGetMessageTypeString(int type) {
return "unknown";
}

/* Get the slot from robj and return it. If the slot is not valid,
* return -1 and send an error to the client. */
int getSlotOrReply(client *c, robj *o) {
long long slot;

Expand Down Expand Up @@ -6531,7 +6540,7 @@ int clusterCommandSpecial(client *c) {
memset(slots, 0, CLUSTER_SLOTS);
/* Check that all the arguments are parseable.*/
for (j = 2; j < c->argc; j++) {
if ((slot = getSlotOrReply(c, c->argv[j])) == C_ERR) {
if ((slot = getSlotOrReply(c, c->argv[j])) == -1) {
zfree(slots);
return 1;
}
Expand Down Expand Up @@ -6564,11 +6573,11 @@ int clusterCommandSpecial(client *c) {
/* Check that all the arguments are parseable and that all the
* slots are not already busy. */
for (j = 2; j < c->argc; j += 2) {
if ((startslot = getSlotOrReply(c, c->argv[j])) == C_ERR) {
if ((startslot = getSlotOrReply(c, c->argv[j])) == -1) {
zfree(slots);
return 1;
}
if ((endslot = getSlotOrReply(c, c->argv[j + 1])) == C_ERR) {
if ((endslot = getSlotOrReply(c, c->argv[j + 1])) == -1) {
zfree(slots);
return 1;
}
Expand Down
4 changes: 2 additions & 2 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ void clusterSlotStatsCommand(client *c) {
if (c->argc == 5 && !strcasecmp(c->argv[2]->ptr, "slotsrange")) {
/* CLUSTER SLOT-STATS SLOTSRANGE start-slot end-slot */
int startslot, endslot;
if ((startslot = getSlotOrReply(c, c->argv[3])) == C_ERR ||
(endslot = getSlotOrReply(c, c->argv[4])) == C_ERR) {
if ((startslot = getSlotOrReply(c, c->argv[3])) == -1 ||
(endslot = getSlotOrReply(c, c->argv[4])) == -1) {
return;
}
if (startslot > endslot) {
Expand Down
11 changes: 9 additions & 2 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -6408,6 +6408,7 @@ struct COMMAND_STRUCT ACL_Subcommands[] = {
/* BGSAVE history */
commandHistory BGSAVE_History[] = {
{"3.2.2","Added the `SCHEDULE` option."},
{"8.1.0","Added the `CANCEL` option."},
};
#endif

Expand All @@ -6421,9 +6422,15 @@ commandHistory BGSAVE_History[] = {
#define BGSAVE_Keyspecs NULL
#endif

/* BGSAVE operation argument table */
struct COMMAND_ARG BGSAVE_operation_Subargs[] = {
{MAKE_ARG("schedule",ARG_TYPE_PURE_TOKEN,-1,"SCHEDULE",NULL,"3.2.2",CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("cancel",ARG_TYPE_PURE_TOKEN,-1,"CANCEL",NULL,"8.1.0",CMD_ARG_NONE,0,NULL)},
};

/* BGSAVE argument table */
struct COMMAND_ARG BGSAVE_Args[] = {
{MAKE_ARG("schedule",ARG_TYPE_PURE_TOKEN,-1,"SCHEDULE",NULL,"3.2.2",CMD_ARG_OPTIONAL,0,NULL)},
{MAKE_ARG("operation",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=BGSAVE_operation_Subargs},
};

/********** COMMAND COUNT ********************/
Expand Down Expand Up @@ -10989,7 +10996,7 @@ struct COMMAND_STRUCT serverCommandTable[] = {
/* server */
{MAKE_CMD("acl","A container for Access List Control commands.","Depends on subcommand.","6.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,ACL_History,0,ACL_Tips,0,NULL,-2,CMD_SENTINEL,0,ACL_Keyspecs,0,NULL,0),.subcommands=ACL_Subcommands},
{MAKE_CMD("bgrewriteaof","Asynchronously rewrites the append-only file to disk.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,BGREWRITEAOF_History,0,BGREWRITEAOF_Tips,0,bgrewriteaofCommand,1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT,0,BGREWRITEAOF_Keyspecs,0,NULL,0)},
{MAKE_CMD("bgsave","Asynchronously saves the database(s) to disk.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,BGSAVE_History,1,BGSAVE_Tips,0,bgsaveCommand,-1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT,0,BGSAVE_Keyspecs,0,NULL,1),.args=BGSAVE_Args},
{MAKE_CMD("bgsave","Asynchronously saves the database(s) to disk.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,BGSAVE_History,2,BGSAVE_Tips,0,bgsaveCommand,-1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT,0,BGSAVE_Keyspecs,0,NULL,1),.args=BGSAVE_Args},
{MAKE_CMD("command","Returns detailed information about all commands.","O(N) where N is the total number of commands","2.8.13",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,COMMAND_History,0,COMMAND_Tips,1,commandCommand,-1,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,COMMAND_Keyspecs,0,NULL,0),.subcommands=COMMAND_Subcommands},
{MAKE_CMD("config","A container for server configuration commands.","Depends on subcommand.","2.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,CONFIG_History,0,CONFIG_Tips,0,NULL,-2,0,0,CONFIG_Keyspecs,0,NULL,0),.subcommands=CONFIG_Subcommands},
{MAKE_CMD("dbsize","Returns the number of keys in the database.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,DBSIZE_History,0,DBSIZE_Tips,2,dbsizeCommand,1,CMD_READONLY|CMD_FAST,ACL_CATEGORY_KEYSPACE,DBSIZE_Keyspecs,0,NULL,0)},
Expand Down
30 changes: 26 additions & 4 deletions src/commands/bgsave.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
[
"3.2.2",
"Added the `SCHEDULE` option."
],
[
"8.1.0",
"Added the `CANCEL` option."
]
],
"command_flags": [
Expand All @@ -19,11 +23,23 @@
],
"arguments": [
{
"name": "schedule",
"token": "SCHEDULE",
"type": "pure-token",
"name": "operation",
"type": "oneof",
"optional": true,
"since": "3.2.2"
"arguments": [
{
"name": "schedule",
"token": "SCHEDULE",
"type": "pure-token",
"since": "3.2.2"
},
{
"name": "cancel",
"token": "CANCEL",
"type": "pure-token",
"since": "8.1.0"
}
]
}
],
"reply_schema": {
Expand All @@ -33,6 +49,12 @@
},
{
"const": "Background saving scheduled"
},
{
"const": "Background saving cancelled"
},
{
"const": "Scheduled background saving cancelled"
}
]
}
Expand Down
9 changes: 9 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ configEnum propagation_error_behavior_enum[] = {
{"panic-on-replicas", PROPAGATION_ERR_BEHAVIOR_PANIC_ON_REPLICAS},
{NULL, 0}};

configEnum log_format_enum[] = {{"legacy", LOG_FORMAT_LEGACY}, {"logfmt", LOG_FORMAT_LOGFMT}, {NULL, 0}};

configEnum log_timestamp_format_enum[] = {{"legacy", LOG_TIMESTAMP_LEGACY},
{"iso8601", LOG_TIMESTAMP_ISO8601},
{"milliseconds", LOG_TIMESTAMP_MILLISECONDS},
{NULL, 0}};

/* Output buffer limits presets. */
clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = {
{0, 0, 0}, /* normal */
Expand Down Expand Up @@ -3191,6 +3198,8 @@ standardConfig static_configs[] = {
createEnumConfig("propagation-error-behavior", NULL, MODIFIABLE_CONFIG, propagation_error_behavior_enum, server.propagation_error_behavior, PROPAGATION_ERR_BEHAVIOR_IGNORE, NULL, NULL),
createEnumConfig("shutdown-on-sigint", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigint, 0, isValidShutdownOnSigFlags, NULL),
createEnumConfig("shutdown-on-sigterm", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigterm, 0, isValidShutdownOnSigFlags, NULL),
createEnumConfig("log-format", NULL, MODIFIABLE_CONFIG, log_format_enum, server.log_format, LOG_FORMAT_LEGACY, NULL, NULL),
createEnumConfig("log-timestamp-format", NULL, MODIFIABLE_CONFIG, log_timestamp_format_enum, server.log_timestamp_format, LOG_TIMESTAMP_LEGACY, NULL, NULL),

/* Integer configs */
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
Expand Down
2 changes: 1 addition & 1 deletion src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ void initIOThreads(void) {

int trySendReadToIOThreads(client *c) {
if (server.active_io_threads_num <= 1) return C_ERR;
/* If IO thread is areadty reading, return C_OK to make sure the main thread will not handle it. */
/* If IO thread is already reading, return C_OK to make sure the main thread will not handle it. */
if (c->io_read_state != CLIENT_IDLE) return C_OK;
/* Currently, replica/master writes are not offloaded and are processed synchronously. */
if (c->flag.primary || getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
Expand Down
2 changes: 2 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2669,6 +2669,8 @@ void processInlineBuffer(client *c) {

/* Create an Object for all arguments. */
for (c->argc = 0, j = 0; j < argc; j++) {
/* Strings returned from sdssplitargs() may have unused capacity that we can trim. */
argv[j] = sdsRemoveFreeSpace(argv[j], 1);
c->argv[c->argc] = createObject(OBJ_STRING, argv[j]);
c->argc++;
c->argv_len_sum += sdslen(argv[j]);
Expand Down
21 changes: 21 additions & 0 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -1355,6 +1355,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
sdsfree(slot_info);
goto werr;
}
written += res;
last_slot = curr_slot;
sdsfree(slot_info);
}
Expand Down Expand Up @@ -3689,6 +3690,21 @@ void bgsaveCommand(client *c) {
if (c->argc > 1) {
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "schedule")) {
schedule = 1;
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "cancel")) {
/* Terminates an in progress BGSAVE */
if (server.child_type == CHILD_TYPE_RDB) {
/* There is an ongoing bgsave */
serverLog(LL_NOTICE, "Background saving will be aborted due to user request");
killRDBChild();
addReplyStatus(c, "Background saving cancelled");
} else if (server.rdb_bgsave_scheduled == 1) {
serverLog(LL_NOTICE, "Scheduled background saving will be cancelled due to user request");
server.rdb_bgsave_scheduled = 0;
addReplyStatus(c, "Scheduled background saving cancelled");
} else {
addReplyError(c, "Background saving is currently not in progress or scheduled");
}
return;
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
Expand All @@ -3703,6 +3719,11 @@ void bgsaveCommand(client *c) {
} else if (hasActiveChildProcess() || server.in_exec) {
if (schedule || server.in_exec) {
server.rdb_bgsave_scheduled = 1;
if (schedule) {
serverLog(LL_NOTICE, "Background saving scheduled due to user request");
} else {
serverLog(LL_NOTICE, "Background saving scheduled to run after transaction execution");
}
addReplyStatus(c, "Background saving scheduled");
} else {
addReplyError(c, "Another child process is active (AOF?): can't BGSAVE right now. "
Expand Down
11 changes: 7 additions & 4 deletions src/rdma.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ typedef struct rdma_listener {
* handler into pending list */
static list *pending_list;

static rdma_listener *rdma_listeners;

static ConnectionType CT_RDMA;

static int valkey_rdma_rx_size = VALKEY_RDMA_DEFAULT_RX_SIZE;
Expand Down Expand Up @@ -748,7 +750,7 @@ static rdma_listener *rdmaFdToListener(connListener *listener, int fd) {
for (int i = 0; i < listener->count; i++) {
if (listener->fd[i] != fd) continue;

return (rdma_listener *)listener->priv + i;
return &rdma_listeners[i];
}

return NULL;
Expand Down Expand Up @@ -1537,7 +1539,7 @@ int connRdmaListen(connListener *listener) {
bindaddr = default_bindaddr;
}

listener->priv = rdma_listener = zcalloc_num(bindaddr_count, sizeof(*rdma_listener));
rdma_listeners = rdma_listener = zcalloc_num(bindaddr_count, sizeof(*rdma_listener));
for (j = 0; j < bindaddr_count; j++) {
char *addr = bindaddr[j];
int optional = *addr == '-';
Expand Down Expand Up @@ -1757,13 +1759,14 @@ static int rdmaChangeListener(void) {

aeDeleteFileEvent(server.el, listener->fd[i], AE_READABLE);
listener->fd[i] = -1;
struct rdma_listener *rdma_listener = (struct rdma_listener *)listener->priv + i;
struct rdma_listener *rdma_listener = &rdma_listeners[i];
rdma_destroy_id(rdma_listener->cm_id);
rdma_destroy_event_channel(rdma_listener->cm_channel);
}

listener->count = 0;
zfree(listener->priv);
zfree(rdma_listeners);
rdma_listeners = NULL;

closeListener(listener);

Expand Down
16 changes: 13 additions & 3 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,7 @@ void updateReplicasWaitingBgsave(int bgsaveerr, int type) {
client *replica = ln->value;

if (replica->repl_state == REPLICA_STATE_WAIT_BGSAVE_END) {
int repldbfd;
struct valkey_stat buf;

if (bgsaveerr != C_OK) {
Expand Down Expand Up @@ -1790,17 +1791,26 @@ void updateReplicasWaitingBgsave(int bgsaveerr, int type) {
}
replica->repl_start_cmd_stream_on_ack = 1;
} else {
if ((replica->repldbfd = open(server.rdb_filename, O_RDONLY)) == -1 ||
valkey_fstat(replica->repldbfd, &buf) == -1) {
repldbfd = open(server.rdb_filename, O_RDONLY);
if (repldbfd == -1) {
freeClientAsync(replica);
serverLog(LL_WARNING, "SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
serverLog(LL_WARNING, "SYNC failed. Can't open DB after BGSAVE: %s", strerror(errno));
continue;
}
if (valkey_fstat(repldbfd, &buf) == -1) {
freeClientAsync(replica);
serverLog(LL_WARNING, "SYNC failed. Can't stat DB after BGSAVE: %s", strerror(errno));
close(repldbfd);
continue;
}
replica->repldbfd = repldbfd;
replica->repldboff = 0;
replica->repldbsize = buf.st_size;
replica->repl_state = REPLICA_STATE_SEND_BULK;
replica->replpreamble = sdscatprintf(sdsempty(), "$%lld\r\n", (unsigned long long)replica->repldbsize);

/* When repl_state changes to REPLICA_STATE_SEND_BULK, we will release
* the resources in freeClient. */
connSetWriteHandler(replica->conn, NULL);
if (connSetWriteHandler(replica->conn, sendBulkToReplica) == C_ERR) {
freeClientAsync(replica);
Expand Down
Loading

0 comments on commit 366e082

Please sign in to comment.