Skip to content

Commit

Permalink
Sort out tokio tests
Browse files Browse the repository at this point in the history
  • Loading branch information
durch committed Jun 24, 2024
1 parent e61a410 commit 7182a4f
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 46 deletions.
3 changes: 2 additions & 1 deletion s3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ bytes = { version = "1.2" }
block_on_proc = { version = "0.2", optional = true }

[features]
default = ["tags", "use-tokio-native-tls", "fail-on-err"]
default = ["tokio-rustls-tls"]
# default = ["tags", "use-tokio-native-tls", "fail-on-err"]
use-tokio-native-tls = [
"with-tokio",
"aws-creds/native-tls",
Expand Down
103 changes: 62 additions & 41 deletions s3/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ use crate::bucket_ops::{BucketConfiguration, CreateBucketResponse};
use crate::command::{Command, Multipart};
use crate::creds::Credentials;
use crate::region::Region;
#[cfg(feature = "with-tokio")]
use crate::request::tokio_backend::{client, HttpsConnector};
#[cfg(any(feature = "with-tokio", feature = "use-tokio-native-tls"))]
use crate::request::tokio_backend::client;
#[cfg(any(feature = "use-tokio-native-tls", feature = "tokio-rustls-tls"))]
use crate::request::tokio_backend::HttpsConnector;
use crate::request::ResponseData;
#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
use crate::request::ResponseDataStream;
Expand Down Expand Up @@ -106,8 +108,14 @@ pub struct Bucket {
pub request_timeout: Option<Duration>,
path_style: bool,
listobjects_v2: bool,
#[cfg(feature = "with-tokio")]
#[cfg(any(feature = "use-tokio-native-tls", feature = "tokio-rustls-tls"))]
http_client: Arc<hyper::Client<HttpsConnector<hyper::client::HttpConnector>>>,
#[cfg(all(
feature = "with-tokio",
not(feature = "use-tokio-native-tls"),
not(feature = "tokio-rustls-tls")
))]
http_client: Arc<hyper::Client<hyper::client::HttpConnector>>,
}

impl Bucket {
Expand All @@ -126,7 +134,16 @@ impl Bucket {
}
}

#[cfg(feature = "with-tokio")]
#[cfg(all(
feature = "with-tokio",
not(feature = "use-tokio-native-tls"),
not(feature = "tokio-rustls-tls")
))]
pub fn http_client(&self) -> Arc<hyper::Client<hyper::client::HttpConnector>> {
Arc::clone(&self.http_client)
}

#[cfg(any(feature = "use-tokio-native-tls", feature = "tokio-rustls-tls"))]
pub fn http_client(&self) -> Arc<hyper::Client<HttpsConnector<hyper::client::HttpConnector>>> {
Arc::clone(&self.http_client)
}
Expand Down Expand Up @@ -224,7 +241,7 @@ impl Bucket {
&self,
post_policy: PostPolicy<'a>,
) -> Result<PresignedPost, S3Error> {
post_policy.sign(self.clone()).await
post_policy.sign(Box::new(self.clone())).await
}

/// Get a presigned url for putting object to a given path
Expand Down Expand Up @@ -557,8 +574,12 @@ impl Bucket {
///
/// let bucket = Bucket::new(bucket_name, region, credentials).unwrap();
/// ```
pub fn new(name: &str, region: Region, credentials: Credentials) -> Result<Bucket, S3Error> {
Ok(Bucket {
pub fn new(
name: &str,
region: Region,
credentials: Credentials,
) -> Result<Box<Bucket>, S3Error> {
Ok(Box::new(Bucket {
name: name.into(),
region,
credentials: Arc::new(RwLock::new(credentials)),
Expand All @@ -567,9 +588,9 @@ impl Bucket {
request_timeout: DEFAULT_REQUEST_TIMEOUT,
path_style: false,
listobjects_v2: true,
#[cfg(feature = "with-tokio")]
#[cfg(any(feature = "use-tokio-native-tls", feature = "with-tokio"))]
http_client: Arc::new(client(DEFAULT_REQUEST_TIMEOUT)?),
})
}))
}

/// Instantiate a public existing `Bucket`.
Expand All @@ -593,13 +614,13 @@ impl Bucket {
request_timeout: DEFAULT_REQUEST_TIMEOUT,
path_style: false,
listobjects_v2: true,
#[cfg(feature = "with-tokio")]
#[cfg(any(feature = "use-tokio-native-tls", feature = "with-tokio"))]
http_client: Arc::new(client(DEFAULT_REQUEST_TIMEOUT)?),
})
}

pub fn with_path_style(&self) -> Bucket {
Bucket {
pub fn with_path_style(&self) -> Box<Bucket> {
Box::new(Bucket {
name: self.name.clone(),
region: self.region.clone(),
credentials: self.credentials.clone(),
Expand All @@ -608,9 +629,9 @@ impl Bucket {
request_timeout: self.request_timeout,
path_style: true,
listobjects_v2: self.listobjects_v2,
#[cfg(feature = "with-tokio")]
#[cfg(any(feature = "use-tokio-native-tls", feature = "with-tokio"))]
http_client: self.http_client.clone(),
}
})
}

pub fn with_extra_headers(&self, extra_headers: HeaderMap) -> Result<Bucket, S3Error> {
Expand All @@ -623,7 +644,7 @@ impl Bucket {
request_timeout: self.request_timeout,
path_style: self.path_style,
listobjects_v2: self.listobjects_v2,
#[cfg(feature = "with-tokio")]
#[cfg(any(feature = "use-tokio-native-tls", feature = "with-tokio"))]
http_client: self.http_client.clone(),
})
}
Expand All @@ -641,13 +662,13 @@ impl Bucket {
request_timeout: self.request_timeout,
path_style: self.path_style,
listobjects_v2: self.listobjects_v2,
#[cfg(feature = "with-tokio")]
#[cfg(any(feature = "use-tokio-native-tls", feature = "with-tokio"))]
http_client: self.http_client.clone(),
})
}

pub fn with_request_timeout(&self, request_timeout: Duration) -> Result<Bucket, S3Error> {
Ok(Bucket {
pub fn with_request_timeout(&self, request_timeout: Duration) -> Result<Box<Bucket>, S3Error> {
Ok(Box::new(Bucket {
name: self.name.clone(),
region: self.region.clone(),
credentials: self.credentials.clone(),
Expand All @@ -656,9 +677,9 @@ impl Bucket {
request_timeout: Some(request_timeout),
path_style: self.path_style,
listobjects_v2: self.listobjects_v2,
#[cfg(feature = "with-tokio")]
#[cfg(any(feature = "use-tokio-native-tls", feature = "with-tokio"))]
http_client: Arc::new(client(Some(request_timeout))?),
})
}))
}

pub fn with_listobjects_v1(&self) -> Bucket {
Expand All @@ -671,7 +692,7 @@ impl Bucket {
request_timeout: self.request_timeout,
path_style: self.path_style,
listobjects_v2: false,
#[cfg(feature = "with-tokio")]
#[cfg(any(feature = "use-tokio-native-tls", feature = "with-tokio"))]
http_client: self.http_client.clone(),
}
}
Expand Down Expand Up @@ -2492,7 +2513,7 @@ mod test {
.unwrap()
}

fn test_aws_bucket() -> Bucket {
fn test_aws_bucket() -> Box<Bucket> {
Bucket::new(
"rust-s3-test",
"eu-central-1".parse().unwrap(),
Expand All @@ -2501,7 +2522,7 @@ mod test {
.unwrap()
}

fn test_wasabi_bucket() -> Bucket {
fn test_wasabi_bucket() -> Box<Bucket> {
Bucket::new(
"rust-s3",
"wa-eu-central-1".parse().unwrap(),
Expand All @@ -2510,7 +2531,7 @@ mod test {
.unwrap()
}

fn test_gc_bucket() -> Bucket {
fn test_gc_bucket() -> Box<Bucket> {
let mut bucket = Bucket::new(
"rust-s3",
Region::Custom {
Expand All @@ -2524,7 +2545,7 @@ mod test {
bucket
}

fn test_minio_bucket() -> Bucket {
fn test_minio_bucket() -> Box<Bucket> {
Bucket::new(
"rust-s3",
Region::Custom {
Expand All @@ -2538,11 +2559,11 @@ mod test {
}

#[allow(dead_code)]
fn test_digital_ocean_bucket() -> Bucket {
fn test_digital_ocean_bucket() -> Box<Bucket> {
Bucket::new("rust-s3", Region::DoFra1, test_digital_ocean_credentials()).unwrap()
}

fn test_r2_bucket() -> Bucket {
fn test_r2_bucket() -> Box<Bucket> {
Bucket::new(
"rust-s3",
Region::R2 {
Expand Down Expand Up @@ -2680,7 +2701,7 @@ mod test {
)
)]
async fn streaming_big_aws_put_head_get_delete_object() {
streaming_test_put_get_delete_big_object(test_aws_bucket()).await;
streaming_test_put_get_delete_big_object(*test_aws_bucket()).await;
}

#[ignore]
Expand All @@ -2700,7 +2721,7 @@ mod test {
)
)]
async fn streaming_big_gc_put_head_get_delete_object() {
streaming_test_put_get_delete_big_object(test_gc_bucket()).await;
streaming_test_put_get_delete_big_object(*test_gc_bucket()).await;
}

#[ignore]
Expand All @@ -2713,7 +2734,7 @@ mod test {
)
)]
async fn streaming_big_minio_put_head_get_delete_object() {
streaming_test_put_get_delete_big_object(test_minio_bucket()).await;
streaming_test_put_get_delete_big_object(*test_minio_bucket()).await;
}

// Test multi-part upload
Expand Down Expand Up @@ -2838,7 +2859,7 @@ mod test {
}

#[maybe_async::maybe_async]
async fn streaming_test_put_get_delete_small_object(bucket: Bucket) {
async fn streaming_test_put_get_delete_small_object(bucket: Box<Bucket>) {
init();
let remote_path = "+stream_test_small";
let content: Vec<u8> = object(1000);
Expand Down Expand Up @@ -2949,7 +2970,7 @@ mod test {
))]
#[test]
fn aws_put_head_get_delete_object_blocking() {
put_head_get_list_delete_object_blocking(test_aws_bucket())
put_head_get_list_delete_object_blocking(*test_aws_bucket())
}

#[ignore]
Expand All @@ -2959,7 +2980,7 @@ mod test {
))]
#[test]
fn gc_put_head_get_delete_object_blocking() {
put_head_get_list_delete_object_blocking(test_gc_bucket())
put_head_get_list_delete_object_blocking(*test_gc_bucket())
}

#[ignore]
Expand All @@ -2969,7 +2990,7 @@ mod test {
))]
#[test]
fn wasabi_put_head_get_delete_object_blocking() {
put_head_get_list_delete_object_blocking(test_wasabi_bucket())
put_head_get_list_delete_object_blocking(*test_wasabi_bucket())
}

#[ignore]
Expand All @@ -2979,7 +3000,7 @@ mod test {
))]
#[test]
fn minio_put_head_get_delete_object_blocking() {
put_head_get_list_delete_object_blocking(test_minio_bucket())
put_head_get_list_delete_object_blocking(*test_minio_bucket())
}

#[ignore]
Expand All @@ -2989,7 +3010,7 @@ mod test {
))]
#[test]
fn digital_ocean_put_head_get_delete_object_blocking() {
put_head_get_list_delete_object_blocking(test_digital_ocean_bucket())
put_head_get_list_delete_object_blocking(*test_digital_ocean_bucket())
}

#[ignore]
Expand All @@ -3002,7 +3023,7 @@ mod test {
)
)]
async fn aws_put_head_get_delete_object() {
put_head_get_delete_object(test_aws_bucket(), true).await;
put_head_get_delete_object(*test_aws_bucket(), true).await;
}

#[ignore]
Expand All @@ -3021,7 +3042,7 @@ mod test {
)
)]
async fn gc_test_put_head_get_delete_object() {
put_head_get_delete_object(test_gc_bucket(), true).await;
put_head_get_delete_object(*test_gc_bucket(), true).await;
}

#[ignore]
Expand All @@ -3034,7 +3055,7 @@ mod test {
)
)]
async fn wasabi_test_put_head_get_delete_object() {
put_head_get_delete_object(test_wasabi_bucket(), true).await;
put_head_get_delete_object(*test_wasabi_bucket(), true).await;
}

#[ignore]
Expand All @@ -3047,7 +3068,7 @@ mod test {
)
)]
async fn minio_test_put_head_get_delete_object() {
put_head_get_delete_object(test_minio_bucket(), true).await;
put_head_get_delete_object(*test_minio_bucket(), true).await;
}

// Keeps failing on tokio-rustls-tls
Expand All @@ -3074,7 +3095,7 @@ mod test {
)
)]
async fn r2_test_put_head_get_delete_object() {
put_head_get_delete_object(test_r2_bucket(), false).await;
put_head_get_delete_object(*test_r2_bucket(), false).await;
}

#[maybe_async::test(
Expand Down
2 changes: 1 addition & 1 deletion s3/src/bucket_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl BucketConfiguration {

#[allow(dead_code)]
pub struct CreateBucketResponse {
pub bucket: Bucket,
pub bucket: Box<Bucket>,
pub response_text: String,
pub response_code: u16,
}
Expand Down
6 changes: 3 additions & 3 deletions s3/src/post_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl<'a> PostPolicy<'a> {
}

#[maybe_async::maybe_async]
pub async fn sign(&self, bucket: Bucket) -> Result<PresignedPost, S3Error> {
pub async fn sign(&self, bucket: Box<Bucket>) -> Result<PresignedPost, S3Error> {
use hmac::Mac;

bucket.credentials_refresh().await?;
Expand Down Expand Up @@ -429,7 +429,7 @@ mod test {

use serde_json::json;

fn test_bucket() -> Bucket {
fn test_bucket() -> Box<Bucket> {
Bucket::new(
"rust-s3",
Region::UsEast1,
Expand All @@ -445,7 +445,7 @@ mod test {
.unwrap()
}

fn test_bucket_with_security_token() -> Bucket {
fn test_bucket_with_security_token() -> Box<Bucket> {
Bucket::new(
"rust-s3",
Region::UsEast1,
Expand Down
Loading

0 comments on commit 7182a4f

Please sign in to comment.