From 10f59eb911abd4f9b7c89816ad183a49fdc9bfdf Mon Sep 17 00:00:00 2001 From: kirills-morozovs Date: Fri, 16 Aug 2024 17:23:38 +0300 Subject: [PATCH] Remove expired members from subscription topic stored in Redis set map --- .../Storage/RedisStorageManager.php | 24 +++++++++++++++---- .../Storage/RedisStorageManagerTest.php | 7 ++++-- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/src/Subscriptions/Storage/RedisStorageManager.php b/src/Subscriptions/Storage/RedisStorageManager.php index 4f945ba82..c22e39bb5 100644 --- a/src/Subscriptions/Storage/RedisStorageManager.php +++ b/src/Subscriptions/Storage/RedisStorageManager.php @@ -65,6 +65,9 @@ public function subscribersByTopic(string $topic): Collection return new Collection(); } + // Store all keys as missing keys to remove the ones which are expired later. + $missingKeys = $subscriberIds; + // Since we store the individual subscribers with a prefix, // but not in the set, we have to add the prefix here. $subscriberIds = array_map([$this, 'channelKey'], $subscriberIds); @@ -73,22 +76,35 @@ public function subscribersByTopic(string $topic): Collection // This is like using multiple get calls (getSubscriber uses the get command). $subscribers = $this->connection->command('mget', [$subscriberIds]); - return (new Collection($subscribers)) + $subscribersCollection = (new Collection($subscribers)) ->filter() - ->map(static function (?string $subscriber): ?Subscriber { - // Some entries may be expired + ->map(static function (?string $subscriber) use (&$missingKeys): ?Subscriber { + // Some entries may be expired. if ($subscriber === null) { return null; } // Other entries may contain invalid values try { - return unserialize($subscriber); + $subscriber = unserialize($subscriber); + + // This key exists so remove it from the list of missing keys. + $missingKeys = array_diff($missingKeys, [$subscriber->channel]); + + return $subscriber; } catch (\ErrorException) { return null; } }) ->filter(); + + // Remove expired subscribers from the set of subscribers of this topic. + $this->connection->command('srem', [ + $this->topicKey($topic), + ...$missingKeys, + ]); + + return $subscribersCollection; } public function storeSubscriber(Subscriber $subscriber, string $topic): void diff --git a/tests/Unit/Subscriptions/Storage/RedisStorageManagerTest.php b/tests/Unit/Subscriptions/Storage/RedisStorageManagerTest.php index d9b92180d..8dacc0b12 100644 --- a/tests/Unit/Subscriptions/Storage/RedisStorageManagerTest.php +++ b/tests/Unit/Subscriptions/Storage/RedisStorageManagerTest.php @@ -195,7 +195,7 @@ public function testSubscribersByTopic(): void $subscriber2, ]; - $redisConnection->expects($this->exactly(2)) + $redisConnection->expects($this->exactly(3)) ->method('command') ->with(...$this->withConsecutive( ['smembers', ["graphql.topic.{$topic}"]], @@ -203,10 +203,12 @@ public function testSubscribersByTopic(): void 'graphql.subscriber.foo1', 'graphql.subscriber.foo2', 'graphql.subscriber.foo3', + 'graphql.subscriber.foo4', ]]], + ['srem', ["graphql.topic.{$topic}", 'foo3', 'foo4']], )) ->willReturnOnConsecutiveCalls( - ['foo1', 'foo2', 'foo3'], + ['foo1', 'foo2', 'foo3', 'foo4'], [ serialize($subscriber1), serialize($subscriber2), @@ -217,6 +219,7 @@ public function testSubscribersByTopic(): void // mget non-existing-entry false, ], + null, ); $this->assertEquals(