Skip to content

Commit

Permalink
Remove expired members from subscription topic stored in Redis set map
Browse files Browse the repository at this point in the history
  • Loading branch information
kirills-morozovs committed Aug 16, 2024
1 parent b7ba38c commit 10f59eb
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
24 changes: 20 additions & 4 deletions src/Subscriptions/Storage/RedisStorageManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public function subscribersByTopic(string $topic): Collection
return new Collection();
}

// Store all keys as missing keys to remove the ones which are expired later.
$missingKeys = $subscriberIds;

// Since we store the individual subscribers with a prefix,
// but not in the set, we have to add the prefix here.
$subscriberIds = array_map([$this, 'channelKey'], $subscriberIds);
Expand All @@ -73,22 +76,35 @@ public function subscribersByTopic(string $topic): Collection
// This is like using multiple get calls (getSubscriber uses the get command).
$subscribers = $this->connection->command('mget', [$subscriberIds]);

return (new Collection($subscribers))
$subscribersCollection = (new Collection($subscribers))
->filter()
->map(static function (?string $subscriber): ?Subscriber {
// Some entries may be expired
->map(static function (?string $subscriber) use (&$missingKeys): ?Subscriber {
// Some entries may be expired.
if ($subscriber === null) {
return null;
}

// Other entries may contain invalid values
try {
return unserialize($subscriber);
$subscriber = unserialize($subscriber);

// This key exists so remove it from the list of missing keys.
$missingKeys = array_diff($missingKeys, [$subscriber->channel]);

return $subscriber;
} catch (\ErrorException) {
return null;
}
})
->filter();

// Remove expired subscribers from the set of subscribers of this topic.
$this->connection->command('srem', [
$this->topicKey($topic),
...$missingKeys,
]);

return $subscribersCollection;
}

public function storeSubscriber(Subscriber $subscriber, string $topic): void
Expand Down
7 changes: 5 additions & 2 deletions tests/Unit/Subscriptions/Storage/RedisStorageManagerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,20 @@ public function testSubscribersByTopic(): void
$subscriber2,
];

$redisConnection->expects($this->exactly(2))
$redisConnection->expects($this->exactly(3))
->method('command')
->with(...$this->withConsecutive(
['smembers', ["graphql.topic.{$topic}"]],
['mget', [[
'graphql.subscriber.foo1',
'graphql.subscriber.foo2',
'graphql.subscriber.foo3',
'graphql.subscriber.foo4',
]]],
['srem', ["graphql.topic.{$topic}", 'foo3', 'foo4']],
))
->willReturnOnConsecutiveCalls(
['foo1', 'foo2', 'foo3'],
['foo1', 'foo2', 'foo3', 'foo4'],
[
serialize($subscriber1),
serialize($subscriber2),
Expand All @@ -217,6 +219,7 @@ public function testSubscribersByTopic(): void
// mget non-existing-entry
false,
],
null,
);

$this->assertEquals(
Expand Down

0 comments on commit 10f59eb

Please sign in to comment.