Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chainhead backend: notify subscribers when the backend is closed #1817

Merged
merged 10 commits into from
Oct 11, 2024
49 changes: 39 additions & 10 deletions subxt/src/backend/chain_head/follow_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ enum InnerStreamState<Hash> {
ReceivingEvents(FollowEventStream<Hash>),
/// We received a stop event. We'll send one on and restart the stream.
Stopped,
/// The backend was closed and tell everything to shutdown.
BackendClosed,
/// Report error.
Error(Option<Error>),
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
/// The stream is finished and will not restart (likely due to an error).
Finished,
}
Expand All @@ -83,8 +87,10 @@ impl<Hash> std::fmt::Debug for InnerStreamState<Hash> {
Self::Initializing(_) => write!(f, "Initializing(..)"),
Self::Ready(_) => write!(f, "Ready(..)"),
Self::ReceivingEvents(_) => write!(f, "ReceivingEvents(..)"),
Self::Stopped => write!(f, "Stopped"),
Self::Stopped { .. } => write!(f, "Stopped"),
Self::BackendClosed => write!(f, "BackendClosed"),
Self::Finished => write!(f, "Finished"),
Self::Error(_) => write!(f, "Error"),
}
}
}
Expand Down Expand Up @@ -154,9 +160,16 @@ impl<Hash> Stream for FollowStream<Hash> {
continue;
}

// Finish forever if there's an error, passing it on.
this.stream = InnerStreamState::Finished;
return Poll::Ready(Some(Err(e)));
// Finish forever if there's an error which is done as follows:
//
// 1) Send the FollowEvent::BackendClosed message which tells to subscriber that subscription is closed.
// 2) Send the error on the backend driver stream.
// 3) Finish the stream.
//
this.stream = InnerStreamState::Error(Some(e));
return Poll::Ready(Some(Ok(FollowStreamMsg::Event(
FollowEvent::BackendClosed,
))));
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand All @@ -179,13 +192,21 @@ impl<Hash> Stream for FollowStream<Hash> {
continue;
}
Poll::Ready(Some(Ok(ev))) => {
if let FollowEvent::Stop = ev {
// A stop event means the stream has ended, so start
// over after passing on the stop message.
this.stream = InnerStreamState::Stopped;
continue;
match ev {
FollowEvent::Stop => {
// A stop event means the stream has ended, so start
// over after passing on the stop message.
this.stream = InnerStreamState::Stopped;
continue;
}
FollowEvent::BackendClosed => {
this.stream = InnerStreamState::BackendClosed;
continue;
}
_ => {
return Poll::Ready(Some(Ok(FollowStreamMsg::Event(ev))));
}
}
return Poll::Ready(Some(Ok(FollowStreamMsg::Event(ev))));
}
Poll::Ready(Some(Err(e))) => {
// Re-start if a reconnecting backend was enabled.
Expand All @@ -204,6 +225,14 @@ impl<Hash> Stream for FollowStream<Hash> {
this.stream = InnerStreamState::New;
return Poll::Ready(Some(Ok(FollowStreamMsg::Event(FollowEvent::Stop))));
}
InnerStreamState::BackendClosed => {
this.stream = InnerStreamState::Finished;
}
InnerStreamState::Error(e) => {
let e = e.take().expect("should always be Some");
this.stream = InnerStreamState::Finished;
return Poll::Ready(Some(Err(e)));
}
InnerStreamState::Finished => {
return Poll::Ready(None);
}
Expand Down
42 changes: 40 additions & 2 deletions subxt/src/backend/chain_head/follow_stream_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ impl<Hash: BlockHash> Shared<Hash> {
FollowStreamMsg::Event(ev @ FollowEvent::BestBlockChanged(_)) => {
shared.block_events_for_new_subscriptions.push_back(ev);
}
FollowStreamMsg::Event(FollowEvent::Stop) => {
FollowStreamMsg::Event(FollowEvent::Stop)
| FollowStreamMsg::Event(FollowEvent::BackendClosed) => {
// On a stop event, clear everything. Wait for resubscription and new ready/initialised events.
shared.block_events_for_new_subscriptions.clear();
shared.current_subscription_id = None;
Expand Down Expand Up @@ -460,7 +461,14 @@ where

(self.f)(FollowEvent::Initialized(init))
}
FollowStreamMsg::Event(ev) => (self.f)(ev),
FollowStreamMsg::Event(ev) => {
if matches!(ev, FollowEvent::BackendClosed) {
self.is_done = true;
return Poll::Ready(None);
};

(self.f)(ev)
}
};

if block_refs.is_empty() {
Expand Down Expand Up @@ -742,4 +750,34 @@ mod test {
)
);
}

#[tokio::test]
async fn subscribe_finalized_blocks_no_restart() {
let mut driver = test_follow_stream_driver_getter(
|| [Ok(ev_initialized(0)), Ok(FollowEvent::BackendClosed)],
10,
);

let handle = driver.handle();

tokio::spawn(async move { while driver.next().await.is_some() {} });

let f = |ev| match ev {
FollowEvent::Finalized(ev) => ev.finalized_block_hashes,
FollowEvent::Initialized(ev) => ev.finalized_block_hashes,
_ => vec![],
};

let stream = FollowStreamFinalizedHeads::new(handle.subscribe(), f);
let evs: Vec<_> = stream.try_collect().await.unwrap();

assert_eq!(evs.len(), 1);
assert_eq!(
evs[0],
(
"sub_id_0".to_string(),
vec![BlockRef::new(H256::from_low_u64_le(0))]
)
);
}
}
12 changes: 12 additions & 0 deletions subxt/src/backend/chain_head/follow_stream_unpin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,18 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {

FollowStreamMsg::Event(FollowEvent::Stop)
}
FollowStreamMsg::Event(FollowEvent::BackendClosed) => {
// clear out "old" things that are no longer applicable since
// the subscription has ended (a new one will be created under the hood, at
// which point we'll get given a new subscription ID.
this.subscription_id = None;
this.pinned.clear();
this.unpin_futs.clear();
this.unpin_flags.lock().unwrap().clear();
this.next_rel_block_age = 0;

FollowStreamMsg::Event(FollowEvent::BackendClosed)
}
// These events aren't interesting; we just forward them on:
FollowStreamMsg::Event(FollowEvent::OperationBodyDone(details)) => {
FollowStreamMsg::Event(FollowEvent::OperationBodyDone(details))
Expand Down
2 changes: 1 addition & 1 deletion subxt/src/backend/chain_head/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
);
}
}
FollowEvent::Stop => {
FollowEvent::Stop | FollowEvent::BackendClosed => {
// If we get this event, we'll lose all of our existing pinned blocks and have a gap
// in which we may lose the finalized block that the TX is in. For now, just error if
// this happens, to prevent the case in which we never see a finalized block and wait
Expand Down
16 changes: 13 additions & 3 deletions subxt/src/backend/chain_head/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ impl<T: Config> ChainHeadRpcMethods<T> {
/// The following events are related to operations:
/// - OperationBodyDone: The response of the `chainHead_body`
/// - OperationCallDone: The response of the `chainHead_call`
/// - OperationStorageItems: Items produced by the `chianHead_storage`
/// - OperationStorageItems: Items produced by the `chainHead_storage`
/// - OperationWaitingForContinue: Generated after OperationStorageItems and requires the user to
/// call `chainHead_continue`
/// - OperationStorageDone: The `chainHead_storage` method has produced all the results
Expand Down Expand Up @@ -354,6 +354,12 @@ pub enum FollowEvent<Hash> {
/// The subscription is dropped and no further events
/// will be generated.
Stop,
/// Internal state which should not be exposed to the user
/// and not part of the JSON-RPC spec for the `chainHead_v1_follow`.
///
/// The backend was closed and tell everything to shutdown.
#[doc(hidden)]
BackendClosed,
}

/// Contain information about the latest finalized block.
Expand Down Expand Up @@ -649,9 +655,13 @@ impl<Hash: BlockHash> Stream for FollowSubscription<Hash> {

let res = self.sub.poll_next_unpin(cx);

if let Poll::Ready(Some(Ok(FollowEvent::Stop))) = &res {
match &res {
// No more events will occur after this one.
self.done = true
Poll::Ready(Some(Ok(FollowEvent::Stop)))
| Poll::Ready(Some(Ok(FollowEvent::BackendClosed))) => {
self.done = true;
}
_ => {}
}

res
Expand Down
10 changes: 1 addition & 9 deletions subxt/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,15 +893,7 @@ mod test {
}

fn build_backend_spawn_background(rpc_client: impl RpcClientT) -> ChainHeadBackend<Conf> {
let (backend, mut driver) = build_backend(rpc_client);
tokio::spawn(async move {
while let Some(val) = driver.next().await {
if let Err(e) = val {
eprintln!("Error driving unstable backend: {e}; terminating client");
}
}
});
backend
ChainHeadBackend::builder().build_with_background_driver(rpc_client)
}

fn runtime_spec() -> RuntimeSpec {
Expand Down
Loading