diff --git a/subxt/src/backend/chain_head/follow_stream_driver.rs b/subxt/src/backend/chain_head/follow_stream_driver.rs index 702ac5cdb0..8490c11799 100644 --- a/subxt/src/backend/chain_head/follow_stream_driver.rs +++ b/subxt/src/backend/chain_head/follow_stream_driver.rs @@ -196,6 +196,13 @@ impl Shared { pub fn done(&self) { let mut shared = self.0.lock().unwrap(); shared.done = true; + + // Wake up all subscribers so they get notified that the backend was closed + for details in shared.subscribers.values_mut() { + if let Some(waker) = details.waker.take() { + waker.wake(); + } + } } /// Cleanup a subscription. diff --git a/subxt/src/backend/chain_head/mod.rs b/subxt/src/backend/chain_head/mod.rs index 9c70df5bff..aacdb452f6 100644 --- a/subxt/src/backend/chain_head/mod.rs +++ b/subxt/src/backend/chain_head/mod.rs @@ -128,16 +128,16 @@ impl ChainHeadBackendBuilder { } let (backend, mut driver) = self.build(client); - spawn(async move { + // NOTE: we need to poll the driver until it's done i.e returns None + // to ensure that the backend is shutdown properly. while let Some(res) = driver.next().await { - if let Err(e) = res { - if !e.is_disconnected_will_reconnect() { - tracing::debug!(target: "subxt", "chainHead driver was closed: {e}"); - break; - } + if let Err(err) = res { + tracing::debug!(target: "subxt", "chainHead backend error={err}"); } } + + tracing::debug!(target: "subxt", "chainHead backend was closed"); }); backend diff --git a/subxt/src/backend/chain_head/rpc_methods.rs b/subxt/src/backend/chain_head/rpc_methods.rs index 900efa361c..dff0a86503 100644 --- a/subxt/src/backend/chain_head/rpc_methods.rs +++ b/subxt/src/backend/chain_head/rpc_methods.rs @@ -308,7 +308,7 @@ impl ChainHeadRpcMethods { /// The following events are related to operations: /// - OperationBodyDone: The response of the `chainHead_body` /// - OperationCallDone: The response of the `chainHead_call` -/// - OperationStorageItems: Items produced by the `chianHead_storage` +/// - OperationStorageItems: Items produced by the `chainHead_storage` /// - OperationWaitingForContinue: Generated after OperationStorageItems and requires the user to /// call `chainHead_continue` /// - OperationStorageDone: The `chainHead_storage` method has produced all the results @@ -651,7 +651,7 @@ impl Stream for FollowSubscription { if let Poll::Ready(Some(Ok(FollowEvent::Stop))) = &res { // No more events will occur after this one. - self.done = true + self.done = true; } res diff --git a/subxt/src/backend/mod.rs b/subxt/src/backend/mod.rs index eeb66eb85d..9fe15d0347 100644 --- a/subxt/src/backend/mod.rs +++ b/subxt/src/backend/mod.rs @@ -893,15 +893,7 @@ mod test { } fn build_backend_spawn_background(rpc_client: impl RpcClientT) -> ChainHeadBackend { - let (backend, mut driver) = build_backend(rpc_client); - tokio::spawn(async move { - while let Some(val) = driver.next().await { - if let Err(e) = val { - eprintln!("Error driving unstable backend: {e}; terminating client"); - } - } - }); - backend + ChainHeadBackend::builder().build_with_background_driver(rpc_client) } fn runtime_spec() -> RuntimeSpec {