Skip to content

Commit 5514e58

Browse files
committed
Add with_tokio_runtime to HTTP stores
1 parent ff670c5 commit 5514e58

File tree

13 files changed

+365
-141
lines changed

13 files changed

+365
-141
lines changed

object_store/src/aws/client.rs

+10-10
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ use crate::aws::checksum::Checksum;
1919
use crate::aws::credential::{AwsCredential, CredentialExt, CredentialProvider};
2020
use crate::aws::STRICT_PATH_ENCODE_SET;
2121
use crate::client::pagination::stream_paginated;
22-
use crate::client::retry::RetryExt;
22+
use crate::client::retry::{ClientConfig, RetryExt};
2323
use crate::multipart::UploadPart;
2424
use crate::path::DELIMITER;
2525
use crate::util::{format_http_range, format_prefix};
2626
use crate::{
2727
BoxStream, ClientOptions, ListResult, MultipartId, ObjectMeta, Path, Result,
28-
RetryConfig, StreamExt,
28+
StreamExt,
2929
};
3030
use base64::prelude::BASE64_STANDARD;
3131
use base64::Engine;
@@ -208,7 +208,7 @@ pub struct S3Config {
208208
pub bucket: String,
209209
pub bucket_endpoint: String,
210210
pub credentials: Box<dyn CredentialProvider>,
211-
pub retry_config: RetryConfig,
211+
pub client_config: ClientConfig,
212212
pub client_options: ClientOptions,
213213
pub sign_payload: bool,
214214
pub checksum: Option<Checksum>,
@@ -271,7 +271,7 @@ impl S3Client {
271271
self.config.sign_payload,
272272
None,
273273
)
274-
.send_retry(&self.config.retry_config)
274+
.send_retry(&self.config.client_config)
275275
.await
276276
.context(GetRequestSnafu {
277277
path: path.as_ref(),
@@ -317,7 +317,7 @@ impl S3Client {
317317
self.config.sign_payload,
318318
payload_sha256,
319319
)
320-
.send_retry(&self.config.retry_config)
320+
.send_retry(&self.config.client_config)
321321
.await
322322
.context(PutRequestSnafu {
323323
path: path.as_ref(),
@@ -345,7 +345,7 @@ impl S3Client {
345345
self.config.sign_payload,
346346
None,
347347
)
348-
.send_retry(&self.config.retry_config)
348+
.send_retry(&self.config.client_config)
349349
.await
350350
.context(DeleteRequestSnafu {
351351
path: path.as_ref(),
@@ -370,7 +370,7 @@ impl S3Client {
370370
self.config.sign_payload,
371371
None,
372372
)
373-
.send_retry(&self.config.retry_config)
373+
.send_retry(&self.config.client_config)
374374
.await
375375
.context(CopyRequestSnafu {
376376
path: from.as_ref(),
@@ -422,7 +422,7 @@ impl S3Client {
422422
self.config.sign_payload,
423423
None,
424424
)
425-
.send_retry(&self.config.retry_config)
425+
.send_retry(&self.config.client_config)
426426
.await
427427
.context(ListRequestSnafu)?
428428
.bytes()
@@ -476,7 +476,7 @@ impl S3Client {
476476
self.config.sign_payload,
477477
None,
478478
)
479-
.send_retry(&self.config.retry_config)
479+
.send_retry(&self.config.client_config)
480480
.await
481481
.context(CreateMultipartRequestSnafu)?
482482
.bytes()
@@ -521,7 +521,7 @@ impl S3Client {
521521
self.config.sign_payload,
522522
None,
523523
)
524-
.send_retry(&self.config.retry_config)
524+
.send_retry(&self.config.client_config)
525525
.await
526526
.context(CompleteMultipartRequestSnafu)?;
527527

object_store/src/aws/credential.rs

+18-18
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
// under the License.
1717

1818
use crate::aws::STRICT_ENCODE_SET;
19-
use crate::client::retry::RetryExt;
19+
use crate::client::retry::{ClientConfig, RetryExt};
2020
use crate::client::token::{TemporaryToken, TokenCache};
2121
use crate::util::hmac_sha256;
22-
use crate::{Result, RetryConfig};
22+
use crate::Result;
2323
use bytes::Buf;
2424
use chrono::{DateTime, Utc};
2525
use futures::future::BoxFuture;
@@ -328,7 +328,7 @@ impl CredentialProvider for StaticCredentialProvider {
328328
pub struct InstanceCredentialProvider {
329329
pub cache: TokenCache<Arc<AwsCredential>>,
330330
pub client: Client,
331-
pub retry_config: RetryConfig,
331+
pub client_config: ClientConfig,
332332
pub imdsv1_fallback: bool,
333333
pub metadata_endpoint: String,
334334
}
@@ -338,7 +338,7 @@ impl CredentialProvider for InstanceCredentialProvider {
338338
Box::pin(self.cache.get_or_insert_with(|| {
339339
instance_creds(
340340
&self.client,
341-
&self.retry_config,
341+
&self.client_config,
342342
&self.metadata_endpoint,
343343
self.imdsv1_fallback,
344344
)
@@ -361,15 +361,15 @@ pub struct WebIdentityProvider {
361361
pub session_name: String,
362362
pub endpoint: String,
363363
pub client: Client,
364-
pub retry_config: RetryConfig,
364+
pub client_config: ClientConfig,
365365
}
366366

367367
impl CredentialProvider for WebIdentityProvider {
368368
fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
369369
Box::pin(self.cache.get_or_insert_with(|| {
370370
web_identity(
371371
&self.client,
372-
&self.retry_config,
372+
&self.client_config,
373373
&self.token_path,
374374
&self.role_arn,
375375
&self.session_name,
@@ -405,7 +405,7 @@ impl From<InstanceCredentials> for AwsCredential {
405405
/// <https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials>
406406
async fn instance_creds(
407407
client: &Client,
408-
retry_config: &RetryConfig,
408+
config: &ClientConfig,
409409
endpoint: &str,
410410
imdsv1_fallback: bool,
411411
) -> Result<TemporaryToken<Arc<AwsCredential>>, StdError> {
@@ -417,7 +417,7 @@ async fn instance_creds(
417417
let token_result = client
418418
.request(Method::PUT, token_url)
419419
.header("X-aws-ec2-metadata-token-ttl-seconds", "600") // 10 minute TTL
420-
.send_retry(retry_config)
420+
.send_retry(config)
421421
.await;
422422

423423
let token = match token_result {
@@ -438,7 +438,7 @@ async fn instance_creds(
438438
role_request = role_request.header(AWS_EC2_METADATA_TOKEN_HEADER, token);
439439
}
440440

441-
let role = role_request.send_retry(retry_config).await?.text().await?;
441+
let role = role_request.send_retry(config).await?.text().await?;
442442

443443
let creds_url = format!("{endpoint}/{CREDENTIALS_PATH}/{role}");
444444
let mut creds_request = client.request(Method::GET, creds_url);
@@ -447,7 +447,7 @@ async fn instance_creds(
447447
}
448448

449449
let creds: InstanceCredentials =
450-
creds_request.send_retry(retry_config).await?.json().await?;
450+
creds_request.send_retry(config).await?.json().await?;
451451

452452
let now = Utc::now();
453453
let ttl = (creds.expiration - now).to_std().unwrap_or_default();
@@ -491,7 +491,7 @@ impl From<AssumeRoleCredentials> for AwsCredential {
491491
/// <https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-technical-overview.html>
492492
async fn web_identity(
493493
client: &Client,
494-
retry_config: &RetryConfig,
494+
config: &ClientConfig,
495495
token_path: &str,
496496
role_arn: &str,
497497
session_name: &str,
@@ -510,7 +510,7 @@ async fn web_identity(
510510
("Version", "2011-06-15"),
511511
("WebIdentityToken", &token),
512512
])
513-
.send_retry(retry_config)
513+
.send_retry(config)
514514
.await?
515515
.bytes()
516516
.await?;
@@ -722,7 +722,7 @@ mod tests {
722722
// For example https://github.com/aws/amazon-ec2-metadata-mock
723723
let endpoint = env::var("EC2_METADATA_ENDPOINT").unwrap();
724724
let client = Client::new();
725-
let retry_config = RetryConfig::default();
725+
let config = ClientConfig::default();
726726

727727
// Verify only allows IMDSv2
728728
let resp = client
@@ -737,7 +737,7 @@ mod tests {
737737
"Ensure metadata endpoint is set to only allow IMDSv2"
738738
);
739739

740-
let creds = instance_creds(&client, &retry_config, &endpoint, false)
740+
let creds = instance_creds(&client, &config, &endpoint, false)
741741
.await
742742
.unwrap();
743743

@@ -762,7 +762,7 @@ mod tests {
762762

763763
let endpoint = server.url();
764764
let client = Client::new();
765-
let retry_config = RetryConfig::default();
765+
let config = ClientConfig::default();
766766

767767
// Test IMDSv2
768768
server.push_fn(|req| {
@@ -788,7 +788,7 @@ mod tests {
788788
Response::new(Body::from(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#))
789789
});
790790

791-
let creds = instance_creds(&client, &retry_config, endpoint, true)
791+
let creds = instance_creds(&client, &config, endpoint, true)
792792
.await
793793
.unwrap();
794794

@@ -821,7 +821,7 @@ mod tests {
821821
Response::new(Body::from(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#))
822822
});
823823

824-
let creds = instance_creds(&client, &retry_config, endpoint, true)
824+
let creds = instance_creds(&client, &config, endpoint, true)
825825
.await
826826
.unwrap();
827827

@@ -838,7 +838,7 @@ mod tests {
838838
);
839839

840840
// Should fail
841-
instance_creds(&client, &retry_config, endpoint, false)
841+
instance_creds(&client, &config, endpoint, false)
842842
.await
843843
.unwrap_err();
844844
}

object_store/src/aws/mod.rs

+34-7
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use std::ops::Range;
4444
use std::str::FromStr;
4545
use std::sync::Arc;
4646
use tokio::io::AsyncWrite;
47+
use tokio::runtime::Handle;
4748
use tracing::info;
4849
use url::Url;
4950

@@ -53,6 +54,7 @@ use crate::aws::credential::{
5354
AwsCredential, CredentialProvider, InstanceCredentialProvider,
5455
StaticCredentialProvider, WebIdentityProvider,
5556
};
57+
use crate::client::retry::ClientConfig;
5658
use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart};
5759
use crate::util::str_is_truthy;
5860
use crate::{
@@ -407,7 +409,7 @@ pub struct AmazonS3Builder {
407409
endpoint: Option<String>,
408410
token: Option<String>,
409411
url: Option<String>,
410-
retry_config: RetryConfig,
412+
client_config: ClientConfig,
411413
imdsv1_fallback: bool,
412414
virtual_hosted_style_request: bool,
413415
unsigned_payload: bool,
@@ -850,7 +852,17 @@ impl AmazonS3Builder {
850852

851853
/// Set the retry configuration
852854
pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
853-
self.retry_config = retry_config;
855+
self.client_config.retry = retry_config;
856+
self
857+
}
858+
859+
/// Set the tokio runtime to use to perform IO
860+
///
861+
/// This allows isolating IO into a dedicated [`Runtime`](tokio::runtime::Runtime) either
862+
/// to ensure acceptable scheduling jitter in the presence of CPU-bound tasks, or to allow
863+
/// using `object_store` outside of a tokio context
864+
pub fn with_tokio_runtime(mut self, runtime: Handle) -> Self {
865+
self.client_config.runtime = Some(runtime);
854866
self
855867
}
856868

@@ -978,7 +990,7 @@ impl AmazonS3Builder {
978990
role_arn,
979991
endpoint,
980992
client,
981-
retry_config: self.retry_config.clone(),
993+
client_config: self.client_config.clone(),
982994
}) as _
983995
}
984996
_ => match self.profile {
@@ -996,7 +1008,7 @@ impl AmazonS3Builder {
9961008
Box::new(InstanceCredentialProvider {
9971009
cache: Default::default(),
9981010
client: client_options.client()?,
999-
retry_config: self.retry_config.clone(),
1011+
client_config: self.client_config.clone(),
10001012
imdsv1_fallback: self.imdsv1_fallback,
10011013
metadata_endpoint: self
10021014
.metadata_endpoint
@@ -1031,7 +1043,7 @@ impl AmazonS3Builder {
10311043
bucket,
10321044
bucket_endpoint,
10331045
credentials,
1034-
retry_config: self.retry_config,
1046+
client_config: self.client_config,
10351047
client_options: self.client_options,
10361048
sign_payload: !self.unsigned_payload,
10371049
checksum: self.checksum_algorithm,
@@ -1063,8 +1075,8 @@ fn profile_credentials(
10631075
mod tests {
10641076
use super::*;
10651077
use crate::tests::{
1066-
get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
1067-
put_get_delete_list_opts, rename_and_copy, stream_get,
1078+
dedicated_tokio, get_nonexistent_object, list_uses_directories_correctly,
1079+
list_with_delimiter, put_get_delete_list_opts, rename_and_copy, stream_get,
10681080
};
10691081
use bytes::Bytes;
10701082
use std::collections::HashMap;
@@ -1285,6 +1297,21 @@ mod tests {
12851297
assert!(builder.is_err());
12861298
}
12871299

1300+
#[test]
1301+
fn s3_test_non_tokio() {
1302+
let (handle, shutdown) = dedicated_tokio();
1303+
let config = maybe_skip_integration!();
1304+
let integration = config.with_tokio_runtime(handle).build().unwrap();
1305+
futures::executor::block_on(async move {
1306+
put_get_delete_list_opts(&integration, true).await;
1307+
list_uses_directories_correctly(&integration).await;
1308+
list_with_delimiter(&integration).await;
1309+
rename_and_copy(&integration).await;
1310+
stream_get(&integration).await;
1311+
});
1312+
shutdown();
1313+
}
1314+
12881315
#[tokio::test]
12891316
async fn s3_test() {
12901317
let config = maybe_skip_integration!();

0 commit comments

Comments
 (0)