-
Notifications
You must be signed in to change notification settings - Fork 285
feat(sdk): Live updates for pinned events timeline with replacements and redactions #3840
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3840 +/- ##
==========================================
+ Coverage 84.06% 84.08% +0.01%
==========================================
Files 266 266
Lines 27779 27812 +33
==========================================
+ Hits 23352 23385 +33
Misses 4427 4427 ☔ View full report in Codecov by Sentry. |
664fb74
to
00199d9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you're on the right track! I'd like a few test changes, and the main logic in the EventCache
itself isn't tested independently. Please add some tests for that too, since it's a different layer than the timeline.
Also, please reword the PR title to follow our conventional commit guidelines :)
Thanks!
if let Err(err) = semaphore.acquire().await { | ||
warn!("error when loading pinned event: {err}"); | ||
return None; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty sure this is a no-op when the semaphore could be acquired, because the guard (in the Ok()
value) is not bound, so it's immediately dropped. Can you add a test for this? (I would even recommend making a separate PR for this feature, since it seems unrelated to the current PR's purpose, and could land separately.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no idea how to test that, to be honest. Since there is no timeout I think the execution will just get stuck there until a new permit is available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replaced this with a buffered stream now. It seems to work just as fine and there is no need for the semaphore.
@@ -396,6 +404,49 @@ impl EventCacheInner { | |||
continue; | |||
}; | |||
|
|||
// Handle and cache events and relations | |||
let mut relations = self.relations.write().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, this is not the right place to add this; I suppose this should be in the same place where I inserted events into Self::events
. Follow the breadcrumbs :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'll have to do that in a separate method then? I don't think I can have 2 RwLockWriteGuards
at the same time, one for all_events
and another one for relations
, so I don't think I'll be able to add this on append_events_locked_impl
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I just saw #3840 (comment).
let mut events = self.events.write().await; | ||
for ev in &joined_room_update.timeline.events { | ||
if let Some(ev_id) = ev.event_id() { | ||
events.insert(ev_id, (room_id.clone(), ev.clone())); | ||
} | ||
} | ||
drop(events); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can entirely be removed, because it's already done a few layers below.
@@ -501,6 +558,29 @@ impl RoomEventCache { | |||
None | |||
} | |||
|
|||
/// Try to find an event by id in this room, along with all relations. | |||
pub async fn event_with_relations(&self, event_id: &EventId) -> Vec<SyncTimelineEvent> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you return an (SyncTimelineEvent, Vec<SyncTimelineEvent>)
instead, so it's clear where the target event is? (Right now there's nothing either in the signature or the doc-comment that makes it obvious it's the first one.)
/// Try to find an event by id in this room, along with all relations. | ||
pub async fn event_with_relations(&self, event_id: &EventId) -> Vec<SyncTimelineEvent> { | ||
let relations = self.inner.relations.read().await; | ||
let relation_ids = relations.get(event_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you inline this where it's used, instead? It's not useful to preload it, if the event couldn't be found.
test_helper.server.reset().await; | ||
|
||
// Load new pinned event contents from sync, where $2 is and edit on $1 | ||
let _ = Box::pin( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the Box::pin
needed? Can you do without?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, leftover from one of my tests to check why the tests were stuck.
.await; | ||
|
||
// The list is reloaded, so it's reset | ||
assert_matches!(timeline_stream.next().await.unwrap(), VectorDiff::Clear); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's an assert_next
that exists and could replace most of these assert_matches
invokes here and below, and in the other test(s) too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason changing assert_matches!(timeline_stream.next().await.unwrap(), *)
to assert_next_matches!(timeline_stream, *)
makes one of the tests fail with:
assertion failed: stream is not ready, it's pending
Maybe I need to add a delay before? It's quite weird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, it's because under the hood it calls Stream::next_now_or_never
... That seems dangerous. I don't see any similar function for doing it while awaiting for the next item.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to add a timeout then, otherwise if the stream item doesn't come immediately, then the test will be timing out with cargo-nextest
's delay, which is a minute or longer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a new assert_next_matches_with_timeout!
macro for this. It's based on the assert_next_matches
, but instead of checking the immediate result of the stream, it adds a small timeout which can be customised: b1fe70a
relations | ||
.insert(orig_ev_id.to_owned(), BTreeSet::from_iter(vec![related_ev_id.to_owned()])); | ||
} | ||
drop(relations); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: no need to drop explicitly, it'll happen automatically at the end of the function's body.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, copy&paste issue. It's gone now.
@@ -496,6 +504,18 @@ impl EventCacheInner { | |||
} | |||
} | |||
} | |||
|
|||
async fn insert_or_update_relationship(&self, orig_ev_id: &EventId, related_ev_id: &EventId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please fully spell out event_id
for consistency, and original
too
(I for one would call these two parameters original
and related
, because including the parameter's type is spurious, since the parameter type is next to the variable name, but that's my preference :))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm removing this code so it shouldn't be needed anymore.
@@ -186,6 +187,8 @@ pub fn load_pinned_events_benchmark(c: &mut Criterion) { | |||
|
|||
group.bench_function(BenchmarkId::new("load_pinned_events", name), |b| { | |||
b.to_async(&runtime).iter(|| async { | |||
client.event_cache().subscribe().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there really a need to do it for each iteration of the benchmark? I don't think so, it should be subscribed to once and for all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like client.event_cache().subscribe().unwrap();
does some async operations, and the benchmark panics with:
there is no reactor running, must be called from the context of a Tokio 1.x runtime
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I can wrap Client
in an Arc
and use runtime.spawn_blocking
to run that code? Is there a better solution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried fixing this in 807c875, please let me know if there's a better solution.
d9880bd
to
b4f147f
Compare
So, funny thing: it seems like when the devs reported |
a1d1a90
to
b93ed11
Compare
b93ed11
to
7f97fba
Compare
7f97fba
to
335f94b
Compare
Ok, so I re-used this PR for holding only the part of the code related to the live updates to the pinned events timeline. It depends on a previous PR being merged and it'll have a following PR containing the semaphore/buffered futures changes. |
335f94b
to
c86c842
Compare
c86c842
to
b8f24bd
Compare
The PR this one depended on has been merged and this one has been rebased, so it's ready to review again. |
b8f24bd
to
b6f4748
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are almost good!
// This doesn't insert the event into the linked chunk. In the future there'll | ||
// be no distinction between the linked chunk and the separate cache. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
Can we add a TODO
and open an issue to track this please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is just a copy of the one above it written by @bnjbvr since it follows the same logic, but I think it's probably because we lack the context of where the LinkedChunk
containing events to save would go, or if it even makes sense to add the events to one (these events can come from very different sources and may not be related enough to be added as a chunk of a timeline, if I understood how LinkedChunk
works properly).
I guess the 'in the future' part means once we have a persistent event cache, but I'm not 100% sure.
Anyway, can create an issue for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to me, thanks! Please rebase and you can merge.
This way any reactions/redactions/edits, etc. will be taken into account when building the timeline event.
This commit contains a new `assert_next_matches_with_timeout!` macro that will take a `Stream` and wait for a little while until its next item is ready, then match it to a pattern.
…g the same `RwLock` for the cache, which should be faster
…::event`, save its results to the event cache too.
4f57f54
to
7cfcbae
Compare
This is a follow-up PR for #3870.
Changes
TimelineEventBuilder
process synced events forTimelineFocus::PinnedEvents
too.EventCache
#3870 to load pinned events and other events related to them so their final timeline event results contain the changes those would add (edits, redactions, reactions, etc.).Room::event_with_context
also save its results to the cache.room_bench
.The end result of this is the pinned events timeline now reacts to edits and redactions, and those values are kept even when the event list reloads.
Signed-off-by: