Skip to content

Commit

Permalink
chore(starknet_l1_provider): connect l1 scraper to infra
Browse files Browse the repository at this point in the history
Logic changes:
- added event identifiers to track into base layer constants, so that l1 provider
  can import them and choose to track them.
- add one line in scraper for sending the events to the provider.
  • Loading branch information
Gilad Chase committed Dec 24, 2024
1 parent a840e25 commit 1a8875a
Show file tree
Hide file tree
Showing 12 changed files with 213 additions and 17 deletions.
50 changes: 50 additions & 0 deletions config/sequencer/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,36 @@
"privacy": "Public",
"value": "0.0.0.0:8080"
},
"components.l1_scraper.execution_mode": {
"description": "The component execution mode.",
"privacy": "Public",
"value": "Enabled"
},
"components.l1_scraper.remote_client_config.#is_none": {
"description": "Flag for an optional field.",
"privacy": "TemporaryValue",
"value": true
},
"components.l1_scraper.remote_client_config.idle_connections": {
"description": "The maximum number of idle connections to keep alive.",
"privacy": "Public",
"value": 18446744073709551615
},
"components.l1_scraper.remote_client_config.idle_timeout": {
"description": "The duration in seconds to keep an idle connection open before closing.",
"privacy": "Public",
"value": 90
},
"components.l1_scraper.remote_client_config.retries": {
"description": "The max number of retries for sending a message.",
"privacy": "Public",
"value": 3
},
"components.l1_scraper.remote_client_config.socket": {
"description": "The remote component server socket.",
"privacy": "Public",
"value": "0.0.0.0:8080"
},
"components.mempool.execution_mode": {
"description": "The component execution mode.",
"privacy": "Public",
Expand Down Expand Up @@ -859,6 +889,26 @@
"privacy": "Public",
"value": 100
},
"l1_scraper_config.chain_id": {
"description": "The chain to follow. For more details see https://docs.starknet.io/documentation/architecture_and_concepts/Blocks/transactions/#chain-id.",
"pointer_target": "chain_id",
"privacy": "Public"
},
"l1_scraper_config.finality": {
"description": "Number of blocks to wait for finality",
"privacy": "Public",
"value": 0
},
"l1_scraper_config.l1_block_to_start_scraping_from": {
"description": "Last L1 block number processed by the scraper",
"privacy": "Public",
"value": 0
},
"l1_scraper_config.polling_interval": {
"description": "Interval in Seconds between each scraping attempt of L1.",
"privacy": "Public",
"value": 1
},
"mempool_p2p_config.network_buffer_size": {
"description": "Network buffer size.",
"privacy": "Public",
Expand Down
10 changes: 10 additions & 0 deletions crates/papyrus_base_layer/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1 +1,11 @@
use alloy_sol_types::SolEvent;

use crate::ethereum_base_layer_contract::Starknet;

pub type EventIdentifier = &'static str;

pub const LOG_MESSAGE_TO_L2_EVENT_IDENTIFIER: &str = Starknet::LogMessageToL2::SIGNATURE;
pub const CONSUMED_MESSAGE_TO_L1_EVENT_IDENTIFIER: &str = Starknet::ConsumedMessageToL1::SIGNATURE;
pub const MESSAGE_TO_L2_CANCELLATION_STARTED_EVENT_IDENTIFIER: &str =
Starknet::MessageToL2CancellationStarted::SIGNATURE;
pub const MESSAGE_TO_L2_CANCELED_EVENT_IDENTIFIER: &str = Starknet::MessageToL2Canceled::SIGNATURE;
1 change: 1 addition & 0 deletions crates/papyrus_node/src/config/pointers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub static CONFIG_POINTERS: LazyLock<ConfigPointers> = LazyLock::new(|| {
"network.chain_id",
"rpc.chain_id",
"storage.db_config.chain_id",
"l1_scraper_config.chain_id",
])
),
(
Expand Down
13 changes: 12 additions & 1 deletion crates/starknet_l1_provider/src/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ use starknet_sequencer_infra::component_definitions::{
ComponentRequestAndResponseSender,
ComponentRequestHandler,
};
use starknet_sequencer_infra::component_server::{LocalComponentServer, RemoteComponentServer};
use starknet_sequencer_infra::component_server::{
LocalComponentServer,
RemoteComponentServer,
WrapperServer,
};
use tracing::instrument;

use crate::L1Provider;
Expand All @@ -18,6 +22,10 @@ pub type L1ProviderRequestAndResponseSender =
pub type LocalL1ProviderClient = LocalComponentClient<L1ProviderRequest, L1ProviderResponse>;
pub type RemoteL1ProviderClient = RemoteComponentClient<L1ProviderRequest, L1ProviderResponse>;

use crate::l1_scraper::L1Scraper;

pub type L1ScraperServer<B> = WrapperServer<L1Scraper<B>>;

#[async_trait]
impl ComponentRequestHandler<L1ProviderRequest, L1ProviderResponse> for L1Provider {
#[instrument(skip(self))]
Expand All @@ -26,6 +34,9 @@ impl ComponentRequestHandler<L1ProviderRequest, L1ProviderResponse> for L1Provid
L1ProviderRequest::GetTransactions(n_txs) => {
L1ProviderResponse::GetTransactions(self.get_txs(n_txs))
}
L1ProviderRequest::AddEvents(events) => {
L1ProviderResponse::AddEvents(self.process_l1_events(events))
}
}
}
}
9 changes: 0 additions & 9 deletions crates/starknet_l1_provider/src/errors.rs

This file was deleted.

57 changes: 54 additions & 3 deletions crates/starknet_l1_provider/src/l1_scraper.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
use std::any::type_name;
use std::collections::BTreeMap;
use std::time::Duration;

use async_trait::async_trait;
use papyrus_base_layer::constants::EventIdentifier;
use papyrus_base_layer::{BaseLayerContract, L1Event};
use papyrus_config::converters::deserialize_seconds_to_duration;
use papyrus_config::dumping::{ser_param, SerializeConfig};
use papyrus_config::validators::validate_ascii;
use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam};
use serde::{Deserialize, Serialize};
use starknet_api::core::ChainId;
use starknet_api::executable_transaction::L1HandlerTransaction as ExecutableL1HandlerTransaction;
use starknet_api::StarknetApiError;
use starknet_l1_provider_types::errors::L1ProviderClientError;
use starknet_l1_provider_types::{Event, SharedL1ProviderClient};
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_sequencer_infra::errors::ComponentError;
use thiserror::Error;
use tokio::time::sleep;
use tracing::error;
use tracing::{error, info};
use validator::Validate;

type L1ScraperResult<T, B> = Result<T, L1ScraperError<B>>;
Expand Down Expand Up @@ -71,11 +79,13 @@ impl<B: BaseLayerContract + Send + Sync> L1Scraper<B> {
.map(|event| self.event_from_raw_l1_event(event))
.collect::<L1ScraperResult<Vec<_>, _>>()?;

self.l1_provider_client.add_events(events).await?;
self.last_block_number_processed = latest_l1_block_number + 1;
todo!("send {events:?} to provider");

Ok(())
}

async fn _run(&mut self) -> L1ScraperResult<(), B> {
async fn run(&mut self) -> L1ScraperResult<(), B> {
loop {
sleep(self.config.polling_interval).await;
// TODO: retry.
Expand All @@ -98,6 +108,14 @@ impl<B: BaseLayerContract + Send + Sync> L1Scraper<B> {
}
}

#[async_trait]
impl<B: BaseLayerContract + Send + Sync> ComponentStarter for L1Scraper<B> {
async fn start(&mut self) -> Result<(), ComponentError> {
info!("Starting component {}.", type_name::<Self>());
self.run().await.map_err(|_| ComponentError::InternalComponentError)
}
}

#[derive(Clone, Debug, Serialize, Deserialize, Validate, PartialEq)]
pub struct L1ScraperConfig {
// TODO: make this config into trait.
Expand All @@ -120,10 +138,43 @@ impl Default for L1ScraperConfig {
}
}

impl SerializeConfig for L1ScraperConfig {
fn dump(&self) -> BTreeMap<ParamPath, SerializedParam> {
BTreeMap::from([
ser_param(
"l1_block_to_start_scraping_from",
&0,
"Last L1 block number processed by the scraper",
ParamPrivacyInput::Public,
),
ser_param(
"finality",
&0,
"Number of blocks to wait for finality",
ParamPrivacyInput::Public,
),
ser_param(
"polling_interval",
&self.polling_interval.as_secs(),
"Interval in Seconds between each scraping attempt of L1.",
ParamPrivacyInput::Public,
),
ser_param(
"chain_id",
&self.chain_id,
"The chain to follow. For more details see https://docs.starknet.io/documentation/architecture_and_concepts/Blocks/transactions/#chain-id.",
ParamPrivacyInput::Public,
),
])
}
}

#[derive(Error, Debug)]
pub enum L1ScraperError<T: BaseLayerContract + Send + Sync> {
#[error("Base layer error: {0}")]
BaseLayer(T::Error),
#[error(transparent)]
NetworkError(#[from] L1ProviderClientError),
#[error("Failed to calculate hash: {0}")]
HashCalculationError(StarknetApiError),
}
23 changes: 21 additions & 2 deletions crates/starknet_l1_provider/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub mod communication;
pub mod errors;
pub mod l1_scraper;

#[cfg(test)]
Expand All @@ -9,14 +8,21 @@ use std::collections::BTreeMap;
use std::time::Duration;

use indexmap::{IndexMap, IndexSet};
use papyrus_base_layer::constants::{
EventIdentifier,
CONSUMED_MESSAGE_TO_L1_EVENT_IDENTIFIER,
LOG_MESSAGE_TO_L2_EVENT_IDENTIFIER,
MESSAGE_TO_L2_CANCELED_EVENT_IDENTIFIER,
MESSAGE_TO_L2_CANCELLATION_STARTED_EVENT_IDENTIFIER,
};
use papyrus_config::converters::deserialize_milliseconds_to_duration;
use papyrus_config::dumping::{ser_param, SerializeConfig};
use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam};
use serde::{Deserialize, Serialize};
use starknet_api::executable_transaction::L1HandlerTransaction;
use starknet_api::transaction::TransactionHash;
use starknet_l1_provider_types::errors::L1ProviderError;
use starknet_l1_provider_types::{L1ProviderResult, ValidationStatus};
use starknet_l1_provider_types::{Event, L1ProviderResult, ValidationStatus};
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use validator::Validate;

Expand Down Expand Up @@ -69,6 +75,10 @@ impl L1Provider {
)
}

pub fn process_l1_events(&mut self, _events: Vec<Event>) -> L1ProviderResult<()> {
todo!()
}

// TODO: pending formal consensus API, guessing the API here to keep things moving.
// TODO: consider adding block number, it isn't strictly necessary, but will help debugging.
pub fn validation_start(&mut self) -> L1ProviderResult<()> {
Expand Down Expand Up @@ -213,6 +223,15 @@ impl SerializeConfig for L1ProviderConfig {
}
}

pub const fn event_identifiers_to_track() -> &'static [EventIdentifier] {
&[
LOG_MESSAGE_TO_L2_EVENT_IDENTIFIER,
CONSUMED_MESSAGE_TO_L1_EVENT_IDENTIFIER,
MESSAGE_TO_L2_CANCELLATION_STARTED_EVENT_IDENTIFIER,
MESSAGE_TO_L2_CANCELED_EVENT_IDENTIFIER,
]
}

pub fn create_l1_provider(_config: L1ProviderConfig) -> L1Provider {
L1Provider { state: ProviderState::Propose, ..Default::default() }
}
16 changes: 16 additions & 0 deletions crates/starknet_l1_provider_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ pub enum ValidationStatus {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum L1ProviderRequest {
GetTransactions(usize),
AddEvents(Vec<Event>),
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum L1ProviderResponse {
GetTransactions(L1ProviderResult<Vec<L1HandlerTransaction>>),
AddEvents(L1ProviderResult<()>),
}

/// Serves as the provider's shared interface. Requires `Send + Sync` to allow transferring and
Expand All @@ -43,6 +45,7 @@ pub enum L1ProviderResponse {
#[async_trait]
pub trait L1ProviderClient: Send + Sync {
async fn get_txs(&self, n_txs: usize) -> L1ProviderClientResult<Vec<L1HandlerTransaction>>;
async fn add_events(&self, events: Vec<Event>) -> L1ProviderClientResult<()>;
async fn validate(&self, _tx_hash: TransactionHash)
-> L1ProviderClientResult<ValidationStatus>;
}
Expand All @@ -63,6 +66,19 @@ where
L1ProviderError
)
}

#[instrument(skip(self))]
async fn add_events(&self, events: Vec<Event>) -> L1ProviderClientResult<()> {
let request = L1ProviderRequest::AddEvents(events);
let response = self.send(request).await;
handle_response_variants!(
L1ProviderResponse,
AddEvents,
L1ProviderClientError,
L1ProviderError
)
}

async fn validate(
&self,
_tx_hash: TransactionHash,
Expand Down
22 changes: 21 additions & 1 deletion crates/starknet_sequencer_node/src/components.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerContract;
use starknet_batcher::batcher::{create_batcher, Batcher};
use starknet_consensus_manager::consensus_manager::ConsensusManager;
use starknet_gateway::gateway::{create_gateway, Gateway};
use starknet_http_server::http_server::{create_http_server, HttpServer};
use starknet_l1_provider::{create_l1_provider, L1Provider};
use starknet_l1_provider::l1_scraper::L1Scraper;
use starknet_l1_provider::{create_l1_provider, event_identifiers_to_track, L1Provider};
use starknet_mempool::communication::{create_mempool, MempoolCommunicationWrapper};
use starknet_mempool_p2p::create_p2p_propagator_and_runner;
use starknet_mempool_p2p::propagator::MempoolP2pPropagator;
Expand All @@ -27,6 +29,7 @@ pub struct SequencerNodeComponents {
pub consensus_manager: Option<ConsensusManager>,
pub gateway: Option<Gateway>,
pub http_server: Option<HttpServer>,
pub l1_scraper: Option<L1Scraper<EthereumBaseLayerContract>>,
pub l1_provider: Option<L1Provider>,
pub mempool: Option<MempoolCommunicationWrapper>,
pub monitoring_endpoint: Option<MonitoringEndpoint>,
Expand Down Expand Up @@ -150,11 +153,28 @@ pub fn create_node_components(
ReactiveComponentExecutionMode::Disabled | ReactiveComponentExecutionMode::Remote => None,
};

let l1_scraper = match config.components.l1_scraper.execution_mode {
ActiveComponentExecutionMode::Enabled => {
let l1_provider_client = clients.get_l1_provider_shared_client().unwrap();
let l1_scraper_config = config.l1_scraper_config.clone();
let base_layer = EthereumBaseLayerContract::new(config.base_layer_config.clone());

Some(L1Scraper::new(
l1_scraper_config,
l1_provider_client,
base_layer,
event_identifiers_to_track(),
))
}
ActiveComponentExecutionMode::Disabled => None,
};

SequencerNodeComponents {
batcher,
consensus_manager,
gateway,
http_server,
l1_scraper,
l1_provider,
mempool,
monitoring_endpoint,
Expand Down
Loading

0 comments on commit 1a8875a

Please sign in to comment.