Skip to content

Commit

Permalink
feat(conductor): restart or stop at set height
Browse files Browse the repository at this point in the history
Co-authored-by: Ethan Oroshiba <[email protected]>
  • Loading branch information
SuperFluffy and ethanoroshiba committed Jan 24, 2025
1 parent 453f66c commit 7a41c53
Show file tree
Hide file tree
Showing 21 changed files with 1,887 additions and 281 deletions.
4 changes: 4 additions & 0 deletions crates/astria-conductor/src/block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ impl<T> BlockCache<T> {
cache: self,
}
}

pub(crate) fn next_height_to_pop(&self) -> u64 {
self.next_height
}
}

impl<T: GetSequencerHeight> BlockCache<T> {
Expand Down
6 changes: 0 additions & 6 deletions crates/astria-conductor/src/celestia/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ pub(crate) struct Builder {
pub(crate) rollup_state: StateReceiver,
pub(crate) sequencer_cometbft_client: SequencerClient,
pub(crate) sequencer_requests_per_second: u32,
pub(crate) expected_celestia_chain_id: String,
pub(crate) expected_sequencer_chain_id: String,
pub(crate) shutdown: CancellationToken,
pub(crate) metrics: &'static Metrics,
}
Expand All @@ -42,8 +40,6 @@ impl Builder {
celestia_token,
sequencer_cometbft_client,
sequencer_requests_per_second,
expected_celestia_chain_id,
expected_sequencer_chain_id,
shutdown,
metrics,
firm_blocks,
Expand All @@ -60,8 +56,6 @@ impl Builder {
rollup_state,
sequencer_cometbft_client,
sequencer_requests_per_second,
expected_celestia_chain_id,
expected_sequencer_chain_id,
shutdown,
metrics,
})
Expand Down
29 changes: 19 additions & 10 deletions crates/astria-conductor/src/celestia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,6 @@ pub(crate) struct Reader {
/// (usually to verify block data retrieved from Celestia blobs).
sequencer_requests_per_second: u32,

/// The chain ID of the Celestia network the reader should be communicating with.
expected_celestia_chain_id: String,

/// The chain ID of the Sequencer the reader should be communicating with.
expected_sequencer_chain_id: String,

/// Token to listen for Conductor being shut down.
shutdown: CancellationToken,

Expand Down Expand Up @@ -179,28 +173,28 @@ impl Reader {

#[instrument(skip_all, err)]
async fn initialize(&mut self) -> eyre::Result<tendermint::chain::Id> {
let expected_celestia_chain_id = self.rollup_state.celestia_chain_id();
let validate_celestia_chain_id = async {
let actual_celestia_chain_id = get_celestia_chain_id(&self.celestia_client)
.await
.wrap_err("failed to fetch Celestia chain ID")?;
let expected_celestia_chain_id = &self.expected_celestia_chain_id;
ensure!(
self.expected_celestia_chain_id == actual_celestia_chain_id.as_str(),
expected_celestia_chain_id == actual_celestia_chain_id.as_str(),
"expected Celestia chain id `{expected_celestia_chain_id}` does not match actual: \
`{actual_celestia_chain_id}`"
);
Ok(())
}
.in_current_span();

let expected_sequencer_chain_id = self.rollup_state.sequencer_chain_id();
let get_and_validate_sequencer_chain_id = async {
let actual_sequencer_chain_id =
get_sequencer_chain_id(self.sequencer_cometbft_client.clone())
.await
.wrap_err("failed to get sequencer chain ID")?;
let expected_sequencer_chain_id = &self.expected_sequencer_chain_id;
ensure!(
self.expected_sequencer_chain_id == actual_sequencer_chain_id.to_string(),
expected_sequencer_chain_id == actual_sequencer_chain_id.as_str(),
"expected Celestia chain id `{expected_sequencer_chain_id}` does not match \
actual: `{actual_sequencer_chain_id}`"
);
Expand Down Expand Up @@ -384,6 +378,10 @@ impl RunningReader {
});

let reason = loop {
if self.has_reached_stop_height() {
break Ok("stop height reached");
}

self.schedule_new_blobs();

select!(
Expand Down Expand Up @@ -449,6 +447,17 @@ impl RunningReader {
}
}

/// The stop height is reached if a) the next height to be forwarded would be equal
/// or greater than the stop height, and b) there is no block currently in flight.
fn has_reached_stop_height(&self) -> bool {
self.rollup_state
.sequencer_stop_block_height()
.map_or(false, |stop_height| {
self.block_cache.next_height_to_pop() >= stop_height.get()
})
&& self.enqueued_block.is_terminated()
}

#[instrument(skip_all)]
fn cache_reconstructed_blocks(&mut self, reconstructed: ReconstructedBlocks) {
for block in reconstructed.blocks {
Expand Down
Loading

0 comments on commit 7a41c53

Please sign in to comment.