Skip to content

feat: Add SpawnService and SpawnedReqwestConnector for running requests on a different runtime #332

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 6, 2025
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ jobs:
echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:4.0.3)" >> $GITHUB_ENV
echo "EC2_METADATA_CONTAINER=$(docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2)" >> $GITHUB_ENV
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-spawn
aws --endpoint-url=http://localhost:4566 s3api create-bucket --bucket test-object-lock --object-lock-enabled-for-bucket
aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

Expand All @@ -150,8 +151,9 @@ jobs:
- name: Run object_store tests
run: cargo test --features=aws,azure,gcp,http

# Don't rerun doc tests (some of them rely on features other than aws)
- name: Run object_store tests (AWS native conditional put)
run: cargo test --features=aws
run: cargo test --lib --tests --features=aws
env:
AWS_CONDITIONAL_PUT: etag
AWS_COPY_IF_NOT_EXISTS: multipart
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ rusty-tags.vi
.flatbuffers/
.idea/
.vscode
.zed
.devcontainer
venv/*
# created by doctests
Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Or directly with:

```shell
aws s3 mb s3://test-bucket --endpoint-url=http://localhost:4566
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-spawn
aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
```

Expand Down
54 changes: 54 additions & 0 deletions src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ impl MultipartStore for AmazonS3 {
mod tests {
use super::*;
use crate::client::get::GetClient;
use crate::client::SpawnedReqwestConnector;
use crate::integration::*;
use crate::tests::*;
use crate::ClientOptions;
Expand Down Expand Up @@ -820,4 +821,57 @@ mod tests {
store.delete(location).await.unwrap();
}
}

/// Integration test that ensures I/O is done on an alternate threadpool
/// when using the `SpawnedReqwestConnector`.
#[test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wrote this end to end integration test to ensure that everything worked correctly when hooked up to an ObjectStore implementation (and it does!)

fn s3_alternate_threadpool_spawned_request_connector() {
maybe_skip_integration!();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();

// Runtime with I/O enabled
let io_runtime = tokio::runtime::Builder::new_current_thread()
.enable_all() // <-- turns on IO
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you comment out this line the test fails

.build()
.unwrap();

// Runtime without I/O enabled
let non_io_runtime = tokio::runtime::Builder::new_current_thread()
// note: no call to enable_all
.build()
.unwrap();

// run the io runtime in a different thread
let io_handle = io_runtime.handle().clone();
let thread_handle = std::thread::spawn(move || {
io_runtime.block_on(async move {
shutdown_rx.await.unwrap();
});
});

let store = AmazonS3Builder::from_env()
// use different bucket to avoid collisions with other tests
.with_bucket_name("test-bucket-for-spawn")
.with_http_connector(SpawnedReqwestConnector::new(io_handle))
.build()
.unwrap();

// run a request on the non io runtime -- will fail if the connector
// does not spawn the request to the io runtime
non_io_runtime
.block_on(async move {
let path = Path::from("alternate_threadpool/test.txt");
store.delete(&path).await.ok(); // remove the file if it exists from prior runs
store.put(&path, "foo".into()).await?;
let res = store.get(&path).await?.bytes().await?;
assert_eq!(res.as_ref(), b"foo");
store.delete(&path).await?; // cleanup
Ok(()) as Result<()>
})
.expect("failed to run request on non io runtime");

// shutdown the io runtime and thread
shutdown_tx.send(()).ok();
thread_handle.join().expect("runtime thread panicked");
}
}
3 changes: 1 addition & 2 deletions src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::client::connection::HttpErrorKind;
use crate::client::{HttpClient, HttpError, HttpRequest, HttpRequestBody};
use crate::client::{HttpClient, HttpError, HttpErrorKind, HttpRequest, HttpRequestBody};
use http::header::{InvalidHeaderName, InvalidHeaderValue};
use http::uri::InvalidUri;
use http::{HeaderName, HeaderValue, Method, Uri};
Expand Down
14 changes: 13 additions & 1 deletion src/client/body.rs → src/client/http/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::client::connection::{HttpError, HttpErrorKind};
use crate::client::{HttpError, HttpErrorKind};
use crate::{collect_bytes, PutPayload};
use bytes::Bytes;
use futures::stream::BoxStream;
Expand Down Expand Up @@ -203,6 +203,18 @@ impl HttpResponseBody {
}
}

impl Body for HttpResponseBody {
type Data = Bytes;
type Error = HttpError;

fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
Pin::new(&mut self.0).poll_frame(cx)
}
}

impl From<Bytes> for HttpResponseBody {
fn from(value: Bytes) -> Self {
Self::new(Full::new(value).map_err(|e| match e {}))
Expand Down
65 changes: 63 additions & 2 deletions src/client/connection.rs → src/client/http/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use crate::client::body::{HttpRequest, HttpResponse};
use crate::client::builder::{HttpRequestBuilder, RequestBuilderError};
use crate::client::HttpResponseBody;
use crate::client::{HttpRequest, HttpResponse, HttpResponseBody};
use crate::ClientOptions;
use async_trait::async_trait;
use http::{Method, Uri};
use http_body_util::BodyExt;
use std::error::Error;
use std::sync::Arc;
use tokio::runtime::Handle;

/// An HTTP protocol error
///
Expand Down Expand Up @@ -298,6 +298,67 @@ impl HttpConnector for ReqwestConnector {
}
}

/// [`reqwest::Client`] connector that performs all I/O on the provided tokio
/// [`Runtime`] (thread pool).
///
/// This adapter is most useful when you wish to segregate I/O from CPU bound
/// work that may be happening on the [`Runtime`].
///
/// [`Runtime`]: tokio::runtime::Runtime
///
/// # Example: Spawning requests on separate runtime
///
/// ```
/// # use std::sync::Arc;
/// # use tokio::runtime::Runtime;
/// # use object_store::azure::MicrosoftAzureBuilder;
/// # use object_store::client::SpawnedReqwestConnector;
/// # use object_store::ObjectStore;
/// # fn get_io_runtime() -> Runtime {
/// # tokio::runtime::Builder::new_current_thread().build().unwrap()
/// # }
/// # fn main() -> Result<(), object_store::Error> {
/// // create a tokio runtime for I/O.
/// let io_runtime: Runtime = get_io_runtime();
/// // configure a store using the runtime.
/// let handle = io_runtime.handle().clone(); // get a handle to the same runtime
/// let store: Arc<dyn ObjectStore> = Arc::new(
/// MicrosoftAzureBuilder::new()
/// .with_http_connector(SpawnedReqwestConnector::new(handle))
/// .with_container_name("my_container")
/// .with_account("my_account")
/// .build()?
/// );
/// // any requests made using store will be spawned on the io_runtime
/// # Ok(())
/// # }
/// ```
#[derive(Debug)]
#[allow(missing_copy_implementations)]
#[cfg(not(target_arch = "wasm32"))]
pub struct SpawnedReqwestConnector {
runtime: Handle,
}

#[cfg(not(target_arch = "wasm32"))]
impl SpawnedReqwestConnector {
/// Create a new [`SpawnedReqwestConnector`] with the provided [`Handle`] to
/// a tokio [`Runtime`]
///
/// [`Runtime`]: tokio::runtime::Runtime
pub fn new(runtime: Handle) -> Self {
Self { runtime }
}
}

#[cfg(not(target_arch = "wasm32"))]
impl HttpConnector for SpawnedReqwestConnector {
fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient> {
let spawn_service = super::SpawnService::new(options.client()?, self.runtime.clone());
Ok(HttpClient::new(spawn_service))
}
}

#[cfg(all(target_arch = "wasm32", target_os = "wasi"))]
pub(crate) fn http_connector(
custom: Option<Arc<dyn HttpConnector>>,
Expand Down
27 changes: 27 additions & 0 deletions src/client/http/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! HTTP client abstraction

mod body;
pub use body::*;

mod connection;
pub use connection::*;

mod spawn;
pub use spawn::*;
Loading