From e9634042f38770d63e688f065b9d2ef1bd869b54 Mon Sep 17 00:00:00 2001 From: Leo P Date: Tue, 17 May 2022 00:15:24 -0400 Subject: [PATCH] sharded pubsub support --- async.c | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/async.c b/async.c index 3dad137b9..a3f4f5da2 100644 --- a/async.c +++ b/async.c @@ -414,7 +414,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, dict *callbacks; redisCallback *cb = NULL; dictEntry *de; - int pvariant; + int pvariant, svariant; char *stype; sds sname = NULL; @@ -426,7 +426,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, assert(reply->element[0]->type == REDIS_REPLY_STRING); stype = reply->element[0]->str; pvariant = (tolower(stype[0]) == 'p') ? 1 : 0; - + svariant = (reply->element[0]->len > 3 && (strncasecmp(stype, "ssu"/*bscrscribe*/, 3) == 0 || strncasecmp(stype, "sun"/*subscribe*/, 3) == 0)) ? 1 : 0; if (pvariant) callbacks = ac->sub.patterns; else @@ -444,11 +444,11 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, } /* If this is an subscribe reply decrease pending counter. */ - if (strcasecmp(stype+pvariant,"subscribe") == 0) { + if (strcasecmp(stype+pvariant+svariant,"subscribe") == 0) { assert(cb != NULL); cb->pending_subs -= 1; - } else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) { + } else if (strcasecmp(stype+pvariant+svariant,"unsubscribe") == 0) { if (cb == NULL) ac->sub.pending_unsubs -= 1; else if (cb->pending_subs == 0) @@ -762,7 +762,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void dictIterator it; dictEntry *de; redisCallback *existcb; - int pvariant, hasnext; + int pvariant, svariant, hasnext; const char *cstr, *astr; size_t clen, alen; const char *p; @@ -783,8 +783,9 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void assert(p != NULL); hasnext = (p[0] == '$'); pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0; - cstr += pvariant; - clen -= pvariant; + svariant = clen > 3 && (strncasecmp(cstr, "ssu"/*bscribe*/, 3) == 0 || strncasecmp(cstr, "sun"/*subscribe*/, 3) == 0); + cstr += pvariant + svariant; + clen -= pvariant + svariant; if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) { c->flags |= REDIS_SUBSCRIBED;