From 1a8875a60221ba31feb1673cc0e22a5e467e6dbd Mon Sep 17 00:00:00 2001 From: Gilad Chase Date: Fri, 20 Dec 2024 23:11:06 +0200 Subject: [PATCH] chore(starknet_l1_provider): connect l1 scraper to infra Logic changes: - added event identifiers to track into base layer constants, so that l1 provider can import them and choose to track them. - add one line in scraper for sending the events to the provider. --- config/sequencer/default_config.json | 50 ++++++++++++++++ crates/papyrus_base_layer/src/constants.rs | 10 ++++ crates/papyrus_node/src/config/pointers.rs | 1 + .../starknet_l1_provider/src/communication.rs | 13 ++++- crates/starknet_l1_provider/src/errors.rs | 9 --- crates/starknet_l1_provider/src/l1_scraper.rs | 57 ++++++++++++++++++- crates/starknet_l1_provider/src/lib.rs | 23 +++++++- crates/starknet_l1_provider_types/src/lib.rs | 16 ++++++ .../starknet_sequencer_node/src/components.rs | 22 ++++++- .../src/config/component_config.rs | 4 ++ .../src/config/node_config.rs | 5 ++ crates/starknet_sequencer_node/src/servers.rs | 20 ++++++- 12 files changed, 213 insertions(+), 17 deletions(-) delete mode 100644 crates/starknet_l1_provider/src/errors.rs diff --git a/config/sequencer/default_config.json b/config/sequencer/default_config.json index 31c75a3d44..d649b47020 100644 --- a/config/sequencer/default_config.json +++ b/config/sequencer/default_config.json @@ -439,6 +439,36 @@ "privacy": "Public", "value": "0.0.0.0:8080" }, + "components.l1_scraper.execution_mode": { + "description": "The component execution mode.", + "privacy": "Public", + "value": "Enabled" + }, + "components.l1_scraper.remote_client_config.#is_none": { + "description": "Flag for an optional field.", + "privacy": "TemporaryValue", + "value": true + }, + "components.l1_scraper.remote_client_config.idle_connections": { + "description": "The maximum number of idle connections to keep alive.", + "privacy": "Public", + "value": 18446744073709551615 + }, + "components.l1_scraper.remote_client_config.idle_timeout": { + "description": "The duration in seconds to keep an idle connection open before closing.", + "privacy": "Public", + "value": 90 + }, + "components.l1_scraper.remote_client_config.retries": { + "description": "The max number of retries for sending a message.", + "privacy": "Public", + "value": 3 + }, + "components.l1_scraper.remote_client_config.socket": { + "description": "The remote component server socket.", + "privacy": "Public", + "value": "0.0.0.0:8080" + }, "components.mempool.execution_mode": { "description": "The component execution mode.", "privacy": "Public", @@ -859,6 +889,26 @@ "privacy": "Public", "value": 100 }, + "l1_scraper_config.chain_id": { + "description": "The chain to follow. For more details see https://docs.starknet.io/documentation/architecture_and_concepts/Blocks/transactions/#chain-id.", + "pointer_target": "chain_id", + "privacy": "Public" + }, + "l1_scraper_config.finality": { + "description": "Number of blocks to wait for finality", + "privacy": "Public", + "value": 0 + }, + "l1_scraper_config.l1_block_to_start_scraping_from": { + "description": "Last L1 block number processed by the scraper", + "privacy": "Public", + "value": 0 + }, + "l1_scraper_config.polling_interval": { + "description": "Interval in Seconds between each scraping attempt of L1.", + "privacy": "Public", + "value": 1 + }, "mempool_p2p_config.network_buffer_size": { "description": "Network buffer size.", "privacy": "Public", diff --git a/crates/papyrus_base_layer/src/constants.rs b/crates/papyrus_base_layer/src/constants.rs index 74be4f72ae..c4fb044df7 100644 --- a/crates/papyrus_base_layer/src/constants.rs +++ b/crates/papyrus_base_layer/src/constants.rs @@ -1 +1,11 @@ +use alloy_sol_types::SolEvent; + +use crate::ethereum_base_layer_contract::Starknet; + pub type EventIdentifier = &'static str; + +pub const LOG_MESSAGE_TO_L2_EVENT_IDENTIFIER: &str = Starknet::LogMessageToL2::SIGNATURE; +pub const CONSUMED_MESSAGE_TO_L1_EVENT_IDENTIFIER: &str = Starknet::ConsumedMessageToL1::SIGNATURE; +pub const MESSAGE_TO_L2_CANCELLATION_STARTED_EVENT_IDENTIFIER: &str = + Starknet::MessageToL2CancellationStarted::SIGNATURE; +pub const MESSAGE_TO_L2_CANCELED_EVENT_IDENTIFIER: &str = Starknet::MessageToL2Canceled::SIGNATURE; diff --git a/crates/papyrus_node/src/config/pointers.rs b/crates/papyrus_node/src/config/pointers.rs index e1ee0ef10b..1c87e2703f 100644 --- a/crates/papyrus_node/src/config/pointers.rs +++ b/crates/papyrus_node/src/config/pointers.rs @@ -62,6 +62,7 @@ pub static CONFIG_POINTERS: LazyLock = LazyLock::new(|| { "network.chain_id", "rpc.chain_id", "storage.db_config.chain_id", + "l1_scraper_config.chain_id", ]) ), ( diff --git a/crates/starknet_l1_provider/src/communication.rs b/crates/starknet_l1_provider/src/communication.rs index ced6603500..60c335a028 100644 --- a/crates/starknet_l1_provider/src/communication.rs +++ b/crates/starknet_l1_provider/src/communication.rs @@ -5,7 +5,11 @@ use starknet_sequencer_infra::component_definitions::{ ComponentRequestAndResponseSender, ComponentRequestHandler, }; -use starknet_sequencer_infra::component_server::{LocalComponentServer, RemoteComponentServer}; +use starknet_sequencer_infra::component_server::{ + LocalComponentServer, + RemoteComponentServer, + WrapperServer, +}; use tracing::instrument; use crate::L1Provider; @@ -18,6 +22,10 @@ pub type L1ProviderRequestAndResponseSender = pub type LocalL1ProviderClient = LocalComponentClient; pub type RemoteL1ProviderClient = RemoteComponentClient; +use crate::l1_scraper::L1Scraper; + +pub type L1ScraperServer = WrapperServer>; + #[async_trait] impl ComponentRequestHandler for L1Provider { #[instrument(skip(self))] @@ -26,6 +34,9 @@ impl ComponentRequestHandler for L1Provid L1ProviderRequest::GetTransactions(n_txs) => { L1ProviderResponse::GetTransactions(self.get_txs(n_txs)) } + L1ProviderRequest::AddEvents(events) => { + L1ProviderResponse::AddEvents(self.process_l1_events(events)) + } } } } diff --git a/crates/starknet_l1_provider/src/errors.rs b/crates/starknet_l1_provider/src/errors.rs deleted file mode 100644 index 40ba30db83..0000000000 --- a/crates/starknet_l1_provider/src/errors.rs +++ /dev/null @@ -1,9 +0,0 @@ -use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerError; -use thiserror::Error; - -// TODO(Gilad): move to scraper module once it's created. -#[derive(Error, Debug)] -pub enum L1ScraperError { - #[error(transparent)] - BaseLayer(#[from] EthereumBaseLayerError), -} diff --git a/crates/starknet_l1_provider/src/l1_scraper.rs b/crates/starknet_l1_provider/src/l1_scraper.rs index e5a335c1a1..903196f219 100644 --- a/crates/starknet_l1_provider/src/l1_scraper.rs +++ b/crates/starknet_l1_provider/src/l1_scraper.rs @@ -1,17 +1,25 @@ +use std::any::type_name; +use std::collections::BTreeMap; use std::time::Duration; +use async_trait::async_trait; use papyrus_base_layer::constants::EventIdentifier; use papyrus_base_layer::{BaseLayerContract, L1Event}; use papyrus_config::converters::deserialize_seconds_to_duration; +use papyrus_config::dumping::{ser_param, SerializeConfig}; use papyrus_config::validators::validate_ascii; +use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam}; use serde::{Deserialize, Serialize}; use starknet_api::core::ChainId; use starknet_api::executable_transaction::L1HandlerTransaction as ExecutableL1HandlerTransaction; use starknet_api::StarknetApiError; +use starknet_l1_provider_types::errors::L1ProviderClientError; use starknet_l1_provider_types::{Event, SharedL1ProviderClient}; +use starknet_sequencer_infra::component_definitions::ComponentStarter; +use starknet_sequencer_infra::errors::ComponentError; use thiserror::Error; use tokio::time::sleep; -use tracing::error; +use tracing::{error, info}; use validator::Validate; type L1ScraperResult = Result>; @@ -71,11 +79,13 @@ impl L1Scraper { .map(|event| self.event_from_raw_l1_event(event)) .collect::, _>>()?; + self.l1_provider_client.add_events(events).await?; self.last_block_number_processed = latest_l1_block_number + 1; - todo!("send {events:?} to provider"); + + Ok(()) } - async fn _run(&mut self) -> L1ScraperResult<(), B> { + async fn run(&mut self) -> L1ScraperResult<(), B> { loop { sleep(self.config.polling_interval).await; // TODO: retry. @@ -98,6 +108,14 @@ impl L1Scraper { } } +#[async_trait] +impl ComponentStarter for L1Scraper { + async fn start(&mut self) -> Result<(), ComponentError> { + info!("Starting component {}.", type_name::()); + self.run().await.map_err(|_| ComponentError::InternalComponentError) + } +} + #[derive(Clone, Debug, Serialize, Deserialize, Validate, PartialEq)] pub struct L1ScraperConfig { // TODO: make this config into trait. @@ -120,10 +138,43 @@ impl Default for L1ScraperConfig { } } +impl SerializeConfig for L1ScraperConfig { + fn dump(&self) -> BTreeMap { + BTreeMap::from([ + ser_param( + "l1_block_to_start_scraping_from", + &0, + "Last L1 block number processed by the scraper", + ParamPrivacyInput::Public, + ), + ser_param( + "finality", + &0, + "Number of blocks to wait for finality", + ParamPrivacyInput::Public, + ), + ser_param( + "polling_interval", + &self.polling_interval.as_secs(), + "Interval in Seconds between each scraping attempt of L1.", + ParamPrivacyInput::Public, + ), + ser_param( + "chain_id", + &self.chain_id, + "The chain to follow. For more details see https://docs.starknet.io/documentation/architecture_and_concepts/Blocks/transactions/#chain-id.", + ParamPrivacyInput::Public, + ), + ]) + } +} + #[derive(Error, Debug)] pub enum L1ScraperError { #[error("Base layer error: {0}")] BaseLayer(T::Error), + #[error(transparent)] + NetworkError(#[from] L1ProviderClientError), #[error("Failed to calculate hash: {0}")] HashCalculationError(StarknetApiError), } diff --git a/crates/starknet_l1_provider/src/lib.rs b/crates/starknet_l1_provider/src/lib.rs index 4ce40abb66..97c5bbc686 100644 --- a/crates/starknet_l1_provider/src/lib.rs +++ b/crates/starknet_l1_provider/src/lib.rs @@ -1,5 +1,4 @@ pub mod communication; -pub mod errors; pub mod l1_scraper; #[cfg(test)] @@ -9,6 +8,13 @@ use std::collections::BTreeMap; use std::time::Duration; use indexmap::{IndexMap, IndexSet}; +use papyrus_base_layer::constants::{ + EventIdentifier, + CONSUMED_MESSAGE_TO_L1_EVENT_IDENTIFIER, + LOG_MESSAGE_TO_L2_EVENT_IDENTIFIER, + MESSAGE_TO_L2_CANCELED_EVENT_IDENTIFIER, + MESSAGE_TO_L2_CANCELLATION_STARTED_EVENT_IDENTIFIER, +}; use papyrus_config::converters::deserialize_milliseconds_to_duration; use papyrus_config::dumping::{ser_param, SerializeConfig}; use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam}; @@ -16,7 +22,7 @@ use serde::{Deserialize, Serialize}; use starknet_api::executable_transaction::L1HandlerTransaction; use starknet_api::transaction::TransactionHash; use starknet_l1_provider_types::errors::L1ProviderError; -use starknet_l1_provider_types::{L1ProviderResult, ValidationStatus}; +use starknet_l1_provider_types::{Event, L1ProviderResult, ValidationStatus}; use starknet_sequencer_infra::component_definitions::ComponentStarter; use validator::Validate; @@ -69,6 +75,10 @@ impl L1Provider { ) } + pub fn process_l1_events(&mut self, _events: Vec) -> L1ProviderResult<()> { + todo!() + } + // TODO: pending formal consensus API, guessing the API here to keep things moving. // TODO: consider adding block number, it isn't strictly necessary, but will help debugging. pub fn validation_start(&mut self) -> L1ProviderResult<()> { @@ -213,6 +223,15 @@ impl SerializeConfig for L1ProviderConfig { } } +pub const fn event_identifiers_to_track() -> &'static [EventIdentifier] { + &[ + LOG_MESSAGE_TO_L2_EVENT_IDENTIFIER, + CONSUMED_MESSAGE_TO_L1_EVENT_IDENTIFIER, + MESSAGE_TO_L2_CANCELLATION_STARTED_EVENT_IDENTIFIER, + MESSAGE_TO_L2_CANCELED_EVENT_IDENTIFIER, + ] +} + pub fn create_l1_provider(_config: L1ProviderConfig) -> L1Provider { L1Provider { state: ProviderState::Propose, ..Default::default() } } diff --git a/crates/starknet_l1_provider_types/src/lib.rs b/crates/starknet_l1_provider_types/src/lib.rs index 541fab1c91..c2823edf71 100644 --- a/crates/starknet_l1_provider_types/src/lib.rs +++ b/crates/starknet_l1_provider_types/src/lib.rs @@ -30,11 +30,13 @@ pub enum ValidationStatus { #[derive(Clone, Debug, Serialize, Deserialize)] pub enum L1ProviderRequest { GetTransactions(usize), + AddEvents(Vec), } #[derive(Clone, Debug, Serialize, Deserialize)] pub enum L1ProviderResponse { GetTransactions(L1ProviderResult>), + AddEvents(L1ProviderResult<()>), } /// Serves as the provider's shared interface. Requires `Send + Sync` to allow transferring and @@ -43,6 +45,7 @@ pub enum L1ProviderResponse { #[async_trait] pub trait L1ProviderClient: Send + Sync { async fn get_txs(&self, n_txs: usize) -> L1ProviderClientResult>; + async fn add_events(&self, events: Vec) -> L1ProviderClientResult<()>; async fn validate(&self, _tx_hash: TransactionHash) -> L1ProviderClientResult; } @@ -63,6 +66,19 @@ where L1ProviderError ) } + + #[instrument(skip(self))] + async fn add_events(&self, events: Vec) -> L1ProviderClientResult<()> { + let request = L1ProviderRequest::AddEvents(events); + let response = self.send(request).await; + handle_response_variants!( + L1ProviderResponse, + AddEvents, + L1ProviderClientError, + L1ProviderError + ) + } + async fn validate( &self, _tx_hash: TransactionHash, diff --git a/crates/starknet_sequencer_node/src/components.rs b/crates/starknet_sequencer_node/src/components.rs index 82ca63000b..2821364518 100644 --- a/crates/starknet_sequencer_node/src/components.rs +++ b/crates/starknet_sequencer_node/src/components.rs @@ -1,8 +1,10 @@ +use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerContract; use starknet_batcher::batcher::{create_batcher, Batcher}; use starknet_consensus_manager::consensus_manager::ConsensusManager; use starknet_gateway::gateway::{create_gateway, Gateway}; use starknet_http_server::http_server::{create_http_server, HttpServer}; -use starknet_l1_provider::{create_l1_provider, L1Provider}; +use starknet_l1_provider::l1_scraper::L1Scraper; +use starknet_l1_provider::{create_l1_provider, event_identifiers_to_track, L1Provider}; use starknet_mempool::communication::{create_mempool, MempoolCommunicationWrapper}; use starknet_mempool_p2p::create_p2p_propagator_and_runner; use starknet_mempool_p2p::propagator::MempoolP2pPropagator; @@ -27,6 +29,7 @@ pub struct SequencerNodeComponents { pub consensus_manager: Option, pub gateway: Option, pub http_server: Option, + pub l1_scraper: Option>, pub l1_provider: Option, pub mempool: Option, pub monitoring_endpoint: Option, @@ -150,11 +153,28 @@ pub fn create_node_components( ReactiveComponentExecutionMode::Disabled | ReactiveComponentExecutionMode::Remote => None, }; + let l1_scraper = match config.components.l1_scraper.execution_mode { + ActiveComponentExecutionMode::Enabled => { + let l1_provider_client = clients.get_l1_provider_shared_client().unwrap(); + let l1_scraper_config = config.l1_scraper_config.clone(); + let base_layer = EthereumBaseLayerContract::new(config.base_layer_config.clone()); + + Some(L1Scraper::new( + l1_scraper_config, + l1_provider_client, + base_layer, + event_identifiers_to_track(), + )) + } + ActiveComponentExecutionMode::Disabled => None, + }; + SequencerNodeComponents { batcher, consensus_manager, gateway, http_server, + l1_scraper, l1_provider, mempool, monitoring_endpoint, diff --git a/crates/starknet_sequencer_node/src/config/component_config.rs b/crates/starknet_sequencer_node/src/config/component_config.rs index b18a046e8c..04538d8afd 100644 --- a/crates/starknet_sequencer_node/src/config/component_config.rs +++ b/crates/starknet_sequencer_node/src/config/component_config.rs @@ -33,6 +33,8 @@ pub struct ComponentConfig { #[validate] pub http_server: ActiveComponentExecutionConfig, #[validate] + pub l1_scraper: ActiveComponentExecutionConfig, + #[validate] pub monitoring_endpoint: ActiveComponentExecutionConfig, } @@ -45,6 +47,7 @@ impl SerializeConfig for ComponentConfig { append_sub_config_name(self.http_server.dump(), "http_server"), append_sub_config_name(self.mempool.dump(), "mempool"), append_sub_config_name(self.l1_provider.dump(), "l1_provider"), + append_sub_config_name(self.l1_scraper.dump(), "l1_scraper"), append_sub_config_name(self.mempool_p2p.dump(), "mempool_p2p"), append_sub_config_name(self.monitoring_endpoint.dump(), "monitoring_endpoint"), append_sub_config_name(self.state_sync.dump(), "state_sync"), @@ -64,6 +67,7 @@ impl ComponentConfig { mempool_p2p: ReactiveComponentExecutionConfig::disabled(), state_sync: ReactiveComponentExecutionConfig::disabled(), l1_provider: ReactiveComponentExecutionConfig::disabled(), + l1_scraper: ActiveComponentExecutionConfig::default(), consensus_manager: ActiveComponentExecutionConfig::disabled(), http_server: ActiveComponentExecutionConfig::disabled(), monitoring_endpoint: ActiveComponentExecutionConfig::disabled(), diff --git a/crates/starknet_sequencer_node/src/config/node_config.rs b/crates/starknet_sequencer_node/src/config/node_config.rs index c6ac538764..4f8b7181dd 100644 --- a/crates/starknet_sequencer_node/src/config/node_config.rs +++ b/crates/starknet_sequencer_node/src/config/node_config.rs @@ -24,6 +24,7 @@ use starknet_batcher::VersionedConstantsOverrides; use starknet_consensus_manager::config::ConsensusManagerConfig; use starknet_gateway::config::{GatewayConfig, RpcStateReaderConfig}; use starknet_http_server::config::HttpServerConfig; +use starknet_l1_provider::l1_scraper::L1ScraperConfig; use starknet_l1_provider::L1ProviderConfig; use starknet_mempool_p2p::config::MempoolP2pConfig; use starknet_monitoring_endpoint::config::MonitoringEndpointConfig; @@ -49,6 +50,7 @@ pub static CONFIG_POINTERS: LazyLock = LazyLock::new(|| { set_pointing_param_paths(&[ "batcher_config.block_builder_config.chain_info.chain_id", "batcher_config.storage.db_config.chain_id", + "l1_scraper_config.chain_id", "consensus_manager_config.consensus_config.chain_id", "consensus_manager_config.consensus_config.network_config.chain_id", "gateway_config.chain_info.chain_id", @@ -129,6 +131,8 @@ pub struct SequencerNodeConfig { #[validate] pub l1_provider_config: L1ProviderConfig, #[validate] + pub l1_scraper_config: L1ScraperConfig, + #[validate] pub mempool_p2p_config: MempoolP2pConfig, #[validate] pub monitoring_endpoint_config: MonitoringEndpointConfig, @@ -157,6 +161,7 @@ impl SerializeConfig for SequencerNodeConfig { ), append_sub_config_name(self.state_sync_config.dump(), "state_sync_config"), append_sub_config_name(self.l1_provider_config.dump(), "l1_provider_config"), + append_sub_config_name(self.l1_scraper_config.dump(), "l1_scraper_config"), ]; sub_configs.into_iter().flatten().collect() diff --git a/crates/starknet_sequencer_node/src/servers.rs b/crates/starknet_sequencer_node/src/servers.rs index 42f2a0bbab..2734bdc380 100644 --- a/crates/starknet_sequencer_node/src/servers.rs +++ b/crates/starknet_sequencer_node/src/servers.rs @@ -2,11 +2,16 @@ use std::future::pending; use std::pin::Pin; use futures::{Future, FutureExt}; +use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerContract; use starknet_batcher::communication::{LocalBatcherServer, RemoteBatcherServer}; use starknet_consensus_manager::communication::ConsensusManagerServer; use starknet_gateway::communication::{LocalGatewayServer, RemoteGatewayServer}; use starknet_http_server::communication::HttpServer; -use starknet_l1_provider::communication::{LocalL1ProviderServer, RemoteL1ProviderServer}; +use starknet_l1_provider::communication::{ + L1ScraperServer, + LocalL1ProviderServer, + RemoteL1ProviderServer, +}; use starknet_mempool::communication::{LocalMempoolServer, RemoteMempoolServer}; use starknet_mempool_p2p::propagator::{ LocalMempoolP2pPropagatorServer, @@ -48,6 +53,7 @@ struct LocalServers { struct WrapperServers { pub(crate) consensus_manager: Option>, pub(crate) http_server: Option>, + pub(crate) l1_scraper: Option>>, pub(crate) monitoring_endpoint: Option>, pub(crate) mempool_p2p_runner: Option>, pub(crate) state_sync_runner: Option>, @@ -318,11 +324,15 @@ fn create_wrapper_servers( &config.components.consensus_manager.execution_mode, components.consensus_manager ); + let http_server = create_wrapper_server!( &config.components.http_server.execution_mode, components.http_server ); + let l1_scraper_server = + create_wrapper_server!(&config.components.l1_scraper.execution_mode, components.l1_scraper); + let monitoring_endpoint_server = create_wrapper_server!( &config.components.monitoring_endpoint.execution_mode, components.monitoring_endpoint @@ -340,6 +350,7 @@ fn create_wrapper_servers( WrapperServers { consensus_manager: consensus_manager_server, http_server, + l1_scraper: l1_scraper_server, monitoring_endpoint: monitoring_endpoint_server, mempool_p2p_runner: mempool_p2p_runner_server, state_sync_runner: state_sync_runner_server, @@ -399,6 +410,9 @@ pub async fn run_component_servers(servers: SequencerNodeServers) -> anyhow::Res // StateSyncRunner server. let state_sync_runner_future = get_server_future(servers.wrapper_servers.state_sync_runner); + // L1Scraper server. + let l1_scraper_server = get_server_future(servers.wrapper_servers.l1_scraper); + // L1Provider server. let local_l1_provider_future = get_server_future(servers.local_servers.l1_provider); let remote_l1_provider_future = get_server_future(servers.remote_servers.l1_provider); @@ -483,6 +497,10 @@ pub async fn run_component_servers(servers: SequencerNodeServers) -> anyhow::Res error!("State Sync Runner Server stopped."); res? } + res = l1_scraper_server => { + error!("L1 Scraper stopped."); + res + } res = local_l1_provider_handle => { error!("Local L1 Provider Server stopped."); res?