Skip to content

E2E MULTI/EXEC/WATCH Support #619

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c3e8ef9
test watch support with redis PR
sjpotter May 11, 2023
afa5440
linter
sjpotter May 11, 2023
833cefc
fix blocking commands in multi
sjpotter May 11, 2023
aa9ba04
change acl/user handling for usage with persistent clients
sjpotter May 11, 2023
6d7ac5d
fix
sjpotter May 11, 2023
f3e4d5e
add dirty flag support to snapshot/restore and use at apply time
sjpotter May 16, 2023
468cd21
linter
sjpotter May 16, 2023
8748578
update redis pr
sjpotter May 17, 2023
2bd6235
add some more tests for blocking commands in multi with and without w…
sjpotter May 18, 2023
61790dd
remove duplication from tests
sjpotter May 18, 2023
28859f2
update redis pr
sjpotter May 18, 2023
f3169e0
add tests for unwatch and discard
sjpotter May 21, 2023
464c36f
add discard support to clear watches
sjpotter May 21, 2023
3fd3e09
linter
sjpotter May 21, 2023
752e45c
add test for exec abort situation with watches
sjpotter May 21, 2023
7dbff8b
enable most watch/multi/exec tests
sjpotter May 22, 2023
d64fff3
fix
sjpotter May 22, 2023
0eeddad
update git ref for redis
sjpotter May 22, 2023
e931548
comment ping changes + update redissuite skip test list/comments
sjpotter May 23, 2023
418ac5b
update redis ref
sjpotter May 23, 2023
90fe66c
pass almost all tests
sjpotter May 23, 2023
b3708cb
another monitor test that fails due to raft prefix
sjpotter May 23, 2023
b6332c2
linter issues
sjpotter May 23, 2023
0ca22ba
remove redundant call
sjpotter May 24, 2023
cebaa5d
py linter
sjpotter May 24, 2023
86233a6
fix spelling error
sjpotter May 24, 2023
10795b6
fix blocking commands when sessions are active + test
sjpotter May 30, 2023
6013dba
linter
sjpotter May 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ jobs:
- name: Checkout Redis
uses: actions/checkout@v2
with:
repository: 'redis/redis'
ref: 'unstable'
repository: 'sjpotter/redis'
ref: 'ed754fe01934cebf2ed3343276ef564c92c4c74f'
path: 'redis'
- name: Build Redis
run: cd redis && make -j 4 gcov
Expand Down Expand Up @@ -54,8 +54,8 @@ jobs:
- name: Checkout Redis
uses: actions/checkout@v2
with:
repository: 'redis/redis'
ref: 'unstable'
repository: 'sjpotter/redis'
ref: 'ed754fe01934cebf2ed3343276ef564c92c4c74f'
path: 'redis'
- name: Build Redis
run: cd redis && make -j 4 SANITIZER=address
Expand Down
48 changes: 36 additions & 12 deletions deps/common/redismodule.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ typedef long long ustime_t;
/* API versions. */
#define REDISMODULE_APIVER_1 1

/* Version of the RedisModuleTypeMethods structure. Once the RedisModuleTypeMethods
/* Version of the RedisModuleTypeMethods structure. Once the RedisModuleTypeMethods
* structure is changed, this version number needs to be changed synchronistically. */
#define REDISMODULE_TYPE_METHOD_VERSION 5

Expand Down Expand Up @@ -309,6 +309,14 @@ typedef uint64_t RedisModuleTimerID;
* Use RedisModule_GetModuleOptionsAll instead. */
#define _REDISMODULE_OPTIONS_FLAGS_NEXT (1<<4)

/* RM_GetClientFlags */
#define REDISMODULE_CLIENT_FLAG_DIRTY_CAS (1<<0) /* Watched keys modified. EXEC will fail. */
#define REDISMODULE_CLIENT_FLAG_DIRTY_EXEC (1<<1) /* EXEC will fail for errors while queueing */
/* Next client flag, must be updated when adding new flags above!
This flag should not be used directly by the module.
* Use RedisModule_GetClientFlagsAll instead. */
#define _REDISMODULE_CLIENT_FLAGS_NEXT (1<<2)

/* Definitions for RedisModule_SetCommandInfo. */

typedef enum {
Expand Down Expand Up @@ -587,7 +595,7 @@ static const RedisModuleEvent
/* Deprecated since Redis 7.0, not used anymore. */
__attribute__ ((deprecated))
RedisModuleEvent_ReplBackup = {
REDISMODULE_EVENT_REPL_BACKUP,
REDISMODULE_EVENT_REPL_BACKUP,
1
},
RedisModuleEvent_ReplAsyncLoad = {
Expand Down Expand Up @@ -880,6 +888,7 @@ typedef struct RedisModuleCommandFilter RedisModuleCommandFilter;
typedef struct RedisModuleServerInfoData RedisModuleServerInfoData;
typedef struct RedisModuleScanCursor RedisModuleScanCursor;
typedef struct RedisModuleUser RedisModuleUser;
typedef struct RedisModuleClient RedisModuleClient;
typedef struct RedisModuleKeyOptCtx RedisModuleKeyOptCtx;
typedef struct RedisModuleRdbStream RedisModuleRdbStream;

Expand Down Expand Up @@ -976,7 +985,7 @@ REDISMODULE_API int (*RedisModule_GetSelectedDb)(RedisModuleCtx *ctx) REDISMODUL
REDISMODULE_API int (*RedisModule_SelectDb)(RedisModuleCtx *ctx, int newid) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_KeyExists)(RedisModuleCtx *ctx, RedisModuleString *keyname) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleKey * (*RedisModule_OpenKey)(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetOpenKeyModesAll)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetOpenKeyModesAll)(void) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_CloseKey)(RedisModuleKey *kp) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_KeyType)(RedisModuleKey *kp) REDISMODULE_ATTR;
REDISMODULE_API size_t (*RedisModule_ValueLength)(RedisModuleKey *kp) REDISMODULE_ATTR;
Expand Down Expand Up @@ -1098,7 +1107,7 @@ REDISMODULE_API int (*RedisModule_SetClientNameById)(uint64_t id, RedisModuleStr
REDISMODULE_API int (*RedisModule_PublishMessage)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_PublishMessageShard)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetContextFlags)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_AvoidReplicaTraffic)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_AvoidReplicaTraffic)(void) REDISMODULE_ATTR;
REDISMODULE_API void * (*RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleType * (*RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value) REDISMODULE_ATTR;
Expand Down Expand Up @@ -1201,17 +1210,17 @@ REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClientOnKeys)(Redi
REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClientOnKeysWithFlags)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata, int flags) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleString * (*RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleScanCursor * (*RedisModule_ScanCursorCreate)() REDISMODULE_ATTR;
REDISMODULE_API RedisModuleScanCursor * (*RedisModule_ScanCursorCreate)(void) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_ScanCursorRestart)(RedisModuleScanCursor *cursor) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_ScanCursorDestroy)(RedisModuleScanCursor *cursor) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_Scan)(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ScanKey)(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetContextFlagsAll)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetModuleOptionsAll)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetKeyspaceNotificationFlagsAll)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetContextFlagsAll)(void) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetModuleOptionsAll)(void) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetKeyspaceNotificationFlagsAll)(void) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_IsSubEventSupported)(RedisModuleEvent event, uint64_t subevent) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetServerVersion)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetTypeMethodVersion)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetServerVersion)(void) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetTypeMethodVersion)(void) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_Yield)(RedisModuleCtx *ctx, int flags, const char *busy_reply) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClient)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) REDISMODULE_ATTR;
REDISMODULE_API void * (*RedisModule_BlockClientGetPrivateData)(RedisModuleBlockedClient *blocked_client) REDISMODULE_ATTR;
Expand All @@ -1234,7 +1243,7 @@ REDISMODULE_API void (*RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx)
REDISMODULE_API int (*RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_AddPostNotificationJob)(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_NotifyKeyspaceEvent)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)(void) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_BlockedClientDisconnected)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_RegisterClusterMessageReceiver)(RedisModuleCtx *ctx, uint8_t type, RedisModuleClusterMessageReceiver callback) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_SendClusterMessage)(RedisModuleCtx *ctx, const char *target_id, uint8_t type, const char *msg, uint32_t len) REDISMODULE_ATTR;
Expand All @@ -1259,11 +1268,12 @@ REDISMODULE_API RedisModuleString * (*RedisModule_CommandFilterArgGet)(RedisModu
REDISMODULE_API int (*RedisModule_CommandFilterArgInsert)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_CommandFilterArgReplace)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_CommandFilterArgDelete)(RedisModuleCommandFilterCtx *fctx, int pos) REDISMODULE_ATTR;
REDISMODULE_API unsigned long long (*RedisModule_CommandFilterGetClientId)(RedisModuleCommandFilterCtx *fctx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_SendChildHeartbeat)(double progress) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ExitFromChild)(int retcode) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_KillForkChild)(int child_pid) REDISMODULE_ATTR;
REDISMODULE_API float (*RedisModule_GetUsedMemoryRatio)() REDISMODULE_ATTR;
REDISMODULE_API float (*RedisModule_GetUsedMemoryRatio)(void) REDISMODULE_ATTR;
REDISMODULE_API size_t (*RedisModule_MallocSize)(void* ptr) REDISMODULE_ATTR;
REDISMODULE_API size_t (*RedisModule_MallocUsableSize)(void *ptr) REDISMODULE_ATTR;
REDISMODULE_API size_t (*RedisModule_MallocSizeString)(RedisModuleString* str) REDISMODULE_ATTR;
Expand Down Expand Up @@ -1309,6 +1319,12 @@ REDISMODULE_API RedisModuleRdbStream *(*RedisModule_RdbStreamCreateFromFile)(con
REDISMODULE_API void (*RedisModule_RdbStreamFree)(RedisModuleRdbStream *stream) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_RdbLoad)(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_RdbSave)(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleClient * (*RedisModule_CreateModuleClient)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_FreeModuleClient)(RedisModuleCtx *ctx, RedisModuleClient *client) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_SetContextClient)(RedisModuleCtx *ctx, RedisModuleClient *client) REDISMODULE_ATTR;
REDISMODULE_API uint64_t (*RedisModule_GetClientFlags)(RedisModuleClient *client) REDISMODULE_ATTR;
REDISMODULE_API uint64_t (*RedisModule_GetClientFlagsAll)(void) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_SetClientUser)(RedisModuleClient *client, RedisModuleUser *user) REDISMODULE_ATTR;

#define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX)

Expand Down Expand Up @@ -1619,6 +1635,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(CommandFilterArgInsert);
REDISMODULE_GET_API(CommandFilterArgReplace);
REDISMODULE_GET_API(CommandFilterArgDelete);
REDISMODULE_GET_API(CommandFilterGetClientId);
REDISMODULE_GET_API(Fork);
REDISMODULE_GET_API(SendChildHeartbeat);
REDISMODULE_GET_API(ExitFromChild);
Expand Down Expand Up @@ -1669,6 +1686,13 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(RdbStreamFree);
REDISMODULE_GET_API(RdbLoad);
REDISMODULE_GET_API(RdbSave);
REDISMODULE_GET_API(CreateModuleClient);
REDISMODULE_GET_API(FreeModuleClient);
REDISMODULE_GET_API(SetContextClient);
REDISMODULE_GET_API(GetClientFlags);
REDISMODULE_GET_API(GetClientFlagsAll);
REDISMODULE_GET_API(SetClientUser);


if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;
RedisModule_SetModuleAttribs(ctx,name,ver,apiver);
Expand Down
1 change: 1 addition & 0 deletions src/clientstate.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ClientState *ClientStateGet(RedisRaftCtx *rr, RedisModuleCtx *ctx)
void ClientStateAlloc(RedisRaftCtx *rr, unsigned long long client_id)
{
ClientState *clientState = RedisModule_Calloc(sizeof(ClientState), 1);
clientState->client_id = client_id;
int ret = RedisModule_DictSetC(rr->client_state, &client_id, sizeof(client_id), clientState);
RedisModule_Assert(ret == REDISMODULE_OK);
}
Expand Down
6 changes: 3 additions & 3 deletions src/commands.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ static const CommandSpec commands[] = {

/* Admin commands - bypassed */
{"auth", CMD_SPEC_DONT_INTERCEPT },
{"ping", CMD_SPEC_DONT_INTERCEPT },
{"ping", CMD_SPEC_INTERCEPT_IN_MULTI },
{"hello", CMD_SPEC_DONT_INTERCEPT },
{"module", CMD_SPEC_DONT_INTERCEPT },
{"config", CMD_SPEC_DONT_INTERCEPT },
{"config", CMD_SPEC_INTERCEPT_IN_MULTI },
{"monitor", CMD_SPEC_DONT_INTERCEPT },
{"command", CMD_SPEC_DONT_INTERCEPT },
{"shutdown", CMD_SPEC_DONT_INTERCEPT },
{"shutdown", CMD_SPEC_INTERCEPT_IN_MULTI },
{"quit", CMD_SPEC_DONT_INTERCEPT },
{"slowlog", CMD_SPEC_DONT_INTERCEPT },
{"acl", CMD_SPEC_DONT_INTERCEPT },
Expand Down
24 changes: 18 additions & 6 deletions src/multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ bool MultiHandleCommand(RedisRaftCtx *rr,

if (multiState->error) {
MultiStateReset(multiState);
RedisModule_ReplyWithError(ctx, "EXECABORT Transaction discarded because of previous errors.");
if (clientState->watched) {
RaftReq *req = RaftReqInit(ctx, RR_END_SESSION);
appendEndClientSession(rr, req, clientState->client_id, SESSION_END_EXECABORT);
} else {
RedisModule_ReplyWithError(ctx, EXECABORT_ERR);
}
return true;
}

Expand All @@ -96,7 +101,12 @@ bool MultiHandleCommand(RedisRaftCtx *rr,
}

MultiStateReset(multiState);
RedisModule_ReplyWithSimpleString(ctx, "OK");
if (clientState->watched) {
RaftReq *req = RaftReqInit(ctx, RR_END_SESSION);
appendEndClientSession(rr, req, clientState->client_id, SESSION_END_DISCARD);
} else {
RedisModule_ReplyWithSimpleString(ctx, "OK");
}

return true;
}
Expand All @@ -120,10 +130,12 @@ bool MultiHandleCommand(RedisRaftCtx *rr,
return true;
}

if (cmd_flags & CMD_SPEC_DONT_INTERCEPT) {
RedisModule_ReplyWithError(ctx, "ERR not supported by RedisRaft inside MULTI/EXEC");
multiState->error = true;
return true;
if (cmd_flags & CMD_SPEC_INTERCEPT_IN_MULTI) {
if (cmd_len == 8 && !strncasecmp(cmd_str, "SHUTDOWN", 8)) {
RedisModule_ReplyWithError(ctx, "ERR Command not allowed inside a transaction");
multiState->error = true;
return true;
}
}

if (RedisModule_GetUsedMemoryRatio() > 1.0) {
Expand Down
Loading