Skip to content

Commit cdb7b6f

Browse files
authored
Update reqwest 0.12 and http 1.0 (#5536)
1 parent ff86119 commit cdb7b6f

File tree

6 files changed

+99
-78
lines changed

6 files changed

+99
-78
lines changed

object_store/Cargo.toml

+5-3
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,12 @@ walkdir = "2"
4545

4646
# Cloud storage support
4747
base64 = { version = "0.22", default-features = false, features = ["std"], optional = true }
48-
hyper = { version = "0.14", default-features = false, optional = true }
48+
hyper = { version = "1.2", default-features = false, optional = true }
4949
quick-xml = { version = "0.31.0", features = ["serialize", "overlapped-lists"], optional = true }
5050
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
5151
serde_json = { version = "1.0", default-features = false, optional = true }
5252
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true }
53-
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls-native-roots"], optional = true }
53+
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2"], optional = true }
5454
ring = { version = "0.17", default-features = false, features = ["std"], optional = true }
5555
rustls-pemfile = { version = "2.0", default-features = false, features = ["std"], optional = true }
5656
tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-util"] }
@@ -69,7 +69,9 @@ tls-webpki-roots = ["reqwest?/rustls-tls-webpki-roots"]
6969

7070
[dev-dependencies] # In alphabetical order
7171
futures-test = "0.3"
72-
hyper = { version = "0.14.24", features = ["server"] }
72+
hyper = { version = "1.2", features = ["server"] }
73+
hyper-util = "0.1"
74+
http-body-util = "0.1"
7375
rand = "0.8"
7476
tempfile = "3.1.0"
7577

object_store/src/aws/credential.rs

+9-9
Original file line numberDiff line numberDiff line change
@@ -738,7 +738,7 @@ struct CreateSessionOutput {
738738
mod tests {
739739
use super::*;
740740
use crate::client::mock_server::MockServer;
741-
use hyper::{Body, Response};
741+
use hyper::Response;
742742
use reqwest::{Client, Method};
743743
use std::env;
744744

@@ -939,7 +939,7 @@ mod tests {
939939

940940
#[tokio::test]
941941
async fn test_mock() {
942-
let server = MockServer::new();
942+
let server = MockServer::new().await;
943943

944944
const IMDSV2_HEADER: &str = "X-aws-ec2-metadata-token";
945945

@@ -955,7 +955,7 @@ mod tests {
955955
server.push_fn(|req| {
956956
assert_eq!(req.uri().path(), "/latest/api/token");
957957
assert_eq!(req.method(), &Method::PUT);
958-
Response::new(Body::from("cupcakes"))
958+
Response::new("cupcakes".to_string())
959959
});
960960
server.push_fn(|req| {
961961
assert_eq!(
@@ -965,14 +965,14 @@ mod tests {
965965
assert_eq!(req.method(), &Method::GET);
966966
let t = req.headers().get(IMDSV2_HEADER).unwrap().to_str().unwrap();
967967
assert_eq!(t, "cupcakes");
968-
Response::new(Body::from("myrole"))
968+
Response::new("myrole".to_string())
969969
});
970970
server.push_fn(|req| {
971971
assert_eq!(req.uri().path(), "/latest/meta-data/iam/security-credentials/myrole");
972972
assert_eq!(req.method(), &Method::GET);
973973
let t = req.headers().get(IMDSV2_HEADER).unwrap().to_str().unwrap();
974974
assert_eq!(t, "cupcakes");
975-
Response::new(Body::from(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#))
975+
Response::new(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#.to_string())
976976
});
977977

978978
let creds = instance_creds(&client, &retry_config, endpoint, true)
@@ -989,7 +989,7 @@ mod tests {
989989
assert_eq!(req.method(), &Method::PUT);
990990
Response::builder()
991991
.status(StatusCode::FORBIDDEN)
992-
.body(Body::empty())
992+
.body(String::new())
993993
.unwrap()
994994
});
995995
server.push_fn(|req| {
@@ -999,13 +999,13 @@ mod tests {
999999
);
10001000
assert_eq!(req.method(), &Method::GET);
10011001
assert!(req.headers().get(IMDSV2_HEADER).is_none());
1002-
Response::new(Body::from("myrole"))
1002+
Response::new("myrole".to_string())
10031003
});
10041004
server.push_fn(|req| {
10051005
assert_eq!(req.uri().path(), "/latest/meta-data/iam/security-credentials/myrole");
10061006
assert_eq!(req.method(), &Method::GET);
10071007
assert!(req.headers().get(IMDSV2_HEADER).is_none());
1008-
Response::new(Body::from(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#))
1008+
Response::new(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#.to_string())
10091009
});
10101010

10111011
let creds = instance_creds(&client, &retry_config, endpoint, true)
@@ -1020,7 +1020,7 @@ mod tests {
10201020
server.push(
10211021
Response::builder()
10221022
.status(StatusCode::FORBIDDEN)
1023-
.body(Body::empty())
1023+
.body(String::new())
10241024
.unwrap(),
10251025
);
10261026

object_store/src/azure/credential.rs

+15-13
Original file line numberDiff line numberDiff line change
@@ -930,8 +930,8 @@ impl CredentialProvider for AzureCliCredential {
930930
#[cfg(test)]
931931
mod tests {
932932
use futures::executor::block_on;
933-
use hyper::body::to_bytes;
934-
use hyper::{Body, Response, StatusCode};
933+
use http_body_util::BodyExt;
934+
use hyper::{Response, StatusCode};
935935
use reqwest::{Client, Method};
936936
use tempfile::NamedTempFile;
937937

@@ -942,7 +942,7 @@ mod tests {
942942

943943
#[tokio::test]
944944
async fn test_managed_identity() {
945-
let server = MockServer::new();
945+
let server = MockServer::new().await;
946946

947947
std::env::set_var(MSI_SECRET_ENV_KEY, "env-secret");
948948

@@ -964,7 +964,7 @@ mod tests {
964964
assert_eq!(t, "env-secret");
965965
let t = req.headers().get("metadata").unwrap().to_str().unwrap();
966966
assert_eq!(t, "true");
967-
Response::new(Body::from(
967+
Response::new(
968968
r#"
969969
{
970970
"access_token": "TOKEN",
@@ -975,8 +975,9 @@ mod tests {
975975
"resource": "https://management.azure.com/",
976976
"token_type": "Bearer"
977977
}
978-
"#,
979-
))
978+
"#
979+
.to_string(),
980+
)
980981
});
981982

982983
let credential = ImdsManagedIdentityProvider::new(
@@ -999,7 +1000,7 @@ mod tests {
9991000

10001001
#[tokio::test]
10011002
async fn test_workload_identity() {
1002-
let server = MockServer::new();
1003+
let server = MockServer::new().await;
10031004
let tokenfile = NamedTempFile::new().unwrap();
10041005
let tenant = "tenant";
10051006
std::fs::write(tokenfile.path(), "federated-token").unwrap();
@@ -1012,10 +1013,10 @@ mod tests {
10121013
server.push_fn(move |req| {
10131014
assert_eq!(req.uri().path(), format!("/{tenant}/oauth2/v2.0/token"));
10141015
assert_eq!(req.method(), &Method::POST);
1015-
let body = block_on(to_bytes(req.into_body())).unwrap();
1016+
let body = block_on(async move { req.into_body().collect().await.unwrap().to_bytes() });
10161017
let body = String::from_utf8(body.to_vec()).unwrap();
10171018
assert!(body.contains("federated-token"));
1018-
Response::new(Body::from(
1019+
Response::new(
10191020
r#"
10201021
{
10211022
"access_token": "TOKEN",
@@ -1026,8 +1027,9 @@ mod tests {
10261027
"resource": "https://management.azure.com/",
10271028
"token_type": "Bearer"
10281029
}
1029-
"#,
1030-
))
1030+
"#
1031+
.to_string(),
1032+
)
10311033
});
10321034

10331035
let credential = WorkloadIdentityOAuthProvider::new(
@@ -1050,7 +1052,7 @@ mod tests {
10501052

10511053
#[tokio::test]
10521054
async fn test_no_credentials() {
1053-
let server = MockServer::new();
1055+
let server = MockServer::new().await;
10541056

10551057
let endpoint = server.url();
10561058
let store = MicrosoftAzureBuilder::new()
@@ -1068,7 +1070,7 @@ mod tests {
10681070
assert!(req.headers().get("Authorization").is_none());
10691071
Response::builder()
10701072
.status(StatusCode::NOT_FOUND)
1071-
.body(Body::from("not found"))
1073+
.body("not found".to_string())
10721074
.unwrap()
10731075
});
10741076

object_store/src/client/mock_server.rs

+48-34
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,23 @@
1717

1818
use futures::future::BoxFuture;
1919
use futures::FutureExt;
20-
use hyper::service::{make_service_fn, service_fn};
21-
use hyper::{Body, Request, Response, Server};
20+
use hyper::body::Incoming;
21+
use hyper::server::conn::http1;
22+
use hyper::service::service_fn;
23+
use hyper::{Request, Response};
24+
use hyper_util::rt::TokioIo;
2225
use parking_lot::Mutex;
2326
use std::collections::VecDeque;
2427
use std::convert::Infallible;
2528
use std::future::Future;
2629
use std::net::SocketAddr;
2730
use std::sync::Arc;
31+
use tokio::net::TcpListener;
2832
use tokio::sync::oneshot;
29-
use tokio::task::JoinHandle;
33+
use tokio::task::{JoinHandle, JoinSet};
3034

31-
pub type ResponseFn = Box<dyn FnOnce(Request<Body>) -> BoxFuture<'static, Response<Body>> + Send>;
35+
pub type ResponseFn =
36+
Box<dyn FnOnce(Request<Incoming>) -> BoxFuture<'static, Response<String>> + Send>;
3237

3338
/// A mock server
3439
pub struct MockServer {
@@ -39,39 +44,48 @@ pub struct MockServer {
3944
}
4045

4146
impl MockServer {
42-
pub fn new() -> Self {
47+
pub async fn new() -> Self {
4348
let responses: Arc<Mutex<VecDeque<ResponseFn>>> =
4449
Arc::new(Mutex::new(VecDeque::with_capacity(10)));
4550

46-
let r = Arc::clone(&responses);
47-
let make_service = make_service_fn(move |_conn| {
48-
let r = Arc::clone(&r);
49-
async move {
50-
Ok::<_, Infallible>(service_fn(move |req| {
51-
let r = Arc::clone(&r);
52-
let next = r.lock().pop_front();
53-
async move {
54-
Ok::<_, Infallible>(match next {
55-
Some(r) => r(req).await,
56-
None => Response::new(Body::from("Hello World")),
57-
})
58-
}
59-
}))
60-
}
61-
});
51+
let addr = SocketAddr::from(([127, 0, 0, 1], 0));
52+
let listener = TcpListener::bind(addr).await.unwrap();
6253

63-
let (shutdown, rx) = oneshot::channel::<()>();
64-
let server = Server::bind(&SocketAddr::from(([127, 0, 0, 1], 0))).serve(make_service);
54+
let (shutdown, mut rx) = oneshot::channel::<()>();
6555

66-
let url = format!("http://{}", server.local_addr());
56+
let url = format!("http://{}", listener.local_addr().unwrap());
6757

58+
let r = Arc::clone(&responses);
6859
let handle = tokio::spawn(async move {
69-
server
70-
.with_graceful_shutdown(async {
71-
rx.await.ok();
72-
})
73-
.await
74-
.unwrap()
60+
let mut set = JoinSet::new();
61+
62+
loop {
63+
let (stream, _) = tokio::select! {
64+
conn = listener.accept() => conn.unwrap(),
65+
_ = &mut rx => break,
66+
};
67+
68+
let r = Arc::clone(&r);
69+
set.spawn(async move {
70+
let _ = http1::Builder::new()
71+
.serve_connection(
72+
TokioIo::new(stream),
73+
service_fn(move |req| {
74+
let r = Arc::clone(&r);
75+
let next = r.lock().pop_front();
76+
async move {
77+
Ok::<_, Infallible>(match next {
78+
Some(r) => r(req).await,
79+
None => Response::new("Hello World".to_string()),
80+
})
81+
}
82+
}),
83+
)
84+
.await;
85+
});
86+
}
87+
88+
set.abort_all();
7589
});
7690

7791
Self {
@@ -88,23 +102,23 @@ impl MockServer {
88102
}
89103

90104
/// Add a response
91-
pub fn push(&self, response: Response<Body>) {
105+
pub fn push(&self, response: Response<String>) {
92106
self.push_fn(|_| response)
93107
}
94108

95109
/// Add a response function
96110
pub fn push_fn<F>(&self, f: F)
97111
where
98-
F: FnOnce(Request<Body>) -> Response<Body> + Send + 'static,
112+
F: FnOnce(Request<Incoming>) -> Response<String> + Send + 'static,
99113
{
100114
let f = Box::new(|req| async move { f(req) }.boxed());
101115
self.responses.lock().push_back(f)
102116
}
103117

104118
pub fn push_async_fn<F, Fut>(&self, f: F)
105119
where
106-
F: FnOnce(Request<Body>) -> Fut + Send + 'static,
107-
Fut: Future<Output = Response<Body>> + Send + 'static,
120+
F: FnOnce(Request<Incoming>) -> Fut + Send + 'static,
121+
Fut: Future<Output = Response<String>> + Send + 'static,
108122
{
109123
self.responses.lock().push_back(Box::new(|r| f(r).boxed()))
110124
}

0 commit comments

Comments
 (0)