Skip to content

Commit 0b0d64b

Browse files
Adds send_retry_with_idempotency and retry more kinds of transport errors (#5609)
1 parent f38283b commit 0b0d64b

File tree

10 files changed

+233
-144
lines changed

10 files changed

+233
-144
lines changed

object_store/src/aws/client.rs

+13-4
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ pub(crate) struct Request<'a> {
268268
builder: RequestBuilder,
269269
payload_sha256: Option<Vec<u8>>,
270270
use_session_creds: bool,
271+
idempotent: bool,
271272
}
272273

273274
impl<'a> Request<'a> {
@@ -285,6 +286,11 @@ impl<'a> Request<'a> {
285286
Self { builder, ..self }
286287
}
287288

289+
pub fn set_idempotent(mut self, idempotent: bool) -> Self {
290+
self.idempotent = idempotent;
291+
self
292+
}
293+
288294
pub async fn send(self) -> Result<Response, RequestError> {
289295
let credential = match self.use_session_creds {
290296
true => self.config.get_session_credential().await?,
@@ -298,7 +304,7 @@ impl<'a> Request<'a> {
298304
let path = self.path.as_ref();
299305
self.builder
300306
.with_aws_sigv4(credential.authorizer(), self.payload_sha256.as_deref())
301-
.send_retry(&self.config.retry_config)
307+
.send_retry_with_idempotency(&self.config.retry_config, self.idempotent)
302308
.await
303309
.context(RetrySnafu { path })
304310
}
@@ -360,6 +366,7 @@ impl S3Client {
360366
payload_sha256,
361367
config: &self.config,
362368
use_session_creds: true,
369+
idempotent: false,
363370
}
364371
}
365372

@@ -462,7 +469,7 @@ impl S3Client {
462469
.header(CONTENT_TYPE, "application/xml")
463470
.body(body)
464471
.with_aws_sigv4(credential.authorizer(), payload_sha256.as_deref())
465-
.send_retry(&self.config.retry_config)
472+
.send_retry_with_idempotency(&self.config.retry_config, false)
466473
.await
467474
.context(DeleteObjectsRequestSnafu {})?
468475
.bytes()
@@ -510,6 +517,7 @@ impl S3Client {
510517
config: &self.config,
511518
payload_sha256: None,
512519
use_session_creds: false,
520+
idempotent: false,
513521
}
514522
}
515523

@@ -522,7 +530,7 @@ impl S3Client {
522530
.request(Method::POST, url)
523531
.headers(self.config.encryption_headers.clone().into())
524532
.with_aws_sigv4(credential.authorizer(), None)
525-
.send_retry(&self.config.retry_config)
533+
.send_retry_with_idempotency(&self.config.retry_config, true)
526534
.await
527535
.context(CreateMultipartRequestSnafu)?
528536
.bytes()
@@ -547,6 +555,7 @@ impl S3Client {
547555
let response = self
548556
.put_request(path, data, false)
549557
.query(&[("partNumber", &part), ("uploadId", upload_id)])
558+
.set_idempotent(true)
550559
.send()
551560
.await?;
552561

@@ -582,7 +591,7 @@ impl S3Client {
582591
.query(&[("uploadId", upload_id)])
583592
.body(body)
584593
.with_aws_sigv4(credential.authorizer(), None)
585-
.send_retry(&self.config.retry_config)
594+
.send_retry_with_idempotency(&self.config.retry_config, true)
586595
.await
587596
.context(CompleteMultipartRequestSnafu)?;
588597

object_store/src/aws/credential.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ async fn instance_creds(
517517
let token_result = client
518518
.request(Method::PUT, token_url)
519519
.header("X-aws-ec2-metadata-token-ttl-seconds", "600") // 10 minute TTL
520-
.send_retry(retry_config)
520+
.send_retry_with_idempotency(retry_config, true)
521521
.await;
522522

523523
let token = match token_result {
@@ -607,7 +607,7 @@ async fn web_identity(
607607
("Version", "2011-06-15"),
608608
("WebIdentityToken", &token),
609609
])
610-
.send_retry(retry_config)
610+
.send_retry_with_idempotency(retry_config, true)
611611
.await?
612612
.bytes()
613613
.await?;

object_store/src/aws/dynamo.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,11 @@ impl DynamoCommit {
186186
to: &Path,
187187
) -> Result<()> {
188188
self.conditional_op(client, to, None, || async {
189-
client.copy_request(from, to).send().await?;
189+
client
190+
.copy_request(from, to)
191+
.set_idempotent(false)
192+
.send()
193+
.await?;
190194
Ok(())
191195
})
192196
.await

object_store/src/aws/mod.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ impl ObjectStore for AmazonS3 {
159159
}
160160

161161
match (opts.mode, &self.client.config.conditional_put) {
162-
(PutMode::Overwrite, _) => request.do_put().await,
162+
(PutMode::Overwrite, _) => request.set_idempotent(true).do_put().await,
163163
(PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented),
164164
(PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => {
165165
match request.header(&IF_NONE_MATCH, "*").do_put().await {
@@ -268,7 +268,11 @@ impl ObjectStore for AmazonS3 {
268268
}
269269

270270
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
271-
self.client.copy_request(from, to).send().await?;
271+
self.client
272+
.copy_request(from, to)
273+
.set_idempotent(true)
274+
.send()
275+
.await?;
272276
Ok(())
273277
}
274278

object_store/src/azure/client.rs

+13-4
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ struct PutRequest<'a> {
172172
path: &'a Path,
173173
config: &'a AzureConfig,
174174
builder: RequestBuilder,
175+
idempotent: bool,
175176
}
176177

177178
impl<'a> PutRequest<'a> {
@@ -185,12 +186,17 @@ impl<'a> PutRequest<'a> {
185186
Self { builder, ..self }
186187
}
187188

189+
fn set_idempotent(mut self, idempotent: bool) -> Self {
190+
self.idempotent = idempotent;
191+
self
192+
}
193+
188194
async fn send(self) -> Result<Response> {
189195
let credential = self.config.get_credential().await?;
190196
let response = self
191197
.builder
192198
.with_azure_authorization(&credential, &self.config.account)
193-
.send_retry(&self.config.retry_config)
199+
.send_retry_with_idempotency(&self.config.retry_config, self.idempotent)
194200
.await
195201
.context(PutRequestSnafu {
196202
path: self.path.as_ref(),
@@ -239,6 +245,7 @@ impl AzureClient {
239245
path,
240246
builder,
241247
config: &self.config,
248+
idempotent: false,
242249
}
243250
}
244251

@@ -247,7 +254,7 @@ impl AzureClient {
247254
let builder = self.put_request(path, bytes);
248255

249256
let builder = match &opts.mode {
250-
PutMode::Overwrite => builder,
257+
PutMode::Overwrite => builder.set_idempotent(true),
251258
PutMode::Create => builder.header(&IF_NONE_MATCH, "*"),
252259
PutMode::Update(v) => {
253260
let etag = v.e_tag.as_ref().context(MissingETagSnafu)?;
@@ -271,6 +278,7 @@ impl AzureClient {
271278

272279
self.put_request(path, data)
273280
.query(&[("comp", "block"), ("blockid", &block_id)])
281+
.set_idempotent(true)
274282
.send()
275283
.await?;
276284

@@ -287,6 +295,7 @@ impl AzureClient {
287295
let response = self
288296
.put_request(path, BlockList { blocks }.to_xml().into())
289297
.query(&[("comp", "blocklist")])
298+
.set_idempotent(true)
290299
.send()
291300
.await?;
292301

@@ -340,7 +349,7 @@ impl AzureClient {
340349

341350
builder
342351
.with_azure_authorization(&credential, &self.config.account)
343-
.send_retry(&self.config.retry_config)
352+
.send_retry_with_idempotency(&self.config.retry_config, true)
344353
.await
345354
.map_err(|err| err.error(STORE, from.to_string()))?;
346355

@@ -373,7 +382,7 @@ impl AzureClient {
373382
.body(body)
374383
.query(&[("restype", "service"), ("comp", "userdelegationkey")])
375384
.with_azure_authorization(&credential, &self.config.account)
376-
.send_retry(&self.config.retry_config)
385+
.send_retry_with_idempotency(&self.config.retry_config, true)
377386
.await
378387
.context(DelegationKeyRequestSnafu)?
379388
.bytes()

object_store/src/azure/credential.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,7 @@ impl TokenProvider for ClientSecretOAuthProvider {
615615
("scope", AZURE_STORAGE_SCOPE),
616616
("grant_type", "client_credentials"),
617617
])
618-
.send_retry(retry)
618+
.send_retry_with_idempotency(retry, true)
619619
.await
620620
.context(TokenRequestSnafu)?
621621
.json()
@@ -797,7 +797,7 @@ impl TokenProvider for WorkloadIdentityOAuthProvider {
797797
("scope", AZURE_STORAGE_SCOPE),
798798
("grant_type", "client_credentials"),
799799
])
800-
.send_retry(retry)
800+
.send_retry_with_idempotency(retry, true)
801801
.await
802802
.context(TokenRequestSnafu)?
803803
.json()

0 commit comments

Comments
 (0)