From 2bd0e634ac274bedb00c78292f931cbbd853d78c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 18 Sep 2024 17:26:52 -0500 Subject: [PATCH 1/2] Bust `_membership_stream_cache` cache when current state changes Fix https://github.com/element-hq/synapse/issues/17368 --- synapse/storage/_base.py | 4 +++- synapse/storage/databases/main/cache.py | 5 +++++ synapse/util/caches/stream_change_cache.py | 11 +++++++++++ tests/util/test_stream_change_cache.py | 16 ++++++++++++++++ 4 files changed, 35 insertions(+), 1 deletion(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e14d711c76..7251e72e3a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -86,7 +86,9 @@ def process_replication_position( # noqa: B027 (no-op by design) """ def _invalidate_state_caches( - self, room_id: str, members_changed: Collection[str] + self, + room_id: str, + members_changed: Collection[str], ) -> None: """Invalidates caches that are based on the current state, but does not stream invalidations down replication. diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 37c865a8e7..930bdb5a4f 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -219,6 +219,8 @@ def process_replication_rows( room_id = row.keys[0] members_changed = set(row.keys[1:]) self._invalidate_state_caches(room_id, members_changed) + for user_id in members_changed: + self._membership_stream_cache.entity_has_changed(user_id, token) # type: ignore[attr-defined] elif row.cache_func == PURGE_HISTORY_CACHE_NAME: if row.keys is None: raise Exception( @@ -236,6 +238,7 @@ def process_replication_rows( room_id = row.keys[0] self._invalidate_caches_for_room_events(room_id) self._invalidate_caches_for_room(room_id) + self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined] else: self._attempt_to_invalidate_cache(row.cache_func, row.keys) @@ -275,6 +278,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: self._attempt_to_invalidate_cache( "get_sliding_sync_rooms_for_user", None ) + self._membership_stream_cache.entity_has_changed(data.state_key, token) # type: ignore[attr-defined] elif data.type == EventTypes.RoomEncryption: self._attempt_to_invalidate_cache( "get_room_encryption", (data.room_id,) @@ -291,6 +295,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: # Similar to the above, but the entire caches are invalidated. This is # unfortunate for the membership caches, but should recover quickly. self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined] + self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined] self._attempt_to_invalidate_cache("get_rooms_for_user", None) self._attempt_to_invalidate_cache("get_room_type", (data.room_id,)) self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,)) diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 16fcb00206..8965086a97 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -314,6 +314,17 @@ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None: self._entity_to_key[entity] = stream_pos self._evict() + def all_entities_changed(self, stream_pos: int) -> None: + """ + Mark all entities as changed. This is useful when the cache is invalidated and + there may be some potential change for all of the entities. + """ + # All entities are at the same stream position now. + self._cache = SortedDict({stream_pos: set(self._entity_to_key.keys())}) + self._entity_to_key = { + entity: stream_pos for entity in self._entity_to_key.keys() + } + def _evict(self) -> None: """ Ensure the cache has not exceeded the maximum size. diff --git a/tests/util/test_stream_change_cache.py b/tests/util/test_stream_change_cache.py index af1199ef8a..da36ecaed3 100644 --- a/tests/util/test_stream_change_cache.py +++ b/tests/util/test_stream_change_cache.py @@ -251,3 +251,19 @@ def test_max_pos(self) -> None: # Unknown entities will return None self.assertEqual(cache.get_max_pos_of_last_change("not@here.website"), None) + + def test_all_entities_changed(self) -> None: + """ + `StreamChangeCache.all_entities_changed(...)` will mark all entites as changed. + """ + cache = StreamChangeCache("#test", 1, max_size=10) + + cache.entity_has_changed("user@foo.com", 2) + cache.entity_has_changed("bar@baz.net", 3) + cache.entity_has_changed("user@elsewhere.org", 4) + + cache.all_entities_changed(5) + + self.assertEqual(cache.get_max_pos_of_last_change("user@foo.com"), 5) + self.assertEqual(cache.get_max_pos_of_last_change("bar@baz.net"), 5) + self.assertEqual(cache.get_max_pos_of_last_change("user@elsewhere.org"), 5) From d327c62ae745f0c2af702031833ad0b0f639ec29 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 18 Sep 2024 17:33:18 -0500 Subject: [PATCH 2/2] Add changelog --- changelog.d/17732.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17732.bugfix diff --git a/changelog.d/17732.bugfix b/changelog.d/17732.bugfix new file mode 100644 index 0000000000..572c13fc57 --- /dev/null +++ b/changelog.d/17732.bugfix @@ -0,0 +1 @@ +Fix membership caches not updating in state reset scenarios.