Skip to content

Commit

Permalink
Made it retriable
Browse files Browse the repository at this point in the history
  • Loading branch information
amigin committed Nov 22, 2024
1 parent ac84113 commit 0629c74
Show file tree
Hide file tree
Showing 11 changed files with 591 additions and 306 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ hyper-util = { version = "*", features = ["tokio"] }

http-body-util = { version = "*" }

my-http-client = { tag = "0.1.1", git = "https://github.com/MyJetTools/my-http-client.git" }
my-http-client = { tag = "0.1.2", git = "https://github.com/MyJetTools/my-http-client.git" }

lazy_static = "*"
async-trait = "*"
Expand Down
216 changes: 120 additions & 96 deletions src/fl_url.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
use hyper::Method;

use hyper::Version;
use my_http_client::http1::MyHttpClient;
use my_http_client::http1::MyHttpRequest;
use my_http_client::MyHttpClientError;
use my_http_client::http1::*;
use my_http_client::MyHttpClientConnector;
use my_tls::tokio_rustls::client::TlsStream;

use rust_extensions::StrOrString;

use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;

use super::FlUrlResponse;
use crate::HttpClientsCache;
use crate::http_connectors::*;

use crate::http_clients_cache::*;

use crate::HttpClientResolver;

use crate::FlUrlError;

use crate::FlUrlHeaders;

use crate::UrlBuilder;

pub struct FlUrl {
Expand Down Expand Up @@ -229,63 +236,54 @@ impl FlUrl {
self
}

async fn execute(mut self, request: MyHttpRequest) -> Result<FlUrlResponse, FlUrlError> {
async fn execute(self, request: MyHttpRequest) -> Result<FlUrlResponse, FlUrlError> {
#[cfg(feature = "with-ssh")]
{
if let Some(ssh_credentials) = self.ssh_credentials.take() {
return self.execute_with_ssh(request, ssh_credentials).await;
if self.ssh_credentials.is_some() {
return self.execute_ssh(request).await;
}
}

let response = match self.url.get_scheme() {
crate::Scheme::Http => {
if self.do_not_reuse_connection {
let remote_endpoint = self.url.get_remote_endpoint();
let http_connector =
crate::http_connectors::HttpConnector::new(remote_endpoint.to_owned());
let client = MyHttpClient::new(http_connector);
let response = client.do_request(request, self.request_timeout).await?;
FlUrlResponse::from_http1_response(self.url, response)
self.execute_with_retry::<TcpStream, HttpConnector, _>(
&request,
&http::HttpClientCreator,
#[cfg(feature = "with-ssh")]
None,
)
.await?
} else {
let reused_connection = self
.get_clients_cache()
.get_http_and_reuse(&self.url)
.await?;

let response = reused_connection
.do_request(request, self.request_timeout)
.await;

let response = self.handle_http_response(response).await?;
FlUrlResponse::from_http1_response(self.url, response)
let clients_cache = self.get_clients_cache();
self.execute_with_retry::<TcpStream, HttpConnector, _>(
&request,
clients_cache.as_ref(),
#[cfg(feature = "with-ssh")]
None,
)
.await?
}
}
crate::Scheme::Https => {
if self.do_not_reuse_connection {
let http_connector = crate::http_connectors::HttpsConnector::new(
self.url.get_remote_endpoint().to_owned(),
self.tls_server_name.take(),
self.client_cert.take(),
);
let client = MyHttpClient::new(http_connector);
let response = client.do_request(request, self.request_timeout).await?;
FlUrlResponse::from_http1_response(self.url, response)
self.execute_with_retry::<TlsStream<TcpStream>, HttpsConnector, _>(
&request,
&https::HttpsClientCreator,
#[cfg(feature = "with-ssh")]
None,
)
.await?
} else {
let reused_connection = self
.get_clients_cache()
.get_https_and_reuse(
&self.url,
self.tls_server_name.take(),
self.client_cert.take(),
)
.await?;

let response = reused_connection
.do_request(request, self.request_timeout)
.await;
let response = self.handle_http_response(response).await?;

FlUrlResponse::from_http1_response(self.url, response)
let clients_cache = self.get_clients_cache();

self.execute_with_retry::<TlsStream<TcpStream>, HttpsConnector, _>(
&request,
clients_cache.as_ref(),
#[cfg(feature = "with-ssh")]
None,
)
.await?
}
}
#[cfg(not(feature = "unix-socket"))]
Expand All @@ -296,26 +294,23 @@ impl FlUrl {
#[cfg(feature = "unix-socket")]
crate::Scheme::UnixSocket => {
if self.do_not_reuse_connection {
let remote_endpoint = self.url.get_remote_endpoint();
let http_connector = crate::http_connectors::UnixSocketConnector::new(
remote_endpoint.to_owned(),
);
let client = MyHttpClient::new(http_connector);
let response = client.do_request(request, self.request_timeout).await?;
FlUrlResponse::from_http1_response(self.url, response)
self.execute_with_retry::<UnixSocketStream, UnixSocketConnector, _>(
&request,
&unix_socket::UnixSocketHttpClientCreator,
#[cfg(feature = "with-ssh")]
None,
)
.await?
} else {
let reused_connection = self
.get_clients_cache()
.get_unix_socket_and_reuse(&self.url)
.await?;

let response = reused_connection
.do_request(request, self.request_timeout)
.await;

let response = self.handle_http_response(response).await?;

FlUrlResponse::from_http1_response(self.url, response)
let clients_cache = self.get_clients_cache();

self.execute_with_retry::<UnixSocketStream, UnixSocketConnector, _>(
&request,
clients_cache.as_ref(),
#[cfg(feature = "with-ssh")]
None,
)
.await?
}
}
};
Expand All @@ -324,26 +319,17 @@ impl FlUrl {
}

#[cfg(feature = "with-ssh")]
async fn execute_with_ssh(
self,
request: MyHttpRequest,
ssh_credentials: my_ssh::SshCredentials,
) -> Result<FlUrlResponse, FlUrlError> {
let reused_connection = self
.get_clients_cache()
.get_ssh_and_reuse(&self.url, &Arc::new(ssh_credentials))
.await?;

let response = reused_connection
.do_request(request, self.request_timeout)
.await;
let response = self.handle_http_response(response).await?;

let result = FlUrlResponse::from_http1_response(self.url, response);

Ok(result)
async fn execute_ssh(mut self, request: MyHttpRequest) -> Result<FlUrlResponse, FlUrlError> {
let ssh_credentials = self.ssh_credentials.take().unwrap();

let clients_cache = self.get_clients_cache();
self.execute_with_retry::<my_ssh::SshAsyncChannel, SshHttpConnector, _>(
&request,
clients_cache.as_ref(),
Some(Arc::new(ssh_credentials)),
)
.await
}

pub(crate) fn get_clients_cache(&self) -> Arc<HttpClientsCache> {
match self.clients_cache.as_ref() {
Some(cache) => cache.clone(),
Expand Down Expand Up @@ -437,17 +423,55 @@ impl FlUrl {
self.execute(request).await
}

async fn handle_http_response<TResult>(
&self,
result: Result<TResult, MyHttpClientError>,
) -> Result<TResult, MyHttpClientError> {
match result {
Ok(result) => {
return Ok(result);
}
Err(err) => {
self.get_clients_cache().remove(&self.url).await;
Err(err)
async fn execute_with_retry<
TStream: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Sync + 'static,
TConnector: MyHttpClientConnector<TStream> + Send + Sync + 'static,
THttpClientResolver: HttpClientResolver<TStream, TConnector>,
>(
mut self,
request: &MyHttpRequest,
http_client_resolver: &THttpClientResolver,
#[cfg(feature = "with-ssh")] ssh_credentials: Option<Arc<my_ssh::SshCredentials>>,
) -> Result<FlUrlResponse, FlUrlError> {
let mut attempt_no = 0;
let domain_override = self.tls_server_name.take();
let client_cert = self.client_cert.take();

loop {
let reused_connection = http_client_resolver
.get_http_client(
&self.url,
domain_override.as_ref(),
client_cert.as_ref(),
#[cfg(feature = "with-ssh")]
ssh_credentials.as_ref(),
)
.await;

let response = reused_connection
.do_request(request, self.request_timeout)
.await;

match response {
Ok(response) => {
let response = FlUrlResponse::from_http1_response(self.url, response);
return Ok(response);
}
Err(err) => {
http_client_resolver
.drop_http_client(
&self.url,
#[cfg(feature = "with-ssh")]
ssh_credentials.as_ref(),
)
.await;

if self.max_retries >= attempt_no {
return Err(FlUrlError::MyHttpClientError(err));
}

attempt_no += 1;
}
}
}
}
Expand Down
Loading

0 comments on commit 0629c74

Please sign in to comment.