From 6831e94cadcd248a61fe9e472e7bbe129cdcb224 Mon Sep 17 00:00:00 2001 From: Xun Li Date: Mon, 21 Oct 2024 09:37:29 -0700 Subject: [PATCH] Add local ingestion client to indexer-alt (#19924) ## Description This PR adds an IngestionClientTrait that abstracts over checkpoint fetching. The existing one is moved into RemoteIngestionClient. Added a new one LocalIngestionClient to read from files. The retry logic is kept in a top-level struct, which then calls into the traits. Each trait decides the type of error for various cases. ## Test plan Added a test. --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- Cargo.lock | 1 + crates/sui-indexer-alt/Cargo.toml | 1 + .../sui-indexer-alt/src/ingestion/client.rs | 399 +++--------------- crates/sui-indexer-alt/src/ingestion/error.rs | 4 +- .../src/ingestion/local_client.rs | 65 +++ crates/sui-indexer-alt/src/ingestion/mod.rs | 45 +- .../src/ingestion/remote_client.rs | 292 +++++++++++++ .../src/ingestion/test_utils.rs | 56 +++ 8 files changed, 517 insertions(+), 346 deletions(-) create mode 100644 crates/sui-indexer-alt/src/ingestion/local_client.rs create mode 100644 crates/sui-indexer-alt/src/ingestion/remote_client.rs create mode 100644 crates/sui-indexer-alt/src/ingestion/test_utils.rs diff --git a/Cargo.lock b/Cargo.lock index f05294dc4f1f3..763f789031c53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13884,6 +13884,7 @@ dependencies = [ "sui-storage", "sui-types", "telemetry-subscribers", + "tempfile", "thiserror", "tokio", "tokio-stream", diff --git a/crates/sui-indexer-alt/Cargo.toml b/crates/sui-indexer-alt/Cargo.toml index 83305df9fc2cb..c80b0b45808d5 100644 --- a/crates/sui-indexer-alt/Cargo.toml +++ b/crates/sui-indexer-alt/Cargo.toml @@ -39,5 +39,6 @@ sui-types.workspace = true [dev-dependencies] rand.workspace = true wiremock.workspace = true +tempfile.workspace = true sui-types = { workspace = true, features = ["test-utils"] } diff --git a/crates/sui-indexer-alt/src/ingestion/client.rs b/crates/sui-indexer-alt/src/ingestion/client.rs index 4edb2999d869a..b16a7c51daef1 100644 --- a/crates/sui-indexer-alt/src/ingestion/client.rs +++ b/crates/sui-indexer-alt/src/ingestion/client.rs @@ -1,133 +1,108 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{sync::Arc, time::Duration}; - +use crate::ingestion::local_client::LocalIngestionClient; +use crate::ingestion::remote_client::RemoteIngestionClient; +use crate::ingestion::Error as IngestionError; +use crate::ingestion::Result as IngestionResult; +use crate::metrics::IndexerMetrics; +use backoff::Error as BE; use backoff::ExponentialBackoff; -use reqwest::{Client, StatusCode}; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; use sui_storage::blob::Blob; use sui_types::full_checkpoint_content::CheckpointData; +use tokio_util::bytes::Bytes; use tokio_util::sync::CancellationToken; -use tracing::{debug, error}; +use tracing::debug; use url::Url; -use crate::ingestion::error::{Error, Result}; -use crate::metrics::IndexerMetrics; - /// Wait at most this long between retries for transient errors. const MAX_TRANSIENT_RETRY_INTERVAL: Duration = Duration::from_secs(60); +#[async_trait::async_trait] +pub(crate) trait IngestionClientTrait: Send + Sync { + async fn fetch(&self, checkpoint: u64) -> FetchResult; +} + +#[derive(thiserror::Error, Debug)] +pub enum FetchError { + #[error("Checkpoint not found")] + NotFound, + #[error("Failed to fetch checkpoint due to permanent error: {0}")] + Permanent(#[from] anyhow::Error), + #[error("Failed to fetch checkpoint due to {reason}: {error}")] + Transient { + reason: &'static str, + #[source] + error: anyhow::Error, + }, +} + +pub type FetchResult = Result; + #[derive(Clone)] pub(crate) struct IngestionClient { - url: Url, - client: Client, + client: Arc, /// Wrap the metrics in an `Arc` to keep copies of the client cheap. metrics: Arc, } impl IngestionClient { - pub(crate) fn new(url: Url, metrics: Arc) -> Result { - Ok(Self { - url, - client: Client::builder().build()?, - metrics, - }) + pub(crate) fn new_remote(url: Url, metrics: Arc) -> IngestionResult { + let client = Arc::new(RemoteIngestionClient::new(url)?); + Ok(IngestionClient { client, metrics }) + } + + pub(crate) fn new_local(path: PathBuf, metrics: Arc) -> Self { + let client = Arc::new(LocalIngestionClient::new(path)); + IngestionClient { client, metrics } } - /// Fetch a checkpoint from the remote store. Repeatedly retries transient errors with an - /// exponential backoff (up to [MAX_RETRY_INTERVAL]), but will immediately return on: - /// - /// - non-transient errors, which include all client errors, except timeouts and rate limiting. + /// Repeatedly retries transient errors with an exponential backoff (up to [MAX_RETRY_INTERVAL]). + /// Transient errors are either defined by the client implementation that + /// returns a `FetchError::Transient` error variant, or within this function + /// if we fail to deserialize the result as [CheckpointData]. + /// The function will immediately return on: + /// - non-transient errors determined by the client implementation, + /// This includes both the FetcherError::NotFound and FetcherError::Permanent variants. /// - cancellation of the supplied `cancel` token. - /// - /// Transient errors include: - /// - /// - failures to issue a request, (network errors, redirect issues, etc) - /// - request timeouts, - /// - rate limiting, - /// - server errors (5xx), - /// - issues getting a full response and deserializing it as [CheckpointData]. pub(crate) async fn fetch( &self, checkpoint: u64, cancel: &CancellationToken, - ) -> Result> { - // SAFETY: The path being joined is statically known to be valid. - let url = self - .url - .join(&format!("/{checkpoint}.chk")) - .expect("Unexpected invalid URL"); - + ) -> IngestionResult> { + let client = self.client.clone(); let request = move || { - let url = url.clone(); + let client = client.clone(); async move { - use backoff::Error as BE; if cancel.is_cancelled() { - return Err(BE::permanent(Error::Cancelled)); + return Err(BE::permanent(IngestionError::Cancelled)); } - let response = self.client.get(url).send().await.map_err(|e| { - self.metrics - .inc_retry(checkpoint, "request", Error::ReqwestError(e)) - })?; - - match response.status() { - code if code.is_success() => { - // Failure to extract all the bytes from the payload, or to deserialize the - // checkpoint from them is considered a transient error -- the store being - // fetched from needs to be corrected, and ingestion will keep retrying it - // until it is. - let bytes = response.bytes().await.map_err(|e| { - self.metrics - .inc_retry(checkpoint, "bytes", Error::ReqwestError(e)) - })?; - - self.metrics.total_ingested_bytes.inc_by(bytes.len() as u64); - let data: CheckpointData = Blob::from_bytes(&bytes).map_err(|e| { - self.metrics.inc_retry( - checkpoint, - "deserialization", - Error::DeserializationError(checkpoint, e), - ) - })?; - - Ok(data) - } - - // Treat 404s as a special case so we can match on this error type. - code @ StatusCode::NOT_FOUND => { - debug!(checkpoint, %code, "Checkpoint not found"); - Err(BE::permanent(Error::NotFound(checkpoint))) + let bytes = client.fetch(checkpoint).await.map_err(|err| match err { + FetchError::NotFound => BE::permanent(IngestionError::NotFound(checkpoint)), + FetchError::Permanent(error) => { + BE::permanent(IngestionError::FetchError(checkpoint, error)) } - - // Timeouts are a client error but they are usually transient. - code @ StatusCode::REQUEST_TIMEOUT => Err(self.metrics.inc_retry( + FetchError::Transient { reason, error } => self.metrics.inc_retry( checkpoint, - "timeout", - Error::HttpError(checkpoint, code), - )), - - // Rate limiting is also a client error, but the backoff will eventually widen the - // interval appropriately. - code @ StatusCode::TOO_MANY_REQUESTS => Err(self.metrics.inc_retry( - checkpoint, - "too_many_requests", - Error::HttpError(checkpoint, code), - )), + reason, + IngestionError::FetchError(checkpoint, error), + ), + })?; - // Assume that if the server is facing difficulties, it will recover eventually. - code if code.is_server_error() => Err(self.metrics.inc_retry( + self.metrics.total_ingested_bytes.inc_by(bytes.len() as u64); + let data: CheckpointData = Blob::from_bytes(&bytes).map_err(|e| { + self.metrics.inc_retry( checkpoint, - "server_error", - Error::HttpError(checkpoint, code), - )), + "deserialization", + IngestionError::DeserializationError(checkpoint, e), + ) + })?; - // For everything else, assume it's a permanent error and don't retry. - code => { - error!(checkpoint, %code, "Permanent error, giving up!"); - Err(BE::permanent(Error::HttpError(checkpoint, code))) - } - } + Ok(data) } }; @@ -178,239 +153,3 @@ impl IngestionClient { Ok(Arc::new(data)) } } - -#[cfg(test)] -pub(crate) mod tests { - use std::sync::Mutex; - - use rand::{rngs::StdRng, SeedableRng}; - use sui_storage::blob::BlobEncoding; - use sui_types::{ - crypto::KeypairTraits, - gas::GasCostSummary, - messages_checkpoint::{ - CertifiedCheckpointSummary, CheckpointContents, CheckpointSummary, - SignedCheckpointSummary, - }, - supported_protocol_versions::ProtocolConfig, - utils::make_committee_key, - }; - use wiremock::{ - matchers::{method, path_regex}, - Mock, MockServer, Request, Respond, ResponseTemplate, - }; - - use crate::metrics::tests::test_metrics; - - use super::*; - - const RNG_SEED: [u8; 32] = [ - 21, 23, 199, 200, 234, 250, 252, 178, 94, 15, 202, 178, 62, 186, 88, 137, 233, 192, 130, - 157, 179, 179, 65, 9, 31, 249, 221, 123, 225, 112, 199, 247, - ]; - - pub(crate) async fn respond_with(server: &MockServer, response: impl Respond + 'static) { - Mock::given(method("GET")) - .and(path_regex(r"/\d+.chk")) - .respond_with(response) - .mount(server) - .await; - } - - pub(crate) fn status(code: StatusCode) -> ResponseTemplate { - ResponseTemplate::new(code.as_u16()) - } - - pub(crate) fn test_checkpoint_data(cp: u64) -> Vec { - let mut rng = StdRng::from_seed(RNG_SEED); - let (keys, committee) = make_committee_key(&mut rng); - let contents = CheckpointContents::new_with_digests_only_for_tests(vec![]); - let summary = CheckpointSummary::new( - &ProtocolConfig::get_for_max_version_UNSAFE(), - 0, - cp, - 0, - &contents, - None, - GasCostSummary::default(), - None, - 0, - Vec::new(), - ); - - let sign_infos: Vec<_> = keys - .iter() - .map(|k| { - let name = k.public().into(); - SignedCheckpointSummary::sign(committee.epoch, &summary, k, name) - }) - .collect(); - - let checkpoint_data = CheckpointData { - checkpoint_summary: CertifiedCheckpointSummary::new(summary, sign_infos, &committee) - .unwrap(), - checkpoint_contents: contents, - transactions: vec![], - }; - - Blob::encode(&checkpoint_data, BlobEncoding::Bcs) - .unwrap() - .to_bytes() - } - - fn test_client(uri: String) -> IngestionClient { - IngestionClient::new(Url::parse(&uri).unwrap(), Arc::new(test_metrics())).unwrap() - } - - #[tokio::test] - async fn fail_on_not_found() { - let server = MockServer::start().await; - respond_with(&server, status(StatusCode::NOT_FOUND)).await; - - let client = test_client(server.uri()); - let error = client - .fetch(42, &CancellationToken::new()) - .await - .unwrap_err(); - - assert!(matches!(error, Error::NotFound(42))); - } - - #[tokio::test] - async fn fail_on_client_error() { - let server = MockServer::start().await; - respond_with(&server, status(StatusCode::IM_A_TEAPOT)).await; - - let client = test_client(server.uri()); - let error = client - .fetch(42, &CancellationToken::new()) - .await - .unwrap_err(); - - assert!(matches!( - error, - Error::HttpError(42, StatusCode::IM_A_TEAPOT) - )); - } - - /// Even if the server is repeatedly returning transient errors, it is possible to cancel the - /// fetch request via its cancellation token. - #[tokio::test] - async fn fail_on_cancel() { - let cancel = CancellationToken::new(); - let server = MockServer::start().await; - - // This mock server repeatedly returns internal server errors, but will also send a - // cancellation with the second request (this is a bit of a contrived test set-up). - let times: Mutex = Mutex::new(0); - let server_cancel = cancel.clone(); - respond_with(&server, move |_: &Request| { - let mut times = times.lock().unwrap(); - *times += 1; - - if *times > 2 { - server_cancel.cancel(); - } - - status(StatusCode::INTERNAL_SERVER_ERROR) - }) - .await; - - let client = test_client(server.uri()); - let error = client.fetch(42, &cancel.clone()).await.unwrap_err(); - - assert!(matches!(error, Error::Cancelled)); - } - - /// Assume that failures to send the request to the remote store are due to temporary - /// connectivity issues, and retry them. - #[tokio::test] - async fn retry_on_request_error() { - let server = MockServer::start().await; - - let times: Mutex = Mutex::new(0); - respond_with(&server, move |r: &Request| { - let mut times = times.lock().unwrap(); - *times += 1; - match (*times, r.url.path()) { - // The first request will trigger a redirect to 0.chk no matter what the original - // request was for -- triggering a request error. - (1, _) => status(StatusCode::MOVED_PERMANENTLY).append_header("Location", "/0.chk"), - - // Set-up checkpoint 0 as an infinite redirect loop. - (_, "/0.chk") => { - status(StatusCode::MOVED_PERMANENTLY).append_header("Location", r.url.as_str()) - } - - // Subsequently, requests will fail with a permanent error, this is what we expect - // to see. - _ => status(StatusCode::IM_A_TEAPOT), - } - }) - .await; - - let client = test_client(server.uri()); - let error = client - .fetch(42, &CancellationToken::new()) - .await - .unwrap_err(); - - assert!( - matches!(error, Error::HttpError(42, StatusCode::IM_A_TEAPOT),), - "{error}" - ); - } - - /// Assume that certain errors will recover by themselves, and keep retrying with an - /// exponential back-off. These errors include: 5xx (server) errors, 408 (timeout), and 429 - /// (rate limiting). - #[tokio::test] - async fn retry_on_transient_server_error() { - let server = MockServer::start().await; - let times: Mutex = Mutex::new(0); - respond_with(&server, move |_: &Request| { - let mut times = times.lock().unwrap(); - *times += 1; - status(match *times { - 1 => StatusCode::INTERNAL_SERVER_ERROR, - 2 => StatusCode::REQUEST_TIMEOUT, - 3 => StatusCode::TOO_MANY_REQUESTS, - _ => StatusCode::IM_A_TEAPOT, - }) - }) - .await; - - let client = test_client(server.uri()); - let error = client - .fetch(42, &CancellationToken::new()) - .await - .unwrap_err(); - - assert!(matches!( - error, - Error::HttpError(42, StatusCode::IM_A_TEAPOT) - )); - } - - /// Treat deserialization failure as another kind of transient error -- all checkpoint data - /// that is fetched should be valid (deserializable as a `CheckpointData`). - #[tokio::test] - async fn retry_on_deserialization_error() { - let server = MockServer::start().await; - let times: Mutex = Mutex::new(0); - respond_with(&server, move |_: &Request| { - let mut times = times.lock().unwrap(); - *times += 1; - if *times < 3 { - status(StatusCode::OK).set_body_bytes(vec![]) - } else { - status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)) - } - }) - .await; - - let client = test_client(server.uri()); - let checkpoint = client.fetch(42, &CancellationToken::new()).await.unwrap(); - assert_eq!(42, checkpoint.checkpoint_summary.sequence_number) - } -} diff --git a/crates/sui-indexer-alt/src/ingestion/error.rs b/crates/sui-indexer-alt/src/ingestion/error.rs index 78fff94d46d8f..17cafe495aa80 100644 --- a/crates/sui-indexer-alt/src/ingestion/error.rs +++ b/crates/sui-indexer-alt/src/ingestion/error.rs @@ -1,8 +1,6 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use reqwest::StatusCode; - pub type Result = std::result::Result; #[derive(thiserror::Error, Debug)] @@ -14,7 +12,7 @@ pub enum Error { DeserializationError(u64, #[source] anyhow::Error), #[error("Failed to fetch checkpoint {0}: {1}")] - HttpError(u64, StatusCode), + FetchError(u64, #[source] anyhow::Error), #[error(transparent)] ReqwestError(#[from] reqwest::Error), diff --git a/crates/sui-indexer-alt/src/ingestion/local_client.rs b/crates/sui-indexer-alt/src/ingestion/local_client.rs new file mode 100644 index 0000000000000..2efb6708939ff --- /dev/null +++ b/crates/sui-indexer-alt/src/ingestion/local_client.rs @@ -0,0 +1,65 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::ingestion::client::{FetchError, FetchResult, IngestionClientTrait}; +use axum::body::Bytes; +use std::path::PathBuf; + +pub struct LocalIngestionClient { + path: PathBuf, +} + +impl LocalIngestionClient { + pub fn new(path: PathBuf) -> Self { + LocalIngestionClient { path } + } +} + +#[async_trait::async_trait] +impl IngestionClientTrait for LocalIngestionClient { + async fn fetch(&self, checkpoint: u64) -> FetchResult { + let path = self.path.join(format!("{}.chk", checkpoint)); + let bytes = tokio::fs::read(path).await.map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + FetchError::NotFound + } else { + FetchError::Transient { + reason: "io_error", + error: e.into(), + } + } + })?; + Ok(Bytes::from(bytes)) + } +} + +#[cfg(test)] +pub(crate) mod tests { + use crate::ingestion::client::IngestionClient; + use crate::ingestion::test_utils::test_checkpoint_data; + use crate::metrics::tests::test_metrics; + use std::sync::Arc; + use sui_storage::blob::{Blob, BlobEncoding}; + use tokio_util::sync::CancellationToken; + + #[tokio::test] + async fn local_test_fetch() { + let tempdir = tempfile::tempdir().unwrap().into_path(); + let path = tempdir.join("1.chk"); + let test_checkpoint = test_checkpoint_data(1); + tokio::fs::write(&path, &test_checkpoint).await.unwrap(); + + let metrics = Arc::new(test_metrics()); + let local_client = IngestionClient::new_local(tempdir, metrics); + let checkpoint = local_client + .fetch(1, &CancellationToken::new()) + .await + .unwrap(); + assert_eq!( + Blob::encode(&*checkpoint, BlobEncoding::Bcs) + .unwrap() + .to_bytes(), + test_checkpoint + ); + } +} diff --git a/crates/sui-indexer-alt/src/ingestion/mod.rs b/crates/sui-indexer-alt/src/ingestion/mod.rs index 385b2026a5acc..0e726cccc891e 100644 --- a/crates/sui-indexer-alt/src/ingestion/mod.rs +++ b/crates/sui-indexer-alt/src/ingestion/mod.rs @@ -1,23 +1,26 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{sync::Arc, time::Duration}; - +use crate::ingestion::client::IngestionClient; +use crate::ingestion::error::{Error, Result}; +use crate::metrics::IndexerMetrics; use backoff::backoff::Constant; -use client::IngestionClient; use futures::{future::try_join_all, stream, StreamExt, TryStreamExt}; use mysten_metrics::spawn_monitored_task; +use std::path::PathBuf; +use std::{sync::Arc, time::Duration}; use sui_types::full_checkpoint_content::CheckpointData; use tokio::{sync::mpsc, task::JoinHandle}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info}; use url::Url; -use crate::ingestion::error::{Error, Result}; -use crate::metrics::IndexerMetrics; - mod client; pub mod error; +mod local_client; +mod remote_client; +#[cfg(test)] +mod test_utils; pub struct IngestionService { config: IngestionConfig, @@ -30,8 +33,13 @@ pub struct IngestionService { #[derive(clap::Args, Debug, Clone)] pub struct IngestionConfig { /// Remote Store to fetch checkpoints from. - #[arg(long)] - remote_store_url: Url, + #[arg(long, required = true, group = "source")] + remote_store_url: Option, + + /// Path to the local ingestion directory. + /// If both remote_store_url and local_ingestion_path are provided, remote_store_url will be used. + #[arg(long, required = true, group = "source")] + local_ingestion_path: Option, /// Maximum size of checkpoint backlog across all workers downstream of the ingestion service. #[arg(long, default_value_t = 5000)] @@ -57,9 +65,18 @@ impl IngestionService { metrics: Arc, cancel: CancellationToken, ) -> Result { + // TODO: Potentially support a hybrid mode where we can fetch from both local and remote. + let client = if let Some(url) = config.remote_store_url.as_ref() { + IngestionClient::new_remote(url.clone(), metrics.clone())? + } else if let Some(path) = config.local_ingestion_path.as_ref() { + IngestionClient::new_local(path.clone(), metrics.clone()) + } else { + panic!("Either remote_store_url or local_ingestion_path must be provided"); + }; + let subscribers = Vec::new(); Ok(Self { - client: IngestionClient::new(config.remote_store_url.clone(), metrics.clone())?, - subscribers: Vec::new(), + client, + subscribers, config, metrics, cancel, @@ -83,7 +100,7 @@ impl IngestionService { /// /// - If a subscriber is lagging (not receiving checkpoints fast enough), it will eventually /// provide back-pressure to the ingestion service, which will stop fetching new checkpoints. - /// - If a subscriber closes its channel, the ingestion service will intepret that as a signal + /// - If a subscriber closes its channel, the ingestion service will interpret that as a signal /// to shutdown as well. /// /// If ingestion reaches the leading edge of the network, it will encounter checkpoints that do @@ -187,7 +204,8 @@ mod tests { use reqwest::StatusCode; use wiremock::{MockServer, Request}; - use crate::ingestion::client::tests::{respond_with, status, test_checkpoint_data}; + use crate::ingestion::remote_client::tests::{respond_with, status}; + use crate::ingestion::test_utils::test_checkpoint_data; use crate::metrics::tests::test_metrics; use super::*; @@ -200,7 +218,8 @@ mod tests { ) -> IngestionService { IngestionService::new( IngestionConfig { - remote_store_url: Url::parse(&uri).unwrap(), + remote_store_url: Some(Url::parse(&uri).unwrap()), + local_ingestion_path: None, buffer_size, concurrency, retry_interval: Duration::from_millis(200), diff --git a/crates/sui-indexer-alt/src/ingestion/remote_client.rs b/crates/sui-indexer-alt/src/ingestion/remote_client.rs new file mode 100644 index 0000000000000..c4f91fee57990 --- /dev/null +++ b/crates/sui-indexer-alt/src/ingestion/remote_client.rs @@ -0,0 +1,292 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::ingestion::client::{FetchError, FetchResult, IngestionClientTrait}; +use crate::ingestion::Result as IngestionResult; +use reqwest::{Client, StatusCode}; +use tracing::{debug, error}; +use url::Url; + +#[derive(thiserror::Error, Debug, Eq, PartialEq)] +pub enum HttpError { + #[error("HTTP error with status code: {0}")] + Http(StatusCode), +} + +fn status_code_to_error(code: StatusCode) -> anyhow::Error { + HttpError::Http(code).into() +} + +pub(crate) struct RemoteIngestionClient { + url: Url, + client: Client, +} + +impl RemoteIngestionClient { + pub(crate) fn new(url: Url) -> IngestionResult { + Ok(Self { + url, + client: Client::builder().build()?, + }) + } +} + +#[async_trait::async_trait] +impl IngestionClientTrait for RemoteIngestionClient { + /// Fetch a checkpoint from the remote store. + /// + /// Transient errors include: + /// + /// - failures to issue a request, (network errors, redirect issues, etc) + /// - request timeouts, + /// - rate limiting, + /// - server errors (5xx), + /// - issues getting a full response. + async fn fetch(&self, checkpoint: u64) -> FetchResult { + // SAFETY: The path being joined is statically known to be valid. + let url = self + .url + .join(&format!("/{checkpoint}.chk")) + .expect("Unexpected invalid URL"); + + let response = self + .client + .get(url) + .send() + .await + .map_err(|e| FetchError::Transient { + reason: "request", + error: e.into(), + })?; + + match response.status() { + code if code.is_success() => { + // Failure to extract all the bytes from the payload, or to deserialize the + // checkpoint from them is considered a transient error -- the store being + // fetched from needs to be corrected, and ingestion will keep retrying it + // until it is. + response.bytes().await.map_err(|e| FetchError::Transient { + reason: "bytes", + error: e.into(), + }) + } + + // Treat 404s as a special case so we can match on this error type. + code @ StatusCode::NOT_FOUND => { + debug!(checkpoint, %code, "Checkpoint not found"); + Err(FetchError::NotFound) + } + + // Timeouts are a client error but they are usually transient. + code @ StatusCode::REQUEST_TIMEOUT => Err(FetchError::Transient { + reason: "timeout", + error: status_code_to_error(code), + }), + + // Rate limiting is also a client error, but the backoff will eventually widen the + // interval appropriately. + code @ StatusCode::TOO_MANY_REQUESTS => Err(FetchError::Transient { + reason: "too_many_requests", + error: status_code_to_error(code), + }), + + // Assume that if the server is facing difficulties, it will recover eventually. + code if code.is_server_error() => Err(FetchError::Transient { + reason: "server_error", + error: status_code_to_error(code), + }), + + // For everything else, assume it's a permanent error and don't retry. + code => { + error!(checkpoint, %code, "Permanent error, giving up!"); + Err(FetchError::Permanent(status_code_to_error(code))) + } + } + } +} + +#[cfg(test)] +pub(crate) mod tests { + use super::*; + use crate::ingestion::client::IngestionClient; + use crate::ingestion::error::Error; + use crate::ingestion::test_utils::test_checkpoint_data; + use crate::metrics::tests::test_metrics; + use axum::http::StatusCode; + use std::sync::{Arc, Mutex}; + use tokio_util::sync::CancellationToken; + use wiremock::{ + matchers::{method, path_regex}, + Mock, MockServer, Request, Respond, ResponseTemplate, + }; + + pub(crate) async fn respond_with(server: &MockServer, response: impl Respond + 'static) { + Mock::given(method("GET")) + .and(path_regex(r"/\d+.chk")) + .respond_with(response) + .mount(server) + .await; + } + + pub(crate) fn status(code: StatusCode) -> ResponseTemplate { + ResponseTemplate::new(code.as_u16()) + } + + fn remote_test_client(uri: String) -> IngestionClient { + IngestionClient::new_remote(Url::parse(&uri).unwrap(), Arc::new(test_metrics())).unwrap() + } + + fn assert_http_error(error: Error, checkpoint: u64, code: StatusCode) { + let Error::FetchError(c, inner) = error else { + panic!("Expected FetchError, got: {:?}", error); + }; + assert_eq!(c, checkpoint); + let Some(http_error) = inner.downcast_ref::() else { + panic!("Expected HttpError, got: {:?}", inner); + }; + assert_eq!(http_error, &HttpError::Http(code)); + } + + #[tokio::test] + async fn fail_on_not_found() { + let server = MockServer::start().await; + respond_with(&server, status(StatusCode::NOT_FOUND)).await; + + let client = remote_test_client(server.uri()); + let error = client + .fetch(42, &CancellationToken::new()) + .await + .unwrap_err(); + + assert!(matches!(error, Error::NotFound(42))); + } + + #[tokio::test] + async fn fail_on_client_error() { + let server = MockServer::start().await; + respond_with(&server, status(StatusCode::IM_A_TEAPOT)).await; + + let client = remote_test_client(server.uri()); + let error = client + .fetch(42, &CancellationToken::new()) + .await + .unwrap_err(); + + assert_http_error(error, 42, StatusCode::IM_A_TEAPOT); + } + + /// Even if the server is repeatedly returning transient errors, it is possible to cancel the + /// fetch request via its cancellation token. + #[tokio::test] + async fn fail_on_cancel() { + let cancel = CancellationToken::new(); + let server = MockServer::start().await; + + // This mock server repeatedly returns internal server errors, but will also send a + // cancellation with the second request (this is a bit of a contrived test set-up). + let times: Mutex = Mutex::new(0); + let server_cancel = cancel.clone(); + respond_with(&server, move |_: &Request| { + let mut times = times.lock().unwrap(); + *times += 1; + + if *times > 2 { + server_cancel.cancel(); + } + + status(StatusCode::INTERNAL_SERVER_ERROR) + }) + .await; + + let client = remote_test_client(server.uri()); + let error = client.fetch(42, &cancel.clone()).await.unwrap_err(); + + assert!(matches!(error, Error::Cancelled)); + } + + /// Assume that failures to send the request to the remote store are due to temporary + /// connectivity issues, and retry them. + #[tokio::test] + async fn retry_on_request_error() { + let server = MockServer::start().await; + + let times: Mutex = Mutex::new(0); + respond_with(&server, move |r: &Request| { + let mut times = times.lock().unwrap(); + *times += 1; + match (*times, r.url.path()) { + // The first request will trigger a redirect to 0.chk no matter what the original + // request was for -- triggering a request error. + (1, _) => status(StatusCode::MOVED_PERMANENTLY).append_header("Location", "/0.chk"), + + // Set-up checkpoint 0 as an infinite redirect loop. + (_, "/0.chk") => { + status(StatusCode::MOVED_PERMANENTLY).append_header("Location", r.url.as_str()) + } + + // Subsequently, requests will fail with a permanent error, this is what we expect + // to see. + _ => status(StatusCode::IM_A_TEAPOT), + } + }) + .await; + + let client = remote_test_client(server.uri()); + let error = client + .fetch(42, &CancellationToken::new()) + .await + .unwrap_err(); + + assert_http_error(error, 42, StatusCode::IM_A_TEAPOT); + } + + /// Assume that certain errors will recover by themselves, and keep retrying with an + /// exponential back-off. These errors include: 5xx (server) errors, 408 (timeout), and 429 + /// (rate limiting). + #[tokio::test] + async fn retry_on_transient_server_error() { + let server = MockServer::start().await; + let times: Mutex = Mutex::new(0); + respond_with(&server, move |_: &Request| { + let mut times = times.lock().unwrap(); + *times += 1; + status(match *times { + 1 => StatusCode::INTERNAL_SERVER_ERROR, + 2 => StatusCode::REQUEST_TIMEOUT, + 3 => StatusCode::TOO_MANY_REQUESTS, + _ => StatusCode::IM_A_TEAPOT, + }) + }) + .await; + + let client = remote_test_client(server.uri()); + let error = client + .fetch(42, &CancellationToken::new()) + .await + .unwrap_err(); + + assert_http_error(error, 42, StatusCode::IM_A_TEAPOT); + } + + /// Treat deserialization failure as another kind of transient error -- all checkpoint data + /// that is fetched should be valid (deserializable as a `CheckpointData`). + #[tokio::test] + async fn retry_on_deserialization_error() { + let server = MockServer::start().await; + let times: Mutex = Mutex::new(0); + respond_with(&server, move |_: &Request| { + let mut times = times.lock().unwrap(); + *times += 1; + if *times < 3 { + status(StatusCode::OK).set_body_bytes(vec![]) + } else { + status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)) + } + }) + .await; + + let client = remote_test_client(server.uri()); + let checkpoint = client.fetch(42, &CancellationToken::new()).await.unwrap(); + assert_eq!(42, checkpoint.checkpoint_summary.sequence_number) + } +} diff --git a/crates/sui-indexer-alt/src/ingestion/test_utils.rs b/crates/sui-indexer-alt/src/ingestion/test_utils.rs new file mode 100644 index 0000000000000..99f130927d0bf --- /dev/null +++ b/crates/sui-indexer-alt/src/ingestion/test_utils.rs @@ -0,0 +1,56 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use rand::prelude::StdRng; +use rand::SeedableRng; +use sui_storage::blob::{Blob, BlobEncoding}; +use sui_types::crypto::KeypairTraits; +use sui_types::full_checkpoint_content::CheckpointData; +use sui_types::gas::GasCostSummary; +use sui_types::messages_checkpoint::{ + CertifiedCheckpointSummary, CheckpointContents, CheckpointSummary, SignedCheckpointSummary, +}; +use sui_types::supported_protocol_versions::ProtocolConfig; +use sui_types::utils::make_committee_key; + +const RNG_SEED: [u8; 32] = [ + 21, 23, 199, 200, 234, 250, 252, 178, 94, 15, 202, 178, 62, 186, 88, 137, 233, 192, 130, 157, + 179, 179, 65, 9, 31, 249, 221, 123, 225, 112, 199, 247, +]; + +pub(crate) fn test_checkpoint_data(cp: u64) -> Vec { + let mut rng = StdRng::from_seed(RNG_SEED); + let (keys, committee) = make_committee_key(&mut rng); + let contents = CheckpointContents::new_with_digests_only_for_tests(vec![]); + let summary = CheckpointSummary::new( + &ProtocolConfig::get_for_max_version_UNSAFE(), + 0, + cp, + 0, + &contents, + None, + GasCostSummary::default(), + None, + 0, + Vec::new(), + ); + + let sign_infos: Vec<_> = keys + .iter() + .map(|k| { + let name = k.public().into(); + SignedCheckpointSummary::sign(committee.epoch, &summary, k, name) + }) + .collect(); + + let checkpoint_data = CheckpointData { + checkpoint_summary: CertifiedCheckpointSummary::new(summary, sign_infos, &committee) + .unwrap(), + checkpoint_contents: contents, + transactions: vec![], + }; + + Blob::encode(&checkpoint_data, BlobEncoding::Bcs) + .unwrap() + .to_bytes() +}