diff --git a/subxt/src/backend/chain_head/follow_stream.rs b/subxt/src/backend/chain_head/follow_stream.rs index 4e21e65d11..916488a6ab 100644 --- a/subxt/src/backend/chain_head/follow_stream.rs +++ b/subxt/src/backend/chain_head/follow_stream.rs @@ -72,10 +72,6 @@ enum InnerStreamState { ReceivingEvents(FollowEventStream), /// We received a stop event. We'll send one on and restart the stream. Stopped, - /// The backend was closed and tell everything to shutdown. - BackendClosed, - /// Report error. - Error(Option), /// The stream is finished and will not restart (likely due to an error). Finished, } @@ -88,9 +84,7 @@ impl std::fmt::Debug for InnerStreamState { Self::Ready(_) => write!(f, "Ready(..)"), Self::ReceivingEvents(_) => write!(f, "ReceivingEvents(..)"), Self::Stopped { .. } => write!(f, "Stopped"), - Self::BackendClosed => write!(f, "BackendClosed"), Self::Finished => write!(f, "Finished"), - Self::Error(_) => write!(f, "Error"), } } } @@ -160,16 +154,9 @@ impl Stream for FollowStream { continue; } - // Finish forever if there's an error which is done as follows: - // - // 1) Send the FollowEvent::BackendClosed message which tells to subscriber that subscription is closed. - // 2) Send the error on the backend driver stream. - // 3) Finish the stream. - // - this.stream = InnerStreamState::Error(Some(e)); - return Poll::Ready(Some(Ok(FollowStreamMsg::Event( - FollowEvent::BackendClosed, - )))); + // Finish forever if there's an error, passing it on. + this.stream = InnerStreamState::Finished; + return Poll::Ready(Some(Err(e))); } } } @@ -192,21 +179,13 @@ impl Stream for FollowStream { continue; } Poll::Ready(Some(Ok(ev))) => { - match ev { - FollowEvent::Stop => { - // A stop event means the stream has ended, so start - // over after passing on the stop message. - this.stream = InnerStreamState::Stopped; - continue; - } - FollowEvent::BackendClosed => { - this.stream = InnerStreamState::BackendClosed; - continue; - } - _ => { - return Poll::Ready(Some(Ok(FollowStreamMsg::Event(ev)))); - } + if let FollowEvent::Stop = ev { + // A stop event means the stream has ended, so start + // over after passing on the stop message. + this.stream = InnerStreamState::Stopped; + continue; } + return Poll::Ready(Some(Ok(FollowStreamMsg::Event(ev)))); } Poll::Ready(Some(Err(e))) => { // Re-start if a reconnecting backend was enabled. @@ -225,14 +204,6 @@ impl Stream for FollowStream { this.stream = InnerStreamState::New; return Poll::Ready(Some(Ok(FollowStreamMsg::Event(FollowEvent::Stop)))); } - InnerStreamState::BackendClosed => { - this.stream = InnerStreamState::Finished; - } - InnerStreamState::Error(e) => { - let e = e.take().expect("should always be Some"); - this.stream = InnerStreamState::Finished; - return Poll::Ready(Some(Err(e))); - } InnerStreamState::Finished => { return Poll::Ready(None); } diff --git a/subxt/src/backend/chain_head/follow_stream_driver.rs b/subxt/src/backend/chain_head/follow_stream_driver.rs index fad7fa0e16..b86d4ff8e5 100644 --- a/subxt/src/backend/chain_head/follow_stream_driver.rs +++ b/subxt/src/backend/chain_head/follow_stream_driver.rs @@ -196,6 +196,13 @@ impl Shared { pub fn done(&self) { let mut shared = self.0.lock().unwrap(); shared.done = true; + + // Wake up all subscribers so they can see that we're done. + for details in shared.subscribers.values_mut() { + if let Some(waker) = details.waker.take() { + waker.wake(); + } + } } /// Cleanup a subscription. @@ -317,8 +324,7 @@ impl Shared { FollowStreamMsg::Event(ev @ FollowEvent::BestBlockChanged(_)) => { shared.block_events_for_new_subscriptions.push_back(ev); } - FollowStreamMsg::Event(FollowEvent::Stop) - | FollowStreamMsg::Event(FollowEvent::BackendClosed) => { + FollowStreamMsg::Event(FollowEvent::Stop) => { // On a stop event, clear everything. Wait for resubscription and new ready/initialised events. shared.block_events_for_new_subscriptions.clear(); shared.current_subscription_id = None; @@ -461,14 +467,7 @@ where (self.f)(FollowEvent::Initialized(init)) } - FollowStreamMsg::Event(ev) => { - if matches!(ev, FollowEvent::BackendClosed) { - self.is_done = true; - return Poll::Ready(None); - }; - - (self.f)(ev) - } + FollowStreamMsg::Event(ev) => (self.f)(ev), }; if block_refs.is_empty() { diff --git a/subxt/src/backend/chain_head/follow_stream_unpin.rs b/subxt/src/backend/chain_head/follow_stream_unpin.rs index 317782190f..0ae0ef0a89 100644 --- a/subxt/src/backend/chain_head/follow_stream_unpin.rs +++ b/subxt/src/backend/chain_head/follow_stream_unpin.rs @@ -223,18 +223,6 @@ impl Stream for FollowStreamUnpin { FollowStreamMsg::Event(FollowEvent::Stop) } - FollowStreamMsg::Event(FollowEvent::BackendClosed) => { - // clear out "old" things that are no longer applicable since - // the subscription has ended (a new one will be created under the hood, at - // which point we'll get given a new subscription ID. - this.subscription_id = None; - this.pinned.clear(); - this.unpin_futs.clear(); - this.unpin_flags.lock().unwrap().clear(); - this.next_rel_block_age = 0; - - FollowStreamMsg::Event(FollowEvent::BackendClosed) - } // These events aren't interesting; we just forward them on: FollowStreamMsg::Event(FollowEvent::OperationBodyDone(details)) => { FollowStreamMsg::Event(FollowEvent::OperationBodyDone(details)) diff --git a/subxt/src/backend/chain_head/mod.rs b/subxt/src/backend/chain_head/mod.rs index 621c8c510e..aad0bd0f35 100644 --- a/subxt/src/backend/chain_head/mod.rs +++ b/subxt/src/backend/chain_head/mod.rs @@ -128,16 +128,16 @@ impl ChainHeadBackendBuilder { } let (backend, mut driver) = self.build(client); - spawn(async move { + // NOTE: we need to poll the driver until it's done i.e returns None + // to ensure that the backend is shutdown properly. while let Some(res) = driver.next().await { - if let Err(e) = res { - if !e.is_disconnected_will_reconnect() { - tracing::debug!(target: "subxt", "chainHead driver was closed: {e}"); - break; - } + if let Err(err) = res { + tracing::debug!(target: "subxt", "ChainHeadBackendDriver got error={err}"); } } + + tracing::debug!(target: "subxt", "ChainHeadBackendDriver closed"); }); backend @@ -614,7 +614,7 @@ impl Backend for ChainHeadBackend { ); } } - FollowEvent::Stop | FollowEvent::BackendClosed => { + FollowEvent::Stop => { // If we get this event, we'll lose all of our existing pinned blocks and have a gap // in which we may lose the finalized block that the TX is in. For now, just error if // this happens, to prevent the case in which we never see a finalized block and wait diff --git a/subxt/src/backend/chain_head/rpc_methods.rs b/subxt/src/backend/chain_head/rpc_methods.rs index b7adc15a4e..dff0a86503 100644 --- a/subxt/src/backend/chain_head/rpc_methods.rs +++ b/subxt/src/backend/chain_head/rpc_methods.rs @@ -354,12 +354,6 @@ pub enum FollowEvent { /// The subscription is dropped and no further events /// will be generated. Stop, - /// Internal state which should not be exposed to the user - /// and not part of the JSON-RPC spec for the `chainHead_v1_follow`. - /// - /// The backend was closed and tell everything to shutdown. - #[doc(hidden)] - BackendClosed, } /// Contain information about the latest finalized block. @@ -655,13 +649,9 @@ impl Stream for FollowSubscription { let res = self.sub.poll_next_unpin(cx); - match &res { + if let Poll::Ready(Some(Ok(FollowEvent::Stop))) = &res { // No more events will occur after this one. - Poll::Ready(Some(Ok(FollowEvent::Stop))) - | Poll::Ready(Some(Ok(FollowEvent::BackendClosed))) => { - self.done = true; - } - _ => {} + self.done = true; } res