Skip to content

Commit

Permalink
feat: wait for l1 sync to l2 genesis separately
Browse files Browse the repository at this point in the history
  • Loading branch information
merklefruit committed Aug 3, 2024
1 parent 6f17c8c commit d3eeed8
Showing 1 changed file with 70 additions and 44 deletions.
114 changes: 70 additions & 44 deletions kona/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,10 @@ pub(crate) struct KonaExEx<Node: FullNodeComponents> {
blob_provider: ExExBlobProvider,
/// The validator to verify newly derived payloads
validator: Box<dyn AttributesValidator + Send>,
/// The derivation pipeline
pipeline: LocalPipeline,
/// The current L2 block we are processing
l2_block_cursor: L2BlockInfo,
/// A map of L1 anchor blocks to their L2 cursor
l1_to_l2_block_cursor: HashMap<BlockInfo, L2BlockInfo>,
/// Whether the ExEx is synced to the L2 genesis
is_synced_to_l2_genesis: bool,
/// Whether we should advance the L2 block cursor
should_advance_l2_block_cursor: bool,
/// The number of derived payloads so far
Expand All @@ -68,10 +64,6 @@ impl<Node: FullNodeComponents> KonaExEx<Node> {

let l2_provider = AlloyL2ChainProvider::new_http(args.l2_rpc_url.clone(), cfg.clone());
let blob_provider = ExExBlobProvider::new_from_beacon_client(args.beacon_client_url);
let dap = EthereumDataSource::new(chain_provider.clone(), blob_provider.clone(), &cfg);

let attributes =
LocalAttributesBuilder::new(cfg.clone(), l2_provider.clone(), chain_provider.clone());

let validator: Box<dyn AttributesValidator + Send> = match args.validation_mode {
ValidationMode::Trusted => Box::new(TrustedValidator::new_http(
Expand All @@ -87,40 +79,49 @@ impl<Node: FullNodeComponents> KonaExEx<Node> {
)),
};

// Initialize the L2 cursor to the L2 genesis block
let l2_block_cursor = l2_genesis_info_from_cfg(&cfg);
let tip =
chain_provider.block_info_by_number(l2_block_cursor.l1_origin.number).await.unwrap();

// Finally create the derivation pipeline
let pipeline = new_local_pipeline(
cfg.clone(),
chain_provider.clone(),
l2_provider.clone(),
dap,
attributes,
tip,
);
// Initialize the rollup block cursor from the L2 genesis block
let l2_block_cursor = l2_genesis_info_from_config(&cfg);

Self {
cfg,
ctx,
validator,
pipeline,
chain_provider,
l2_provider,
blob_provider,
l2_block_cursor,
l1_to_l2_block_cursor: HashMap::new(),
is_synced_to_l2_genesis: false,
should_advance_l2_block_cursor: false,
derived_payloads_count: 0,
}
}

/// Initializes the derivation pipeline with the L2 origin block.
pub async fn init_pipeline(&mut self, origin_l1_block: BlockInfo) -> LocalPipeline {
let dap = EthereumDataSource::new(
self.chain_provider.clone(),
self.blob_provider.clone(),
&self.cfg,
);
let attributes = LocalAttributesBuilder::new(
self.cfg.clone(),
self.l2_provider.clone(),
self.chain_provider.clone(),
);

new_local_pipeline(
self.cfg.clone(),
self.chain_provider.clone(),
self.l2_provider.clone(),
dap,
attributes,
origin_l1_block,
)
}

/// Steps the L2 derivation pipeline and validates the prepared attributes.
async fn step_l2(&mut self) {
match self.pipeline.step(self.l2_block_cursor).await {
async fn step_l2(&mut self, pipeline: &mut LocalPipeline) {
match pipeline.step(self.l2_block_cursor).await {
StepResult::OriginAdvanceErr(err) => {
error!(target: "kona", %err, "Failed to advance origin");
}
Expand All @@ -131,14 +132,14 @@ impl<Node: FullNodeComponents> KonaExEx<Node> {
};

// Peek the the next prepared attributes and validate them
match self.pipeline.peek() {
match pipeline.peek() {
None => debug!(target: "kona", "No prepared attributes to validate"),
Some(attributes) => match self.validator.validate(attributes).await {
Ok(true) => info!(target: "kona", "Attributes validated"),
Ok(false) => {
warn!(target: "kona", "Attributes failed validation");
// If the validation fails, take the attributes out and continue
let _ = self.pipeline.next();
let _ = pipeline.next();
return;
}
Err(err) => {
Expand All @@ -151,7 +152,7 @@ impl<Node: FullNodeComponents> KonaExEx<Node> {
};

// Take the next attributes from the pipeline since they're valid.
let Some(attributes) = self.pipeline.next() else {
let Some(attributes) = pipeline.next() else {
error!(target: "kona", "Must have valid attributes");
return;
};
Expand All @@ -165,7 +166,7 @@ impl<Node: FullNodeComponents> KonaExEx<Node> {
self.derived_payloads_count,
attributes.parent.block_info.number as i64 + 1,
attributes.attributes.timestamp,
self.pipeline.origin().unwrap().number,
pipeline.origin().unwrap().number,
);
debug!(target: "kona", "attributes: {:#?}", attributes);
}
Expand All @@ -192,7 +193,11 @@ impl<Node: FullNodeComponents> KonaExEx<Node> {
Ok(())
}

async fn handle_exex_notification(&mut self, notification: ExExNotification) -> Result<()> {
async fn handle_exex_notification(
&mut self,
notification: ExExNotification,
pipeline: &mut LocalPipeline,
) -> Result<()> {
if let Some(reverted_chain) = notification.reverted_chain() {
self.chain_provider.commit(reverted_chain.clone());
let l1_block_info = info_from_header(&reverted_chain.tip().block);
Expand All @@ -212,7 +217,7 @@ impl<Node: FullNodeComponents> KonaExEx<Node> {

// Reset the pipeline to the previous L2 block cursor
self.l2_block_cursor = *l2_cursor;
if let Err(err) = self.pipeline.reset(l2_cursor.block_info, l1_block_info).await {
if let Err(err) = pipeline.reset(l2_cursor.block_info, l1_block_info).await {
bail!("Critical: Failed to reset pipeline: {:?}", err);
}
}
Expand All @@ -221,13 +226,6 @@ impl<Node: FullNodeComponents> KonaExEx<Node> {
let tip_number = committed_chain.tip().number; // TODO: ensure this is the right tip
self.chain_provider.commit(committed_chain);

if tip_number >= self.cfg.genesis.l1.number {
debug!(target: "kona", "Chain synced to rollup genesis with L1 block number: {}", tip_number);
self.is_synced_to_l2_genesis = true;
} else {
trace!(target: "kona", "Chain not yet synced to rollup genesis. L1 block number: {}", tip_number);
}

if let Err(err) = self.ctx.events.send(ExExEvent::FinishedHeight(tip_number)) {
bail!("Critical: Failed to send ExEx event: {:?}", err);
}
Expand All @@ -236,12 +234,40 @@ impl<Node: FullNodeComponents> KonaExEx<Node> {
Ok(())
}

/// Wait for the L2 genesis L1 block (aka "origin block") to be available in the L1 chain.
async fn wait_for_l2_genesis_l1_block(&mut self) -> Result<BlockInfo> {
loop {
if let Some(notification) = self.ctx.notifications.recv().await {
if let Some(committed_chain) = notification.committed_chain() {
let tip = info_from_header(&committed_chain.tip().block);
self.chain_provider.commit(committed_chain);

if tip.number >= self.cfg.genesis.l1.number {
debug!(target: "kona", "Chain synced to rollup genesis with L1 block number: {}", tip.number);
break Ok(tip);
} else {
trace!(target: "kona", "Chain not yet synced to rollup genesis. L1 block number: {}", tip.number);
}

if let Err(err) = self.ctx.events.send(ExExEvent::FinishedHeight(tip.number)) {
bail!("Critical: Failed to send ExEx event: {:?}", err);
}
}
}
}
}

/// Starts the Kona Execution Extension loop.
pub async fn start(mut self) -> Result<()> {
// Step 1: Wait for the L2 origin block to be available
let l2_genesis_l1_block = self.wait_for_l2_genesis_l1_block().await?;

// Step 2: Initialize the derivation pipeline with the L2 origin block
let mut pipeline = self.init_pipeline(l2_genesis_l1_block).await;

// Step 3: Start the main loop
loop {
if self.is_synced_to_l2_genesis {
self.step_l2().await;
}
self.step_l2(&mut pipeline).await;

if self.should_advance_l2_block_cursor {
if let Err(err) = self.advance_l2_cursor().await {
Expand All @@ -250,7 +276,7 @@ impl<Node: FullNodeComponents> KonaExEx<Node> {
}

if let Ok(notification) = self.ctx.notifications.try_recv() {
if let Err(err) = self.handle_exex_notification(notification).await {
if let Err(err) = self.handle_exex_notification(notification, &mut pipeline).await {
error!(target: "kona", ?err, "Failed to handle ExEx notification");
}
}
Expand Down Expand Up @@ -287,7 +313,7 @@ fn info_from_header(block: &reth::primitives::SealedBlock) -> BlockInfo {
}

/// Helper to extract L2 genesis block info from a rollup configuration
fn l2_genesis_info_from_cfg(cfg: &Arc<RollupConfig>) -> L2BlockInfo {
fn l2_genesis_info_from_config(cfg: &Arc<RollupConfig>) -> L2BlockInfo {
L2BlockInfo {
block_info: BlockInfo {
hash: cfg.genesis.l2.hash,
Expand Down

0 comments on commit d3eeed8

Please sign in to comment.