Skip to content

Commit

Permalink
Refactor to use dedicated sync mechanisms
Browse files Browse the repository at this point in the history
Signed-off-by: Jacob Murphy <[email protected]>
  • Loading branch information
murphyjacob4 committed Jan 19, 2025
1 parent db18295 commit 6e8bdb5
Show file tree
Hide file tree
Showing 19 changed files with 836 additions and 575 deletions.
8 changes: 4 additions & 4 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -2191,10 +2191,10 @@ static int rewriteFunctions(rio *aof) {
return 0;
}

int shouldFilterSlot(int slot, void * privdata) {
if (privdata == NULL) return 0;
int slotFilterPredicate(int slot, void * privdata) {
if (privdata == NULL) return 1;
unsigned char *slot_bitmap = (unsigned char *)privdata;
return !bitmapTestBit(slot_bitmap, slot);
return bitmapTestBit(slot_bitmap, slot);
}

int rewriteAppendOnlyFileRio(rio *aof, slotBitmap slot_bitmap) {
Expand Down Expand Up @@ -2227,7 +2227,7 @@ int rewriteAppendOnlyFileRio(rio *aof, slotBitmap slot_bitmap) {
if (slot_bitmap == NULL || isSlotBitmapEmpty(slot_bitmap)) {
kvs_it = kvstoreIteratorInit(db->keys);
} else {
kvs_it = kvstoreFilteredIteratorInit(db->keys, &shouldFilterSlot, slot_bitmap);
kvs_it = kvstoreFilteredIteratorInit(db->keys, &slotFilterPredicate, slot_bitmap);
}
/* Iterate this DB writing every entry */
void *next;
Expand Down
2 changes: 1 addition & 1 deletion src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void freeClientBlockingState(client *c) {
* and will be processed when the client is unblocked. */
void blockClient(client *c, int btype) {
/* Replication clients should never be blocked unless pause or module */
serverAssert(!(c->flag.replication_source && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));
serverAssert(!(c->flag.replicated && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));

initClientBlockingState(c);

Expand Down
14 changes: 7 additions & 7 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int

/* We handle all the cases as if they were EXEC commands, so we have
* a common code path for everything */
if (cmd->proc == execCommand) {
if (c && cmd->proc == execCommand) {
/* If CLIENT_MULTI flag is not set EXEC is just going to return an
* error. */
if (!c->flag.multi) return myself;
Expand All @@ -1040,11 +1040,11 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
mc.cmd = cmd;
}

uint64_t cmd_flags = getCommandFlags(c);
uint64_t cmd_flags = c ? getCommandFlags(c) : cmd->flags;

/* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */
int pubsubshard_included =
(cmd_flags & CMD_PUBSUB) || (c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_PUBSUB));
(cmd_flags & CMD_PUBSUB) || (c && c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_PUBSUB));

/* Check that all the keys are in the same hash slot, and obtain this
* slot and the node associated. */
Expand Down Expand Up @@ -1089,7 +1089,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
* can safely serve the request, otherwise we return a TRYAGAIN
* error). To do so we set the importing/migrating state and
* increment a counter for every missing key. */
if (clusterNodeIsPrimary(myself) || c->flag.readonly) {
if (clusterNodeIsPrimary(myself) || (c && c->flag.readonly)) {
if (n == clusterNodeGetPrimary(myself) && getMigratingSlotDest(slot) != NULL) {
migrating_slot = 1;
} else if (getImportingSlotSource(slot) != NULL) {
Expand Down Expand Up @@ -1184,7 +1184,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
* request as "ASKING", we can serve the request. However if the request
* involves multiple keys and we don't have them all, the only option is
* to send a TRYAGAIN error. */
if (importing_slot && (c->flag.asking || cmd_flags & CMD_ASKING)) {
if (importing_slot && (c && (c->flag.asking || cmd_flags & CMD_ASKING))) {
if (multiple_keys && missing_keys) {
if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
return NULL;
Expand All @@ -1197,8 +1197,8 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
* node is a replica and the request is about a hash slot our primary
* is serving, we can reply without redirection. */
int is_write_command =
(cmd_flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_WRITE));
if ((c->flag.readonly || pubsubshard_included) && !is_write_command && clusterNodeIsReplica(myself) &&
(cmd_flags & CMD_WRITE) || (c && c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_WRITE));
if (((c && c->flag.readonly) || pubsubshard_included) && !is_write_command && clusterNodeIsReplica(myself) &&
clusterNodeGetPrimary(myself) == n) {
return myself;
}
Expand Down
5 changes: 4 additions & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ void bitmapSetAllBits(unsigned char *bitmap, int len);
int slotBitmapCompare(slotBitmap bitmap, slotBitmap other);
int isSlotBitmapEmpty(slotBitmap bitmap);
int getSlotOrReply(client *c, robj *o);
void clusterSlotMigrationDoneSyncing(long long initial_offset);
void clusterSlotImportDoneSyncing(long long initial_offset);
void clusterSlotMigrationHandleClientClose(client *c);
void clusterFeedSlotMigration(int dbid, robj **argv, int argc);
int clusterShouldWriteToSlotMigrationTarget(void);

/* functions with shared implementations */
int clusterNodeIsMyself(clusterNode *n);
Expand Down
Loading

0 comments on commit 6e8bdb5

Please sign in to comment.