Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCS=1 enabling GCS-specific functionality. #10277

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 94 additions & 47 deletions libs/remote_storage/src/s3_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use std::{
time::{Duration, SystemTime},
};

use aws_sdk_s3::error::ProvideErrorMetadata;

use anyhow::{anyhow, Context as _};
use aws_config::{
default_provider::credentials::DefaultCredentialsChain,
Expand Down Expand Up @@ -355,60 +357,105 @@ impl S3Bucket {
let kind = RequestKind::Delete;
let mut cancel = std::pin::pin!(cancel.cancelled());

for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE_S3) {
let started_at = start_measuring_requests(kind);
if option_env!("GCS").is_some() {
// GCS
for oid in delete_objects {
let started_at = start_measuring_requests(kind);

let req = self
.client
.delete_objects()
.bucket(self.bucket_name.clone())
.delete(
Delete::builder()
.set_objects(Some(chunk.to_vec()))
.build()
.context("build request")?,
)
.send();

let resp = tokio::select! {
resp = req => resp,
_ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
_ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
};
let req = self
.client
.delete_object()
.bucket(self.bucket_name.clone())
.key(oid.key())
.send();

let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &resp, started_at);
let resp = tokio::select! {
resp = req => resp,
_ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
_ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
};

let resp = resp.context("request deletion")?;
crate::metrics::BUCKET_METRICS
.deleted_objects_total
.inc_by(chunk.len() as u64);

if let Some(errors) = resp.errors {
// Log a bounded number of the errors within the response:
// these requests can carry 1000 keys so logging each one
// would be too verbose, especially as errors may lead us
// to retry repeatedly.
const LOG_UP_TO_N_ERRORS: usize = 10;
for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
tracing::warn!(
"DeleteObjects key {} failed: {}: {}",
e.key.as_ref().map(Cow::from).unwrap_or("".into()),
e.code.as_ref().map(Cow::from).unwrap_or("".into()),
e.message.as_ref().map(Cow::from).unwrap_or("".into())
);
let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &resp, started_at);

match resp {
Ok(_) => {
crate::metrics::BUCKET_METRICS
.deleted_objects_total
.inc_by(1 as u64);
}
Err(e) => {
if matches!(e.code(), Some("404") | Some("NoSuchKey")) {
tracing::warn!("Ignoring. No such key to delete {}", oid.key());
} else {
return Err(anyhow::anyhow!(format!(
"Failed to delete object {}",
oid.key()
)));
}
}
}
}
Ok(())
} else {
// AWS
for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE_S3) {
let started_at = start_measuring_requests(kind);

return Err(anyhow::anyhow!(
"Failed to delete {}/{} objects",
errors.len(),
chunk.len(),
));
let req = self
.client
.delete_objects()
.bucket(self.bucket_name.clone())
.delete(
Delete::builder()
.set_objects(Some(chunk.to_vec()))
.build()
.context("build request")?,
)
.send();

let resp = tokio::select! {
resp = req => resp,
_ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
_ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
};

let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &resp, started_at);

let resp = resp.context("request deletion")?;
crate::metrics::BUCKET_METRICS
.deleted_objects_total
.inc_by(chunk.len() as u64);

if let Some(errors) = resp.errors {
// Log a bounded number of the errors within the response:
// these requests can carry 1000 keys so logging each one
// would be too verbose, especially as errors may lead us
// to retry repeatedly.
const LOG_UP_TO_N_ERRORS: usize = 10;
for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
tracing::warn!(
"DeleteObjects key {} failed: {}: {}",
e.key.as_ref().map(Cow::from).unwrap_or("".into()),
e.code.as_ref().map(Cow::from).unwrap_or("".into()),
e.message.as_ref().map(Cow::from).unwrap_or("".into())
);
}

return Err(anyhow::anyhow!(
"Failed to delete {}/{} objects",
errors.len(),
chunk.len(),
));
}
}
Ok(())
}
Ok(())
}

pub fn bucket_name(&self) -> &str {
Expand Down