diff --git a/core/node/da_clients/src/eigen/eigenda-integration.md b/core/node/da_clients/src/eigen/eigenda-integration.md index 3dbe81e09525..985881d3f6c0 100644 --- a/core/node/da_clients/src/eigen/eigenda-integration.md +++ b/core/node/da_clients/src/eigen/eigenda-integration.md @@ -76,7 +76,27 @@ cargo install --path zkstack_cli/crates/zkstack --force --locked zkstack containers --observability true ``` -3. Create `eigen_da` chain +3. Temporary metrics setup (until `era-observabilty` changes are also merged) + +a. Setup the observability container at least once so the `era-observability` directory is cloned. + +```bash +zkstack containers --observability true +``` + +b. Add `lambda` remote to the `era-observability` project: + +```bash +cd era-observability && git remote add lambda https://github.com/lambdaclass/era-observability.git +``` + +c. Fetch and checkout the `eigenda` branch: + +```bash +git fetch lambda && git checkout eigenda +``` + +4. Create `eigen_da` chain ```bash zkstack chain create \ @@ -91,7 +111,7 @@ zkstack chain create \ --set-as-default false ``` -4. Initialize created ecosystem +5. Initialize created ecosystem ```bash zkstack ecosystem init \ @@ -107,7 +127,42 @@ zkstack ecosystem init \ You may enable observability here if you want to. -5. Start the server +6. Setup grafana dashboard for Data Availability + +a. Get the running port of the eigen_da chain in the `chains/eigen_da/configs/general.yaml` file: + +```yaml +prometheus: + listener_port: 3414 # <- this is the port +``` + +(around line 108) + +Then modify the `era-observability/etc/prometheus/prometheus.yml` with the retrieved port: + +```yaml +- job_name: 'zksync' + scrape_interval: 5s + honor_labels: true + static_configs: + - targets: ['host.docker.internal:3312'] # <- change this to the port +``` + +b. Enable the Data Availability Grafana dashboard + +```bash +mv era-observability/additional_dashboards/EigenDA.json era-observability/dashboards/EigenDA.json +``` + +c. Restart the era-observability container + +```bash +docker ps --filter "label=com.docker.compose.project=era-observability" -q | xargs docker restart +``` + +(this can also be done through the docker dashboard) + +7. Start the server ```bash zkstack server --chain eigen_da @@ -125,7 +180,7 @@ And with the server running on one terminal, you can run the server integration following command: ```bash -zkstack dev test --chain eigen_da +zkstack dev test integration --chain eigen_da ``` ## Mainnet/Testnet setup diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index b7ae37260ff3..d4b74f004a5e 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -2,7 +2,7 @@ use std::{str::FromStr, time::Duration}; use secp256k1::{ecdsa::RecoverableSignature, SecretKey}; use tokio::{sync::mpsc, time::Instant}; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; use tonic::{ transport::{Channel, ClientTlsConfig, Endpoint}, Streaming, @@ -37,8 +37,6 @@ pub(crate) struct RawEigenClient { pub(crate) const DATA_CHUNK_SIZE: usize = 32; impl RawEigenClient { - pub(crate) const BUFFER_SIZE: usize = 1000; - pub async fn new(private_key: SecretKey, config: DisperserConfig) -> anyhow::Result { let endpoint = Endpoint::from_str(config.disperser_rpc.as_str())?.tls_config(ClientTlsConfig::new())?; @@ -119,24 +117,25 @@ impl RawEigenClient { async fn dispatch_blob_authenticated(&self, data: Vec) -> anyhow::Result { let mut client_clone = self.client.clone(); - let (tx, rx) = mpsc::channel(Self::BUFFER_SIZE); + let (tx, rx) = mpsc::unbounded_channel(); let disperse_time = Instant::now(); - let response_stream = client_clone.disperse_blob_authenticated(ReceiverStream::new(rx)); + let response_stream = + client_clone.disperse_blob_authenticated(UnboundedReceiverStream::new(rx)); let padded_data = convert_by_padding_empty_byte(&data); // 1. send DisperseBlobRequest - self.disperse_data(padded_data, &tx).await?; + self.disperse_data(padded_data, &tx)?; // this await is blocked until the first response on the stream, so we only await after sending the `DisperseBlobRequest` - let mut response_stream = response_stream.await?.into_inner(); + let mut response_stream = response_stream.await?; + let response_stream = response_stream.get_mut(); // 2. receive BlobAuthHeader - let blob_auth_header = self.receive_blob_auth_header(&mut response_stream).await?; + let blob_auth_header = self.receive_blob_auth_header(response_stream).await?; // 3. sign and send BlobAuthHeader - self.submit_authentication_data(blob_auth_header.clone(), &tx) - .await?; + self.submit_authentication_data(blob_auth_header.clone(), &tx)?; // 4. receive DisperseBlobReply let reply = response_stream @@ -183,10 +182,10 @@ impl RawEigenClient { } } - async fn disperse_data( + fn disperse_data( &self, data: Vec, - tx: &mpsc::Sender, + tx: &mpsc::UnboundedSender, ) -> anyhow::Result<()> { let req = disperser::AuthenticatedRequest { payload: Some(DisperseRequest(disperser::DisperseBlobRequest { @@ -197,14 +196,13 @@ impl RawEigenClient { }; tx.send(req) - .await .map_err(|e| anyhow::anyhow!("Failed to send DisperseBlobRequest: {}", e)) } - async fn submit_authentication_data( + fn submit_authentication_data( &self, blob_auth_header: BlobAuthHeader, - tx: &mpsc::Sender, + tx: &mpsc::UnboundedSender, ) -> anyhow::Result<()> { // TODO: replace challenge_parameter with actual auth header when it is available let digest = zksync_basic_types::web3::keccak256( @@ -228,7 +226,6 @@ impl RawEigenClient { }; tx.send(req) - .await .map_err(|e| anyhow::anyhow!("Failed to send AuthenticationData: {}", e)) } diff --git a/core/node/da_dispatcher/src/da_dispatcher.rs b/core/node/da_dispatcher/src/da_dispatcher.rs index 52d2168c1cf7..f10b54db7dba 100644 --- a/core/node/da_dispatcher/src/da_dispatcher.rs +++ b/core/node/da_dispatcher/src/da_dispatcher.rs @@ -4,7 +4,14 @@ use anyhow::Context; use chrono::Utc; use futures::future::join_all; use rand::Rng; -use tokio::sync::{mpsc, watch::Receiver, Mutex, Notify}; +use tokio::{ + sync::{ + mpsc, + watch::{self, Receiver}, + Mutex, Notify, + }, + task::JoinSet, +}; use zksync_config::{configs::da_dispatcher::DEFAULT_MAX_CONCURRENT_REQUESTS, DADispatcherConfig}; use zksync_da_client::{ types::{DAError, InclusionData}, @@ -68,6 +75,7 @@ impl DataAvailabilityDispatcher { .max_concurrent_requests .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, ); + let (shutdown_tx, mut shutdown_rx) = watch::channel(false); let next_expected_batch = Arc::new(Mutex::new(None)); @@ -75,7 +83,9 @@ impl DataAvailabilityDispatcher { let pool_clone = self.pool.clone(); let config_clone = self.config.clone(); let next_expected_batch_clone = next_expected_batch.clone(); - let pending_blobs_reader = tokio::spawn(async move { + let mut dispatcher_tasks = JoinSet::new(); + // This task reads pending blocks from the database + dispatcher_tasks.spawn(async move { // Used to avoid sending the same batch multiple times let mut pending_batches = HashSet::new(); loop { @@ -83,7 +93,6 @@ impl DataAvailabilityDispatcher { tracing::info!("Stop signal received, da_dispatcher is shutting down"); break; } - let mut conn = pool_clone.connection_tagged("da_dispatcher").await?; let batches = conn .data_availability_dal() @@ -96,7 +105,6 @@ impl DataAvailabilityDispatcher { if pending_batches.contains(&batch.l1_batch_number.0) { continue; } - // This should only happen once. // We can't assume that the first batch is always 1 because the dispatcher can be restarted // and resume from a different batch. @@ -120,99 +128,139 @@ impl DataAvailabilityDispatcher { let client = self.client.clone(); let request_semaphore = self.request_semaphore.clone(); let notifier = Arc::new(Notify::new()); - let pending_blobs_sender = tokio::spawn(async move { - let mut spawned_requests = vec![]; + // This task sends blobs to the dispatcher + dispatcher_tasks.spawn(async move { + let mut spawned_requests = JoinSet::new(); let notifier = notifier.clone(); loop { if *stop_receiver.borrow() { break; } - - let batch = match rx.recv().await { - Some(batch) => batch, - None => continue, // Should never happen - }; - - // Block until we can send the request - let permit = request_semaphore.clone().acquire_owned().await?; - - let client = client.clone(); - let pool = pool.clone(); - let config = config.clone(); - let next_expected_batch = next_expected_batch.clone(); - let notifier = notifier.clone(); - let request = tokio::spawn(async move { - let _permit = permit; // move permit into scope - let dispatch_latency = METRICS.blob_dispatch_latency.start(); - let dispatch_response = - retry(config.max_retries(), batch.l1_batch_number, || { - client.dispatch_blob(batch.l1_batch_number.0, batch.pubdata.clone()) - }) - .await - .with_context(|| { - format!( - "failed to dispatch a blob with batch_number: {}, pubdata_len: {}", - batch.l1_batch_number, - batch.pubdata.len() - ) - })?; - let dispatch_latency_duration = dispatch_latency.observe(); - - let sent_at = Utc::now().naive_utc(); - - // Before saving the blob in the database, we need to be sure that we are doing it - // in the correct order. - while next_expected_batch - .lock() - .await - .map_or(true, |next_expected_batch| { - batch.l1_batch_number > next_expected_batch - }) - { - notifier.clone().notified().await; + tokio::select! { + Some(batch) = rx.recv() => { + let permit = request_semaphore.clone().acquire_owned().await?; + let client = client.clone(); + let pool = pool.clone(); + let config = config.clone(); + let next_expected_batch = next_expected_batch.clone(); + let notifier = notifier.clone(); + let shutdown_tx = shutdown_tx.clone(); + let shutdown_rx = shutdown_rx.clone(); + spawned_requests.spawn(async move { + let _permit = permit; // move permit into scope + let dispatch_latency = METRICS.blob_dispatch_latency.start(); + let result = retry(config.max_retries(), batch.l1_batch_number, || { + client.dispatch_blob(batch.l1_batch_number.0, batch.pubdata.clone()) + }) + .await; + if result.is_err() { + shutdown_tx.send(true)?; + notifier.notify_waiters(); + }; + let dispatch_response = + result + .with_context(|| { + format!( + "failed to dispatch a blob with batch_number: {}, pubdata_len: {}", + batch.l1_batch_number, + batch.pubdata.len() + ) + })?; + let dispatch_latency_duration = dispatch_latency.observe(); + + let sent_at = Utc::now().naive_utc(); + + // Before saving the blob in the database, we need to be sure that we are doing it + // in the correct order. + while next_expected_batch + .lock() + .await + .map_or(true, |next_expected_batch| { + batch.l1_batch_number > next_expected_batch + }) + { + if *shutdown_rx.borrow() { + return Err(anyhow::anyhow!("Batch {} failed to disperse: Shutdown signal received", batch.l1_batch_number)); + } + notifier.clone().notified().await; + } + + let mut conn = pool.connection_tagged("da_dispatcher").await?; + conn.data_availability_dal() + .insert_l1_batch_da( + batch.l1_batch_number, + dispatch_response.blob_id.as_str(), + sent_at, + ) + .await?; + drop(conn); + + // Update the next expected batch number + next_expected_batch + .lock() + .await + .replace(batch.l1_batch_number + 1); + notifier.notify_waiters(); + + METRICS + .last_dispatched_l1_batch + .set(batch.l1_batch_number.0 as usize); + METRICS.blob_size.observe(batch.pubdata.len()); + METRICS.blobs_dispatched.inc_by(1); + METRICS.blobs_pending_dispatch.dec_by(1); + tracing::info!( + "Dispatched a DA for batch_number: {}, pubdata_size: {}, dispatch_latency: {dispatch_latency_duration:?}", + batch.l1_batch_number, + batch.pubdata.len(), + ); + Ok::<(), anyhow::Error>(()) + }); + } + // Check for shutdown signal + _ = shutdown_rx.changed() => { + if *shutdown_rx.borrow() { + println!("Shutdown signal received. Exiting message loop."); + break; + } + } + } + } + while let Some(next) = spawned_requests.join_next().await { + match next { + Ok(value) => + match value { + Ok(_) => (), + Err(err) => { + spawned_requests.shutdown().await; + return Err(err.into()); + } + } + , + Err(err) => { + spawned_requests.shutdown().await; + return Err(err.into()); } - - let mut conn = pool.connection_tagged("da_dispatcher").await?; - conn.data_availability_dal() - .insert_l1_batch_da( - batch.l1_batch_number, - dispatch_response.blob_id.as_str(), - sent_at, - ) - .await?; - drop(conn); - - // Update the next expected batch number - next_expected_batch - .lock() - .await - .replace(batch.l1_batch_number + 1); - notifier.notify_waiters(); - - METRICS - .last_dispatched_l1_batch - .set(batch.l1_batch_number.0 as usize); - METRICS.blob_size.observe(batch.pubdata.len()); - METRICS.blobs_dispatched.inc_by(1); - METRICS.blobs_pending_dispatch.dec_by(1); - tracing::info!( - "Dispatched a DA for batch_number: {}, pubdata_size: {}, dispatch_latency: {dispatch_latency_duration:?}", - batch.l1_batch_number, - batch.pubdata.len(), - ); - - Ok::<(), anyhow::Error>(()) - }); - spawned_requests.push(request); + } } - join_all(spawned_requests).await; Ok::<(), anyhow::Error>(()) }); - let results = join_all(vec![pending_blobs_reader, pending_blobs_sender]).await; - for result in results { - result??; + while let Some(next) = dispatcher_tasks.join_next().await { + match next { + Ok(value) => match value { + Ok(_) => (), + Err(err) => { + dispatcher_tasks.shutdown().await; + return Err(err.into()); + } + }, + Err(err) => { + dispatcher_tasks.shutdown().await; + return Err(err.into()); + } + } } + Ok(()) }