Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eigen-client-m0-implementation): grafana metrics #334

Open
wants to merge 8 commits into
base: eigen-client-m0-implementation
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 59 additions & 4 deletions core/node/da_clients/src/eigen/eigenda-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,27 @@ cargo install --path zkstack_cli/crates/zkstack --force --locked
zkstack containers --observability true
```

3. Create `eigen_da` chain
3. Temporary metrics setup (until `era-observabilty` changes are also merged)

a. Setup the observability container at least once so the `era-observability` directory is cloned.

```bash
zkstack containers --observability true
```

b. Add `lambda` remote to the `era-observability` project:

```bash
cd era-observability && git remote add lambda https://github.com/lambdaclass/era-observability.git
```

c. Fetch and checkout the `eigenda` branch:

```bash
git fetch lambda && git checkout eigenda
```

4. Create `eigen_da` chain

```bash
zkstack chain create \
Expand All @@ -91,7 +111,7 @@ zkstack chain create \
--set-as-default false
```

4. Initialize created ecosystem
5. Initialize created ecosystem

```bash
zkstack ecosystem init \
Expand All @@ -107,7 +127,42 @@ zkstack ecosystem init \

You may enable observability here if you want to.

5. Start the server
6. Setup grafana dashboard for Data Availability

a. Get the running port of the eigen_da chain in the `chains/eigen_da/configs/general.yaml` file:

```yaml
prometheus:
listener_port: 3414 # <- this is the port
```

(around line 108)

Then modify the `era-observability/etc/prometheus/prometheus.yml` with the retrieved port:

```yaml
- job_name: 'zksync'
scrape_interval: 5s
honor_labels: true
static_configs:
- targets: ['host.docker.internal:3312'] # <- change this to the port
```

b. Enable the Data Availability Grafana dashboard

```bash
mv era-observability/additional_dashboards/EigenDA.json era-observability/dashboards/EigenDA.json
```

c. Restart the era-observability container

```bash
docker ps --filter "label=com.docker.compose.project=era-observability" -q | xargs docker restart
```

(this can also be done through the docker dashboard)

7. Start the server

```bash
zkstack server --chain eigen_da
Expand All @@ -125,7 +180,7 @@ And with the server running on one terminal, you can run the server integration
following command:

```bash
zkstack dev test --chain eigen_da
zkstack dev test integration --chain eigen_da
```

## Mainnet/Testnet setup
Expand Down
29 changes: 13 additions & 16 deletions core/node/da_clients/src/eigen/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{str::FromStr, time::Duration};

use secp256k1::{ecdsa::RecoverableSignature, SecretKey};
use tokio::{sync::mpsc, time::Instant};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt};
use tonic::{
transport::{Channel, ClientTlsConfig, Endpoint},
Streaming,
Expand Down Expand Up @@ -37,8 +37,6 @@ pub(crate) struct RawEigenClient {
pub(crate) const DATA_CHUNK_SIZE: usize = 32;

impl RawEigenClient {
pub(crate) const BUFFER_SIZE: usize = 1000;

pub async fn new(private_key: SecretKey, config: DisperserConfig) -> anyhow::Result<Self> {
let endpoint =
Endpoint::from_str(config.disperser_rpc.as_str())?.tls_config(ClientTlsConfig::new())?;
Expand Down Expand Up @@ -119,24 +117,25 @@ impl RawEigenClient {

async fn dispatch_blob_authenticated(&self, data: Vec<u8>) -> anyhow::Result<String> {
let mut client_clone = self.client.clone();
let (tx, rx) = mpsc::channel(Self::BUFFER_SIZE);
let (tx, rx) = mpsc::unbounded_channel();

let disperse_time = Instant::now();
let response_stream = client_clone.disperse_blob_authenticated(ReceiverStream::new(rx));
let response_stream =
client_clone.disperse_blob_authenticated(UnboundedReceiverStream::new(rx));
let padded_data = convert_by_padding_empty_byte(&data);

// 1. send DisperseBlobRequest
self.disperse_data(padded_data, &tx).await?;
self.disperse_data(padded_data, &tx)?;

// this await is blocked until the first response on the stream, so we only await after sending the `DisperseBlobRequest`
let mut response_stream = response_stream.await?.into_inner();
let mut response_stream = response_stream.await?;
let response_stream = response_stream.get_mut();

// 2. receive BlobAuthHeader
let blob_auth_header = self.receive_blob_auth_header(&mut response_stream).await?;
let blob_auth_header = self.receive_blob_auth_header(response_stream).await?;

// 3. sign and send BlobAuthHeader
self.submit_authentication_data(blob_auth_header.clone(), &tx)
.await?;
self.submit_authentication_data(blob_auth_header.clone(), &tx)?;

// 4. receive DisperseBlobReply
let reply = response_stream
Expand Down Expand Up @@ -183,10 +182,10 @@ impl RawEigenClient {
}
}

async fn disperse_data(
fn disperse_data(
&self,
data: Vec<u8>,
tx: &mpsc::Sender<disperser::AuthenticatedRequest>,
tx: &mpsc::UnboundedSender<disperser::AuthenticatedRequest>,
) -> anyhow::Result<()> {
let req = disperser::AuthenticatedRequest {
payload: Some(DisperseRequest(disperser::DisperseBlobRequest {
Expand All @@ -197,14 +196,13 @@ impl RawEigenClient {
};

tx.send(req)
.await
.map_err(|e| anyhow::anyhow!("Failed to send DisperseBlobRequest: {}", e))
}

async fn submit_authentication_data(
fn submit_authentication_data(
&self,
blob_auth_header: BlobAuthHeader,
tx: &mpsc::Sender<disperser::AuthenticatedRequest>,
tx: &mpsc::UnboundedSender<disperser::AuthenticatedRequest>,
) -> anyhow::Result<()> {
// TODO: replace challenge_parameter with actual auth header when it is available
let digest = zksync_basic_types::web3::keccak256(
Expand All @@ -228,7 +226,6 @@ impl RawEigenClient {
};

tx.send(req)
.await
.map_err(|e| anyhow::anyhow!("Failed to send AuthenticationData: {}", e))
}

Expand Down
Loading
Loading