From 597663eaffc665e9ecdc4a320d44c064855c6e4b Mon Sep 17 00:00:00 2001 From: john Date: Fri, 3 Jan 2025 15:13:14 -0600 Subject: [PATCH] `GCS=1` enabling GCS-specific functionality. --- libs/remote_storage/src/s3_bucket.rs | 141 ++++++++++++++++++--------- 1 file changed, 94 insertions(+), 47 deletions(-) diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index d3f19f0b119a..ac28ab02ea03 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -14,6 +14,8 @@ use std::{ time::{Duration, SystemTime}, }; +use aws_sdk_s3::error::ProvideErrorMetadata; + use anyhow::{anyhow, Context as _}; use aws_config::{ default_provider::credentials::DefaultCredentialsChain, @@ -355,60 +357,105 @@ impl S3Bucket { let kind = RequestKind::Delete; let mut cancel = std::pin::pin!(cancel.cancelled()); - for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE_S3) { - let started_at = start_measuring_requests(kind); + if option_env!("GCS").is_some() { + // GCS + for oid in delete_objects { + let started_at = start_measuring_requests(kind); - let req = self - .client - .delete_objects() - .bucket(self.bucket_name.clone()) - .delete( - Delete::builder() - .set_objects(Some(chunk.to_vec())) - .build() - .context("build request")?, - ) - .send(); - - let resp = tokio::select! { - resp = req => resp, - _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()), - _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()), - }; + let req = self + .client + .delete_object() + .bucket(self.bucket_name.clone()) + .key(oid.key()) + .send(); - let started_at = ScopeGuard::into_inner(started_at); - crate::metrics::BUCKET_METRICS - .req_seconds - .observe_elapsed(kind, &resp, started_at); + let resp = tokio::select! { + resp = req => resp, + _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()), + _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()), + }; - let resp = resp.context("request deletion")?; - crate::metrics::BUCKET_METRICS - .deleted_objects_total - .inc_by(chunk.len() as u64); - - if let Some(errors) = resp.errors { - // Log a bounded number of the errors within the response: - // these requests can carry 1000 keys so logging each one - // would be too verbose, especially as errors may lead us - // to retry repeatedly. - const LOG_UP_TO_N_ERRORS: usize = 10; - for e in errors.iter().take(LOG_UP_TO_N_ERRORS) { - tracing::warn!( - "DeleteObjects key {} failed: {}: {}", - e.key.as_ref().map(Cow::from).unwrap_or("".into()), - e.code.as_ref().map(Cow::from).unwrap_or("".into()), - e.message.as_ref().map(Cow::from).unwrap_or("".into()) - ); + let started_at = ScopeGuard::into_inner(started_at); + crate::metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, &resp, started_at); + + match resp { + Ok(_) => { + crate::metrics::BUCKET_METRICS + .deleted_objects_total + .inc_by(1 as u64); + } + Err(e) => { + if matches!(e.code(), Some("404") | Some("NoSuchKey")) { + tracing::warn!("Ignoring. No such key to delete {}", oid.key()); + } else { + return Err(anyhow::anyhow!(format!( + "Failed to delete object {}", + oid.key() + ))); + } + } } + } + Ok(()) + } else { + // AWS + for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE_S3) { + let started_at = start_measuring_requests(kind); - return Err(anyhow::anyhow!( - "Failed to delete {}/{} objects", - errors.len(), - chunk.len(), - )); + let req = self + .client + .delete_objects() + .bucket(self.bucket_name.clone()) + .delete( + Delete::builder() + .set_objects(Some(chunk.to_vec())) + .build() + .context("build request")?, + ) + .send(); + + let resp = tokio::select! { + resp = req => resp, + _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()), + _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()), + }; + + let started_at = ScopeGuard::into_inner(started_at); + crate::metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, &resp, started_at); + + let resp = resp.context("request deletion")?; + crate::metrics::BUCKET_METRICS + .deleted_objects_total + .inc_by(chunk.len() as u64); + + if let Some(errors) = resp.errors { + // Log a bounded number of the errors within the response: + // these requests can carry 1000 keys so logging each one + // would be too verbose, especially as errors may lead us + // to retry repeatedly. + const LOG_UP_TO_N_ERRORS: usize = 10; + for e in errors.iter().take(LOG_UP_TO_N_ERRORS) { + tracing::warn!( + "DeleteObjects key {} failed: {}: {}", + e.key.as_ref().map(Cow::from).unwrap_or("".into()), + e.code.as_ref().map(Cow::from).unwrap_or("".into()), + e.message.as_ref().map(Cow::from).unwrap_or("".into()) + ); + } + + return Err(anyhow::anyhow!( + "Failed to delete {}/{} objects", + errors.len(), + chunk.len(), + )); + } } + Ok(()) } - Ok(()) } pub fn bucket_name(&self) -> &str {