From c3e8ef9a2965010b0bb063f9e5220bbaa8716d4f Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Thu, 11 May 2023 16:31:15 +0300 Subject: [PATCH 01/28] test watch support with redis PR --- .github/workflows/ci.yml | 8 ++--- deps/common/redismodule.h | 37 ++++++++++++++------- src/raft.c | 59 +++++++++++++++++++++++---------- src/redisraft.h | 1 + tests/integration/test_multi.py | 46 +++++++++++++++++++++++++ 5 files changed, 117 insertions(+), 34 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index eac440496..1791fcf6b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,8 +17,8 @@ jobs: - name: Checkout Redis uses: actions/checkout@v2 with: - repository: 'redis/redis' - ref: 'unstable' + repository: 'sjpotter/redis' + ref: '32622170376b50d13cfffce9cae7f69627fee381' path: 'redis' - name: Build Redis run: cd redis && make -j 4 gcov @@ -54,8 +54,8 @@ jobs: - name: Checkout Redis uses: actions/checkout@v2 with: - repository: 'redis/redis' - ref: 'unstable' + repository: 'sjpotter/redis' + ref: '32622170376b50d13cfffce9cae7f69627fee381' path: 'redis' - name: Build Redis run: cd redis && make -j 4 SANITIZER=address diff --git a/deps/common/redismodule.h b/deps/common/redismodule.h index 2f3bb8586..a7065b851 100644 --- a/deps/common/redismodule.h +++ b/deps/common/redismodule.h @@ -41,7 +41,7 @@ typedef long long ustime_t; /* API versions. */ #define REDISMODULE_APIVER_1 1 -/* Version of the RedisModuleTypeMethods structure. Once the RedisModuleTypeMethods +/* Version of the RedisModuleTypeMethods structure. Once the RedisModuleTypeMethods * structure is changed, this version number needs to be changed synchronistically. */ #define REDISMODULE_TYPE_METHOD_VERSION 5 @@ -309,6 +309,10 @@ typedef uint64_t RedisModuleTimerID; * Use RedisModule_GetModuleOptionsAll instead. */ #define _REDISMODULE_OPTIONS_FLAGS_NEXT (1<<4) +/* RM_GetClientFlags */ +#define REDISMODULE_CLIENT_FLAG_DIRTY_CAS (1<<0) /* Watched keys modified. EXEC will fail. */ +#define REDISMODULE_CLIENT_FLAG_DIRTY_EXEC (1<<1) /* EXEC will fail for errors while queueing */ + /* Definitions for RedisModule_SetCommandInfo. */ typedef enum { @@ -587,7 +591,7 @@ static const RedisModuleEvent /* Deprecated since Redis 7.0, not used anymore. */ __attribute__ ((deprecated)) RedisModuleEvent_ReplBackup = { - REDISMODULE_EVENT_REPL_BACKUP, + REDISMODULE_EVENT_REPL_BACKUP, 1 }, RedisModuleEvent_ReplAsyncLoad = { @@ -880,6 +884,7 @@ typedef struct RedisModuleCommandFilter RedisModuleCommandFilter; typedef struct RedisModuleServerInfoData RedisModuleServerInfoData; typedef struct RedisModuleScanCursor RedisModuleScanCursor; typedef struct RedisModuleUser RedisModuleUser; +typedef struct RedisModuleClient RedisModuleClient; typedef struct RedisModuleKeyOptCtx RedisModuleKeyOptCtx; typedef struct RedisModuleRdbStream RedisModuleRdbStream; @@ -976,7 +981,7 @@ REDISMODULE_API int (*RedisModule_GetSelectedDb)(RedisModuleCtx *ctx) REDISMODUL REDISMODULE_API int (*RedisModule_SelectDb)(RedisModuleCtx *ctx, int newid) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_KeyExists)(RedisModuleCtx *ctx, RedisModuleString *keyname) REDISMODULE_ATTR; REDISMODULE_API RedisModuleKey * (*RedisModule_OpenKey)(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode) REDISMODULE_ATTR; -REDISMODULE_API int (*RedisModule_GetOpenKeyModesAll)() REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetOpenKeyModesAll)(void) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_CloseKey)(RedisModuleKey *kp) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_KeyType)(RedisModuleKey *kp) REDISMODULE_ATTR; REDISMODULE_API size_t (*RedisModule_ValueLength)(RedisModuleKey *kp) REDISMODULE_ATTR; @@ -1098,7 +1103,7 @@ REDISMODULE_API int (*RedisModule_SetClientNameById)(uint64_t id, RedisModuleStr REDISMODULE_API int (*RedisModule_PublishMessage)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_PublishMessageShard)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_GetContextFlags)(RedisModuleCtx *ctx) REDISMODULE_ATTR; -REDISMODULE_API int (*RedisModule_AvoidReplicaTraffic)() REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_AvoidReplicaTraffic)(void) REDISMODULE_ATTR; REDISMODULE_API void * (*RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes) REDISMODULE_ATTR; REDISMODULE_API RedisModuleType * (*RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value) REDISMODULE_ATTR; @@ -1201,17 +1206,17 @@ REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClientOnKeys)(Redi REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClientOnKeysWithFlags)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata, int flags) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key) REDISMODULE_ATTR; REDISMODULE_API RedisModuleString * (*RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx) REDISMODULE_ATTR; -REDISMODULE_API RedisModuleScanCursor * (*RedisModule_ScanCursorCreate)() REDISMODULE_ATTR; +REDISMODULE_API RedisModuleScanCursor * (*RedisModule_ScanCursorCreate)(void) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_ScanCursorRestart)(RedisModuleScanCursor *cursor) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_ScanCursorDestroy)(RedisModuleScanCursor *cursor) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_Scan)(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_ScanKey)(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata) REDISMODULE_ATTR; -REDISMODULE_API int (*RedisModule_GetContextFlagsAll)() REDISMODULE_ATTR; -REDISMODULE_API int (*RedisModule_GetModuleOptionsAll)() REDISMODULE_ATTR; -REDISMODULE_API int (*RedisModule_GetKeyspaceNotificationFlagsAll)() REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetContextFlagsAll)(void) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetModuleOptionsAll)(void) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetKeyspaceNotificationFlagsAll)(void) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_IsSubEventSupported)(RedisModuleEvent event, uint64_t subevent) REDISMODULE_ATTR; -REDISMODULE_API int (*RedisModule_GetServerVersion)() REDISMODULE_ATTR; -REDISMODULE_API int (*RedisModule_GetTypeMethodVersion)() REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetServerVersion)(void) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetTypeMethodVersion)(void) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_Yield)(RedisModuleCtx *ctx, int flags, const char *busy_reply) REDISMODULE_ATTR; REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClient)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) REDISMODULE_ATTR; REDISMODULE_API void * (*RedisModule_BlockClientGetPrivateData)(RedisModuleBlockedClient *blocked_client) REDISMODULE_ATTR; @@ -1234,7 +1239,7 @@ REDISMODULE_API void (*RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx) REDISMODULE_API int (*RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_AddPostNotificationJob)(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_NotifyKeyspaceEvent)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) REDISMODULE_ATTR; -REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)() REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)(void) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_BlockedClientDisconnected)(RedisModuleCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_RegisterClusterMessageReceiver)(RedisModuleCtx *ctx, uint8_t type, RedisModuleClusterMessageReceiver callback) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_SendClusterMessage)(RedisModuleCtx *ctx, const char *target_id, uint8_t type, const char *msg, uint32_t len) REDISMODULE_ATTR; @@ -1263,7 +1268,7 @@ REDISMODULE_API int (*RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *use REDISMODULE_API void (*RedisModule_SendChildHeartbeat)(double progress) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_ExitFromChild)(int retcode) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_KillForkChild)(int child_pid) REDISMODULE_ATTR; -REDISMODULE_API float (*RedisModule_GetUsedMemoryRatio)() REDISMODULE_ATTR; +REDISMODULE_API float (*RedisModule_GetUsedMemoryRatio)(void) REDISMODULE_ATTR; REDISMODULE_API size_t (*RedisModule_MallocSize)(void* ptr) REDISMODULE_ATTR; REDISMODULE_API size_t (*RedisModule_MallocUsableSize)(void *ptr) REDISMODULE_ATTR; REDISMODULE_API size_t (*RedisModule_MallocSizeString)(RedisModuleString* str) REDISMODULE_ATTR; @@ -1309,6 +1314,10 @@ REDISMODULE_API RedisModuleRdbStream *(*RedisModule_RdbStreamCreateFromFile)(con REDISMODULE_API void (*RedisModule_RdbStreamFree)(RedisModuleRdbStream *stream) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_RdbLoad)(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_RdbSave)(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) REDISMODULE_ATTR; +REDISMODULE_API RedisModuleClient * (*RedisModule_CreateModuleClient)(RedisModuleCtx *ctx, const RedisModuleUser *rmu) REDISMODULE_ATTR; +REDISMODULE_API void (*RedisModule_FreeModuleClient)(RedisModuleCtx *ctx, RedisModuleClient *client) REDISMODULE_ATTR; +REDISMODULE_API void (*RedisModule_SetContextClient)(RedisModuleCtx *ctx, RedisModuleClient *client) REDISMODULE_ATTR; +REDISMODULE_API uint64_t (*RedisModule_GetClientFlags)(RedisModuleClient *client) REDISMODULE_ATTR; #define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX) @@ -1669,6 +1678,10 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(RdbStreamFree); REDISMODULE_GET_API(RdbLoad); REDISMODULE_GET_API(RdbSave); + REDISMODULE_GET_API(CreateModuleClient); + REDISMODULE_GET_API(FreeModuleClient); + REDISMODULE_GET_API(SetContextClient); + REDISMODULE_GET_API(GetClientFlags); if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; RedisModule_SetModuleAttribs(ctx,name,ver,apiver); diff --git a/src/raft.c b/src/raft.c index 6b057f226..d75875100 100644 --- a/src/raft.c +++ b/src/raft.c @@ -298,7 +298,7 @@ static RRStatus handleSharding(RedisRaftCtx *rr, RedisModuleCtx *ctx, RaftRedisC /* returns the client session object for this CommandArray if applicable * starts/creates it if necessary */ -static void *getClientSession(RedisRaftCtx *rr, RaftRedisCommandArray *cmds, bool local) +static void *getClientSession(RedisRaftCtx *rr, RedisModuleUser *user, RaftRedisCommandArray *cmds, bool local) { unsigned long long id = cmds->client_id; int nokey; @@ -314,6 +314,7 @@ static void *getClientSession(RedisRaftCtx *rr, RaftRedisCommandArray *cmds, boo client_session = RedisModule_Alloc(sizeof(ClientSession)); client_session->client_id = id; client_session->local = local; + client_session->client = RedisModule_CreateModuleClient(rr->ctx, user); RedisModule_DictSetC(rr->client_session_dict, &id, sizeof(id), client_session); } } @@ -321,6 +322,21 @@ static void *getClientSession(RedisRaftCtx *rr, RaftRedisCommandArray *cmds, boo return client_session; } +static void freeClientSession(RedisRaftCtx *rr, ClientSession *client_session) +{ + RedisModule_FreeModuleClient(rr->ctx, client_session->client); + RedisModule_Free(client_session); +} + +static void endClientSession(RedisRaftCtx *rr, unsigned long long id) +{ + ClientSession *client_session = NULL; + RedisModule_DictDelC(rr->client_session_dict, &id, sizeof(id), &client_session); + if (client_session) { + freeClientSession(rr, client_session); + } +} + RedisModuleUser *RaftGetACLUser(RedisModuleCtx *ctx, RedisRaftCtx *rr, RaftRedisCommandArray *cmds) { int nokey; @@ -376,6 +392,7 @@ RedisModuleCallReply *RaftExecuteCommandArray(RedisRaftCtx *rr, RedisModuleCallReply *reply = NULL; RedisModuleUser *user = NULL; RedisModuleCtx *ctx = req ? req->ctx : rr->ctx; + bool is_multi_session = false; if (cmds->acl) { user = RaftGetACLUser(rr->ctx, rr, cmds); @@ -391,7 +408,7 @@ RedisModuleCallReply *RaftExecuteCommandArray(RedisRaftCtx *rr, return NULL; /* sharding error, so even if blocking command, don't */ } - ClientSession *client_session = getClientSession(rr, cmds, req != NULL); + ClientSession *client_session = getClientSession(rr, user, cmds, req != NULL); (void) client_session; /* unused for now */ if (cmds->cmd_flags & CMD_SPEC_BLOCKING) { @@ -412,6 +429,17 @@ RedisModuleCallReply *RaftExecuteCommandArray(RedisRaftCtx *rr, * (although no harm is done). */ if (i == 0 && cmdlen == 5 && !strncasecmp(cmd, "MULTI", 5)) { + if (client_session) { + uint64_t flags = RedisModule_GetClientFlags(client_session->client); + if (flags) { + if (req) { + RedisModule_ReplyWithNull(req->ctx); + } + endClientSession(rr, client_session->client_id); + return NULL; + } + is_multi_session = true; + } if (req) { RedisModule_ReplyWithArray(req->ctx, cmds->len - 1); } @@ -434,7 +462,7 @@ RedisModuleCallReply *RaftExecuteCommandArray(RedisRaftCtx *rr, * When we have an ACL, we will have a user set on the context, so need "C" */ char *resp_call_fmt; - if (cmds->cmd_flags & CMD_SPEC_MULTI) { + if (client_session) { /* don't block inside a MULTI */ resp_call_fmt = cmds->acl ? "CE0v" : "E0v"; } else { @@ -443,7 +471,11 @@ RedisModuleCallReply *RaftExecuteCommandArray(RedisRaftCtx *rr, enterRedisModuleCall(); RedisModule_SetContextUser(ctx, user); + if (client_session) { + RedisModule_SetContextClient(ctx, client_session->client); + } reply = RedisModule_Call(ctx, cmd, resp_call_fmt, &c->argv[1], c->argc - 1); + RedisModule_SetContextClient(ctx, NULL); RedisModule_SetContextUser(ctx, NULL); exitRedisModuleCall(); rr->entered_eval = old_entered_eval; @@ -465,6 +497,11 @@ RedisModuleCallReply *RaftExecuteCommandArray(RedisRaftCtx *rr, } } + if (is_multi_session) { + LOG_WARNING("ending session"); + endClientSession(rr, client_session->client_id); + } + /* if blocking (this won't be NULL), return it to the caller, to setup callback / saving state */ return reply; } @@ -591,20 +628,6 @@ static void unlockDeleteKeys(RedisRaftCtx *rr, raft_entry_t *entry, RaftReq *req } } -static void freeClientSession(void *client_session) -{ - RedisModule_Free(client_session); -} - -static void endClientSession(RedisRaftCtx *rr, unsigned long long id) -{ - void *client_session = NULL; - RedisModule_DictDelC(rr->client_session_dict, &id, sizeof(id), &client_session); - if (client_session) { - freeClientSession(client_session); - } -} - static void handleEndClientSession(RedisRaftCtx *rr, raft_entry_t *entry, RaftReq *req) { RedisModule_Assert(entry->type == RAFT_LOGTYPE_END_SESSION); @@ -626,7 +649,7 @@ void clearClientSessions(RedisRaftCtx *rr) if (client_session->local) { RedisModule_DeauthenticateAndCloseClient(rr->ctx, client_session->client_id); } - freeClientSession(client_session); + freeClientSession(rr, client_session); } RedisModule_DictIteratorStop(iter); RedisModule_FreeDict(rr->ctx, rr->client_session_dict); diff --git a/src/redisraft.h b/src/redisraft.h index 69e0bf64e..217f3846a 100644 --- a/src/redisraft.h +++ b/src/redisraft.h @@ -774,6 +774,7 @@ typedef struct ClientState { typedef struct ClientSession { raft_session_t client_id; + RedisModuleClient *client; bool local; } ClientSession; diff --git a/tests/integration/test_multi.py b/tests/integration/test_multi.py index 07d6cf47a..bbce7cab9 100644 --- a/tests/integration/test_multi.py +++ b/tests/integration/test_multi.py @@ -240,3 +240,49 @@ def test_watch_within_multi(cluster): conn.execute('watch', 'x') assert conn.execute('get', 'key1') == b'QUEUED' assert conn.execute('exec') == [b'1', b'1'] + +def test_multi_watch_without_modification(cluster): + cluster.create(3) + node = cluster.leader_node() + node.execute('set', 'key1', 1) + + conn = RawConnection(cluster.node(1).client) + + assert conn.execute('watch', 'key1') == b'OK' + assert conn.execute('multi') == b'OK' + assert conn.execute('get', 'key1') == b'QUEUED' + assert conn.execute('get', 'key1') == b'QUEUED' + assert conn.execute('exec') == [b'1', b'1'] + +def test_multi_watch_with_modification(cluster): + cluster.create(3) + node = cluster.leader_node() + node.execute('set', 'key1', 1) + + conn = RawConnection(cluster.node(1).client) + + assert conn.execute('watch', 'key1') == b'OK' + assert conn.execute('multi') == b'OK' + assert conn.execute('get', 'key1') == b'QUEUED' + assert conn.execute('get', 'key1') == b'QUEUED' + assert cluster.execute('set', 'key1', 2) + assert conn.execute('exec') is None + +def test_multi_watch_cleared_after_exec(cluster): + cluster.create(3) + node = cluster.leader_node() + node.execute('set', 'key1', 1) + + conn = RawConnection(cluster.node(1).client) + + assert conn.execute('watch', 'key1') == b'OK' + assert conn.execute('multi') == b'OK' + assert conn.execute('get', 'key1') == b'QUEUED' + assert conn.execute('get', 'key1') == b'QUEUED' + assert conn.execute('exec') == [b'1', b'1'] + + assert conn.execute('multi') == b'OK' + assert conn.execute('get', 'key1') == b'QUEUED' + assert conn.execute('get', 'key1') == b'QUEUED' + assert cluster.execute('set', 'key1', 2) + assert conn.execute('exec') == [b'2', b'2'] From afa54404c2119fa70220983b5c3ef531d5bfd959 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Thu, 11 May 2023 17:09:24 +0300 Subject: [PATCH 02/28] linter --- tests/integration/test_multi.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/integration/test_multi.py b/tests/integration/test_multi.py index bbce7cab9..bbae60e7b 100644 --- a/tests/integration/test_multi.py +++ b/tests/integration/test_multi.py @@ -241,6 +241,7 @@ def test_watch_within_multi(cluster): assert conn.execute('get', 'key1') == b'QUEUED' assert conn.execute('exec') == [b'1', b'1'] + def test_multi_watch_without_modification(cluster): cluster.create(3) node = cluster.leader_node() @@ -254,6 +255,7 @@ def test_multi_watch_without_modification(cluster): assert conn.execute('get', 'key1') == b'QUEUED' assert conn.execute('exec') == [b'1', b'1'] + def test_multi_watch_with_modification(cluster): cluster.create(3) node = cluster.leader_node() @@ -268,6 +270,7 @@ def test_multi_watch_with_modification(cluster): assert cluster.execute('set', 'key1', 2) assert conn.execute('exec') is None + def test_multi_watch_cleared_after_exec(cluster): cluster.create(3) node = cluster.leader_node() From 833cefcdea9d2e2e6e0161664b8ebb8b9ae39a5a Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Thu, 11 May 2023 17:12:29 +0300 Subject: [PATCH 03/28] fix blocking commands in multi --- src/raft.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/raft.c b/src/raft.c index d75875100..9daea5ac0 100644 --- a/src/raft.c +++ b/src/raft.c @@ -462,7 +462,7 @@ RedisModuleCallReply *RaftExecuteCommandArray(RedisRaftCtx *rr, * When we have an ACL, we will have a user set on the context, so need "C" */ char *resp_call_fmt; - if (client_session) { + if (cmds->cmd_flags & CMD_SPEC_MULTI || client_session) { /* don't block inside a MULTI */ resp_call_fmt = cmds->acl ? "CE0v" : "E0v"; } else { From aa9ba04271df4b4ee986a2930f28091069b2e431 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Thu, 11 May 2023 19:52:03 +0300 Subject: [PATCH 04/28] change acl/user handling for usage with persistent clients --- deps/common/redismodule.h | 5 ++++- src/raft.c | 18 +++++++++++------- src/snapshot.c | 1 + 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/deps/common/redismodule.h b/deps/common/redismodule.h index a7065b851..027007ee5 100644 --- a/deps/common/redismodule.h +++ b/deps/common/redismodule.h @@ -1314,10 +1314,11 @@ REDISMODULE_API RedisModuleRdbStream *(*RedisModule_RdbStreamCreateFromFile)(con REDISMODULE_API void (*RedisModule_RdbStreamFree)(RedisModuleRdbStream *stream) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_RdbLoad)(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_RdbSave)(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) REDISMODULE_ATTR; -REDISMODULE_API RedisModuleClient * (*RedisModule_CreateModuleClient)(RedisModuleCtx *ctx, const RedisModuleUser *rmu) REDISMODULE_ATTR; +REDISMODULE_API RedisModuleClient * (*RedisModule_CreateModuleClient)(RedisModuleCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_FreeModuleClient)(RedisModuleCtx *ctx, RedisModuleClient *client) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_SetContextClient)(RedisModuleCtx *ctx, RedisModuleClient *client) REDISMODULE_ATTR; REDISMODULE_API uint64_t (*RedisModule_GetClientFlags)(RedisModuleClient *client) REDISMODULE_ATTR; +REDISMODULE_API void (*RedisModule_SetClientUser)(RedisModuleClient *client, RedisModuleUser *user) REDISMODULE_ATTR; #define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX) @@ -1682,6 +1683,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(FreeModuleClient); REDISMODULE_GET_API(SetContextClient); REDISMODULE_GET_API(GetClientFlags); + REDISMODULE_GET_API(SetClientUser); + if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; RedisModule_SetModuleAttribs(ctx,name,ver,apiver); diff --git a/src/raft.c b/src/raft.c index 9daea5ac0..7c604182a 100644 --- a/src/raft.c +++ b/src/raft.c @@ -298,7 +298,7 @@ static RRStatus handleSharding(RedisRaftCtx *rr, RedisModuleCtx *ctx, RaftRedisC /* returns the client session object for this CommandArray if applicable * starts/creates it if necessary */ -static void *getClientSession(RedisRaftCtx *rr, RedisModuleUser *user, RaftRedisCommandArray *cmds, bool local) +static void *getClientSession(RedisRaftCtx *rr, RaftRedisCommandArray *cmds, bool local) { unsigned long long id = cmds->client_id; int nokey; @@ -314,7 +314,7 @@ static void *getClientSession(RedisRaftCtx *rr, RedisModuleUser *user, RaftRedis client_session = RedisModule_Alloc(sizeof(ClientSession)); client_session->client_id = id; client_session->local = local; - client_session->client = RedisModule_CreateModuleClient(rr->ctx, user); + client_session->client = RedisModule_CreateModuleClient(rr->ctx); RedisModule_DictSetC(rr->client_session_dict, &id, sizeof(id), client_session); } } @@ -408,8 +408,7 @@ RedisModuleCallReply *RaftExecuteCommandArray(RedisRaftCtx *rr, return NULL; /* sharding error, so even if blocking command, don't */ } - ClientSession *client_session = getClientSession(rr, user, cmds, req != NULL); - (void) client_session; /* unused for now */ + ClientSession *client_session = getClientSession(rr, cmds, req != NULL); if (cmds->cmd_flags & CMD_SPEC_BLOCKING) { replaceBlockingTimeout(cmds); @@ -470,13 +469,18 @@ RedisModuleCallReply *RaftExecuteCommandArray(RedisRaftCtx *rr, } enterRedisModuleCall(); - RedisModule_SetContextUser(ctx, user); if (client_session) { - RedisModule_SetContextClient(ctx, client_session->client); + RedisModule_SetClientUser(client_session->client, user); + } else { + RedisModule_SetContextUser(ctx, user); } reply = RedisModule_Call(ctx, cmd, resp_call_fmt, &c->argv[1], c->argc - 1); + if (client_session) { + RedisModule_SetClientUser(client_session->client, NULL); + } else { + RedisModule_SetContextUser(ctx, NULL); + } RedisModule_SetContextClient(ctx, NULL); - RedisModule_SetContextUser(ctx, NULL); exitRedisModuleCall(); rr->entered_eval = old_entered_eval; diff --git a/src/snapshot.c b/src/snapshot.c index 3be9b5196..9c63ee99a 100644 --- a/src/snapshot.c +++ b/src/snapshot.c @@ -588,6 +588,7 @@ static void clientSessionRDBLoad(RedisModuleIO *rdb) unsigned long long id = RedisModule_LoadUnsigned(rdb); client_session->client_id = id; client_session->local = false; + client_session->client = RedisModule_CreateModuleClient(rr->ctx); RedisModule_DictSetC(rr->client_session_dict, &id, sizeof(id), client_session); } } From 6d7ac5d8405140cec00bdc431b9bf872e9e1837a Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Thu, 11 May 2023 20:42:22 +0300 Subject: [PATCH 05/28] fix --- src/raft.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/raft.c b/src/raft.c index 7c604182a..2e9dea362 100644 --- a/src/raft.c +++ b/src/raft.c @@ -471,12 +471,14 @@ RedisModuleCallReply *RaftExecuteCommandArray(RedisRaftCtx *rr, enterRedisModuleCall(); if (client_session) { RedisModule_SetClientUser(client_session->client, user); + RedisModule_SetContextClient(ctx, client_session->client); } else { RedisModule_SetContextUser(ctx, user); } reply = RedisModule_Call(ctx, cmd, resp_call_fmt, &c->argv[1], c->argc - 1); if (client_session) { RedisModule_SetClientUser(client_session->client, NULL); + RedisModule_SetContextClient(ctx, NULL); } else { RedisModule_SetContextUser(ctx, NULL); } From f3e4d5e6e91e637467ee49e0ab3ccd116e490a2a Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Tue, 16 May 2023 12:19:34 +0300 Subject: [PATCH 06/28] add dirty flag support to snapshot/restore and use at apply time --- .github/workflows/ci.yml | 4 +- src/raft.c | 14 +++- src/redisraft.h | 1 + src/snapshot.c | 6 ++ tests/integration/test_multi.py | 115 +++++++++++++++++++++++++++++++- 5 files changed, 133 insertions(+), 7 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1791fcf6b..2ed948bcb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: uses: actions/checkout@v2 with: repository: 'sjpotter/redis' - ref: '32622170376b50d13cfffce9cae7f69627fee381' + ref: '686799e2fff207ccbb1c4cf8d546cb76e6f77fe3' path: 'redis' - name: Build Redis run: cd redis && make -j 4 gcov @@ -55,7 +55,7 @@ jobs: uses: actions/checkout@v2 with: repository: 'sjpotter/redis' - ref: '32622170376b50d13cfffce9cae7f69627fee381' + ref: '686799e2fff207ccbb1c4cf8d546cb76e6f77fe3' path: 'redis' - name: Build Redis run: cd redis && make -j 4 SANITIZER=address diff --git a/src/raft.c b/src/raft.c index 2e9dea362..1cf39a823 100644 --- a/src/raft.c +++ b/src/raft.c @@ -314,6 +314,7 @@ static void *getClientSession(RedisRaftCtx *rr, RaftRedisCommandArray *cmds, boo client_session = RedisModule_Alloc(sizeof(ClientSession)); client_session->client_id = id; client_session->local = local; + client_session->dirty = false; client_session->client = RedisModule_CreateModuleClient(rr->ctx); RedisModule_DictSetC(rr->client_session_dict, &id, sizeof(id), client_session); } @@ -375,6 +376,16 @@ void handleUnblock(RedisModuleCtx *ctx, RedisModuleCallReply *reply, void *priva freeBlockedCommand(bc); } +static bool isClientSessionDirty(ClientSession *client_session) +{ + uint64_t flags = RedisModule_GetClientFlags(client_session->client); + if (client_session->dirty || flags) { + return true; + } + + return false; +} + /* Execute all commands in a specified RaftRedisCommandArray. * * If reply_ctx is non-NULL, replies are delivered to it. @@ -429,8 +440,7 @@ RedisModuleCallReply *RaftExecuteCommandArray(RedisRaftCtx *rr, */ if (i == 0 && cmdlen == 5 && !strncasecmp(cmd, "MULTI", 5)) { if (client_session) { - uint64_t flags = RedisModule_GetClientFlags(client_session->client); - if (flags) { + if (isClientSessionDirty(client_session)) { if (req) { RedisModule_ReplyWithNull(req->ctx); } diff --git a/src/redisraft.h b/src/redisraft.h index 217f3846a..35ff49cb3 100644 --- a/src/redisraft.h +++ b/src/redisraft.h @@ -776,6 +776,7 @@ typedef struct ClientSession { raft_session_t client_id; RedisModuleClient *client; bool local; + bool dirty; } ClientSession; /* common.c */ diff --git a/src/snapshot.c b/src/snapshot.c index 9c63ee99a..d3516cb69 100644 --- a/src/snapshot.c +++ b/src/snapshot.c @@ -587,6 +587,7 @@ static void clientSessionRDBLoad(RedisModuleIO *rdb) ClientSession *client_session = RedisModule_Alloc(sizeof(ClientSession)); unsigned long long id = RedisModule_LoadUnsigned(rdb); client_session->client_id = id; + client_session->dirty = RedisModule_LoadUnsigned(rdb); client_session->local = false; client_session->client = RedisModule_CreateModuleClient(rr->ctx); RedisModule_DictSetC(rr->client_session_dict, &id, sizeof(id), client_session); @@ -708,6 +709,11 @@ static void clientSessionRDBSave(RedisModuleIO *rdb) ClientSession *client_session; while (RedisModule_DictNextC(iter, NULL, (void **) &client_session) != NULL) { RedisModule_SaveUnsigned(rdb, client_session->client_id); + if (RedisModule_GetClientFlags(client_session->client)) { + RedisModule_SaveUnsigned(rdb, 1); + } else { + RedisModule_SaveUnsigned(rdb, 0); + } } RedisModule_DictIteratorStop(iter); } diff --git a/tests/integration/test_multi.py b/tests/integration/test_multi.py index bbae60e7b..0a43cd640 100644 --- a/tests/integration/test_multi.py +++ b/tests/integration/test_multi.py @@ -253,7 +253,14 @@ def test_multi_watch_without_modification(cluster): assert conn.execute('multi') == b'OK' assert conn.execute('get', 'key1') == b'QUEUED' assert conn.execute('get', 'key1') == b'QUEUED' - assert conn.execute('exec') == [b'1', b'1'] + assert conn.execute('set', 'key2', 1) == b'QUEUED' + assert conn.execute('exec') == [b'1', b'1', b'OK'] + + cluster.wait_for_unanimity() + + for i in range(1, 3): + val = cluster.node(i).raft_debug_exec("get", "key2") + assert val == b'1' def test_multi_watch_with_modification(cluster): @@ -267,9 +274,16 @@ def test_multi_watch_with_modification(cluster): assert conn.execute('multi') == b'OK' assert conn.execute('get', 'key1') == b'QUEUED' assert conn.execute('get', 'key1') == b'QUEUED' + assert conn.execute('set', 'key2', 1) == b'QUEUED' assert cluster.execute('set', 'key1', 2) assert conn.execute('exec') is None + cluster.wait_for_unanimity() + + for i in range(1, 3): + val = cluster.node(i).raft_debug_exec("get", "key2") + assert val is None + def test_multi_watch_cleared_after_exec(cluster): cluster.create(3) @@ -282,10 +296,105 @@ def test_multi_watch_cleared_after_exec(cluster): assert conn.execute('multi') == b'OK' assert conn.execute('get', 'key1') == b'QUEUED' assert conn.execute('get', 'key1') == b'QUEUED' - assert conn.execute('exec') == [b'1', b'1'] + assert conn.execute('set', 'key2', 1) == b'QUEUED' + assert conn.execute('exec') == [b'1', b'1', b'OK'] + + cluster.wait_for_unanimity() + + for i in range(1, 3): + val = cluster.node(i).raft_debug_exec("get", "key2") + assert val == b'1' assert conn.execute('multi') == b'OK' assert conn.execute('get', 'key1') == b'QUEUED' assert conn.execute('get', 'key1') == b'QUEUED' + assert conn.execute('set', 'key2', 2) == b'QUEUED' + assert cluster.execute('set', 'key1', 2) + assert conn.execute('exec') == [b'2', b'2', b'OK'] + + cluster.wait_for_unanimity() + + for i in range(1, 3): + val = cluster.node(i).raft_debug_exec("get", "key2") + assert val == b'2' + + +def test_multi_watch_with_restart_clean(cluster): + cluster.create(3) + node = cluster.leader_node() + node.execute('set', 'key1', 1) + + conn = RawConnection(cluster.node(1).client) + + assert conn.execute('watch', 'key1') == b'OK' + assert conn.execute('multi') == b'OK' + assert conn.execute('set', 'key2', 1) == b'QUEUED' + + cluster.node(2).restart() + cluster.node(2).wait_for_election() + cluster.node(3).restart() + cluster.node(3).wait_for_election() + cluster.wait_for_unanimity() + + assert conn.execute('exec') == [b'OK'] + + cluster.wait_for_unanimity() + + for i in range(1, 3): + val = cluster.node(i).raft_debug_exec("get", "key2") + assert val == b'1' + + +def test_multi_watch_with_restart_dirty(cluster): + cluster.create(3) + node = cluster.leader_node() + node.execute('set', 'key1', 1) + + conn = RawConnection(cluster.node(1).client) + + assert conn.execute('watch', 'key1') == b'OK' + assert conn.execute('multi') == b'OK' + assert conn.execute('set', 'key2', 1) == b'QUEUED' assert cluster.execute('set', 'key1', 2) - assert conn.execute('exec') == [b'2', b'2'] + + cluster.node(2).restart() + cluster.node(2).wait_for_election() + cluster.node(3).restart() + cluster.node(3).wait_for_election() + cluster.wait_for_unanimity() + + assert conn.execute('exec') is None + + cluster.wait_for_unanimity() + + for i in range(1, 3): + val = cluster.node(i).raft_debug_exec("get", "key2") + assert val is None + + +def test_multi_watch_with_dirty_after_restart(cluster): + cluster.create(3) + node = cluster.leader_node() + node.execute('set', 'key1', 1) + + conn = RawConnection(cluster.node(1).client) + + assert conn.execute('watch', 'key1') == b'OK' + assert conn.execute('multi') == b'OK' + assert conn.execute('set', 'key2', 1) == b'QUEUED' + + cluster.node(2).restart() + cluster.node(2).wait_for_election() + cluster.node(3).restart() + cluster.node(3).wait_for_election() + cluster.wait_for_unanimity() + + assert cluster.execute('set', 'key1', 2) + + assert conn.execute('exec') is None + + cluster.wait_for_unanimity() + + for i in range(1, 3): + val = cluster.node(i).raft_debug_exec("get", "key2") + assert val is None \ No newline at end of file From 468cd213cb843291d13bbcb6b871a169f7db5c7b Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Tue, 16 May 2023 12:50:42 +0300 Subject: [PATCH 07/28] linter --- tests/integration/test_multi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_multi.py b/tests/integration/test_multi.py index 0a43cd640..016e88ca7 100644 --- a/tests/integration/test_multi.py +++ b/tests/integration/test_multi.py @@ -397,4 +397,4 @@ def test_multi_watch_with_dirty_after_restart(cluster): for i in range(1, 3): val = cluster.node(i).raft_debug_exec("get", "key2") - assert val is None \ No newline at end of file + assert val is None From 874857815732edbc0e2f7d6fc62341d2952268b3 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Wed, 17 May 2023 16:08:06 +0300 Subject: [PATCH 08/28] update redis pr --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2ed948bcb..c761b2e8b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: uses: actions/checkout@v2 with: repository: 'sjpotter/redis' - ref: '686799e2fff207ccbb1c4cf8d546cb76e6f77fe3' + ref: 'e138e7445e62ddedddf4477b0724ae431f200ea1' path: 'redis' - name: Build Redis run: cd redis && make -j 4 gcov @@ -55,7 +55,7 @@ jobs: uses: actions/checkout@v2 with: repository: 'sjpotter/redis' - ref: '686799e2fff207ccbb1c4cf8d546cb76e6f77fe3' + ref: 'e138e7445e62ddedddf4477b0724ae431f200ea1' path: 'redis' - name: Build Redis run: cd redis && make -j 4 SANITIZER=address From 2bd623593567506382ab02ea99d8d43a3151fa24 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Thu, 18 May 2023 10:45:40 +0300 Subject: [PATCH 09/28] add some more tests for blocking commands in multi with and without watch --- tests/integration/test_multi.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/integration/test_multi.py b/tests/integration/test_multi.py index 016e88ca7..0632ad853 100644 --- a/tests/integration/test_multi.py +++ b/tests/integration/test_multi.py @@ -398,3 +398,28 @@ def test_multi_watch_with_dirty_after_restart(cluster): for i in range(1, 3): val = cluster.node(i).raft_debug_exec("get", "key2") assert val is None + + +def test_multi_with_blocking_commands(cluster): + cluster.create(3) + node = cluster.leader_node() + node.execute('set', 'key1', 1) + + conn = RawConnection(cluster.node(1).client) + + assert conn.execute('multi') == b'OK' + assert conn.execute('blpop', 'key2', 0) == b'QUEUED' + assert conn.execute('exec') == [None] + + +def test_multi_with_blocking_command_and_watch(cluster): + cluster.create(3) + node = cluster.leader_node() + node.execute('set', 'key1', 1) + + conn = RawConnection(cluster.node(1).client) + + assert conn.execute('watch', 'key1') == b'OK' + assert conn.execute('multi') == b'OK' + assert conn.execute('blpop', 'key2', 0) == b'QUEUED' + assert conn.execute('exec') == [None] From 61790dde1ea708ca3ac3a136820f73d201873ec1 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Thu, 18 May 2023 10:48:09 +0300 Subject: [PATCH 10/28] remove duplication from tests --- tests/integration/test_multi.py | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/tests/integration/test_multi.py b/tests/integration/test_multi.py index 0632ad853..124269e05 100644 --- a/tests/integration/test_multi.py +++ b/tests/integration/test_multi.py @@ -3,7 +3,7 @@ Licensed under your choice of the Redis Source Available License 2.0 (RSALv2) or the Server Side Public License v1 (SSPLv1). """ - +import pytest from pytest import raises from redis.exceptions import ExecAbortError, ResponseError @@ -400,26 +400,16 @@ def test_multi_watch_with_dirty_after_restart(cluster): assert val is None -def test_multi_with_blocking_commands(cluster): +@pytest.mark.parametrize("with_watch", [False, True]) +def test_multi_with_blocking_commands(cluster, with_watch): cluster.create(3) node = cluster.leader_node() node.execute('set', 'key1', 1) conn = RawConnection(cluster.node(1).client) - assert conn.execute('multi') == b'OK' - assert conn.execute('blpop', 'key2', 0) == b'QUEUED' - assert conn.execute('exec') == [None] - - -def test_multi_with_blocking_command_and_watch(cluster): - cluster.create(3) - node = cluster.leader_node() - node.execute('set', 'key1', 1) - - conn = RawConnection(cluster.node(1).client) - - assert conn.execute('watch', 'key1') == b'OK' + if with_watch: + assert conn.execute('watch', 'key1') == b'OK' assert conn.execute('multi') == b'OK' assert conn.execute('blpop', 'key2', 0) == b'QUEUED' assert conn.execute('exec') == [None] From 28859f2c9458aa753d97896fdac898aa8ca80df7 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Thu, 18 May 2023 14:14:08 +0300 Subject: [PATCH 11/28] update redis pr --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c761b2e8b..afbf988dd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: uses: actions/checkout@v2 with: repository: 'sjpotter/redis' - ref: 'e138e7445e62ddedddf4477b0724ae431f200ea1' + ref: '4e3b21045c8ba7f5d7b6d021cca3f51389a8b6c9' path: 'redis' - name: Build Redis run: cd redis && make -j 4 gcov @@ -55,7 +55,7 @@ jobs: uses: actions/checkout@v2 with: repository: 'sjpotter/redis' - ref: 'e138e7445e62ddedddf4477b0724ae431f200ea1' + ref: '4e3b21045c8ba7f5d7b6d021cca3f51389a8b6c9' path: 'redis' - name: Build Redis run: cd redis && make -j 4 SANITIZER=address From f3169e08a3a41166626a69a5ff0d99935225be03 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Sun, 21 May 2023 12:29:21 +0300 Subject: [PATCH 12/28] add tests for unwatch and discard note - we don't handle discard correctly yet (tearing down session) --- tests/integration/test_multi.py | 59 +++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/tests/integration/test_multi.py b/tests/integration/test_multi.py index 124269e05..5254c4259 100644 --- a/tests/integration/test_multi.py +++ b/tests/integration/test_multi.py @@ -309,6 +309,7 @@ def test_multi_watch_cleared_after_exec(cluster): assert conn.execute('get', 'key1') == b'QUEUED' assert conn.execute('get', 'key1') == b'QUEUED' assert conn.execute('set', 'key2', 2) == b'QUEUED' + # "dirty" the key that was formerly watched, but previous exec should have cleare dit assert cluster.execute('set', 'key1', 2) assert conn.execute('exec') == [b'2', b'2', b'OK'] @@ -319,6 +320,64 @@ def test_multi_watch_cleared_after_exec(cluster): assert val == b'2' +def test_multi_watch_cleared_after_discard(cluster): + cluster.create(3) + node = cluster.leader_node() + node.execute('set', 'key1', 1) + + conn = RawConnection(cluster.node(1).client) + + assert conn.execute('watch', 'key1') == b'OK' + assert conn.execute('multi') == b'OK' + assert conn.execute('get', 'key1') == b'QUEUED' + assert conn.execute('get', 'key1') == b'QUEUED' + assert conn.execute('set', 'key2', 1) == b'QUEUED' + assert conn.execute('discard') == b'OK' + + cluster.wait_for_unanimity() + + for i in range(1, 3): + val = cluster.node(i).raft_debug_exec("get", "key2") + assert val is None + + assert conn.execute('multi') == b'OK' + assert conn.execute('get', 'key1') == b'QUEUED' + assert conn.execute('get', 'key1') == b'QUEUED' + assert conn.execute('set', 'key2', 2) == b'QUEUED' + # "dirty" the key that was formerly watched, but previous exec should have cleare dit + assert cluster.execute('set', 'key1', 2) + assert conn.execute('exec') == [b'2', b'2', b'OK'] + + cluster.wait_for_unanimity() + + for i in range(1, 3): + val = cluster.node(i).raft_debug_exec("get", "key2") + assert val == b'2' + + +def test_multi_watch_cleared_after_unwatch(cluster): + cluster.create(3) + node = cluster.leader_node() + node.execute('set', 'key1', 1) + + conn = RawConnection(cluster.node(1).client) + + assert conn.execute('watch', 'key1') == b'OK' + assert conn.execute('unwatch') == b'OK' + assert conn.execute('multi') == b'OK' + assert conn.execute('get', 'key1') == b'QUEUED' + assert conn.execute('get', 'key1') == b'QUEUED' + assert conn.execute('set', 'key2', 1) == b'QUEUED' + assert cluster.execute('set', 'key1', 2) + assert conn.execute('exec') == [b'2', b'2', b'OK'] + + cluster.wait_for_unanimity() + + for i in range(1, 3): + val = cluster.node(i).raft_debug_exec("get", "key2") + assert val == b'1' + + def test_multi_watch_with_restart_clean(cluster): cluster.create(3) node = cluster.leader_node() From 464c36f1c5ac588912f5f3b5a7dbf3594e7773d7 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Sun, 21 May 2023 12:37:40 +0300 Subject: [PATCH 13/28] add discard support to clear watches --- src/clientstate.c | 1 + src/multi.c | 3 ++- src/redisraft.c | 2 +- src/redisraft.h | 2 ++ 4 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/clientstate.c b/src/clientstate.c index 6cc3257eb..e35855bc6 100644 --- a/src/clientstate.c +++ b/src/clientstate.c @@ -20,6 +20,7 @@ ClientState *ClientStateGet(RedisRaftCtx *rr, RedisModuleCtx *ctx) void ClientStateAlloc(RedisRaftCtx *rr, unsigned long long client_id) { ClientState *clientState = RedisModule_Calloc(sizeof(ClientState), 1); + clientState->client_id = client_id; int ret = RedisModule_DictSetC(rr->client_state, &client_id, sizeof(client_id), clientState); RedisModule_Assert(ret == REDISMODULE_OK); } diff --git a/src/multi.c b/src/multi.c index 551cc61d7..4e64d7b54 100644 --- a/src/multi.c +++ b/src/multi.c @@ -96,7 +96,8 @@ bool MultiHandleCommand(RedisRaftCtx *rr, } MultiStateReset(multiState); - RedisModule_ReplyWithSimpleString(ctx, "OK"); + RaftReq *req = RaftReqInit(ctx, RR_END_SESSION); + appendEndClientSession(rr, req, clientState->client_id, "UNWATCH"); return true; } diff --git a/src/redisraft.c b/src/redisraft.c index 35af658d0..b2decced9 100644 --- a/src/redisraft.c +++ b/src/redisraft.c @@ -587,7 +587,7 @@ static void handleClientCommand(RedisRaftCtx *rr, RedisModuleCtx *ctx, RaftRedis RedisModule_ReplyWithError(ctx, "ERR RedisRaft should only handle CLIENT UNBLOCK commands"); } -static void appendEndClientSession(RedisRaftCtx *rr, RaftReq *req, unsigned long long id, char *reason) +void appendEndClientSession(RedisRaftCtx *rr, RaftReq *req, unsigned long long id, char *reason) { raft_entry_t *entry = raft_entry_new(strlen(reason) + 1); entry->type = RAFT_LOGTYPE_END_SESSION; diff --git a/src/redisraft.h b/src/redisraft.h index 35ff49cb3..1fcb8831b 100644 --- a/src/redisraft.h +++ b/src/redisraft.h @@ -738,6 +738,7 @@ typedef struct MultiState { } MultiState; typedef struct ClientState { + unsigned long long client_id; MultiState multi_state; bool asking; /* we record "watched" at append time, for 2 reasons @@ -832,6 +833,7 @@ RRStatus RaftRedisDeserializeTimeout(const void *buf, size_t buf_size, raft_inde /* redisraft.c */ RRStatus RedisRaftCtxInit(RedisRaftCtx *rr, RedisModuleCtx *ctx); void RedisRaftCtxClear(RedisRaftCtx *rr); +void appendEndClientSession(RedisRaftCtx *rr, RaftReq *req, unsigned long long id, char *reason); /* raft.c */ void RaftReqFree(RaftReq *req); From 3fd3e091b29d8bbbf50db4ae96523fe32a129f0a Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Sun, 21 May 2023 13:27:44 +0300 Subject: [PATCH 14/28] linter --- tests/integration/test_multi.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_multi.py b/tests/integration/test_multi.py index 5254c4259..fae281f53 100644 --- a/tests/integration/test_multi.py +++ b/tests/integration/test_multi.py @@ -309,7 +309,8 @@ def test_multi_watch_cleared_after_exec(cluster): assert conn.execute('get', 'key1') == b'QUEUED' assert conn.execute('get', 'key1') == b'QUEUED' assert conn.execute('set', 'key2', 2) == b'QUEUED' - # "dirty" the key that was formerly watched, but previous exec should have cleare dit + # "dirty" the key that was formerly watched, + # but previous exec should have cleared it assert cluster.execute('set', 'key1', 2) assert conn.execute('exec') == [b'2', b'2', b'OK'] @@ -344,7 +345,8 @@ def test_multi_watch_cleared_after_discard(cluster): assert conn.execute('get', 'key1') == b'QUEUED' assert conn.execute('get', 'key1') == b'QUEUED' assert conn.execute('set', 'key2', 2) == b'QUEUED' - # "dirty" the key that was formerly watched, but previous exec should have cleare dit + # "dirty" the key that was formerly watched, + # but previous exec should have cleared it assert cluster.execute('set', 'key1', 2) assert conn.execute('exec') == [b'2', b'2', b'OK'] From 752e45c78b3615730d1e5e6076dce31ba43e4c7e Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Sun, 21 May 2023 17:08:01 +0300 Subject: [PATCH 15/28] add test for exec abort situation with watches --- src/multi.c | 15 ++++++++++--- src/raft.c | 7 ++++-- src/redisraft.c | 2 +- src/redisraft.h | 6 +++++ tests/integration/test_multi.py | 40 +++++++++++++++++++++++++++++++++ 5 files changed, 64 insertions(+), 6 deletions(-) diff --git a/src/multi.c b/src/multi.c index 4e64d7b54..65d4e03e1 100644 --- a/src/multi.c +++ b/src/multi.c @@ -75,7 +75,12 @@ bool MultiHandleCommand(RedisRaftCtx *rr, if (multiState->error) { MultiStateReset(multiState); - RedisModule_ReplyWithError(ctx, "EXECABORT Transaction discarded because of previous errors."); + if (clientState->watched) { + RaftReq *req = RaftReqInit(ctx, RR_END_SESSION); + appendEndClientSession(rr, req, clientState->client_id, SESSION_END_EXECABORT); + } else { + RedisModule_ReplyWithError(ctx, EXECABORT_ERR); + } return true; } @@ -96,8 +101,12 @@ bool MultiHandleCommand(RedisRaftCtx *rr, } MultiStateReset(multiState); - RaftReq *req = RaftReqInit(ctx, RR_END_SESSION); - appendEndClientSession(rr, req, clientState->client_id, "UNWATCH"); + if (clientState->watched) { + RaftReq *req = RaftReqInit(ctx, RR_END_SESSION); + appendEndClientSession(rr, req, clientState->client_id, SESSION_END_DISCARD); + } else { + RedisModule_ReplyWithSimpleString(ctx, "OK"); + } return true; } diff --git a/src/raft.c b/src/raft.c index 1cf39a823..ce0b13d02 100644 --- a/src/raft.c +++ b/src/raft.c @@ -514,7 +514,6 @@ RedisModuleCallReply *RaftExecuteCommandArray(RedisRaftCtx *rr, } if (is_multi_session) { - LOG_WARNING("ending session"); endClientSession(rr, client_session->client_id); } @@ -652,7 +651,11 @@ static void handleEndClientSession(RedisRaftCtx *rr, raft_entry_t *entry, RaftRe endClientSession(rr, id); if (req) { - RedisModule_ReplyWithSimpleString(req->ctx, "OK"); + if (strncmp(entry->data, SESSION_END_EXECABORT, entry->data_len) == 0) { + RedisModule_ReplyWithError(req->ctx, EXECABORT_ERR); + } else { + RedisModule_ReplyWithSimpleString(req->ctx, "OK"); + } RaftReqFree(req); } } diff --git a/src/redisraft.c b/src/redisraft.c index b2decced9..72e5e87e7 100644 --- a/src/redisraft.c +++ b/src/redisraft.c @@ -677,7 +677,7 @@ static bool handleUnwatch(RedisRaftCtx *rr, RedisModuleCtx *ctx, RaftRedisComman if (cmd_len == 7 && strncasecmp(cmd, "UNWATCH", 7) == 0) { RaftReq *req = RaftReqInit(ctx, RR_END_SESSION); unsigned long long id = RedisModule_GetClientId(ctx); - appendEndClientSession(rr, req, id, "UNWATCH"); + appendEndClientSession(rr, req, id, SESSION_END_UNWATCH); return true; } diff --git a/src/redisraft.h b/src/redisraft.h index 1fcb8831b..2623d0e6b 100644 --- a/src/redisraft.h +++ b/src/redisraft.h @@ -780,6 +780,12 @@ typedef struct ClientSession { bool dirty; } ClientSession; +#define SESSION_END_DISCONNECT "DISCONNECT" +#define SESSION_END_UNWATCH "UNWATCH" +#define SESSION_END_DISCARD "DISCARD" +#define SESSION_END_EXECABORT "EXECABORT" +#define EXECABORT_ERR "EXECABORT Transaction discarded because of previous errors." + /* common.c */ void joinLinkIdleCallback(Connection *conn); void joinLinkFreeCallback(void *privdata); diff --git a/tests/integration/test_multi.py b/tests/integration/test_multi.py index fae281f53..30adb7a71 100644 --- a/tests/integration/test_multi.py +++ b/tests/integration/test_multi.py @@ -275,6 +275,7 @@ def test_multi_watch_with_modification(cluster): assert conn.execute('get', 'key1') == b'QUEUED' assert conn.execute('get', 'key1') == b'QUEUED' assert conn.execute('set', 'key2', 1) == b'QUEUED' + # "dirty" the key that was watched, should kill transaction assert cluster.execute('set', 'key1', 2) assert conn.execute('exec') is None @@ -357,6 +358,45 @@ def test_multi_watch_cleared_after_discard(cluster): assert val == b'2' +def test_multi_watch_cleared_after_execabort(cluster): + cluster.create(3) + node = cluster.leader_node() + node.execute('set', 'key1', 1) + + conn = RawConnection(cluster.node(1).client) + + assert conn.execute('watch', 'key1') == b'OK' + assert conn.execute('multi') == b'OK' + with raises(ResponseError, match=".*unknown command 'nonexistentcommand'.*"): + conn.execute('nonexistentcommand') + assert conn.execute('get', 'key1') == b'QUEUED' + assert conn.execute('get', 'key1') == b'QUEUED' + assert conn.execute('set', 'key2', 1) == b'QUEUED' + with raises(ResponseError, match="Transaction discarded because of previous errors."): + conn.execute('exec') + + cluster.wait_for_unanimity() + + for i in range(1, 3): + val = cluster.node(i).raft_debug_exec("get", "key2") + assert val is None + + assert conn.execute('multi') == b'OK' + assert conn.execute('get', 'key1') == b'QUEUED' + assert conn.execute('get', 'key1') == b'QUEUED' + assert conn.execute('set', 'key2', 2) == b'QUEUED' + # "dirty" the key that was formerly watched, + # but previous exec should have cleared it + assert cluster.execute('set', 'key1', 2) + assert conn.execute('exec') == [b'2', b'2', b'OK'] + + cluster.wait_for_unanimity() + + for i in range(1, 3): + val = cluster.node(i).raft_debug_exec("get", "key2") + assert val == b'2' + + def test_multi_watch_cleared_after_unwatch(cluster): cluster.create(3) node = cluster.leader_node() From 7dbff8b2b39de4fde5e562caf0a5bc30b4fafc6e Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Mon, 22 May 2023 20:24:22 +0300 Subject: [PATCH 16/28] enable most watch/multi/exec tests --- src/commands.c | 4 +++- tests/redis-suite/skip.txt | 31 ++++++++++++++++++------------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/src/commands.c b/src/commands.c index aa6f03bc5..bc10c3d55 100644 --- a/src/commands.c +++ b/src/commands.c @@ -72,7 +72,9 @@ static const CommandSpec commands[] = { /* Admin commands - bypassed */ {"auth", CMD_SPEC_DONT_INTERCEPT }, - {"ping", CMD_SPEC_DONT_INTERCEPT }, + /* queued by multi, need to intercept */ +// {"ping", CMD_SPEC_DONT_INTERCEPT }, + {"ping", CMD_SPEC_READONLY }, {"hello", CMD_SPEC_DONT_INTERCEPT }, {"module", CMD_SPEC_DONT_INTERCEPT }, {"config", CMD_SPEC_DONT_INTERCEPT }, diff --git a/tests/redis-suite/skip.txt b/tests/redis-suite/skip.txt index aafcecab8..b3379d1e2 100644 --- a/tests/redis-suite/skip.txt +++ b/tests/redis-suite/skip.txt @@ -1,5 +1,6 @@ --- doesn't work, as command appears as "raft", not "blpop" +-- doesn't work, as command appears as "raft", not the comand Blocking command accounted only once in commandstats after timeout +command stats for MULTI -- Streams not supported -- See: https://github.com/RedisLabs/redisraft/issues/59 @@ -22,6 +23,7 @@ Timedout script link is still usable after Lua returns /function kill /script kill /test wrong subcommand +/.*script timeout.* -- RAFT command prefix shows up in SLOWLOG. SLOWLOG - Rewritten commands are logged as their original command @@ -39,18 +41,21 @@ UNLINK can reclaim memory in background -- ACL test fails because we prepend "raft" string to the command Script ACL check --- WATCH (multi/exec) not supported -/.*MULTI.* -/.*EXEC.* -/.*WATCH.* -SMOVE only notify dstset when the addition is successful -FLUSHALL is able to touch the watched keys -FLUSHDB is able to touch the watched keys -client evicted due to watched key list -FLUSHALL does not touch non affected keys -FLUSHDB does not touch non affected keys -SWAPDB is able to touch the watched keys that exist -SWAPDB is able to touch the watched keys that do not exist + +-- MULTI/EXEC is currently read-write in RedisRaft +EXEC with only read commands should not be rejected when OOM + +-- RedisRaft can't intercept differently when inside a multi, so these are broken for now +-- in general, we'd not want to intercept them, but they need to be intercepted within a multi +-- where they will error out in a specific way +-- https://github.com/redis/redis/issues/12210 +MULTI with SAVE +MULTI with SHUTDOWN +MULTI with config error + +-- pubsub can't allow ping to be intercepted, but needs to be intercepted for multi tests +-- where part of the multi response +/.*Sub PING on.* -- After fixing this: https://github.com/RedisLabs/redisraft/issues/367 -- We don't need to skip this test as it doesn't actually configure a replica. From d64fff32ffe600c88be6acaba937fa8904d0605b Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Mon, 22 May 2023 21:47:05 +0300 Subject: [PATCH 17/28] fix --- deps/common/redismodule.h | 8 +++++++- src/raft.c | 2 +- src/snapshot.c | 3 ++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/deps/common/redismodule.h b/deps/common/redismodule.h index 027007ee5..10ee8af1d 100644 --- a/deps/common/redismodule.h +++ b/deps/common/redismodule.h @@ -312,6 +312,10 @@ typedef uint64_t RedisModuleTimerID; /* RM_GetClientFlags */ #define REDISMODULE_CLIENT_FLAG_DIRTY_CAS (1<<0) /* Watched keys modified. EXEC will fail. */ #define REDISMODULE_CLIENT_FLAG_DIRTY_EXEC (1<<1) /* EXEC will fail for errors while queueing */ +/* Next client flag, must be updated when adding new flags above! +This flag should not be used directly by the module. + * Use RedisModule_GetClientFlagsAll instead. */ +#define _REDISMODULE_CLIENT_FLAGS_NEXT (1<<2) /* Definitions for RedisModule_SetCommandInfo. */ @@ -1315,9 +1319,10 @@ REDISMODULE_API void (*RedisModule_RdbStreamFree)(RedisModuleRdbStream *stream) REDISMODULE_API int (*RedisModule_RdbLoad)(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_RdbSave)(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) REDISMODULE_ATTR; REDISMODULE_API RedisModuleClient * (*RedisModule_CreateModuleClient)(RedisModuleCtx *ctx) REDISMODULE_ATTR; -REDISMODULE_API void (*RedisModule_FreeModuleClient)(RedisModuleCtx *ctx, RedisModuleClient *client) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_FreeModuleClient)(RedisModuleCtx *ctx, RedisModuleClient *client) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_SetContextClient)(RedisModuleCtx *ctx, RedisModuleClient *client) REDISMODULE_ATTR; REDISMODULE_API uint64_t (*RedisModule_GetClientFlags)(RedisModuleClient *client) REDISMODULE_ATTR; +REDISMODULE_API uint64_t (*RedisModule_GetClientFlagsAll)(void) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_SetClientUser)(RedisModuleClient *client, RedisModuleUser *user) REDISMODULE_ATTR; #define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX) @@ -1683,6 +1688,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(FreeModuleClient); REDISMODULE_GET_API(SetContextClient); REDISMODULE_GET_API(GetClientFlags); + REDISMODULE_GET_API(GetClientFlagsAll); REDISMODULE_GET_API(SetClientUser); diff --git a/src/raft.c b/src/raft.c index ce0b13d02..490336a20 100644 --- a/src/raft.c +++ b/src/raft.c @@ -379,7 +379,7 @@ void handleUnblock(RedisModuleCtx *ctx, RedisModuleCallReply *reply, void *priva static bool isClientSessionDirty(ClientSession *client_session) { uint64_t flags = RedisModule_GetClientFlags(client_session->client); - if (client_session->dirty || flags) { + if (client_session->dirty || flags & REDISMODULE_CLIENT_FLAG_DIRTY_CAS) { return true; } diff --git a/src/snapshot.c b/src/snapshot.c index d3516cb69..05b3dbe5e 100644 --- a/src/snapshot.c +++ b/src/snapshot.c @@ -709,7 +709,8 @@ static void clientSessionRDBSave(RedisModuleIO *rdb) ClientSession *client_session; while (RedisModule_DictNextC(iter, NULL, (void **) &client_session) != NULL) { RedisModule_SaveUnsigned(rdb, client_session->client_id); - if (RedisModule_GetClientFlags(client_session->client)) { + uint64_t flags = RedisModule_GetClientFlags(client_session->client); + if (flags & REDISMODULE_CLIENT_FLAG_DIRTY_CAS) { RedisModule_SaveUnsigned(rdb, 1); } else { RedisModule_SaveUnsigned(rdb, 0); From 0eeddad76487b6991c6c17dddbe322703f3c5c6d Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Mon, 22 May 2023 22:01:34 +0300 Subject: [PATCH 18/28] update git ref for redis --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index afbf988dd..d543721b1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: uses: actions/checkout@v2 with: repository: 'sjpotter/redis' - ref: '4e3b21045c8ba7f5d7b6d021cca3f51389a8b6c9' + ref: '1631f37d242613c128ed2406d2f6fd5a83eaf954' path: 'redis' - name: Build Redis run: cd redis && make -j 4 gcov @@ -55,7 +55,7 @@ jobs: uses: actions/checkout@v2 with: repository: 'sjpotter/redis' - ref: '4e3b21045c8ba7f5d7b6d021cca3f51389a8b6c9' + ref: '1631f37d242613c128ed2406d2f6fd5a83eaf954' path: 'redis' - name: Build Redis run: cd redis && make -j 4 SANITIZER=address From e9315488839b5a8eb5345ef4aa88ab088aa9ad14 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Tue, 23 May 2023 15:33:07 +0300 Subject: [PATCH 19/28] comment ping changes + update redissuite skip test list/comments --- src/commands.c | 5 ++--- tests/redis-suite/skip.txt | 13 ++++++++----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/commands.c b/src/commands.c index bc10c3d55..cc6a1585f 100644 --- a/src/commands.c +++ b/src/commands.c @@ -72,9 +72,8 @@ static const CommandSpec commands[] = { /* Admin commands - bypassed */ {"auth", CMD_SPEC_DONT_INTERCEPT }, - /* queued by multi, need to intercept */ -// {"ping", CMD_SPEC_DONT_INTERCEPT }, - {"ping", CMD_SPEC_READONLY }, + /* queued by multi, need to intercept in multi, but can't do that right now with redis */ + {"ping", CMD_SPEC_DONT_INTERCEPT }, {"hello", CMD_SPEC_DONT_INTERCEPT }, {"module", CMD_SPEC_DONT_INTERCEPT }, {"config", CMD_SPEC_DONT_INTERCEPT }, diff --git a/tests/redis-suite/skip.txt b/tests/redis-suite/skip.txt index b3379d1e2..3aa08b00e 100644 --- a/tests/redis-suite/skip.txt +++ b/tests/redis-suite/skip.txt @@ -41,7 +41,6 @@ UNLINK can reclaim memory in background -- ACL test fails because we prepend "raft" string to the command Script ACL check - -- MULTI/EXEC is currently read-write in RedisRaft EXEC with only read commands should not be rejected when OOM @@ -52,10 +51,14 @@ EXEC with only read commands should not be rejected when OOM MULTI with SAVE MULTI with SHUTDOWN MULTI with config error - --- pubsub can't allow ping to be intercepted, but needs to be intercepted for multi tests --- where part of the multi response -/.*Sub PING on.* +-- same as above, but these all relate to PING usage inside of MULTI +MULTI / EXEC basics +EXEC works on WATCHed key not modified +After successful EXEC key is no longer watched +After failed EXEC key is no longer watched +It is possible to UNWATCH +FLUSHALL does not touch non affected keys +FLUSHDB does not touch non affected keys -- After fixing this: https://github.com/RedisLabs/redisraft/issues/367 -- We don't need to skip this test as it doesn't actually configure a replica. From 418ac5b6735cc65215580534aabe38f319445fda Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Tue, 23 May 2023 15:34:44 +0300 Subject: [PATCH 20/28] update redis ref --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d543721b1..af3dc0cad 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: uses: actions/checkout@v2 with: repository: 'sjpotter/redis' - ref: '1631f37d242613c128ed2406d2f6fd5a83eaf954' + ref: '78c96c1443f1037ad087e069ade10b822512910f' path: 'redis' - name: Build Redis run: cd redis && make -j 4 gcov @@ -55,7 +55,7 @@ jobs: uses: actions/checkout@v2 with: repository: 'sjpotter/redis' - ref: '1631f37d242613c128ed2406d2f6fd5a83eaf954' + ref: '78c96c1443f1037ad087e069ade10b822512910f' path: 'redis' - name: Build Redis run: cd redis && make -j 4 SANITIZER=address From 90fe66ca516795545a937a0680e4ec50d371e7c3 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Tue, 23 May 2023 17:04:03 +0300 Subject: [PATCH 21/28] pass almost all tests --- .github/workflows/ci.yml | 4 ++-- deps/common/redismodule.h | 2 ++ src/commands.c | 7 +++---- src/multi.c | 10 ++++++---- src/redisraft.c | 9 +++++++++ src/redisraft.h | 21 +++++++++++---------- tests/redis-suite/skip.txt | 15 +-------------- 7 files changed, 34 insertions(+), 34 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index af3dc0cad..7cded3d85 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: uses: actions/checkout@v2 with: repository: 'sjpotter/redis' - ref: '78c96c1443f1037ad087e069ade10b822512910f' + ref: 'ed754fe01934cebf2ed3343276ef564c92c4c74f' path: 'redis' - name: Build Redis run: cd redis && make -j 4 gcov @@ -55,7 +55,7 @@ jobs: uses: actions/checkout@v2 with: repository: 'sjpotter/redis' - ref: '78c96c1443f1037ad087e069ade10b822512910f' + ref: 'ed754fe01934cebf2ed3343276ef564c92c4c74f' path: 'redis' - name: Build Redis run: cd redis && make -j 4 SANITIZER=address diff --git a/deps/common/redismodule.h b/deps/common/redismodule.h index 10ee8af1d..b0bb8fb37 100644 --- a/deps/common/redismodule.h +++ b/deps/common/redismodule.h @@ -1268,6 +1268,7 @@ REDISMODULE_API RedisModuleString * (*RedisModule_CommandFilterArgGet)(RedisModu REDISMODULE_API int (*RedisModule_CommandFilterArgInsert)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_CommandFilterArgReplace)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_CommandFilterArgDelete)(RedisModuleCommandFilterCtx *fctx, int pos) REDISMODULE_ATTR; +REDISMODULE_API unsigned long long (*RedisModule_CommandFilterGetClientId)(RedisModuleCommandFilterCtx *fctx) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_SendChildHeartbeat)(double progress) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_ExitFromChild)(int retcode) REDISMODULE_ATTR; @@ -1634,6 +1635,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(CommandFilterArgInsert); REDISMODULE_GET_API(CommandFilterArgReplace); REDISMODULE_GET_API(CommandFilterArgDelete); + REDISMODULE_GET_API(CommandFilterGetClientId); REDISMODULE_GET_API(Fork); REDISMODULE_GET_API(SendChildHeartbeat); REDISMODULE_GET_API(ExitFromChild); diff --git a/src/commands.c b/src/commands.c index cc6a1585f..b71965f76 100644 --- a/src/commands.c +++ b/src/commands.c @@ -72,14 +72,13 @@ static const CommandSpec commands[] = { /* Admin commands - bypassed */ {"auth", CMD_SPEC_DONT_INTERCEPT }, - /* queued by multi, need to intercept in multi, but can't do that right now with redis */ - {"ping", CMD_SPEC_DONT_INTERCEPT }, + {"ping", CMD_SPEC_INTERCEPT_IN_MULTI }, {"hello", CMD_SPEC_DONT_INTERCEPT }, {"module", CMD_SPEC_DONT_INTERCEPT }, - {"config", CMD_SPEC_DONT_INTERCEPT }, + {"config", CMD_SPEC_INTERCEPT_IN_MULTI }, {"monitor", CMD_SPEC_DONT_INTERCEPT }, {"command", CMD_SPEC_DONT_INTERCEPT }, - {"shutdown", CMD_SPEC_DONT_INTERCEPT }, + {"shutdown", CMD_SPEC_INTERCEPT_IN_MULTI }, {"quit", CMD_SPEC_DONT_INTERCEPT }, {"slowlog", CMD_SPEC_DONT_INTERCEPT }, {"acl", CMD_SPEC_DONT_INTERCEPT }, diff --git a/src/multi.c b/src/multi.c index 65d4e03e1..8f04303ed 100644 --- a/src/multi.c +++ b/src/multi.c @@ -130,10 +130,12 @@ bool MultiHandleCommand(RedisRaftCtx *rr, return true; } - if (cmd_flags & CMD_SPEC_DONT_INTERCEPT) { - RedisModule_ReplyWithError(ctx, "ERR not supported by RedisRaft inside MULTI/EXEC"); - multiState->error = true; - return true; + if (cmd_flags & CMD_SPEC_INTERCEPT_IN_MULTI) { + if (cmd_len == 8 && !strncasecmp(cmd_str, "SHUTDOWN", 8)) { + RedisModule_ReplyWithError(ctx, "ERR Command not allowed inside a transaction"); + multiState->error = true; + return true; + } } if (RedisModule_GetUsedMemoryRatio() > 1.0) { diff --git a/src/redisraft.c b/src/redisraft.c index 72e5e87e7..e1dfcabda 100644 --- a/src/redisraft.c +++ b/src/redisraft.c @@ -1725,6 +1725,15 @@ static void interceptRedisCommands(RedisModuleCommandFilterCtx *filter) if (flags != -1 && (flags & CMD_SPEC_DONT_INTERCEPT)) return; + if (flags != -1 && (flags & CMD_SPEC_INTERCEPT_IN_MULTI)) { + unsigned long long id = RedisModule_CommandFilterGetClientId(filter); + ClientState *clientState = ClientStateGetById(rr, id); + + if (clientState == NULL || !clientState->multi_state.active) { + return; + } + } + size_t len; const char *str = RedisModule_StringPtrLen(cmd, &len); diff --git a/src/redisraft.h b/src/redisraft.h index 2623d0e6b..d421bb42f 100644 --- a/src/redisraft.h +++ b/src/redisraft.h @@ -676,16 +676,17 @@ typedef struct { unsigned int flags; /* Command flags, see CMD_SPEC_* */ } CommandSpec; -#define CMD_SPEC_READONLY (1 << 1) /* Command is a read-only command */ -#define CMD_SPEC_WRITE (1 << 2) /* Command is a (potentially) write command */ -#define CMD_SPEC_UNSUPPORTED (1 << 3) /* Command is not supported, should be rejected */ -#define CMD_SPEC_DONT_INTERCEPT (1 << 4) /* Command should not be intercepted to RAFT */ -#define CMD_SPEC_SORT_REPLY (1 << 5) /* Command output should be sorted within a lua script */ -#define CMD_SPEC_RANDOM (1 << 6) /* Commands that are always random */ -#define CMD_SPEC_SCRIPTS (1 << 7) /* Commands that have script/function flags */ -#define CMD_SPEC_BLOCKING (1 << 8) /* Blocking command */ -#define CMD_SPEC_MULTI (1 << 9) /* a MULTI */ -#define CMD_SPEC_SUBCOMMAND (1 << 10) /* a command with subcommand specs */ +#define CMD_SPEC_READONLY (1 << 1) /* Command is a read-only command */ +#define CMD_SPEC_WRITE (1 << 2) /* Command is a (potentially) write command */ +#define CMD_SPEC_UNSUPPORTED (1 << 3) /* Command is not supported, should be rejected */ +#define CMD_SPEC_DONT_INTERCEPT (1 << 4) /* Command should not be intercepted to RAFT */ +#define CMD_SPEC_SORT_REPLY (1 << 5) /* Command output should be sorted within a lua script */ +#define CMD_SPEC_RANDOM (1 << 6) /* Commands that are always random */ +#define CMD_SPEC_SCRIPTS (1 << 7) /* Commands that have script/function flags */ +#define CMD_SPEC_BLOCKING (1 << 8) /* Blocking command */ +#define CMD_SPEC_MULTI (1 << 9) /* a MULTI */ +#define CMD_SPEC_SUBCOMMAND (1 << 10) /* a command with subcommand specs */ +#define CMD_SPEC_INTERCEPT_IN_MULTI (1 << 11) /* only intecept this command within a MULTI */ /* Command filtering re-entrancy counter handling. * diff --git a/tests/redis-suite/skip.txt b/tests/redis-suite/skip.txt index 3aa08b00e..5758752be 100644 --- a/tests/redis-suite/skip.txt +++ b/tests/redis-suite/skip.txt @@ -44,21 +44,8 @@ Script ACL check -- MULTI/EXEC is currently read-write in RedisRaft EXEC with only read commands should not be rejected when OOM --- RedisRaft can't intercept differently when inside a multi, so these are broken for now --- in general, we'd not want to intercept them, but they need to be intercepted within a multi --- where they will error out in a specific way --- https://github.com/redis/redis/issues/12210 +-- SAVE is in general unsupported in RedisRaft so can ignore it in multi as well MULTI with SAVE -MULTI with SHUTDOWN -MULTI with config error --- same as above, but these all relate to PING usage inside of MULTI -MULTI / EXEC basics -EXEC works on WATCHed key not modified -After successful EXEC key is no longer watched -After failed EXEC key is no longer watched -It is possible to UNWATCH -FLUSHALL does not touch non affected keys -FLUSHDB does not touch non affected keys -- After fixing this: https://github.com/RedisLabs/redisraft/issues/367 -- We don't need to skip this test as it doesn't actually configure a replica. From b3708cbb0cf9e067c1211737fe18078d90d6713e Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Tue, 23 May 2023 17:06:18 +0300 Subject: [PATCH 22/28] another monitor test that fails due to raft prefix --- tests/redis-suite/skip.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/redis-suite/skip.txt b/tests/redis-suite/skip.txt index 5758752be..1e8cac3aa 100644 --- a/tests/redis-suite/skip.txt +++ b/tests/redis-suite/skip.txt @@ -34,6 +34,7 @@ MONITOR can log executed commands MONITOR can log commands issued by the scripting engine MONITOR can log commands issued by functions MONITOR correctly handles multi-exec cases +MONITOR log blocked command only once -- TODO: check what's wrong UNLINK can reclaim memory in background From b6332c2161ecfc467b17f460cff50056bb081975 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Tue, 23 May 2023 17:51:40 +0300 Subject: [PATCH 23/28] linter issues --- src/redisraft.h | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/redisraft.h b/src/redisraft.h index d421bb42f..2d7f34fd1 100644 --- a/src/redisraft.h +++ b/src/redisraft.h @@ -676,17 +676,17 @@ typedef struct { unsigned int flags; /* Command flags, see CMD_SPEC_* */ } CommandSpec; -#define CMD_SPEC_READONLY (1 << 1) /* Command is a read-only command */ -#define CMD_SPEC_WRITE (1 << 2) /* Command is a (potentially) write command */ -#define CMD_SPEC_UNSUPPORTED (1 << 3) /* Command is not supported, should be rejected */ -#define CMD_SPEC_DONT_INTERCEPT (1 << 4) /* Command should not be intercepted to RAFT */ -#define CMD_SPEC_SORT_REPLY (1 << 5) /* Command output should be sorted within a lua script */ -#define CMD_SPEC_RANDOM (1 << 6) /* Commands that are always random */ -#define CMD_SPEC_SCRIPTS (1 << 7) /* Commands that have script/function flags */ -#define CMD_SPEC_BLOCKING (1 << 8) /* Blocking command */ -#define CMD_SPEC_MULTI (1 << 9) /* a MULTI */ -#define CMD_SPEC_SUBCOMMAND (1 << 10) /* a command with subcommand specs */ -#define CMD_SPEC_INTERCEPT_IN_MULTI (1 << 11) /* only intecept this command within a MULTI */ +#define CMD_SPEC_READONLY (1 << 1) /* Command is a read-only command */ +#define CMD_SPEC_WRITE (1 << 2) /* Command is a (potentially) write command */ +#define CMD_SPEC_UNSUPPORTED (1 << 3) /* Command is not supported, should be rejected */ +#define CMD_SPEC_DONT_INTERCEPT (1 << 4) /* Command should not be intercepted to RAFT */ +#define CMD_SPEC_SORT_REPLY (1 << 5) /* Command output should be sorted within a lua script */ +#define CMD_SPEC_RANDOM (1 << 6) /* Commands that are always random */ +#define CMD_SPEC_SCRIPTS (1 << 7) /* Commands that have script/function flags */ +#define CMD_SPEC_BLOCKING (1 << 8) /* Blocking command */ +#define CMD_SPEC_MULTI (1 << 9) /* a MULTI */ +#define CMD_SPEC_SUBCOMMAND (1 << 10) /* a command with subcommand specs */ +#define CMD_SPEC_INTERCEPT_IN_MULTI (1 << 11) /* only intecept this command within a MULTI */ /* Command filtering re-entrancy counter handling. * @@ -782,9 +782,10 @@ typedef struct ClientSession { } ClientSession; #define SESSION_END_DISCONNECT "DISCONNECT" -#define SESSION_END_UNWATCH "UNWATCH" -#define SESSION_END_DISCARD "DISCARD" -#define SESSION_END_EXECABORT "EXECABORT" +#define SESSION_END_UNWATCH "UNWATCH" +#define SESSION_END_DISCARD "DISCARD" +#define SESSION_END_EXECABORT "EXECABORT" + #define EXECABORT_ERR "EXECABORT Transaction discarded because of previous errors." /* common.c */ From 0ca22ba9fa0e6557e950e60d8ab95cd7190e9b45 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Wed, 24 May 2023 11:07:46 +0300 Subject: [PATCH 24/28] remove redundant call --- src/raft.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/raft.c b/src/raft.c index 490336a20..54211905d 100644 --- a/src/raft.c +++ b/src/raft.c @@ -492,7 +492,6 @@ RedisModuleCallReply *RaftExecuteCommandArray(RedisRaftCtx *rr, } else { RedisModule_SetContextUser(ctx, NULL); } - RedisModule_SetContextClient(ctx, NULL); exitRedisModuleCall(); rr->entered_eval = old_entered_eval; From cebaa5de3a0fe94c54b53faf91465adc76e65f9e Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Wed, 24 May 2023 11:39:44 +0300 Subject: [PATCH 25/28] py linter --- tests/integration/test_multi.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_multi.py b/tests/integration/test_multi.py index 30adb7a71..c23702f9f 100644 --- a/tests/integration/test_multi.py +++ b/tests/integration/test_multi.py @@ -367,12 +367,12 @@ def test_multi_watch_cleared_after_execabort(cluster): assert conn.execute('watch', 'key1') == b'OK' assert conn.execute('multi') == b'OK' - with raises(ResponseError, match=".*unknown command 'nonexistentcommand'.*"): - conn.execute('nonexistentcommand') + with raises(ResponseError, match=".*unknown command 'notexistcmd'.*"): + conn.execute('notexistcmd') assert conn.execute('get', 'key1') == b'QUEUED' assert conn.execute('get', 'key1') == b'QUEUED' assert conn.execute('set', 'key2', 1) == b'QUEUED' - with raises(ResponseError, match="Transaction discarded because of previous errors."): + with raises(ResponseError, match="Transaction discarded.*"): conn.execute('exec') cluster.wait_for_unanimity() From 86233a6682e66bcafd5061c3701f6bb778fb1909 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Wed, 24 May 2023 12:09:31 +0300 Subject: [PATCH 26/28] fix spelling error --- tests/redis-suite/skip.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/redis-suite/skip.txt b/tests/redis-suite/skip.txt index 1e8cac3aa..e62313428 100644 --- a/tests/redis-suite/skip.txt +++ b/tests/redis-suite/skip.txt @@ -1,4 +1,4 @@ --- doesn't work, as command appears as "raft", not the comand +-- doesn't work, as command appears as "raft", not the command Blocking command accounted only once in commandstats after timeout command stats for MULTI From 10795b67d8479a074427463cba175fe3ba1740b7 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Tue, 30 May 2023 12:52:43 +0300 Subject: [PATCH 27/28] fix blocking commands when sessions are active + test --- src/raft.c | 2 +- tests/integration/test_blocking.py | 26 ++++++++++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/src/raft.c b/src/raft.c index 54211905d..b13d61316 100644 --- a/src/raft.c +++ b/src/raft.c @@ -471,7 +471,7 @@ RedisModuleCallReply *RaftExecuteCommandArray(RedisRaftCtx *rr, * When we have an ACL, we will have a user set on the context, so need "C" */ char *resp_call_fmt; - if (cmds->cmd_flags & CMD_SPEC_MULTI || client_session) { + if (cmds->cmd_flags & CMD_SPEC_MULTI) { /* don't block inside a MULTI */ resp_call_fmt = cmds->acl ? "CE0v" : "E0v"; } else { diff --git a/tests/integration/test_blocking.py b/tests/integration/test_blocking.py index 09d274d5d..bfc937a37 100644 --- a/tests/integration/test_blocking.py +++ b/tests/integration/test_blocking.py @@ -414,3 +414,29 @@ def test_blocking_with_timeout_after_unblock(cluster): val = cluster.node(i).raft_debug_exec("lrange", "x", 0, -1) assert type(val) == list assert len(val) == 0 + + +def test_blocking_with_watch(cluster): + cluster.create(3) + + c1 = cluster.leader_node().client.connection_pool.get_connection('c1') + c1.send_command('watch', 'x') + assert c1.read_response() == b'OK' + c1.send_command('blpop', 'x', 0) + c2 = cluster.leader_node().client.connection_pool.get_connection('c2') + c2.send_command('watch', 'x') + assert c2.read_response() == b'OK' + c2.send_command('blpop', 'x', 0) + + cluster.leader_node().execute("lpush", "x", 1) + cluster.leader_node().execute("lpush", "x", 2) + cluster.leader_node().execute("lpush", "x", 3) + + cluster.wait_for_unanimity() + + assert c1.read_response() == [b'x', b'1'] + assert c2.read_response() == [b'x', b'2'] + + for i in range(1, 3): + val = cluster.node(i).raft_debug_exec("lrange", "x", 0, -1) + assert val == [b'3'] \ No newline at end of file From 6013dbacce324ada0dc50bb6c153ed068a5e36d2 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Tue, 30 May 2023 14:44:32 +0300 Subject: [PATCH 28/28] linter --- tests/integration/test_blocking.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_blocking.py b/tests/integration/test_blocking.py index bfc937a37..d2d92cbe8 100644 --- a/tests/integration/test_blocking.py +++ b/tests/integration/test_blocking.py @@ -439,4 +439,4 @@ def test_blocking_with_watch(cluster): for i in range(1, 3): val = cluster.node(i).raft_debug_exec("lrange", "x", 0, -1) - assert val == [b'3'] \ No newline at end of file + assert val == [b'3']