diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f227da0..954ee8a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -129,6 +129,7 @@ jobs: echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:4.0.3)" >> $GITHUB_ENV echo "EC2_METADATA_CONTAINER=$(docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2)" >> $GITHUB_ENV aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket + aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-spawn aws --endpoint-url=http://localhost:4566 s3api create-bucket --bucket test-object-lock --object-lock-enabled-for-bucket aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 @@ -150,8 +151,9 @@ jobs: - name: Run object_store tests run: cargo test --features=aws,azure,gcp,http + # Don't rerun doc tests (some of them rely on features other than aws) - name: Run object_store tests (AWS native conditional put) - run: cargo test --features=aws + run: cargo test --lib --tests --features=aws env: AWS_CONDITIONAL_PUT: etag AWS_COPY_IF_NOT_EXISTS: multipart diff --git a/.gitignore b/.gitignore index 0788dae..1762202 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ rusty-tags.vi .flatbuffers/ .idea/ .vscode +.zed .devcontainer venv/* # created by doctests diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 06cbf6c..0cc703d 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -66,6 +66,7 @@ Or directly with: ```shell aws s3 mb s3://test-bucket --endpoint-url=http://localhost:4566 +aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-spawn aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 ``` diff --git a/src/aws/mod.rs b/src/aws/mod.rs index b8175bd..6b9b228 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -497,6 +497,7 @@ impl MultipartStore for AmazonS3 { mod tests { use super::*; use crate::client::get::GetClient; + use crate::client::SpawnedReqwestConnector; use crate::integration::*; use crate::tests::*; use crate::ClientOptions; @@ -820,4 +821,57 @@ mod tests { store.delete(location).await.unwrap(); } } + + /// Integration test that ensures I/O is done on an alternate threadpool + /// when using the `SpawnedReqwestConnector`. + #[test] + fn s3_alternate_threadpool_spawned_request_connector() { + maybe_skip_integration!(); + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + + // Runtime with I/O enabled + let io_runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() // <-- turns on IO + .build() + .unwrap(); + + // Runtime without I/O enabled + let non_io_runtime = tokio::runtime::Builder::new_current_thread() + // note: no call to enable_all + .build() + .unwrap(); + + // run the io runtime in a different thread + let io_handle = io_runtime.handle().clone(); + let thread_handle = std::thread::spawn(move || { + io_runtime.block_on(async move { + shutdown_rx.await.unwrap(); + }); + }); + + let store = AmazonS3Builder::from_env() + // use different bucket to avoid collisions with other tests + .with_bucket_name("test-bucket-for-spawn") + .with_http_connector(SpawnedReqwestConnector::new(io_handle)) + .build() + .unwrap(); + + // run a request on the non io runtime -- will fail if the connector + // does not spawn the request to the io runtime + non_io_runtime + .block_on(async move { + let path = Path::from("alternate_threadpool/test.txt"); + store.delete(&path).await.ok(); // remove the file if it exists from prior runs + store.put(&path, "foo".into()).await?; + let res = store.get(&path).await?.bytes().await?; + assert_eq!(res.as_ref(), b"foo"); + store.delete(&path).await?; // cleanup + Ok(()) as Result<()> + }) + .expect("failed to run request on non io runtime"); + + // shutdown the io runtime and thread + shutdown_tx.send(()).ok(); + thread_handle.join().expect("runtime thread panicked"); + } } diff --git a/src/client/builder.rs b/src/client/builder.rs index 8e18227..aca36a0 100644 --- a/src/client/builder.rs +++ b/src/client/builder.rs @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::client::connection::HttpErrorKind; -use crate::client::{HttpClient, HttpError, HttpRequest, HttpRequestBody}; +use crate::client::{HttpClient, HttpError, HttpErrorKind, HttpRequest, HttpRequestBody}; use http::header::{InvalidHeaderName, InvalidHeaderValue}; use http::uri::InvalidUri; use http::{HeaderName, HeaderValue, Method, Uri}; diff --git a/src/client/body.rs b/src/client/http/body.rs similarity index 95% rename from src/client/body.rs rename to src/client/http/body.rs index ed87972..e696904 100644 --- a/src/client/body.rs +++ b/src/client/http/body.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::client::connection::{HttpError, HttpErrorKind}; +use crate::client::{HttpError, HttpErrorKind}; use crate::{collect_bytes, PutPayload}; use bytes::Bytes; use futures::stream::BoxStream; @@ -203,6 +203,18 @@ impl HttpResponseBody { } } +impl Body for HttpResponseBody { + type Data = Bytes; + type Error = HttpError; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + Pin::new(&mut self.0).poll_frame(cx) + } +} + impl From for HttpResponseBody { fn from(value: Bytes) -> Self { Self::new(Full::new(value).map_err(|e| match e {})) diff --git a/src/client/connection.rs b/src/client/http/connection.rs similarity index 83% rename from src/client/connection.rs rename to src/client/http/connection.rs index ac4db5c..2b9da31 100644 --- a/src/client/connection.rs +++ b/src/client/http/connection.rs @@ -15,15 +15,15 @@ // specific language governing permissions and limitations // under the License. -use crate::client::body::{HttpRequest, HttpResponse}; use crate::client::builder::{HttpRequestBuilder, RequestBuilderError}; -use crate::client::HttpResponseBody; +use crate::client::{HttpRequest, HttpResponse, HttpResponseBody}; use crate::ClientOptions; use async_trait::async_trait; use http::{Method, Uri}; use http_body_util::BodyExt; use std::error::Error; use std::sync::Arc; +use tokio::runtime::Handle; /// An HTTP protocol error /// @@ -298,6 +298,67 @@ impl HttpConnector for ReqwestConnector { } } +/// [`reqwest::Client`] connector that performs all I/O on the provided tokio +/// [`Runtime`] (thread pool). +/// +/// This adapter is most useful when you wish to segregate I/O from CPU bound +/// work that may be happening on the [`Runtime`]. +/// +/// [`Runtime`]: tokio::runtime::Runtime +/// +/// # Example: Spawning requests on separate runtime +/// +/// ``` +/// # use std::sync::Arc; +/// # use tokio::runtime::Runtime; +/// # use object_store::azure::MicrosoftAzureBuilder; +/// # use object_store::client::SpawnedReqwestConnector; +/// # use object_store::ObjectStore; +/// # fn get_io_runtime() -> Runtime { +/// # tokio::runtime::Builder::new_current_thread().build().unwrap() +/// # } +/// # fn main() -> Result<(), object_store::Error> { +/// // create a tokio runtime for I/O. +/// let io_runtime: Runtime = get_io_runtime(); +/// // configure a store using the runtime. +/// let handle = io_runtime.handle().clone(); // get a handle to the same runtime +/// let store: Arc = Arc::new( +/// MicrosoftAzureBuilder::new() +/// .with_http_connector(SpawnedReqwestConnector::new(handle)) +/// .with_container_name("my_container") +/// .with_account("my_account") +/// .build()? +/// ); +/// // any requests made using store will be spawned on the io_runtime +/// # Ok(()) +/// # } +/// ``` +#[derive(Debug)] +#[allow(missing_copy_implementations)] +#[cfg(not(target_arch = "wasm32"))] +pub struct SpawnedReqwestConnector { + runtime: Handle, +} + +#[cfg(not(target_arch = "wasm32"))] +impl SpawnedReqwestConnector { + /// Create a new [`SpawnedReqwestConnector`] with the provided [`Handle`] to + /// a tokio [`Runtime`] + /// + /// [`Runtime`]: tokio::runtime::Runtime + pub fn new(runtime: Handle) -> Self { + Self { runtime } + } +} + +#[cfg(not(target_arch = "wasm32"))] +impl HttpConnector for SpawnedReqwestConnector { + fn connect(&self, options: &ClientOptions) -> crate::Result { + let spawn_service = super::SpawnService::new(options.client()?, self.runtime.clone()); + Ok(HttpClient::new(spawn_service)) + } +} + #[cfg(all(target_arch = "wasm32", target_os = "wasi"))] pub(crate) fn http_connector( custom: Option>, diff --git a/src/client/http/mod.rs b/src/client/http/mod.rs new file mode 100644 index 0000000..86e1e11 --- /dev/null +++ b/src/client/http/mod.rs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! HTTP client abstraction + +mod body; +pub use body::*; + +mod connection; +pub use connection::*; + +mod spawn; +pub use spawn::*; diff --git a/src/client/http/spawn.rs b/src/client/http/spawn.rs new file mode 100644 index 0000000..c3f1e71 --- /dev/null +++ b/src/client/http/spawn.rs @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::client::{ + HttpError, HttpErrorKind, HttpRequest, HttpResponse, HttpResponseBody, HttpService, +}; +use async_trait::async_trait; +use bytes::Bytes; +use http::Response; +use http_body_util::BodyExt; +use hyper::body::{Body, Frame}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use thiserror::Error; +use tokio::runtime::Handle; +use tokio::task::JoinHandle; + +/// Spawn error +#[derive(Debug, Error)] +#[error("SpawnError")] +struct SpawnError {} + +impl From for HttpError { + fn from(value: SpawnError) -> Self { + Self::new(HttpErrorKind::Interrupted, value) + } +} + +/// Wraps a provided [`HttpService`] and runs it on a separate tokio runtime +/// +/// See example on [`SpawnedReqwestConnector`] +/// +/// [`SpawnedReqwestConnector`]: crate::client::http::SpawnedReqwestConnector +#[derive(Debug)] +pub struct SpawnService { + inner: T, + runtime: Handle, +} + +impl SpawnService { + /// Creates a new [`SpawnService`] from the provided + pub fn new(inner: T, runtime: Handle) -> Self { + Self { inner, runtime } + } +} + +#[async_trait] +impl HttpService for SpawnService { + async fn call(&self, req: HttpRequest) -> Result { + let inner = self.inner.clone(); + let (send, recv) = tokio::sync::oneshot::channel(); + + // We use an unbounded channel to prevent backpressure across the runtime boundary + // which could in turn starve the underlying IO operations + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + + let handle = SpawnHandle(self.runtime.spawn(async move { + let r = match HttpService::call(&inner, req).await { + Ok(resp) => resp, + Err(e) => { + let _ = send.send(Err(e)); + return; + } + }; + + let (parts, mut body) = r.into_parts(); + if send.send(Ok(parts)).is_err() { + return; + } + + while let Some(x) = body.frame().await { + sender.send(x).unwrap(); + } + })); + + let parts = recv.await.map_err(|_| SpawnError {})??; + + Ok(Response::from_parts( + parts, + HttpResponseBody::new(SpawnBody { + stream: receiver, + _worker: handle, + }), + )) + } +} + +/// A wrapper around a [`JoinHandle`] that aborts on drop +struct SpawnHandle(JoinHandle<()>); +impl Drop for SpawnHandle { + fn drop(&mut self) { + self.0.abort(); + } +} + +type StreamItem = Result, HttpError>; + +struct SpawnBody { + stream: tokio::sync::mpsc::UnboundedReceiver, + _worker: SpawnHandle, +} + +impl Body for SpawnBody { + type Data = Bytes; + type Error = HttpError; + + fn poll_frame(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.stream.poll_recv(cx) + } +} + +#[cfg(not(target_arch = "wasm32"))] +#[cfg(test)] +mod tests { + use super::*; + use crate::client::mock_server::MockServer; + use crate::client::retry::RetryExt; + use crate::client::HttpClient; + use crate::RetryConfig; + + async fn test_client(client: HttpClient) { + let (send, recv) = tokio::sync::oneshot::channel(); + + let mock = MockServer::new().await; + mock.push(Response::new("BANANAS".to_string())); + + let url = mock.url().to_string(); + let thread = std::thread::spawn(|| { + futures::executor::block_on(async move { + let retry = RetryConfig::default(); + let ret = client.get(url).send_retry(&retry).await.unwrap(); + let payload = ret.into_body().bytes().await.unwrap(); + assert_eq!(payload.as_ref(), b"BANANAS"); + let _ = send.send(()); + }) + }); + recv.await.unwrap(); + thread.join().unwrap(); + } + + #[tokio::test] + async fn test_spawn() { + let client = HttpClient::new(SpawnService::new(reqwest::Client::new(), Handle::current())); + test_client(client).await; + } + + #[tokio::test] + #[should_panic] + async fn test_no_spawn() { + let client = HttpClient::new(reqwest::Client::new()); + test_client(client).await; + } +} diff --git a/src/client/mod.rs b/src/client/mod.rs index 0c5dcc1..a71814b 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. -//! Generic utilities reqwest based ObjectStore implementations +//! Generic utilities for [`reqwest`] based [`ObjectStore`] implementations +//! +//! [`ObjectStore`]: crate::ObjectStore pub(crate) mod backoff; @@ -44,19 +46,12 @@ pub(crate) mod header; #[cfg(any(feature = "aws", feature = "gcp"))] pub(crate) mod s3; -mod body; -pub use body::{HttpRequest, HttpRequestBody, HttpResponse, HttpResponseBody}; - pub(crate) mod builder; - -mod connection; -pub(crate) use connection::http_connector; -#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))] -pub use connection::ReqwestConnector; -pub use connection::{HttpClient, HttpConnector, HttpError, HttpErrorKind, HttpService}; +mod http; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] pub(crate) mod parts; +pub use http::*; use async_trait::async_trait; use reqwest::header::{HeaderMap, HeaderValue}; @@ -128,7 +123,7 @@ pub enum ClientConfigKey { ProxyExcludes, /// Randomize order addresses that the DNS resolution yields. /// - /// This will spread the connections accross more servers. + /// This will spread the connections across more servers. RandomizeAddresses, /// Request timeout /// diff --git a/src/client/retry.rs b/src/client/retry.rs index a3a1449..5aac450 100644 --- a/src/client/retry.rs +++ b/src/client/retry.rs @@ -19,8 +19,7 @@ use crate::client::backoff::{Backoff, BackoffConfig}; use crate::client::builder::HttpRequestBuilder; -use crate::client::connection::HttpErrorKind; -use crate::client::{HttpClient, HttpError, HttpRequest, HttpResponse}; +use crate::client::{HttpClient, HttpError, HttpErrorKind, HttpRequest, HttpResponse}; use crate::PutPayload; use futures::future::BoxFuture; use http::{Method, Uri}; diff --git a/src/lib.rs b/src/lib.rs index 66575b8..ffca24a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -497,6 +497,14 @@ //! [`rustls-native-certs`]: https://crates.io/crates/rustls-native-certs/ //! [`webpki-roots`]: https://crates.io/crates/webpki-roots //! +//! # Customizing HTTP Clients +//! +//! Many [`ObjectStore`] implementations permit customization of the HTTP client via +//! the [`HttpConnector`] trait and utilities in the [`client`] module. +//! Examples include injecting custom HTTP headers or using an alternate +//! tokio Runtime I/O requests. +//! +//! [`HttpConnector`]: client::HttpConnector #[cfg(feature = "aws")] pub mod aws;