Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(conductor): make firm, soft readers subtasks #1926

Merged
merged 3 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions crates/astria-conductor/src/celestia/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Boilerplate to construct a [`super::Reader`] via a type-state builder.

Check warning on line 1 in crates/astria-conductor/src/celestia/builder.rs

View workflow job for this annotation

GitHub Actions / Code Freeze

Code Freeze in Effect - Bypassed

This file is under code freeze.

use std::time::Duration;

Expand All @@ -10,17 +10,21 @@
use tendermint_rpc::HttpClient as SequencerClient;
use tokio_util::sync::CancellationToken;

use super::Reader;
use super::{
Reader,
ReconstructedBlock,
};
use crate::{
executor,
executor::StateReceiver,
metrics::Metrics,
};

pub(crate) struct Builder {
pub(crate) celestia_block_time: Duration,
pub(crate) celestia_http_endpoint: String,
pub(crate) celestia_token: Option<String>,
pub(crate) executor: executor::Handle,
pub(crate) firm_blocks: tokio::sync::mpsc::Sender<Box<ReconstructedBlock>>,
pub(crate) rollup_state: StateReceiver,
pub(crate) sequencer_cometbft_client: SequencerClient,
pub(crate) sequencer_requests_per_second: u32,
pub(crate) expected_celestia_chain_id: String,
Expand All @@ -36,13 +40,14 @@
celestia_block_time,
celestia_http_endpoint,
celestia_token,
executor,
sequencer_cometbft_client,
sequencer_requests_per_second,
expected_celestia_chain_id,
expected_sequencer_chain_id,
shutdown,
metrics,
firm_blocks,
rollup_state,
} = self;

let celestia_client = create_celestia_client(celestia_http_endpoint, celestia_token)
Expand All @@ -51,7 +56,8 @@
Ok(Reader {
celestia_block_time,
celestia_client,
executor,
firm_blocks,
rollup_state,
sequencer_cometbft_client,
sequencer_requests_per_second,
expected_celestia_chain_id,
Expand Down
120 changes: 53 additions & 67 deletions crates/astria-conductor/src/celestia/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{

Check warning on line 1 in crates/astria-conductor/src/celestia/mod.rs

View workflow job for this annotation

GitHub Actions / Code Freeze

Code Freeze in Effect - Bypassed

This file is under code freeze.
cmp::max,
sync::Arc,
time::Duration,
Expand Down Expand Up @@ -58,15 +58,11 @@
trace,
trace_span,
warn,
Instrument as _,
};

use crate::{
block_cache::GetSequencerHeight,
executor::{
FirmSendError,
FirmTrySendError,
StateIsInit,
},
metrics::Metrics,
utils::flatten,
};
Expand Down Expand Up @@ -95,10 +91,7 @@
BlobVerifier,
},
};
use crate::{
block_cache::BlockCache,
executor,
};
use crate::block_cache::BlockCache;

/// Sequencer Block information reconstructed from Celestia blobs.
///
Expand Down Expand Up @@ -138,8 +131,11 @@
/// Client to fetch heights and blocks from Celestia.
celestia_client: CelestiaClient,

/// The channel used to send messages to the executor task.
executor: executor::Handle,
/// The channel to forward firm blocks to the executor.
firm_blocks: mpsc::Sender<Box<ReconstructedBlock>>,

/// The channel to read updates of the rollup state from.
rollup_state: crate::executor::StateReceiver,

/// The client to get the sequencer namespace and verify blocks.
sequencer_cometbft_client: SequencerClient,
Expand All @@ -162,7 +158,7 @@

impl Reader {
pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> {
let (executor, sequencer_chain_id) = select!(
let sequencer_chain_id = select!(
() = self.shutdown.clone().cancelled_owned() => {
info_span!("conductor::celestia::Reader::run_until_stopped").in_scope(||
info!("received shutdown signal while waiting for Celestia reader task to initialize")
Expand All @@ -175,16 +171,14 @@
}
);

RunningReader::from_parts(self, executor, sequencer_chain_id)
RunningReader::from_parts(self, sequencer_chain_id)
.wrap_err("failed entering run loop")?
.run_until_stopped()
.await
}

#[instrument(skip_all, err)]
async fn initialize(
&mut self,
) -> eyre::Result<(executor::Handle<StateIsInit>, tendermint::chain::Id)> {
async fn initialize(&mut self) -> eyre::Result<tendermint::chain::Id> {
let validate_celestia_chain_id = async {
let actual_celestia_chain_id = get_celestia_chain_id(&self.celestia_client)
.await
Expand All @@ -196,14 +190,8 @@
`{actual_celestia_chain_id}`"
);
Ok(())
};

let wait_for_init_executor = async {
self.executor
.wait_for_init()
.await
.wrap_err("handle to executor failed while waiting for it being initialized")
};
}
.in_current_span();

let get_and_validate_sequencer_chain_id = async {
let actual_sequencer_chain_id =
Expand All @@ -217,18 +205,18 @@
actual: `{actual_sequencer_chain_id}`"
);
Ok(actual_sequencer_chain_id)
};
}
.in_current_span();

try_join!(
validate_celestia_chain_id,
wait_for_init_executor,
get_and_validate_sequencer_chain_id
)
.map(|((), executor_init, sequencer_chain_id)| (executor_init, sequencer_chain_id))
.map(|((), sequencer_chain_id)| sequencer_chain_id)
}
}

#[instrument(skip_all, err)]
#[instrument(skip_all, err, ret(Display))]
async fn get_celestia_chain_id(
celestia_client: &CelestiaClient,
) -> eyre::Result<celestia_tendermint::chain::Id> {
Expand Down Expand Up @@ -263,8 +251,11 @@
// Client to fetch heights and blocks from Celestia.
celestia_client: CelestiaClient,

/// The channel used to send messages to the executor task.
executor: executor::Handle<StateIsInit>,
/// The channel to forward firm blocks to the executor.
firm_blocks: mpsc::Sender<Box<ReconstructedBlock>>,

/// The channel to read updates of the rollup state from.
rollup_state: crate::executor::StateReceiver,

/// Token to listen for Conductor being shut down.
shutdown: CancellationToken,
Expand All @@ -280,7 +271,8 @@
/// capacity again. Used as a back pressure mechanism so that this task does not fetch more
/// blobs if there is no capacity in the executor to execute them against the rollup in
/// time.
enqueued_block: Fuse<BoxFuture<'static, Result<u64, FirmSendError>>>,
enqueued_block:
Fuse<BoxFuture<'static, Result<u64, mpsc::error::SendError<Box<ReconstructedBlock>>>>>,
SuperFluffy marked this conversation as resolved.
Show resolved Hide resolved

/// The latest observed head height of the Celestia network. Set by values read from
/// the `latest_height` stream.
Expand Down Expand Up @@ -323,7 +315,6 @@
impl RunningReader {
fn from_parts(
exposed_reader: Reader,
mut executor: executor::Handle<StateIsInit>,
sequencer_chain_id: tendermint::chain::Id,
) -> eyre::Result<Self> {
let Reader {
Expand All @@ -333,21 +324,23 @@
shutdown,
sequencer_requests_per_second,
metrics,
firm_blocks,
rollup_state,
..
} = exposed_reader;
let block_cache =
BlockCache::with_next_height(executor.next_expected_firm_sequencer_height())
BlockCache::with_next_height(rollup_state.next_expected_firm_sequencer_height())
.wrap_err("failed constructing sequential block cache")?;

let latest_heights = stream_latest_heights(celestia_client.clone(), celestia_block_time);
let rollup_id = executor.rollup_id();
let rollup_id = rollup_state.rollup_id();
let rollup_namespace = astria_core::celestia::namespace_v0_from_rollup_id(rollup_id);
let sequencer_namespace =
astria_core::celestia::namespace_v0_from_sha256_of_bytes(sequencer_chain_id.as_bytes());

let celestia_next_height = executor.celestia_base_block_height();
let celestia_reference_height = executor.celestia_base_block_height();
let celestia_variance = executor.celestia_block_variance();
let celestia_next_height = rollup_state.celestia_base_block_height();
let celestia_reference_height = rollup_state.celestia_base_block_height();
let celestia_variance = rollup_state.celestia_block_variance();

Ok(Self {
block_cache,
Expand All @@ -357,7 +350,8 @@
),
celestia_client,
enqueued_block: Fuse::terminated(),
executor,
firm_blocks,
rollup_state,
latest_heights,
shutdown,
reconstruction_tasks: JoinMap::new(),
Expand Down Expand Up @@ -498,7 +492,7 @@
rollup_id: self.rollup_id,
rollup_namespace: self.rollup_namespace,
sequencer_namespace: self.sequencer_namespace,
executor: self.executor.clone(),
rollup_state: self.rollup_state.clone(),
metrics: self.metrics,
};
self.reconstruction_tasks.spawn(height, task.execute());
Expand All @@ -520,28 +514,20 @@
#[instrument(skip_all)]
fn forward_block_to_executor(&mut self, block: ReconstructedBlock) -> eyre::Result<()> {
let celestia_height = block.celestia_height;
match self.executor.try_send_firm_block(block) {
match self.firm_blocks.try_send(block.into()) {
Ok(()) => self.advance_reference_celestia_height(celestia_height),
Err(FirmTrySendError::Channel {
source,
}) => match source {
mpsc::error::TrySendError::Full(block) => {
trace!(
"executor channel is full; rescheduling block fetch until the channel \
opens up"
);
self.enqueued_block =
enqueue_block(self.executor.clone(), block).boxed().fuse();
}
mpsc::error::TrySendError::Closed(_) => {
bail!("exiting because executor channel is closed");
}
},
Err(FirmTrySendError::NotSet) => bail!(
"exiting because executor was configured without firm commitments; this Celestia \
reader should have never been started"
),
}
Err(mpsc::error::TrySendError::Full(block)) => {
trace!(
"executor channel is full; rescheduling block fetch until the channel opens up"
);
self.enqueued_block = enqueue_block(self.firm_blocks.clone(), block)
.boxed()
.fuse();
}
Err(mpsc::error::TrySendError::Closed(_)) => {
bail!("exiting because executor channel is closed");
}
};
Ok(())
}

Expand Down Expand Up @@ -574,7 +560,7 @@
rollup_id: RollupId,
rollup_namespace: Namespace,
sequencer_namespace: Namespace,
executor: executor::Handle<StateIsInit>,
rollup_state: crate::executor::StateReceiver,
metrics: &'static Metrics,
}

Expand All @@ -593,7 +579,7 @@
rollup_id,
rollup_namespace,
sequencer_namespace,
executor,
rollup_state,
metrics,
} = self;

Expand Down Expand Up @@ -633,7 +619,7 @@
"decoded Sequencer header and rollup info from raw Celestia blobs",
);

let verified_blobs = verify_metadata(blob_verifier, decoded_blobs, executor).await;
let verified_blobs = verify_metadata(blob_verifier, decoded_blobs, rollup_state).await;

metrics.record_sequencer_blocks_metadata_verified_per_celestia_fetch(
verified_blobs.len_header_blobs(),
Expand Down Expand Up @@ -671,15 +657,15 @@

#[instrument(skip_all, err)]
async fn enqueue_block(
executor: executor::Handle<StateIsInit>,
firm_blocks_tx: mpsc::Sender<Box<ReconstructedBlock>>,
block: Box<ReconstructedBlock>,
) -> Result<u64, FirmSendError> {
) -> Result<u64, mpsc::error::SendError<Box<ReconstructedBlock>>> {
let celestia_height = block.celestia_height;
executor.send_firm_block(block).await?;
firm_blocks_tx.send(block).await?;
Ok(celestia_height)
}

#[instrument(skip_all, err)]
#[instrument(skip_all, err, ret(Display))]
async fn get_sequencer_chain_id(client: SequencerClient) -> eyre::Result<tendermint::chain::Id> {
use sequencer_client::Client as _;

Expand Down
8 changes: 2 additions & 6 deletions crates/astria-conductor/src/celestia/verify.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{

Check warning on line 1 in crates/astria-conductor/src/celestia/verify.rs

View workflow job for this annotation

GitHub Actions / Code Freeze

Code Freeze in Effect - Bypassed

This file is under code freeze.
collections::HashMap,
sync::Arc,
time::Duration,
Expand Down Expand Up @@ -51,10 +51,6 @@
block_verifier,
convert::ConvertedBlobs,
};
use crate::executor::{
self,
StateIsInit,
};

pub(super) struct VerifiedBlobs {
celestia_height: u64,
Expand Down Expand Up @@ -99,15 +95,15 @@
pub(super) async fn verify_metadata(
blob_verifier: Arc<BlobVerifier>,
converted_blobs: ConvertedBlobs,
mut executor: executor::Handle<StateIsInit>,
rollup_state: crate::executor::StateReceiver,
) -> VerifiedBlobs {
let (celestia_height, header_blobs, rollup_blobs) = converted_blobs.into_parts();

let mut verification_tasks = JoinMap::new();
let mut verified_header_blobs = HashMap::with_capacity(header_blobs.len());

let next_expected_firm_sequencer_height =
executor.next_expected_firm_sequencer_height().value();
rollup_state.next_expected_firm_sequencer_height().value();

for (index, blob) in header_blobs.into_iter().enumerate() {
if blob.height().value() < next_expected_firm_sequencer_height {
Expand Down
Loading
Loading