Skip to content

Commit

Permalink
Add local ingestion client
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed Oct 19, 2024
1 parent b80a6ba commit cd8fb48
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-indexer-alt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ sui-types.workspace = true
[dev-dependencies]
rand.workspace = true
wiremock.workspace = true
tempfile.workspace = true

sui-types = { workspace = true, features = ["test-utils"] }
12 changes: 12 additions & 0 deletions crates/sui-indexer-alt/src/ingestion/client.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::ingestion::local_client::LocalIngestionClient;
use crate::ingestion::remote_client::RemoteIngestionClient;
use crate::ingestion::Error as IngestionError;
use crate::ingestion::Result as IngestionResult;
use crate::metrics::IndexerMetrics;
use backoff::Error as BE;
use backoff::ExponentialBackoff;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use sui_storage::blob::Blob;
Expand Down Expand Up @@ -40,6 +42,16 @@ impl IngestionClient {
Ok(IngestionClient { client, metrics })
}

pub(crate) fn new_local(path: PathBuf, metrics: Arc<IndexerMetrics>) -> Self {
let client = Arc::new(LocalIngestionClient::new(path));
IngestionClient { client, metrics }
}

/// Repeatedly retries transient errors with an
/// exponential backoff (up to [MAX_RETRY_INTERVAL]), but will immediately return on:
///
/// - non-transient errors, which include all client errors, except timeouts and rate limiting.
/// - cancellation of the supplied `cancel` token.
pub(crate) async fn fetch(
&self,
checkpoint: CheckpointSequenceNumber,
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-indexer-alt/src/ingestion/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@ pub enum Error {

#[error("Shutdown signal received, stopping ingestion service")]
Cancelled,

#[error(transparent)]
IoError(#[from] std::io::Error),
}
67 changes: 67 additions & 0 deletions crates/sui-indexer-alt/src/ingestion/local_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::ingestion::client::IngestionClientTrait;
use crate::ingestion::Error as IngestionError;
use axum::body::Bytes;
use std::path::PathBuf;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;

pub struct LocalIngestionClient {
path: PathBuf,
}

impl LocalIngestionClient {
pub fn new(path: PathBuf) -> Self {
LocalIngestionClient { path }
}
}

#[async_trait::async_trait]
impl IngestionClientTrait for LocalIngestionClient {
async fn fetch(
&self,
checkpoint: CheckpointSequenceNumber,
) -> Result<Bytes, backoff::Error<IngestionError>> {
let path = self.path.join(format!("{}.chk", checkpoint));
let bytes = tokio::fs::read(path).await.map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
backoff::Error::permanent(IngestionError::NotFound(checkpoint))
} else {
backoff::Error::transient(IngestionError::IoError(e))
}
})?;
Ok(Bytes::from(bytes))
}
}

#[cfg(test)]
pub(crate) mod tests {
use crate::ingestion::client::IngestionClient;
use crate::ingestion::remote_client::tests::test_checkpoint_data;
use crate::metrics::tests::test_metrics;
use std::sync::Arc;
use sui_storage::blob::{Blob, BlobEncoding};
use tokio_util::sync::CancellationToken;

#[tokio::test]
async fn local_test_fetch() {
let tempdir = tempfile::tempdir().unwrap().into_path();
let path = tempdir.join("1.chk");
let test_checkpoint = test_checkpoint_data(1);
tokio::fs::write(&path, &test_checkpoint).await.unwrap();

let metrics = Arc::new(test_metrics());
let local_client = IngestionClient::new_local(tempdir, metrics);
let checkpoint = local_client
.fetch(1, &CancellationToken::new())
.await
.unwrap();
assert_eq!(
Blob::encode(&*checkpoint, BlobEncoding::Bcs)
.unwrap()
.to_bytes(),
test_checkpoint
);
}
}
23 changes: 18 additions & 5 deletions crates/sui-indexer-alt/src/ingestion/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{sync::Arc, time::Duration};

use crate::ingestion::client::IngestionClient;
use crate::ingestion::error::{Error, Result};
use crate::metrics::IndexerMetrics;
use backoff::backoff::Constant;
use futures::{future::try_join_all, stream, StreamExt, TryStreamExt};
use mysten_metrics::spawn_monitored_task;
use std::path::PathBuf;
use std::{sync::Arc, time::Duration};
use sui_types::full_checkpoint_content::CheckpointData;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;
Expand All @@ -17,6 +17,7 @@ use url::Url;

mod client;
pub mod error;
mod local_client;
mod remote_client;

pub struct IngestionService {
Expand All @@ -31,7 +32,11 @@ pub struct IngestionService {
pub struct IngestionConfig {
/// Remote Store to fetch checkpoints from.
#[arg(long)]
remote_store_url: Url,
remote_store_url: Option<Url>,

/// Path to the local ingestion directory.
#[arg(long)]
local_ingestion_path: Option<PathBuf>,

/// Maximum size of checkpoint backlog across all workers downstream of the ingestion service.
#[arg(long, default_value_t = 5000)]
Expand All @@ -57,7 +62,14 @@ impl IngestionService {
metrics: Arc<IndexerMetrics>,
cancel: CancellationToken,
) -> Result<Self> {
let client = IngestionClient::new_remote(config.remote_store_url.clone(), metrics.clone())?;
// TODO: Potentially support a hybrid mode where we can fetch from both local and remote.
let client = if let Some(url) = config.remote_store_url.as_ref() {
IngestionClient::new_remote(url.clone(), metrics.clone())?
} else if let Some(path) = config.local_ingestion_path.as_ref() {
IngestionClient::new_local(path.clone(), metrics.clone())
} else {
panic!("Either remote_store_url or local_ingestion_path must be provided");
};
let subscribers = Vec::new();
Ok(Self {
client,
Expand Down Expand Up @@ -202,7 +214,8 @@ mod tests {
) -> IngestionService {
IngestionService::new(
IngestionConfig {
remote_store_url: Url::parse(&uri).unwrap(),
remote_store_url: Some(Url::parse(&uri).unwrap()),
local_ingestion_path: None,
buffer_size,
concurrency,
retry_interval: Duration::from_millis(200),
Expand Down
31 changes: 14 additions & 17 deletions crates/sui-indexer-alt/src/ingestion/remote_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@ impl RemoteIngestionClient {

#[async_trait::async_trait]
impl IngestionClientTrait for RemoteIngestionClient {
/// Fetch a checkpoint from the remote store. Repeatedly retries transient errors with an
/// exponential backoff (up to [MAX_RETRY_INTERVAL]), but will immediately return on:
///
/// - non-transient errors, which include all client errors, except timeouts and rate limiting.
/// - cancellation of the supplied `cancel` token.
/// Fetch a checkpoint from the remote store.
///
/// Transient errors include:
///
Expand Down Expand Up @@ -108,12 +104,14 @@ impl IngestionClientTrait for RemoteIngestionClient {

#[cfg(test)]
pub(crate) mod tests {
use std::sync::Mutex;

use super::*;
use crate::ingestion::client::IngestionClient;
use crate::metrics::tests::test_metrics;
use axum::http::StatusCode;
use rand::{rngs::StdRng, SeedableRng};
use sui_storage::blob::BlobEncoding;
use std::sync::Mutex;
use sui_storage::blob::{Blob, BlobEncoding};
use sui_types::full_checkpoint_content::CheckpointData;
use sui_types::{
crypto::KeypairTraits,
gas::GasCostSummary,
Expand All @@ -124,13 +122,12 @@ pub(crate) mod tests {
supported_protocol_versions::ProtocolConfig,
utils::make_committee_key,
};
use tokio_util::sync::CancellationToken;
use wiremock::{
matchers::{method, path_regex},
Mock, MockServer, Request, Respond, ResponseTemplate,
};

use super::*;

const RNG_SEED: [u8; 32] = [
21, 23, 199, 200, 234, 250, 252, 178, 94, 15, 202, 178, 62, 186, 88, 137, 233, 192, 130,
157, 179, 179, 65, 9, 31, 249, 221, 123, 225, 112, 199, 247,
Expand Down Expand Up @@ -185,7 +182,7 @@ pub(crate) mod tests {
.to_bytes()
}

fn test_client(uri: String) -> IngestionClient {
fn remote_test_client(uri: String) -> IngestionClient {
IngestionClient::new_remote(Url::parse(&uri).unwrap(), Arc::new(test_metrics())).unwrap()
}

Expand All @@ -194,7 +191,7 @@ pub(crate) mod tests {
let server = MockServer::start().await;
respond_with(&server, status(StatusCode::NOT_FOUND)).await;

let client = test_client(server.uri());
let client = remote_test_client(server.uri());
let error = client
.fetch(42, &CancellationToken::new())
.await
Expand All @@ -208,7 +205,7 @@ pub(crate) mod tests {
let server = MockServer::start().await;
respond_with(&server, status(StatusCode::IM_A_TEAPOT)).await;

let client = test_client(server.uri());
let client = remote_test_client(server.uri());
let error = client
.fetch(42, &CancellationToken::new())
.await
Expand Down Expand Up @@ -243,7 +240,7 @@ pub(crate) mod tests {
})
.await;

let client = test_client(server.uri());
let client = remote_test_client(server.uri());
let error = client.fetch(42, &cancel.clone()).await.unwrap_err();

assert!(matches!(error, Error::Cancelled));
Expand Down Expand Up @@ -276,7 +273,7 @@ pub(crate) mod tests {
})
.await;

let client = test_client(server.uri());
let client = remote_test_client(server.uri());
let error = client
.fetch(42, &CancellationToken::new())
.await
Expand Down Expand Up @@ -307,7 +304,7 @@ pub(crate) mod tests {
})
.await;

let client = test_client(server.uri());
let client = remote_test_client(server.uri());
let error = client
.fetch(42, &CancellationToken::new())
.await
Expand Down Expand Up @@ -336,7 +333,7 @@ pub(crate) mod tests {
})
.await;

let client = test_client(server.uri());
let client = remote_test_client(server.uri());
let checkpoint = client.fetch(42, &CancellationToken::new()).await.unwrap();
assert_eq!(42, checkpoint.checkpoint_summary.sequence_number)
}
Expand Down

0 comments on commit cd8fb48

Please sign in to comment.