From 6a45d05769eabd3289bc249d8399d1b2daa8321c Mon Sep 17 00:00:00 2001 From: Nipunn Koorapati Date: Fri, 27 Sep 2024 13:54:51 -0700 Subject: [PATCH] Avoid resending the same value twice Comes at the cost of storing a value in memory. Seems not worth? Perhaps there's an alternate strategy where instead of holding a single massive broadcast sender, we could have multiple sub-senders (one for each query) and only broadcast on those. --- rustfmt.toml | 15 +++++++++++++++ src/client/subscription.rs | 15 ++++++++++++--- src/client/worker.rs | 3 ++- 3 files changed, 29 insertions(+), 4 deletions(-) create mode 100644 rustfmt.toml diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..8fa5544 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,15 @@ +use_field_init_shorthand = true +use_try_shorthand = true +match_block_trailing_comma = true + +# Nightly only options: +unstable_features = true +condense_wildcard_suffixes = true +format_strings = true +imports_granularity = "Crate" +reorder_impl_items = true +imports_layout = "Vertical" +group_imports = "StdExternalCrate" +wrap_comments = true +normalize_comments = false +error_on_line_overflow = true \ No newline at end of file diff --git a/src/client/subscription.rs b/src/client/subscription.rs index b361d3f..3c6e1e7 100644 --- a/src/client/subscription.rs +++ b/src/client/subscription.rs @@ -44,7 +44,8 @@ pub struct QuerySubscription { pub(super) subscriber_id: SubscriberId, pub(super) request_sender: UnboundedSender, pub(super) watch: BroadcastStream, - pub(super) initial: Option, + pub(super) sent_initial_value: bool, + pub(super) last_value: Option, } impl QuerySubscription { /// Returns an identifier for this subscription based on its query and args. @@ -84,8 +85,11 @@ impl Stream for QuerySubscription { mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> task::Poll> { - if let Some(initial) = self.initial.take() { - return task::Poll::Ready(Some(initial)); + if !self.sent_initial_value { + self.sent_initial_value = true; + if let Some(value) = self.last_value.clone() { + return task::Poll::Ready(Some(value)); + } } loop { return match self.watch.poll_next_unpin(cx) { @@ -97,6 +101,11 @@ impl Stream for QuerySubscription { // No result yet in the query result set. Keep polling. continue; }; + if Some(value) == self.last_value.as_ref() { + // Redundant + continue; + } + self.last_value = Some(value.clone()); task::Poll::Ready(Some(value.clone())) }, task::Poll::Ready(None) => task::Poll::Ready(None), diff --git a/src/client/worker.rs b/src/client/worker.rs index efa1689..560f195 100644 --- a/src/client/worker.rs +++ b/src/client/worker.rs @@ -168,7 +168,8 @@ async fn _worker_once( subscriber_id, request_sender, watch, - initial: base_client.latest_results().get(&subscriber_id).cloned(), + sent_initial_value: false, + last_value: base_client.latest_results().get(&subscriber_id).cloned(), }; let _ = tx.send(subscription); },