Skip to content

Commit

Permalink
Bug fixes for SYNCSLOTS based implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Jacob Murphy <[email protected]>
  • Loading branch information
murphyjacob4 committed Jan 20, 2025
1 parent 6e8bdb5 commit 98de0a7
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 141 deletions.
21 changes: 0 additions & 21 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -815,27 +815,6 @@ unsigned int countKeysInSlot(unsigned int slot) {
return kvstoreHashtableSize(server.db->keys, slot);
}

unsigned int dropKeysInSlotBitmap(slotBitmap slot_bitmap, int async) {
unsigned int result = 0;
for (int i = 0; i < CLUSTER_SLOTS; i++) {
if (bitmapTestBit(slot_bitmap, i)) {
result += dropKeysInSlot(i, async);
}
}
return result;
}

unsigned int dropKeysInSlot(unsigned int hashslot, int async) {
unsigned int result = kvstoreHashtableSize(server.db->keys, hashslot);
if (async) {
emptyHashtableAsync(server.db, hashslot);
} else {
kvstoreEmptyHashtable(server.db->keys, hashslot, NULL);
kvstoreEmptyHashtable(server.db->expires, hashslot, NULL);
}
return result;
}

void clusterCommandHelp(client *c) {
const char *help[] = {
"COUNTKEYSINSLOT <slot>",
Expand Down
2 changes: 0 additions & 2 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ client *createCachedResponseClient(int resp);
void deleteCachedResponseClient(client *recording_client);
void clearCachedClusterSlotsResponse(void);
unsigned int countKeysInSlot(unsigned int hashslot);
unsigned int dropKeysInSlotBitmap(slotBitmap slot_bitmap, int async);
unsigned int dropKeysInSlot(unsigned int hashslot, int async);
void bitmapToSlotRanges(unsigned char *bitmap, slotBitmap slot_bitmap_out);
int bitmapTestBit(unsigned char *bitmap, int pos);
void bitmapSetBit(unsigned char *bitmap, int pos);
Expand Down
127 changes: 67 additions & 60 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ const char *clusterGetMessageTypeString(int type);
void removeChannelsInSlot(unsigned int slot);
unsigned int countChannelsInSlot(unsigned int hashslot);
unsigned int delKeysInSlot(unsigned int hashslot);
unsigned int delKeysInSlotBitmap(slotBitmap bitmap);
void clusterAddNodeToShard(const char *shard_id, clusterNode *node);
list *clusterLookupNodeListByShardId(const char *shard_id);
void clusterRemoveNodeFromShard(clusterNode *node);
Expand Down Expand Up @@ -4424,10 +4425,10 @@ slotImport *clusterCreateSlotImportJob(clusterNode *source, slotBitmap slots) {
}

slotExport *clusterCreateSlotExportJob(client *c, slotBitmap slots) {
slotExport *result = (slotExport *) zmalloc(sizeof(slotExport));
slotExport *result = (slotExport *) zcalloc(sizeof(slotExport));
memcpy(result->slot_bitmap, slots, sizeof(slotBitmap));
result->state = SLOT_EXPORT_QUEUED;
result->pause_end = 0;
result->pause_end = -1;
result->client = c;
return result;
}
Expand Down Expand Up @@ -4484,17 +4485,14 @@ void clusterFeedSlotMigration(int dbid, robj **argv, int argc) {
unsigned long long prev_pending = curr_export->client->reply_bytes;
addReplyArrayLen(curr_export->client, argc);
for (i = 0; i < argc; i++) {
addReply(curr_export->client, argv[i]);
addReplyBulk(curr_export->client, argv[i]);
}
curr_export->syncslot_offset += curr_export->client->reply_bytes - prev_pending;
}

int clusterShouldWriteToSlotMigrationTarget() {
slotExport *curr_export = clusterGetCurrentSlotExport();
if (curr_export->state != SLOT_EXPORT_PAUSED) {
return 0;
}
return 1;
return curr_export && (curr_export->state == SLOT_EXPORT_PAUSE_AND_REPLY || curr_export->state == SLOT_EXPORT_PAUSED);
}

void clusterSlotMigrationHandleClientClose(client *c) {
Expand Down Expand Up @@ -4570,7 +4568,7 @@ void clusterProceedWithSlotImport(void) {
c->flag.authenticated = 1;
c->user = NULL; /* This client can do everything. */
c->querybuf = sdsempty(); /* Similar to primary, we use a dedicated query buf. */
initClientReplicationData(c); /* Used to track reploff */
initClientReplicationData(c);

curr_import->state = SLOT_IMPORT_SEND_AUTH;
continue;
Expand Down Expand Up @@ -4614,7 +4612,7 @@ void clusterProceedWithSlotImport(void) {
continue;
case SLOT_IMPORT_SEND_SYNCSLOTS:
/* Ensure we have a clean state for the SYNC. */
dropKeysInSlotBitmap(curr_import->slot_bitmap, 1);
delKeysInSlotBitmap(curr_import->slot_bitmap);

serverLog(LL_NOTICE, "Sending CLUSTER SYNCSLOTS START request to source node %.40s", curr_import->source_node->name);
char *syncslots_args[4] = {"CLUSTER", "SYNCSLOTS", "START", (char *)curr_import->slot_bitmap};
Expand All @@ -4632,7 +4630,7 @@ void clusterProceedWithSlotImport(void) {
connSetReadHandler(curr_import->client->conn, readQueryFromClient);
curr_import->state = SLOT_IMPORT_RECEIVE_SYNCSLOTS;
case SLOT_IMPORT_RECEIVE_SYNCSLOTS:
/* Nothing to do in this state. Waiting for CLUSTER SYNCSLOTS END to be processed. */
/* Nothing to do in this state. Waiting for CLUSTER SYNCSLOTS ENDSNAPSHOT to be processed. */
return;
case SLOT_IMPORT_PAUSE_OWNER:
curr_import->client->flag.replication_force_reply = 1;
Expand Down Expand Up @@ -4674,20 +4672,9 @@ void clusterProceedWithSlotImport(void) {
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
listDelNode(server.cluster->slot_import_jobs, curr_node);
continue;
case SLOT_IMPORT_REPLICA_TRACKING:
/* As a replica, we will simply apply the primaries updates
* from the slot migration source. However, if we are ever
* promoted to primary, we need to fail the migration to
* prevent leaked keys in the importing slots. */
if (clusterNodeIsPrimary(myself)) {
serverLog(LL_WARNING, "Promoted to primary during slot migration, failing the ongoing migration");
curr_import->state = SLOT_IMPORT_FAILED;
continue;
}
return;
case SLOT_IMPORT_FAILED:
listDelNode(server.cluster->slot_import_jobs, curr_node);
dropKeysInSlotBitmap(curr_import->slot_bitmap, server.repl_replica_lazy_flush);
delKeysInSlotBitmap(curr_import->slot_bitmap);
clusterFreeSlotImportJob(curr_import);
continue;
}
Expand All @@ -4700,7 +4687,7 @@ int childSnapshotForSyncSlot(int req, rio *rdb, void *privdata) {
rioWrite(rdb, "*3\r\n", 4);
rioWriteBulkString(rdb, "CLUSTER", 7);
rioWriteBulkString(rdb, "SYNCSLOTS", 9);
rioWriteBulkString(rdb, "END", 3);
rioWriteBulkString(rdb, "ENDSNAPSHOT", 11);
return retval;
}

Expand All @@ -4721,6 +4708,7 @@ void clusterProceedWithSlotExport(void) {
}
connection ** conns = zmalloc(sizeof(connection*));
*conns = curr_export->client->conn;
serverLog(LL_NOTICE, "Initiating snapshot to conn with fd %d", curr_export->client->conn->fd);
if (saveSnapshotToConnectionSockets(conns, 1, 1, 0, childSnapshotForSyncSlot, curr_export->slot_bitmap) != C_OK) {
serverLog(LL_WARNING, "Failed to start slot export to target");
curr_export->state = SLOT_EXPORT_FAILED;
Expand All @@ -4736,16 +4724,32 @@ void clusterProceedWithSlotExport(void) {
addReplyArrayLen(curr_export->client, 4);
addReplyBulkCBuffer(curr_export->client, "CLUSTER", 7);
addReplyBulkCBuffer(curr_export->client, "SYNCSLOTS", 9);
addReplyBulkCBuffer(curr_export->client, "PAUSEDAT", 8);
addReplyLongLong(curr_export->client, curr_export->syncslot_offset);
addReplyBulkCBuffer(curr_export->client, "PAUSEOFFSET", 11);
addReplyBulkLongLong(curr_export->client, curr_export->syncslot_offset);

/* Even though we just added replies, it's possible that, due to
* existing pending data, the client is not in the pending write
* queue. We enqueue it explicitly to work around this. */
putClientInPendingWriteQueue(curr_export->client);

curr_export->pause_end = mstime() + (CLUSTER_MF_TIMEOUT * CLUSTER_MF_PAUSE_MULT);
pauseActions(PAUSE_DURING_SLOT_MIGRATION, curr_export->pause_end, PAUSE_ACTIONS_CLIENT_WRITE_SET);

curr_export->state = SLOT_EXPORT_PAUSED;
continue;
case SLOT_EXPORT_PAUSED:
/* */
/* While paused, we simply want to check if we should unpause. */
if (curr_export->pause_end <= mstime()) {
/* Every CLUSTER_MF_TIMEOUT, the source node should
* re-attempt the pause. If we reach this point, it hasn't
* attempted the pause in that time, we can assume it is
* dead and fail the migration.*/
serverLog(LL_WARNING, "During slot export, unpausing self and cancelling export due to timeout.");
unpauseActions(PAUSE_DURING_SLOT_MIGRATION);
curr_export->state = SLOT_EXPORT_FAILED;
continue;
}
return;
case SLOT_EXPORT_FINISH:
case SLOT_EXPORT_FAILED:
listDelNode(server.cluster->slot_export_jobs, curr_node);
Expand Down Expand Up @@ -6714,9 +6718,18 @@ void removeChannelsInSlot(unsigned int slot) {
pubsubShardUnsubscribeAllChannelsInSlot(slot);
}

unsigned int delKeysInSlotBitmap(slotBitmap bitmap) {
unsigned int res = 0;
for (int i = 0; i < CLUSTER_SLOTS; i++) {
if (bitmapTestBit(bitmap, i)) {
res += delKeysInSlot(i);
}
}
return res;
}

/* Remove all the keys in the specified hash slot.
* The number of removed items is returned. */
// TODO(murphyjacob4) - can we just use this?
unsigned int delKeysInSlot(unsigned int hashslot) {
if (!countKeysInSlot(hashslot)) return 0;

Expand Down Expand Up @@ -7572,35 +7585,29 @@ int clusterCommandSpecial(client *c) {
}
c->flag.slot_migration_target = 1;
initClientReplicationData(c);
slotExport *job = clusterCreateSlotExportJob(c, c->argv[2]->ptr);
slotExport *job = clusterCreateSlotExportJob(c, c->argv[3]->ptr);
listAddNodeTail(server.cluster->slot_export_jobs, job);
clusterProceedWithSlotMigration();
} else if (!strcasecmp(c->argv[2]->ptr, "inform")) {
/* CLUSTER SYNCSLOTS INFORM <slot-bitmap> */
if (c->argc != 4) {
addReplyError(c, "CLUSTER SYNCSLOTS INFORM command requires exactly one argument");
return 1;
}
slotImport * to_enqueue = clusterCreateSlotImportJob(NULL, c->argv[2]->ptr);
to_enqueue->state = SLOT_IMPORT_REPLICA_TRACKING;
} else if (!strcasecmp(c->argv[2]->ptr, "end")) {
/* CLUSTER SYNCSLOTS END */
} else if (!strcasecmp(c->argv[2]->ptr, "endsnapshot")) {
/* CLUSTER SYNCSLOTS ENDSNAPSHOT */
if (c->argc != 3) {
addReplyError(c, "CLUSTER SYNCSLOTS END does not expect any arguments.");
addReplyError(c, "CLUSTER SYNCSLOTS ENDSNAPSHOT does not expect any arguments.");
return 1;
}
slotImport *curr_import = clusterGetCurrentSlotImport();
if (!curr_import || (curr_import->state != SLOT_IMPORT_RECEIVE_SYNCSLOTS && curr_import->state != SLOT_IMPORT_REPLICA_TRACKING)) {
addReplyError(c, "No ongoing CLUSTER SYNCSLOTS to end.");
if (c->flag.primary) {
/* Due to the proxying nature of replication from the source
* node through the target node to the target node's replicas,
* this message should simply be ignored. */
return 1;
}
if (curr_import->state != SLOT_IMPORT_REPLICA_TRACKING) {
/* Replicas will also receive this command through the replication
* stream, but it is not actionable. */
slotImport *curr_import = clusterGetCurrentSlotImport();
if (!curr_import || (curr_import->state != SLOT_IMPORT_RECEIVE_SYNCSLOTS)) {
addReplyError(c, "No ongoing snapshot to end.");
return 1;
}
if (curr_import->client != c) {
addReplyError(c, "This client is not the one that initiated the ongoing CLUSTER SYNCSLOTS.");
return 1;
}
curr_import->state = SLOT_IMPORT_PAUSE_OWNER;
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_SLOTMIGRATION);
Expand All @@ -7621,28 +7628,25 @@ int clusterCommandSpecial(client *c) {
} else if (slot_export->state != SLOT_EXPORT_SNAPSHOTTING) {
addReplyError(c, "SYNCSLOTS is not in the correct state for this command.");
return 1;
} else {
/* First pause. We want to flush the output buffer that was not allowed to
* flush during the snapshot. */
putClientInPendingWriteQueue(slot_export->client);
}
serverLog(LL_NOTICE, "Pause received by target during slot migration. Pausing and initiating stream of commands.");

slot_export->state = SLOT_EXPORT_PAUSE_AND_REPLY;
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_SLOTMIGRATION);
} else if (!strcasecmp(c->argv[2]->ptr, "pausedat")) {
/* CLUSTER SYNCSLOTS PAUSEDAT <offset> */
} else if (!strcasecmp(c->argv[2]->ptr, "pauseoffset")) {
/* CLUSTER SYNCSLOTS PAUSEOFFSET <offset> */
if (c->argc != 4) {
addReplyError(c, "CLUSTER SYNCSLOTS PAUSEDAT command requires exactly one argument.");
addReplyError(c, "CLUSTER SYNCSLOTS PAUSEOFFSET command requires exactly one argument.");
return 1;
}
slotImport *slot_import = clusterGetCurrentSlotImport();
if (!slot_import || slot_import->state != SLOT_IMPORT_WAITING_FOR_OFFSET) {
addReplyError(c, "No CLUSTER SYNCSLOTS is waiting for a PAUSEDAT response.");
addReplyError(c, "No CLUSTER SYNCSLOTS is waiting for a PAUSEOFFSET response.");
return 1;
}
long long offset;
if (getLongLongFromObject(c->argv[3]->ptr, &offset) != C_OK) {
addReplyError(c, "Failed to parse PAUSEDAT offset.");
if (getLongLongFromObject(c->argv[3], &offset) != C_OK) {
addReplyError(c, "Failed to parse PAUSEOFFSET offset.");
return 1;
}
slot_import->paused_at_offset = offset;
Expand Down Expand Up @@ -7693,14 +7697,14 @@ const char **clusterCommandExtendedHelp(void) {
"LINKS",
" Return information about all network links between this node and its peers.",
" Output format is an array where each array element is a map containing attributes of a link",
"MIGRATE SLOTSRANGE <start slot> <end slot> [<start slot> <end slot> ...] SHARD <shard-id>",
"MIGRATE SLOTSRANGE <start slot> <end slot> [<start slot> <end slot> ...]",
" Initiate server driven slot migration of all slot ranges to the designated shard.",
"SYNCSLOTS [START <slot-bitmap>|END|INFORM <slot-bitmap>|PAUSE|PAUSEDAT]",
"SYNCSLOTS [START <slot-bitmap>|ENDSNAPSHOT|PAUSE|PAUSEOFFSET <offset>]",
" Internal command. SYNCSLOTS START initiates send of an AOF formatted snapshot containing the",
" provided slot bitmap. SYNCSLOTS END terminates the AOF formatted snapshot, and after this",
" provided slot bitmap. SYNCSLOTS ENDSNAPSHOT terminates the AOF formatted snapshot, and after this",
" SYNCSLOTS PAUSE signals for this node to be paused and for a continuous stream of commands"
" for the slots to be replicated. SYNCSLOTS PAUSEDAT will be replied with the offset of remaining"
" commands. SYNCSLOTS INFORM is used to inform replicas that the operation is occurring.",
" for the slots to be replicated. SYNCSLOTS PAUSEOFFSET will be replied with the offset of remaining"
" commands.",
NULL};

return help;
Expand Down Expand Up @@ -7759,6 +7763,9 @@ int clusterAllowFailoverCmd(client *c) {

void clusterPromoteSelfToPrimary(void) {
replicationUnsetPrimary();
/* verifyClusterConfigWithData will delete keys in unowned slots. This
* could happen in the case of failover during a slot migration. */
serverAssert(verifyClusterConfigWithData() == C_OK);
}

int detectAndUpdateCachedNodeHealth(void) {
Expand Down
1 change: 0 additions & 1 deletion src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ struct _clusterNode {

typedef enum slotImportState {
SLOT_IMPORT_QUEUED,
SLOT_IMPORT_REPLICA_TRACKING, /* Replicas track the slot import as well */
SLOT_IMPORT_CONNECTING,
SLOT_IMPORT_SEND_AUTH,
SLOT_IMPORT_RECEIVE_AUTH,
Expand Down
29 changes: 6 additions & 23 deletions src/kvstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,12 @@ kvstore *kvstoreCreate(hashtableType *type, int num_hashtables_bits, int flags)

void kvstoreEmpty(kvstore *kvs, void(callback)(hashtable *)) {
for (int didx = 0; didx < kvs->num_hashtables; didx++) {
kvstoreEmptyHashtable(kvs, didx, callback);
hashtable *ht = kvstoreGetHashtable(kvs, didx);
if (!ht) continue;
kvstoreHashtableMetadata *metadata = (kvstoreHashtableMetadata *)hashtableMetadata(ht);
if (metadata->rehashing_node) metadata->rehashing_node = NULL;
hashtableEmpty(ht, callback);
freeHashtableIfNeeded(kvs, didx);
}

listEmpty(kvs->rehashing);
Expand All @@ -315,28 +320,6 @@ void kvstoreEmpty(kvstore *kvs, void(callback)(hashtable *)) {
kvs->overhead_hashtable_rehashing = 0;
}

void kvstoreEmptyHashtable(kvstore *kvs, int didx, void(callback)(hashtable *)) {
hashtable *ht = kvstoreGetHashtable(kvs, didx);
if (!ht) return;
kvstoreHashtableMetadata *metadata = (kvstoreHashtableMetadata *)hashtableMetadata(ht);
if (metadata->rehashing_node) metadata->rehashing_node = NULL;
hashtableEmpty(ht, callback);
freeHashtableIfNeeded(kvs, didx);
}

hashtable *kvstoreUnlinkHashtable(kvstore *kvs, int didx) {
hashtable *oldht = kvstoreGetHashtable(kvs, didx);
if (!oldht) return NULL;

/* Pause rehashing on the to be unlinked node. */
kvstoreHashtableMetadata *oldmetadata = (kvstoreHashtableMetadata *)hashtableMetadata(oldht);
if (oldmetadata->rehashing_node) oldmetadata->rehashing_node = NULL;

kvs->hashtables[didx] = NULL;
kvs->allocated_hashtables--;
return oldht;
}

void kvstoreRelease(kvstore *kvs) {
for (int didx = 0; didx < kvs->num_hashtables; didx++) {
hashtable *ht = kvstoreGetHashtable(kvs, didx);
Expand Down
1 change: 0 additions & 1 deletion src/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ typedef int(kvstoreIteratorPredicate)(int didx, void *privdata);
kvstore *kvstoreCreate(hashtableType *type, int num_hashtables_bits, int flags);
void kvstoreEmpty(kvstore *kvs, void(callback)(hashtable *));
void kvstoreEmptyHashtable(kvstore *kvs, int didx, void(callback)(hashtable *));
hashtable *kvstoreUnlinkHashtable(kvstore *kvs, int didx);
void kvstoreRelease(kvstore *kvs);
unsigned long long kvstoreSize(kvstore *kvs);
unsigned long kvstoreBuckets(kvstore *kvs);
Expand Down
Loading

0 comments on commit 98de0a7

Please sign in to comment.