Skip to content

Commit

Permalink
refactor(0.7.x): 0.7.5 with exposed non-retryable s3_fetchers (#98)
Browse files Browse the repository at this point in the history
* refactor(0.7.x): 0.7.5 with exposed non-retryable s3_fetchers

* LakeError error to debug message
  • Loading branch information
khorolets authored Dec 19, 2023
1 parent d9db7f1 commit 554c493
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 76 deletions.
10 changes: 8 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased](https://github.com/near/near-lake-framework/compare/v0.7.4...HEAD)
## [Unreleased](https://github.com/near/near-lake-framework/compare/v0.7.5...HEAD)

## [0.7.3](https://github.com/near/near-lake-framework/compare/v0.7.2...0.7.4)
## [0.7.5](https://github.com/near/near-lake-framework/compare/v0.7.3...0.7.5)

* Refactor `s3_fetchers` module to allow exposing the underlying functionality:
* `s3_fetchers::fetch_block` (without retrying)
* `s3_fetchers::fetch_shard` (without retrying)

## [0.7.4](https://github.com/near/near-lake-framework/compare/v0.7.2...0.7.4)

* Upgrade all `aws` crates to the latest version
* Undeylying AWS error will now be printed
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ rust-version = "1.58.1"

# cargo-workspaces
[workspace.metadata.workspaces]
version = "0.7.4"
version = "0.7.5"

[dependencies]
anyhow = "1.0.51"
aws-config = { version = "1.0.0", features = ["behavior-version-latest"] }
aws-types = "1.0.0"
aws-credential-types = "1.0.0"
aws-sdk-s3 = "0.39.1"
aws-smithy-types = "1.0.1"
async-stream = "0.3.3"
async-trait = "0.1.64"
derive_builder = "0.11.2"
Expand All @@ -35,7 +36,6 @@ near-indexer-primitives = "0.17"

[dev-dependencies]
aws-smithy-http = "0.60.0"
aws-smithy-types = "1.0.1"

[lib]
doctest = false
179 changes: 110 additions & 69 deletions src/s3_fetchers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,50 +142,71 @@ pub(crate) async fn fetch_streamer_message(
}

/// Fetches the block data JSON from AWS S3 and returns the `BlockView`
pub async fn fetch_block_or_retry(
pub async fn fetch_block(
lake_s3_client: &impl S3Client,
s3_bucket_name: &str,
block_height: crate::types::BlockHeight,
) -> Result<
near_indexer_primitives::views::BlockView,
crate::types::LakeError<aws_sdk_s3::operation::get_object::GetObjectError>,
> {
let body_bytes = loop {
match lake_s3_client
.get_object(s3_bucket_name, &format!("{:0>12}/block.json", block_height))
.await
{
Ok(response) => {
match response.body.collect().await {
Ok(bytes_stream) => break bytes_stream.into_bytes(),
Err(err) => {
tracing::debug!(
target: crate::LAKE_FRAMEWORK,
"Failed to read bytes from the block #{:0>12} response. Retrying immediately.\n{:#?}",
block_height,
err,
);
}
};
}
Err(err) => {
tracing::debug!(
target: crate::LAKE_FRAMEWORK,
"Failed to get {:0>12}/block.json. Retrying immediately\n{:#?}",
block_height,
err
);
}
};
};
let body_bytes = lake_s3_client
.get_object(s3_bucket_name, &format!("{:0>12}/block.json", block_height))
.await?
.body
.collect()
.await?
.into_bytes();

Ok(serde_json::from_slice::<
near_indexer_primitives::views::BlockView,
>(body_bytes.as_ref())?)
}

/// Fetches the block data JSON from AWS S3 and returns the `BlockView` retrying until it succeeds (indefinitely)
pub async fn fetch_block_or_retry(
lake_s3_client: &impl S3Client,
s3_bucket_name: &str,
block_height: crate::types::BlockHeight,
) -> Result<
near_indexer_primitives::views::BlockView,
crate::types::LakeError<aws_sdk_s3::operation::get_object::GetObjectError>,
> {
loop {
match fetch_block(lake_s3_client, s3_bucket_name, block_height).await {
Ok(block_view) => break Ok(block_view),
Err(err) => match err {
crate::types::LakeError::AwsError { .. } => {
tracing::debug!(
target: crate::LAKE_FRAMEWORK,
"Block #{:0>12} not found. Retrying in immediately...\n{:#?}",
block_height,
err,
);
}
crate::types::LakeError::AwsSmithyError { .. } => {
tracing::debug!(
target: crate::LAKE_FRAMEWORK,
"Failed to read bytes from the block #{:0>12} response. Retrying immediately.\n{:#?}",
block_height,
err,
);
}
_ => {
tracing::debug!(
target: crate::LAKE_FRAMEWORK,
"Failed to fetch block #{}, retrying immediately\n{:#?}",
block_height,
err
);
}
},
}
}
}

/// Fetches the shard data JSON from AWS S3 and returns the `IndexerShard`
pub async fn fetch_shard_or_retry(
pub async fn fetch_shard(
lake_s3_client: &impl S3Client,
s3_bucket_name: &str,
block_height: crate::types::BlockHeight,
Expand All @@ -194,48 +215,68 @@ pub async fn fetch_shard_or_retry(
near_indexer_primitives::IndexerShard,
crate::types::LakeError<aws_sdk_s3::operation::get_object::GetObjectError>,
> {
let body_bytes = loop {
match lake_s3_client
.get_object(
s3_bucket_name,
&format!("{:0>12}/shard_{}.json", block_height, shard_id),
)
.await
{
Ok(response) => {
let body_bytes = match response.body.collect().await {
Ok(body) => body.into_bytes(),
Err(err) => {
tracing::debug!(
target: crate::LAKE_FRAMEWORK,
"Failed to read the {:0>12}/shard_{}.json. Retrying in 1s...\n {:#?}",
block_height,
shard_id,
err,
);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
};

break body_bytes;
}
Err(err) => {
tracing::debug!(
target: crate::LAKE_FRAMEWORK,
"Failed to fetch shard #{}, retrying immediately\n{:#?}",
shard_id,
err
);
}
}
};
let body_bytes = lake_s3_client
.get_object(
s3_bucket_name,
&format!("{:0>12}/shard_{}.json", block_height, shard_id),
)
.await?
.body
.collect()
.await?
.into_bytes();

Ok(serde_json::from_slice::<
near_indexer_primitives::IndexerShard,
>(body_bytes.as_ref())?)
}

/// Fetches the shard data JSON from AWS S3 and returns the `IndexerShard`
pub async fn fetch_shard_or_retry(
lake_s3_client: &impl S3Client,
s3_bucket_name: &str,
block_height: crate::types::BlockHeight,
shard_id: u64,
) -> Result<
near_indexer_primitives::IndexerShard,
crate::types::LakeError<aws_sdk_s3::operation::get_object::GetObjectError>,
> {
loop {
match fetch_shard(lake_s3_client, s3_bucket_name, block_height, shard_id).await {
Ok(shard) => break Ok(shard),
Err(err) => match err {
crate::types::LakeError::AwsError { .. } => {
tracing::debug!(
target: crate::LAKE_FRAMEWORK,
"Shard {} of block #{:0>12} not found. Retrying in immediately...\n{:#?}",
shard_id,
block_height,
err,
);
}
crate::types::LakeError::AwsSmithyError { .. } => {
tracing::debug!(
target: crate::LAKE_FRAMEWORK,
"Failed to read bytes from the shard {} of block #{:0>12} response. Retrying immediately.\n{:#?}",
shard_id,
block_height,
err,
);
}
_ => {
tracing::debug!(
target: crate::LAKE_FRAMEWORK,
"Failed to fetch shard {} of block #{}, retrying immediately\n{:#?}",
shard_id,
block_height,
err
);
}
},
}
}
}

#[cfg(test)]
mod test {
use super::*;
Expand All @@ -255,7 +296,7 @@ mod test {
impl S3Client for LakeS3Client {
async fn get_object(
&self,
bucket: &str,
_bucket: &str,
prefix: &str,
) -> Result<
aws_sdk_s3::operation::get_object::GetObjectOutput,
Expand All @@ -269,8 +310,8 @@ mod test {

async fn list_objects(
&self,
bucket: &str,
start_after: &str,
_bucket: &str,
_start_after: &str,
) -> Result<
aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output,
aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error>,
Expand Down
11 changes: 8 additions & 3 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,24 @@ impl LakeConfigBuilder {
#[allow(clippy::enum_variant_names)]
#[derive(thiserror::Error, Debug)]
pub enum LakeError<E> {
#[error("Failed to parse structure from JSON: {error_message}")]
#[error("Failed to parse structure from JSON: {error_message:?}")]
ParseError {
#[from]
error_message: serde_json::Error,
},
#[error("AWS S3 error: {error}")]
#[error("AWS S3 error: {error:?}")]
AwsError {
#[from]
error: aws_sdk_s3::error::SdkError<E>,
},
#[error("Failed to convert integer")]
#[error("Failed to convert integer: {error:?}")]
IntConversionError {
#[from]
error: std::num::TryFromIntError,
},
#[error("AWS Smithy byte_stream error: {error:?}")]
AwsSmithyError {
#[from]
error: aws_smithy_types::byte_stream::error::Error,
},
}

0 comments on commit 554c493

Please sign in to comment.