Skip to content

Commit

Permalink
simplify hacky code
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasad1 committed Oct 10, 2024
1 parent cd713b9 commit 97a505d
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 79 deletions.
47 changes: 9 additions & 38 deletions subxt/src/backend/chain_head/follow_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ enum InnerStreamState<Hash> {
ReceivingEvents(FollowEventStream<Hash>),
/// We received a stop event. We'll send one on and restart the stream.
Stopped,
/// The backend was closed and tell everything to shutdown.
BackendClosed,
/// Report error.
Error(Option<Error>),
/// The stream is finished and will not restart (likely due to an error).
Finished,
}
Expand All @@ -88,9 +84,7 @@ impl<Hash> std::fmt::Debug for InnerStreamState<Hash> {
Self::Ready(_) => write!(f, "Ready(..)"),
Self::ReceivingEvents(_) => write!(f, "ReceivingEvents(..)"),
Self::Stopped { .. } => write!(f, "Stopped"),
Self::BackendClosed => write!(f, "BackendClosed"),
Self::Finished => write!(f, "Finished"),
Self::Error(_) => write!(f, "Error"),
}
}
}
Expand Down Expand Up @@ -160,16 +154,9 @@ impl<Hash> Stream for FollowStream<Hash> {
continue;
}

// Finish forever if there's an error which is done as follows:
//
// 1) Send the FollowEvent::BackendClosed message which tells to subscriber that subscription is closed.
// 2) Send the error on the backend driver stream.
// 3) Finish the stream.
//
this.stream = InnerStreamState::Error(Some(e));
return Poll::Ready(Some(Ok(FollowStreamMsg::Event(
FollowEvent::BackendClosed,
))));
// Finish forever if there's an error, passing it on.
this.stream = InnerStreamState::Finished;
return Poll::Ready(Some(Err(e)));
}
}
}
Expand All @@ -192,21 +179,13 @@ impl<Hash> Stream for FollowStream<Hash> {
continue;
}
Poll::Ready(Some(Ok(ev))) => {
match ev {
FollowEvent::Stop => {
// A stop event means the stream has ended, so start
// over after passing on the stop message.
this.stream = InnerStreamState::Stopped;
continue;
}
FollowEvent::BackendClosed => {
this.stream = InnerStreamState::BackendClosed;
continue;
}
_ => {
return Poll::Ready(Some(Ok(FollowStreamMsg::Event(ev))));
}
if let FollowEvent::Stop = ev {
// A stop event means the stream has ended, so start
// over after passing on the stop message.
this.stream = InnerStreamState::Stopped;
continue;
}
return Poll::Ready(Some(Ok(FollowStreamMsg::Event(ev))));
}
Poll::Ready(Some(Err(e))) => {
// Re-start if a reconnecting backend was enabled.
Expand All @@ -225,14 +204,6 @@ impl<Hash> Stream for FollowStream<Hash> {
this.stream = InnerStreamState::New;
return Poll::Ready(Some(Ok(FollowStreamMsg::Event(FollowEvent::Stop))));
}
InnerStreamState::BackendClosed => {
this.stream = InnerStreamState::Finished;
}
InnerStreamState::Error(e) => {
let e = e.take().expect("should always be Some");
this.stream = InnerStreamState::Finished;
return Poll::Ready(Some(Err(e)));
}
InnerStreamState::Finished => {
return Poll::Ready(None);
}
Expand Down
19 changes: 9 additions & 10 deletions subxt/src/backend/chain_head/follow_stream_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ impl<Hash: BlockHash> Shared<Hash> {
pub fn done(&self) {
let mut shared = self.0.lock().unwrap();
shared.done = true;

// Wake up all subscribers so they can see that we're done.
for details in shared.subscribers.values_mut() {
if let Some(waker) = details.waker.take() {
waker.wake();
}
}
}

/// Cleanup a subscription.
Expand Down Expand Up @@ -317,8 +324,7 @@ impl<Hash: BlockHash> Shared<Hash> {
FollowStreamMsg::Event(ev @ FollowEvent::BestBlockChanged(_)) => {
shared.block_events_for_new_subscriptions.push_back(ev);
}
FollowStreamMsg::Event(FollowEvent::Stop)
| FollowStreamMsg::Event(FollowEvent::BackendClosed) => {
FollowStreamMsg::Event(FollowEvent::Stop) => {
// On a stop event, clear everything. Wait for resubscription and new ready/initialised events.
shared.block_events_for_new_subscriptions.clear();
shared.current_subscription_id = None;
Expand Down Expand Up @@ -461,14 +467,7 @@ where

(self.f)(FollowEvent::Initialized(init))
}
FollowStreamMsg::Event(ev) => {
if matches!(ev, FollowEvent::BackendClosed) {
self.is_done = true;
return Poll::Ready(None);
};

(self.f)(ev)
}
FollowStreamMsg::Event(ev) => (self.f)(ev),
};

if block_refs.is_empty() {
Expand Down
12 changes: 0 additions & 12 deletions subxt/src/backend/chain_head/follow_stream_unpin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,18 +223,6 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {

FollowStreamMsg::Event(FollowEvent::Stop)
}
FollowStreamMsg::Event(FollowEvent::BackendClosed) => {
// clear out "old" things that are no longer applicable since
// the subscription has ended (a new one will be created under the hood, at
// which point we'll get given a new subscription ID.
this.subscription_id = None;
this.pinned.clear();
this.unpin_futs.clear();
this.unpin_flags.lock().unwrap().clear();
this.next_rel_block_age = 0;

FollowStreamMsg::Event(FollowEvent::BackendClosed)
}
// These events aren't interesting; we just forward them on:
FollowStreamMsg::Event(FollowEvent::OperationBodyDone(details)) => {
FollowStreamMsg::Event(FollowEvent::OperationBodyDone(details))
Expand Down
14 changes: 7 additions & 7 deletions subxt/src/backend/chain_head/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,16 @@ impl<T: Config> ChainHeadBackendBuilder<T> {
}

let (backend, mut driver) = self.build(client);

spawn(async move {
// NOTE: we need to poll the driver until it's done i.e returns None
// to ensure that the backend is shutdown properly.
while let Some(res) = driver.next().await {
if let Err(e) = res {
if !e.is_disconnected_will_reconnect() {
tracing::debug!(target: "subxt", "chainHead driver was closed: {e}");
break;
}
if let Err(err) = res {
tracing::debug!(target: "subxt", "ChainHeadBackendDriver got error={err}");
}
}

tracing::debug!(target: "subxt", "ChainHeadBackendDriver closed");
});

backend
Expand Down Expand Up @@ -614,7 +614,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
);
}
}
FollowEvent::Stop | FollowEvent::BackendClosed => {
FollowEvent::Stop => {
// If we get this event, we'll lose all of our existing pinned blocks and have a gap
// in which we may lose the finalized block that the TX is in. For now, just error if
// this happens, to prevent the case in which we never see a finalized block and wait
Expand Down
14 changes: 2 additions & 12 deletions subxt/src/backend/chain_head/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,12 +354,6 @@ pub enum FollowEvent<Hash> {
/// The subscription is dropped and no further events
/// will be generated.
Stop,
/// Internal state which should not be exposed to the user
/// and not part of the JSON-RPC spec for the `chainHead_v1_follow`.
///
/// The backend was closed and tell everything to shutdown.
#[doc(hidden)]
BackendClosed,
}

/// Contain information about the latest finalized block.
Expand Down Expand Up @@ -655,13 +649,9 @@ impl<Hash: BlockHash> Stream for FollowSubscription<Hash> {

let res = self.sub.poll_next_unpin(cx);

match &res {
if let Poll::Ready(Some(Ok(FollowEvent::Stop))) = &res {
// No more events will occur after this one.
Poll::Ready(Some(Ok(FollowEvent::Stop)))
| Poll::Ready(Some(Ok(FollowEvent::BackendClosed))) => {
self.done = true;
}
_ => {}
self.done = true;
}

res
Expand Down

0 comments on commit 97a505d

Please sign in to comment.