From 2fd0cdd3e983fdc03322b3181ea4a1a0d4edb55b Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 8 Oct 2024 16:44:06 +0200 Subject: [PATCH 1/9] FollowEvent::stop include backend closed or not --- subxt/src/backend/chain_head/follow_stream.rs | 56 +++++++++++++------ .../chain_head/follow_stream_driver.rs | 50 +++++++++++++++-- .../backend/chain_head/follow_stream_unpin.rs | 4 +- subxt/src/backend/chain_head/mod.rs | 2 +- subxt/src/backend/chain_head/rpc_methods.rs | 7 ++- subxt/src/backend/mod.rs | 25 +++++---- 6 files changed, 106 insertions(+), 38 deletions(-) diff --git a/subxt/src/backend/chain_head/follow_stream.rs b/subxt/src/backend/chain_head/follow_stream.rs index f68d3ba0be..89f7e1163b 100644 --- a/subxt/src/backend/chain_head/follow_stream.rs +++ b/subxt/src/backend/chain_head/follow_stream.rs @@ -71,7 +71,9 @@ enum InnerStreamState { /// We are polling for, and receiving events from the stream. ReceivingEvents(FollowEventStream), /// We received a stop event. We'll send one on and restart the stream. - Stopped, + Stopped { restart: bool }, + /// Report error. + Error(Option), /// The stream is finished and will not restart (likely due to an error). Finished, } @@ -83,8 +85,9 @@ impl std::fmt::Debug for InnerStreamState { Self::Initializing(_) => write!(f, "Initializing(..)"), Self::Ready(_) => write!(f, "Ready(..)"), Self::ReceivingEvents(_) => write!(f, "ReceivingEvents(..)"), - Self::Stopped => write!(f, "Stopped"), + Self::Stopped { .. } => write!(f, "Stopped"), Self::Finished => write!(f, "Finished"), + Self::Error(_) => write!(f, "Error"), } } } @@ -150,13 +153,20 @@ impl Stream for FollowStream { Poll::Ready(Err(e)) => { // Re-start if a reconnecting backend was enabled. if e.is_disconnected_will_reconnect() { - this.stream = InnerStreamState::Stopped; + this.stream = InnerStreamState::Stopped { restart: true }; continue; } - // Finish forever if there's an error, passing it on. - this.stream = InnerStreamState::Finished; - return Poll::Ready(Some(Err(e))); + // Finish forever if there's an error which is done as follows: + // + // 1) Send the FollowEvent::Stop message which is propagated to the subscriber + // 2) Send the error on the stream + // 3) Finish the stream + // + this.stream = InnerStreamState::Error(Some(e)); + return Poll::Ready(Some(Ok(FollowStreamMsg::Event( + FollowEvent::Stop { restart: false }, + )))); } } } @@ -175,14 +185,14 @@ impl Stream for FollowStream { Poll::Ready(None) => { // No error happened but the stream ended; restart and // pass on a Stop message anyway. - this.stream = InnerStreamState::Stopped; + this.stream = InnerStreamState::Stopped { restart: true }; continue; } Poll::Ready(Some(Ok(ev))) => { - if let FollowEvent::Stop = ev { + if let FollowEvent::Stop { restart } = ev { // A stop event means the stream has ended, so start // over after passing on the stop message. - this.stream = InnerStreamState::Stopped; + this.stream = InnerStreamState::Stopped { restart }; continue; } return Poll::Ready(Some(Ok(FollowStreamMsg::Event(ev)))); @@ -190,7 +200,7 @@ impl Stream for FollowStream { Poll::Ready(Some(Err(e))) => { // Re-start if a reconnecting backend was enabled. if e.is_disconnected_will_reconnect() { - this.stream = InnerStreamState::Stopped; + this.stream = InnerStreamState::Stopped { restart: true }; continue; } @@ -200,9 +210,21 @@ impl Stream for FollowStream { } } } - InnerStreamState::Stopped => { - this.stream = InnerStreamState::New; - return Poll::Ready(Some(Ok(FollowStreamMsg::Event(FollowEvent::Stop)))); + InnerStreamState::Stopped { restart } => { + let restart = *restart; + if restart { + this.stream = InnerStreamState::New; + } else { + this.stream = InnerStreamState::Finished; + } + return Poll::Ready(Some(Ok(FollowStreamMsg::Event(FollowEvent::Stop { + restart, + })))); + } + InnerStreamState::Error(e) => { + let e = e.take().expect("should always be Some"); + this.stream = InnerStreamState::Finished; + return Poll::Ready(Some(Err(e))); } InnerStreamState::Finished => { return Poll::Ready(None); @@ -303,8 +325,8 @@ pub mod test { [ Ok(ev_initialized(1)), // Stop should lead to a drop and resubscribe: - Ok(FollowEvent::Stop), - Ok(FollowEvent::Stop), + Ok(FollowEvent::Stop { restart: true }), + Ok(FollowEvent::Stop { restart: true }), Ok(ev_new_block(1, 2)), // Nothing should be emitted after an error: Err(Error::Other("ended".to_owned())), @@ -321,9 +343,9 @@ pub mod test { vec![ FollowStreamMsg::Ready("sub_id_0".to_owned()), FollowStreamMsg::Event(ev_initialized(1)), - FollowStreamMsg::Event(FollowEvent::Stop), + FollowStreamMsg::Event(FollowEvent::Stop { restart: true }), FollowStreamMsg::Ready("sub_id_2".to_owned()), - FollowStreamMsg::Event(FollowEvent::Stop), + FollowStreamMsg::Event(FollowEvent::Stop { restart: true }), FollowStreamMsg::Ready("sub_id_3".to_owned()), FollowStreamMsg::Event(ev_new_block(1, 2)), ] diff --git a/subxt/src/backend/chain_head/follow_stream_driver.rs b/subxt/src/backend/chain_head/follow_stream_driver.rs index 702ac5cdb0..8544d320d6 100644 --- a/subxt/src/backend/chain_head/follow_stream_driver.rs +++ b/subxt/src/backend/chain_head/follow_stream_driver.rs @@ -317,7 +317,7 @@ impl Shared { FollowStreamMsg::Event(ev @ FollowEvent::BestBlockChanged(_)) => { shared.block_events_for_new_subscriptions.push_back(ev); } - FollowStreamMsg::Event(FollowEvent::Stop) => { + FollowStreamMsg::Event(FollowEvent::Stop { .. }) => { // On a stop event, clear everything. Wait for resubscription and new ready/initialised events. shared.block_events_for_new_subscriptions.clear(); shared.current_subscription_id = None; @@ -460,7 +460,14 @@ where (self.f)(FollowEvent::Initialized(init)) } - FollowStreamMsg::Event(ev) => (self.f)(ev), + FollowStreamMsg::Event(ev) => { + if matches!(ev, FollowEvent::Stop { restart: false }) { + self.is_done = true; + return Poll::Ready(None); + }; + + (self.f)(ev) + } }; if block_refs.is_empty() { @@ -655,7 +662,7 @@ mod test { Ok(ev_new_block(0, 1)), Ok(ev_best_block(1)), Ok(ev_finalized([1], [])), - Ok(FollowEvent::Stop), + Ok(FollowEvent::Stop { restart: true }), Ok(ev_initialized(1)), Ok(ev_finalized([2], [])), Err(Error::Other("ended".to_owned())), @@ -700,7 +707,7 @@ mod test { || { [ Ok(ev_initialized(0)), - Ok(FollowEvent::Stop), + Ok(FollowEvent::Stop { restart: true }), // Emulate that we missed some blocks. Ok(ev_initialized(13)), Ok(ev_finalized([14], [])), @@ -742,4 +749,39 @@ mod test { ) ); } + + #[tokio::test] + async fn subscribe_finalized_blocks_no_restart() { + let mut driver = test_follow_stream_driver_getter( + || { + [ + Ok(ev_initialized(0)), + Ok(FollowEvent::Stop { restart: false }), + ] + }, + 10, + ); + + let handle = driver.handle(); + + tokio::spawn(async move { while driver.next().await.is_some() {} }); + + let f = |ev| match ev { + FollowEvent::Finalized(ev) => ev.finalized_block_hashes, + FollowEvent::Initialized(ev) => ev.finalized_block_hashes, + _ => vec![], + }; + + let stream = FollowStreamFinalizedHeads::new(handle.subscribe(), f); + let evs: Vec<_> = stream.try_collect().await.unwrap(); + + assert_eq!(evs.len(), 1); + assert_eq!( + evs[0], + ( + "sub_id_0".to_string(), + vec![BlockRef::new(H256::from_low_u64_le(0))] + ) + ); + } } diff --git a/subxt/src/backend/chain_head/follow_stream_unpin.rs b/subxt/src/backend/chain_head/follow_stream_unpin.rs index 0ae0ef0a89..b2fb6c11c6 100644 --- a/subxt/src/backend/chain_head/follow_stream_unpin.rs +++ b/subxt/src/backend/chain_head/follow_stream_unpin.rs @@ -211,7 +211,7 @@ impl Stream for FollowStreamUnpin { pruned_block_hashes: pruned_block_refs, })) } - FollowStreamMsg::Event(FollowEvent::Stop) => { + FollowStreamMsg::Event(FollowEvent::Stop { restart }) => { // clear out "old" things that are no longer applicable since // the subscription has ended (a new one will be created under the hood, at // which point we'll get given a new subscription ID. @@ -221,7 +221,7 @@ impl Stream for FollowStreamUnpin { this.unpin_flags.lock().unwrap().clear(); this.next_rel_block_age = 0; - FollowStreamMsg::Event(FollowEvent::Stop) + FollowStreamMsg::Event(FollowEvent::Stop { restart }) } // These events aren't interesting; we just forward them on: FollowStreamMsg::Event(FollowEvent::OperationBodyDone(details)) => { diff --git a/subxt/src/backend/chain_head/mod.rs b/subxt/src/backend/chain_head/mod.rs index 9c70df5bff..592ee09482 100644 --- a/subxt/src/backend/chain_head/mod.rs +++ b/subxt/src/backend/chain_head/mod.rs @@ -614,7 +614,7 @@ impl Backend for ChainHeadBackend { ); } } - FollowEvent::Stop => { + FollowEvent::Stop { .. } => { // If we get this event, we'll lose all of our existing pinned blocks and have a gap // in which we may lose the finalized block that the TX is in. For now, just error if // this happens, to prevent the case in which we never see a finalized block and wait diff --git a/subxt/src/backend/chain_head/rpc_methods.rs b/subxt/src/backend/chain_head/rpc_methods.rs index 900efa361c..6bd75fc2dc 100644 --- a/subxt/src/backend/chain_head/rpc_methods.rs +++ b/subxt/src/backend/chain_head/rpc_methods.rs @@ -353,7 +353,10 @@ pub enum FollowEvent { OperationError(OperationError), /// The subscription is dropped and no further events /// will be generated. - Stop, + Stop { + /// Whether the stopped subscription will be restarted. + restart: bool, + }, } /// Contain information about the latest finalized block. @@ -649,7 +652,7 @@ impl Stream for FollowSubscription { let res = self.sub.poll_next_unpin(cx); - if let Poll::Ready(Some(Ok(FollowEvent::Stop))) = &res { + if let Poll::Ready(Some(Ok(FollowEvent::Stop { .. }))) = &res { // No more events will occur after this one. self.done = true } diff --git a/subxt/src/backend/mod.rs b/subxt/src/backend/mod.rs index eeb66eb85d..53fafc9cf8 100644 --- a/subxt/src/backend/mod.rs +++ b/subxt/src/backend/mod.rs @@ -893,15 +893,7 @@ mod test { } fn build_backend_spawn_background(rpc_client: impl RpcClientT) -> ChainHeadBackend { - let (backend, mut driver) = build_backend(rpc_client); - tokio::spawn(async move { - while let Some(val) = driver.next().await { - if let Err(e) = val { - eprintln!("Error driving unstable backend: {e}; terminating client"); - } - } - }); - backend + ChainHeadBackend::builder().build_with_background_driver(rpc_client) } fn runtime_spec() -> RuntimeSpec { @@ -1023,7 +1015,10 @@ mod test { )]; let mock_subscription_data = vec![( "chainHead_v1_storage", - Message::Many(Ok(vec![Ok(operation_error("Id1")), Ok(FollowEvent::Stop)])), + Message::Many(Ok(vec![ + Ok(operation_error("Id1")), + Ok(FollowEvent::Stop { restart: true }), + ])), )]; let rpc_client = setup_mock_rpc_client(false) .add_method("chainHead_v1_storage", |data, sub, _| { @@ -1187,7 +1182,10 @@ mod test { ), ( "chainHead_v1_storage", - Message::Many(Ok(vec![Ok(operation_error("Id1")), Ok(FollowEvent::Stop)])), + Message::Many(Ok(vec![ + Ok(operation_error("Id1")), + Ok(FollowEvent::Stop { restart: true }), + ])), ), ( "chainHead_v1_storage", @@ -1350,7 +1348,10 @@ mod test { let mock_data = vec![ ( "chainHead_v1_storage", - Message::Many(Ok(vec![Ok(operation_error("Id1")), Ok(FollowEvent::Stop)])), + Message::Many(Ok(vec![ + Ok(operation_error("Id1")), + Ok(FollowEvent::Stop { restart: true }), + ])), ), ( "chainHead_v1_storage", From be5fb904e65bbe51a047ffb293c15b80240b4a84 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 9 Oct 2024 10:25:59 +0200 Subject: [PATCH 2/9] simplify code: enum variant BackendClosed --- subxt/src/backend/chain_head/follow_stream.rs | 63 ++++++++++--------- .../chain_head/follow_stream_driver.rs | 16 ++--- .../backend/chain_head/follow_stream_unpin.rs | 16 ++++- subxt/src/backend/chain_head/mod.rs | 2 +- subxt/src/backend/chain_head/rpc_methods.rs | 13 ++-- subxt/src/backend/mod.rs | 15 +---- 6 files changed, 67 insertions(+), 58 deletions(-) diff --git a/subxt/src/backend/chain_head/follow_stream.rs b/subxt/src/backend/chain_head/follow_stream.rs index 89f7e1163b..4e21e65d11 100644 --- a/subxt/src/backend/chain_head/follow_stream.rs +++ b/subxt/src/backend/chain_head/follow_stream.rs @@ -71,7 +71,9 @@ enum InnerStreamState { /// We are polling for, and receiving events from the stream. ReceivingEvents(FollowEventStream), /// We received a stop event. We'll send one on and restart the stream. - Stopped { restart: bool }, + Stopped, + /// The backend was closed and tell everything to shutdown. + BackendClosed, /// Report error. Error(Option), /// The stream is finished and will not restart (likely due to an error). @@ -86,6 +88,7 @@ impl std::fmt::Debug for InnerStreamState { Self::Ready(_) => write!(f, "Ready(..)"), Self::ReceivingEvents(_) => write!(f, "ReceivingEvents(..)"), Self::Stopped { .. } => write!(f, "Stopped"), + Self::BackendClosed => write!(f, "BackendClosed"), Self::Finished => write!(f, "Finished"), Self::Error(_) => write!(f, "Error"), } @@ -153,19 +156,19 @@ impl Stream for FollowStream { Poll::Ready(Err(e)) => { // Re-start if a reconnecting backend was enabled. if e.is_disconnected_will_reconnect() { - this.stream = InnerStreamState::Stopped { restart: true }; + this.stream = InnerStreamState::Stopped; continue; } // Finish forever if there's an error which is done as follows: // - // 1) Send the FollowEvent::Stop message which is propagated to the subscriber - // 2) Send the error on the stream - // 3) Finish the stream + // 1) Send the FollowEvent::BackendClosed message which tells to subscriber that subscription is closed. + // 2) Send the error on the backend driver stream. + // 3) Finish the stream. // this.stream = InnerStreamState::Error(Some(e)); return Poll::Ready(Some(Ok(FollowStreamMsg::Event( - FollowEvent::Stop { restart: false }, + FollowEvent::BackendClosed, )))); } } @@ -185,22 +188,30 @@ impl Stream for FollowStream { Poll::Ready(None) => { // No error happened but the stream ended; restart and // pass on a Stop message anyway. - this.stream = InnerStreamState::Stopped { restart: true }; + this.stream = InnerStreamState::Stopped; continue; } Poll::Ready(Some(Ok(ev))) => { - if let FollowEvent::Stop { restart } = ev { - // A stop event means the stream has ended, so start - // over after passing on the stop message. - this.stream = InnerStreamState::Stopped { restart }; - continue; + match ev { + FollowEvent::Stop => { + // A stop event means the stream has ended, so start + // over after passing on the stop message. + this.stream = InnerStreamState::Stopped; + continue; + } + FollowEvent::BackendClosed => { + this.stream = InnerStreamState::BackendClosed; + continue; + } + _ => { + return Poll::Ready(Some(Ok(FollowStreamMsg::Event(ev)))); + } } - return Poll::Ready(Some(Ok(FollowStreamMsg::Event(ev)))); } Poll::Ready(Some(Err(e))) => { // Re-start if a reconnecting backend was enabled. if e.is_disconnected_will_reconnect() { - this.stream = InnerStreamState::Stopped { restart: true }; + this.stream = InnerStreamState::Stopped; continue; } @@ -210,16 +221,12 @@ impl Stream for FollowStream { } } } - InnerStreamState::Stopped { restart } => { - let restart = *restart; - if restart { - this.stream = InnerStreamState::New; - } else { - this.stream = InnerStreamState::Finished; - } - return Poll::Ready(Some(Ok(FollowStreamMsg::Event(FollowEvent::Stop { - restart, - })))); + InnerStreamState::Stopped => { + this.stream = InnerStreamState::New; + return Poll::Ready(Some(Ok(FollowStreamMsg::Event(FollowEvent::Stop)))); + } + InnerStreamState::BackendClosed => { + this.stream = InnerStreamState::Finished; } InnerStreamState::Error(e) => { let e = e.take().expect("should always be Some"); @@ -325,8 +332,8 @@ pub mod test { [ Ok(ev_initialized(1)), // Stop should lead to a drop and resubscribe: - Ok(FollowEvent::Stop { restart: true }), - Ok(FollowEvent::Stop { restart: true }), + Ok(FollowEvent::Stop), + Ok(FollowEvent::Stop), Ok(ev_new_block(1, 2)), // Nothing should be emitted after an error: Err(Error::Other("ended".to_owned())), @@ -343,9 +350,9 @@ pub mod test { vec![ FollowStreamMsg::Ready("sub_id_0".to_owned()), FollowStreamMsg::Event(ev_initialized(1)), - FollowStreamMsg::Event(FollowEvent::Stop { restart: true }), + FollowStreamMsg::Event(FollowEvent::Stop), FollowStreamMsg::Ready("sub_id_2".to_owned()), - FollowStreamMsg::Event(FollowEvent::Stop { restart: true }), + FollowStreamMsg::Event(FollowEvent::Stop), FollowStreamMsg::Ready("sub_id_3".to_owned()), FollowStreamMsg::Event(ev_new_block(1, 2)), ] diff --git a/subxt/src/backend/chain_head/follow_stream_driver.rs b/subxt/src/backend/chain_head/follow_stream_driver.rs index 8544d320d6..fad7fa0e16 100644 --- a/subxt/src/backend/chain_head/follow_stream_driver.rs +++ b/subxt/src/backend/chain_head/follow_stream_driver.rs @@ -317,7 +317,8 @@ impl Shared { FollowStreamMsg::Event(ev @ FollowEvent::BestBlockChanged(_)) => { shared.block_events_for_new_subscriptions.push_back(ev); } - FollowStreamMsg::Event(FollowEvent::Stop { .. }) => { + FollowStreamMsg::Event(FollowEvent::Stop) + | FollowStreamMsg::Event(FollowEvent::BackendClosed) => { // On a stop event, clear everything. Wait for resubscription and new ready/initialised events. shared.block_events_for_new_subscriptions.clear(); shared.current_subscription_id = None; @@ -461,7 +462,7 @@ where (self.f)(FollowEvent::Initialized(init)) } FollowStreamMsg::Event(ev) => { - if matches!(ev, FollowEvent::Stop { restart: false }) { + if matches!(ev, FollowEvent::BackendClosed) { self.is_done = true; return Poll::Ready(None); }; @@ -662,7 +663,7 @@ mod test { Ok(ev_new_block(0, 1)), Ok(ev_best_block(1)), Ok(ev_finalized([1], [])), - Ok(FollowEvent::Stop { restart: true }), + Ok(FollowEvent::Stop), Ok(ev_initialized(1)), Ok(ev_finalized([2], [])), Err(Error::Other("ended".to_owned())), @@ -707,7 +708,7 @@ mod test { || { [ Ok(ev_initialized(0)), - Ok(FollowEvent::Stop { restart: true }), + Ok(FollowEvent::Stop), // Emulate that we missed some blocks. Ok(ev_initialized(13)), Ok(ev_finalized([14], [])), @@ -753,12 +754,7 @@ mod test { #[tokio::test] async fn subscribe_finalized_blocks_no_restart() { let mut driver = test_follow_stream_driver_getter( - || { - [ - Ok(ev_initialized(0)), - Ok(FollowEvent::Stop { restart: false }), - ] - }, + || [Ok(ev_initialized(0)), Ok(FollowEvent::BackendClosed)], 10, ); diff --git a/subxt/src/backend/chain_head/follow_stream_unpin.rs b/subxt/src/backend/chain_head/follow_stream_unpin.rs index b2fb6c11c6..317782190f 100644 --- a/subxt/src/backend/chain_head/follow_stream_unpin.rs +++ b/subxt/src/backend/chain_head/follow_stream_unpin.rs @@ -211,7 +211,7 @@ impl Stream for FollowStreamUnpin { pruned_block_hashes: pruned_block_refs, })) } - FollowStreamMsg::Event(FollowEvent::Stop { restart }) => { + FollowStreamMsg::Event(FollowEvent::Stop) => { // clear out "old" things that are no longer applicable since // the subscription has ended (a new one will be created under the hood, at // which point we'll get given a new subscription ID. @@ -221,7 +221,19 @@ impl Stream for FollowStreamUnpin { this.unpin_flags.lock().unwrap().clear(); this.next_rel_block_age = 0; - FollowStreamMsg::Event(FollowEvent::Stop { restart }) + FollowStreamMsg::Event(FollowEvent::Stop) + } + FollowStreamMsg::Event(FollowEvent::BackendClosed) => { + // clear out "old" things that are no longer applicable since + // the subscription has ended (a new one will be created under the hood, at + // which point we'll get given a new subscription ID. + this.subscription_id = None; + this.pinned.clear(); + this.unpin_futs.clear(); + this.unpin_flags.lock().unwrap().clear(); + this.next_rel_block_age = 0; + + FollowStreamMsg::Event(FollowEvent::BackendClosed) } // These events aren't interesting; we just forward them on: FollowStreamMsg::Event(FollowEvent::OperationBodyDone(details)) => { diff --git a/subxt/src/backend/chain_head/mod.rs b/subxt/src/backend/chain_head/mod.rs index 592ee09482..621c8c510e 100644 --- a/subxt/src/backend/chain_head/mod.rs +++ b/subxt/src/backend/chain_head/mod.rs @@ -614,7 +614,7 @@ impl Backend for ChainHeadBackend { ); } } - FollowEvent::Stop { .. } => { + FollowEvent::Stop | FollowEvent::BackendClosed => { // If we get this event, we'll lose all of our existing pinned blocks and have a gap // in which we may lose the finalized block that the TX is in. For now, just error if // this happens, to prevent the case in which we never see a finalized block and wait diff --git a/subxt/src/backend/chain_head/rpc_methods.rs b/subxt/src/backend/chain_head/rpc_methods.rs index 6bd75fc2dc..18bb352885 100644 --- a/subxt/src/backend/chain_head/rpc_methods.rs +++ b/subxt/src/backend/chain_head/rpc_methods.rs @@ -308,7 +308,7 @@ impl ChainHeadRpcMethods { /// The following events are related to operations: /// - OperationBodyDone: The response of the `chainHead_body` /// - OperationCallDone: The response of the `chainHead_call` -/// - OperationStorageItems: Items produced by the `chianHead_storage` +/// - OperationStorageItems: Items produced by the `chainHead_storage` /// - OperationWaitingForContinue: Generated after OperationStorageItems and requires the user to /// call `chainHead_continue` /// - OperationStorageDone: The `chainHead_storage` method has produced all the results @@ -353,10 +353,13 @@ pub enum FollowEvent { OperationError(OperationError), /// The subscription is dropped and no further events /// will be generated. - Stop { - /// Whether the stopped subscription will be restarted. - restart: bool, - }, + Stop, + /// Internal state which should not be exposed to the user + /// and not part of the JSON-RPC spec for the `chainHead_v1_follow`. + /// + /// The backend was closed and tell everything to shutdown. + #[doc(hidden)] + BackendClosed, } /// Contain information about the latest finalized block. diff --git a/subxt/src/backend/mod.rs b/subxt/src/backend/mod.rs index 53fafc9cf8..9fe15d0347 100644 --- a/subxt/src/backend/mod.rs +++ b/subxt/src/backend/mod.rs @@ -1015,10 +1015,7 @@ mod test { )]; let mock_subscription_data = vec![( "chainHead_v1_storage", - Message::Many(Ok(vec![ - Ok(operation_error("Id1")), - Ok(FollowEvent::Stop { restart: true }), - ])), + Message::Many(Ok(vec![Ok(operation_error("Id1")), Ok(FollowEvent::Stop)])), )]; let rpc_client = setup_mock_rpc_client(false) .add_method("chainHead_v1_storage", |data, sub, _| { @@ -1182,10 +1179,7 @@ mod test { ), ( "chainHead_v1_storage", - Message::Many(Ok(vec![ - Ok(operation_error("Id1")), - Ok(FollowEvent::Stop { restart: true }), - ])), + Message::Many(Ok(vec![Ok(operation_error("Id1")), Ok(FollowEvent::Stop)])), ), ( "chainHead_v1_storage", @@ -1348,10 +1342,7 @@ mod test { let mock_data = vec![ ( "chainHead_v1_storage", - Message::Many(Ok(vec![ - Ok(operation_error("Id1")), - Ok(FollowEvent::Stop { restart: true }), - ])), + Message::Many(Ok(vec![Ok(operation_error("Id1")), Ok(FollowEvent::Stop)])), ), ( "chainHead_v1_storage", From cd713b98223d97b2d40e697baacd7ac517c87b5b Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 9 Oct 2024 10:47:13 +0200 Subject: [PATCH 3/9] check both stopped and backend closed --- subxt/src/backend/chain_head/rpc_methods.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/subxt/src/backend/chain_head/rpc_methods.rs b/subxt/src/backend/chain_head/rpc_methods.rs index 18bb352885..b7adc15a4e 100644 --- a/subxt/src/backend/chain_head/rpc_methods.rs +++ b/subxt/src/backend/chain_head/rpc_methods.rs @@ -655,9 +655,13 @@ impl Stream for FollowSubscription { let res = self.sub.poll_next_unpin(cx); - if let Poll::Ready(Some(Ok(FollowEvent::Stop { .. }))) = &res { + match &res { // No more events will occur after this one. - self.done = true + Poll::Ready(Some(Ok(FollowEvent::Stop))) + | Poll::Ready(Some(Ok(FollowEvent::BackendClosed))) => { + self.done = true; + } + _ => {} } res From 97a505d58798ab2bae4d776204abbafc76ad9cf7 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 10 Oct 2024 09:59:30 +0200 Subject: [PATCH 4/9] simplify hacky code --- subxt/src/backend/chain_head/follow_stream.rs | 47 ++++--------------- .../chain_head/follow_stream_driver.rs | 19 ++++---- .../backend/chain_head/follow_stream_unpin.rs | 12 ----- subxt/src/backend/chain_head/mod.rs | 14 +++--- subxt/src/backend/chain_head/rpc_methods.rs | 14 +----- 5 files changed, 27 insertions(+), 79 deletions(-) diff --git a/subxt/src/backend/chain_head/follow_stream.rs b/subxt/src/backend/chain_head/follow_stream.rs index 4e21e65d11..916488a6ab 100644 --- a/subxt/src/backend/chain_head/follow_stream.rs +++ b/subxt/src/backend/chain_head/follow_stream.rs @@ -72,10 +72,6 @@ enum InnerStreamState { ReceivingEvents(FollowEventStream), /// We received a stop event. We'll send one on and restart the stream. Stopped, - /// The backend was closed and tell everything to shutdown. - BackendClosed, - /// Report error. - Error(Option), /// The stream is finished and will not restart (likely due to an error). Finished, } @@ -88,9 +84,7 @@ impl std::fmt::Debug for InnerStreamState { Self::Ready(_) => write!(f, "Ready(..)"), Self::ReceivingEvents(_) => write!(f, "ReceivingEvents(..)"), Self::Stopped { .. } => write!(f, "Stopped"), - Self::BackendClosed => write!(f, "BackendClosed"), Self::Finished => write!(f, "Finished"), - Self::Error(_) => write!(f, "Error"), } } } @@ -160,16 +154,9 @@ impl Stream for FollowStream { continue; } - // Finish forever if there's an error which is done as follows: - // - // 1) Send the FollowEvent::BackendClosed message which tells to subscriber that subscription is closed. - // 2) Send the error on the backend driver stream. - // 3) Finish the stream. - // - this.stream = InnerStreamState::Error(Some(e)); - return Poll::Ready(Some(Ok(FollowStreamMsg::Event( - FollowEvent::BackendClosed, - )))); + // Finish forever if there's an error, passing it on. + this.stream = InnerStreamState::Finished; + return Poll::Ready(Some(Err(e))); } } } @@ -192,21 +179,13 @@ impl Stream for FollowStream { continue; } Poll::Ready(Some(Ok(ev))) => { - match ev { - FollowEvent::Stop => { - // A stop event means the stream has ended, so start - // over after passing on the stop message. - this.stream = InnerStreamState::Stopped; - continue; - } - FollowEvent::BackendClosed => { - this.stream = InnerStreamState::BackendClosed; - continue; - } - _ => { - return Poll::Ready(Some(Ok(FollowStreamMsg::Event(ev)))); - } + if let FollowEvent::Stop = ev { + // A stop event means the stream has ended, so start + // over after passing on the stop message. + this.stream = InnerStreamState::Stopped; + continue; } + return Poll::Ready(Some(Ok(FollowStreamMsg::Event(ev)))); } Poll::Ready(Some(Err(e))) => { // Re-start if a reconnecting backend was enabled. @@ -225,14 +204,6 @@ impl Stream for FollowStream { this.stream = InnerStreamState::New; return Poll::Ready(Some(Ok(FollowStreamMsg::Event(FollowEvent::Stop)))); } - InnerStreamState::BackendClosed => { - this.stream = InnerStreamState::Finished; - } - InnerStreamState::Error(e) => { - let e = e.take().expect("should always be Some"); - this.stream = InnerStreamState::Finished; - return Poll::Ready(Some(Err(e))); - } InnerStreamState::Finished => { return Poll::Ready(None); } diff --git a/subxt/src/backend/chain_head/follow_stream_driver.rs b/subxt/src/backend/chain_head/follow_stream_driver.rs index fad7fa0e16..b86d4ff8e5 100644 --- a/subxt/src/backend/chain_head/follow_stream_driver.rs +++ b/subxt/src/backend/chain_head/follow_stream_driver.rs @@ -196,6 +196,13 @@ impl Shared { pub fn done(&self) { let mut shared = self.0.lock().unwrap(); shared.done = true; + + // Wake up all subscribers so they can see that we're done. + for details in shared.subscribers.values_mut() { + if let Some(waker) = details.waker.take() { + waker.wake(); + } + } } /// Cleanup a subscription. @@ -317,8 +324,7 @@ impl Shared { FollowStreamMsg::Event(ev @ FollowEvent::BestBlockChanged(_)) => { shared.block_events_for_new_subscriptions.push_back(ev); } - FollowStreamMsg::Event(FollowEvent::Stop) - | FollowStreamMsg::Event(FollowEvent::BackendClosed) => { + FollowStreamMsg::Event(FollowEvent::Stop) => { // On a stop event, clear everything. Wait for resubscription and new ready/initialised events. shared.block_events_for_new_subscriptions.clear(); shared.current_subscription_id = None; @@ -461,14 +467,7 @@ where (self.f)(FollowEvent::Initialized(init)) } - FollowStreamMsg::Event(ev) => { - if matches!(ev, FollowEvent::BackendClosed) { - self.is_done = true; - return Poll::Ready(None); - }; - - (self.f)(ev) - } + FollowStreamMsg::Event(ev) => (self.f)(ev), }; if block_refs.is_empty() { diff --git a/subxt/src/backend/chain_head/follow_stream_unpin.rs b/subxt/src/backend/chain_head/follow_stream_unpin.rs index 317782190f..0ae0ef0a89 100644 --- a/subxt/src/backend/chain_head/follow_stream_unpin.rs +++ b/subxt/src/backend/chain_head/follow_stream_unpin.rs @@ -223,18 +223,6 @@ impl Stream for FollowStreamUnpin { FollowStreamMsg::Event(FollowEvent::Stop) } - FollowStreamMsg::Event(FollowEvent::BackendClosed) => { - // clear out "old" things that are no longer applicable since - // the subscription has ended (a new one will be created under the hood, at - // which point we'll get given a new subscription ID. - this.subscription_id = None; - this.pinned.clear(); - this.unpin_futs.clear(); - this.unpin_flags.lock().unwrap().clear(); - this.next_rel_block_age = 0; - - FollowStreamMsg::Event(FollowEvent::BackendClosed) - } // These events aren't interesting; we just forward them on: FollowStreamMsg::Event(FollowEvent::OperationBodyDone(details)) => { FollowStreamMsg::Event(FollowEvent::OperationBodyDone(details)) diff --git a/subxt/src/backend/chain_head/mod.rs b/subxt/src/backend/chain_head/mod.rs index 621c8c510e..aad0bd0f35 100644 --- a/subxt/src/backend/chain_head/mod.rs +++ b/subxt/src/backend/chain_head/mod.rs @@ -128,16 +128,16 @@ impl ChainHeadBackendBuilder { } let (backend, mut driver) = self.build(client); - spawn(async move { + // NOTE: we need to poll the driver until it's done i.e returns None + // to ensure that the backend is shutdown properly. while let Some(res) = driver.next().await { - if let Err(e) = res { - if !e.is_disconnected_will_reconnect() { - tracing::debug!(target: "subxt", "chainHead driver was closed: {e}"); - break; - } + if let Err(err) = res { + tracing::debug!(target: "subxt", "ChainHeadBackendDriver got error={err}"); } } + + tracing::debug!(target: "subxt", "ChainHeadBackendDriver closed"); }); backend @@ -614,7 +614,7 @@ impl Backend for ChainHeadBackend { ); } } - FollowEvent::Stop | FollowEvent::BackendClosed => { + FollowEvent::Stop => { // If we get this event, we'll lose all of our existing pinned blocks and have a gap // in which we may lose the finalized block that the TX is in. For now, just error if // this happens, to prevent the case in which we never see a finalized block and wait diff --git a/subxt/src/backend/chain_head/rpc_methods.rs b/subxt/src/backend/chain_head/rpc_methods.rs index b7adc15a4e..dff0a86503 100644 --- a/subxt/src/backend/chain_head/rpc_methods.rs +++ b/subxt/src/backend/chain_head/rpc_methods.rs @@ -354,12 +354,6 @@ pub enum FollowEvent { /// The subscription is dropped and no further events /// will be generated. Stop, - /// Internal state which should not be exposed to the user - /// and not part of the JSON-RPC spec for the `chainHead_v1_follow`. - /// - /// The backend was closed and tell everything to shutdown. - #[doc(hidden)] - BackendClosed, } /// Contain information about the latest finalized block. @@ -655,13 +649,9 @@ impl Stream for FollowSubscription { let res = self.sub.poll_next_unpin(cx); - match &res { + if let Poll::Ready(Some(Ok(FollowEvent::Stop))) = &res { // No more events will occur after this one. - Poll::Ready(Some(Ok(FollowEvent::Stop))) - | Poll::Ready(Some(Ok(FollowEvent::BackendClosed))) => { - self.done = true; - } - _ => {} + self.done = true; } res From d8494fe730afe723ad4b95ee91c9ccf2cd6dcb89 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 10 Oct 2024 10:03:33 +0200 Subject: [PATCH 5/9] remove old test --- subxt/src/backend/chain_head/follow_stream.rs | 2 +- .../chain_head/follow_stream_driver.rs | 30 ------------------- 2 files changed, 1 insertion(+), 31 deletions(-) diff --git a/subxt/src/backend/chain_head/follow_stream.rs b/subxt/src/backend/chain_head/follow_stream.rs index 916488a6ab..f68d3ba0be 100644 --- a/subxt/src/backend/chain_head/follow_stream.rs +++ b/subxt/src/backend/chain_head/follow_stream.rs @@ -83,7 +83,7 @@ impl std::fmt::Debug for InnerStreamState { Self::Initializing(_) => write!(f, "Initializing(..)"), Self::Ready(_) => write!(f, "Ready(..)"), Self::ReceivingEvents(_) => write!(f, "ReceivingEvents(..)"), - Self::Stopped { .. } => write!(f, "Stopped"), + Self::Stopped => write!(f, "Stopped"), Self::Finished => write!(f, "Finished"), } } diff --git a/subxt/src/backend/chain_head/follow_stream_driver.rs b/subxt/src/backend/chain_head/follow_stream_driver.rs index b86d4ff8e5..a77782bc8b 100644 --- a/subxt/src/backend/chain_head/follow_stream_driver.rs +++ b/subxt/src/backend/chain_head/follow_stream_driver.rs @@ -749,34 +749,4 @@ mod test { ) ); } - - #[tokio::test] - async fn subscribe_finalized_blocks_no_restart() { - let mut driver = test_follow_stream_driver_getter( - || [Ok(ev_initialized(0)), Ok(FollowEvent::BackendClosed)], - 10, - ); - - let handle = driver.handle(); - - tokio::spawn(async move { while driver.next().await.is_some() {} }); - - let f = |ev| match ev { - FollowEvent::Finalized(ev) => ev.finalized_block_hashes, - FollowEvent::Initialized(ev) => ev.finalized_block_hashes, - _ => vec![], - }; - - let stream = FollowStreamFinalizedHeads::new(handle.subscribe(), f); - let evs: Vec<_> = stream.try_collect().await.unwrap(); - - assert_eq!(evs.len(), 1); - assert_eq!( - evs[0], - ( - "sub_id_0".to_string(), - vec![BlockRef::new(H256::from_low_u64_le(0))] - ) - ); - } } From 87eec09665e75f9edac33de2ea5a767a8618ef1b Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 10 Oct 2024 10:27:50 +0200 Subject: [PATCH 6/9] Update subxt/src/backend/chain_head/follow_stream_driver.rs --- subxt/src/backend/chain_head/follow_stream_driver.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subxt/src/backend/chain_head/follow_stream_driver.rs b/subxt/src/backend/chain_head/follow_stream_driver.rs index a77782bc8b..f498b1c992 100644 --- a/subxt/src/backend/chain_head/follow_stream_driver.rs +++ b/subxt/src/backend/chain_head/follow_stream_driver.rs @@ -197,7 +197,7 @@ impl Shared { let mut shared = self.0.lock().unwrap(); shared.done = true; - // Wake up all subscribers so they can see that we're done. + // Wake up all subscribers so they get notified that backend was closed for details in shared.subscribers.values_mut() { if let Some(waker) = details.waker.take() { waker.wake(); From bb2dd0be6a1aae63391043c98087481f29bfb050 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 10 Oct 2024 10:28:39 +0200 Subject: [PATCH 7/9] Update subxt/src/backend/chain_head/mod.rs --- subxt/src/backend/chain_head/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subxt/src/backend/chain_head/mod.rs b/subxt/src/backend/chain_head/mod.rs index aad0bd0f35..6761e37b77 100644 --- a/subxt/src/backend/chain_head/mod.rs +++ b/subxt/src/backend/chain_head/mod.rs @@ -133,7 +133,7 @@ impl ChainHeadBackendBuilder { // to ensure that the backend is shutdown properly. while let Some(res) = driver.next().await { if let Err(err) = res { - tracing::debug!(target: "subxt", "ChainHeadBackendDriver got error={err}"); + tracing::debug!(target: "subxt", "chainHead backend error={err}"); } } From 1c27f82ec1f99434d9be8d8f7f47148dffe05f46 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 10 Oct 2024 10:29:01 +0200 Subject: [PATCH 8/9] Update subxt/src/backend/chain_head/mod.rs --- subxt/src/backend/chain_head/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subxt/src/backend/chain_head/mod.rs b/subxt/src/backend/chain_head/mod.rs index 6761e37b77..aacdb452f6 100644 --- a/subxt/src/backend/chain_head/mod.rs +++ b/subxt/src/backend/chain_head/mod.rs @@ -137,7 +137,7 @@ impl ChainHeadBackendBuilder { } } - tracing::debug!(target: "subxt", "ChainHeadBackendDriver closed"); + tracing::debug!(target: "subxt", "chainHead backend was closed"); }); backend From e9ac91072864f7fa98b3eedddd2b74e778e16944 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 10 Oct 2024 15:31:27 +0200 Subject: [PATCH 9/9] Update subxt/src/backend/chain_head/follow_stream_driver.rs --- subxt/src/backend/chain_head/follow_stream_driver.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subxt/src/backend/chain_head/follow_stream_driver.rs b/subxt/src/backend/chain_head/follow_stream_driver.rs index f498b1c992..8490c11799 100644 --- a/subxt/src/backend/chain_head/follow_stream_driver.rs +++ b/subxt/src/backend/chain_head/follow_stream_driver.rs @@ -197,7 +197,7 @@ impl Shared { let mut shared = self.0.lock().unwrap(); shared.done = true; - // Wake up all subscribers so they get notified that backend was closed + // Wake up all subscribers so they get notified that the backend was closed for details in shared.subscribers.values_mut() { if let Some(waker) = details.waker.take() { waker.wake();