diff --git a/Cargo.toml b/Cargo.toml index 1c9b6d1..831190e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "flurl" -version = "0.5.3" +version = "0.6.0" authors = ["Andrey "] edition = "2021" @@ -24,6 +24,8 @@ hyper-util = { version = "*", features = ["tokio"] } http-body-util = { version = "*" } +my-http-client = { tag = "0.1.0", git = "https://github.com/MyJetTools/my-http-client.git" } + lazy_static = "*" async-trait = "*" bytes = "*" @@ -36,5 +38,4 @@ my-ssh = { tag = "0.1.1", git = "https://github.com/MyJetTools/my-ssh.git", opti webpki-roots = "*" webpki = "*" -tower-service = "*" pem = "*" diff --git a/src/errors/fl_url_error.rs b/src/errors/fl_url_error.rs index 9c42ec5..52dc4aa 100644 --- a/src/errors/fl_url_error.rs +++ b/src/errors/fl_url_error.rs @@ -1,3 +1,5 @@ +use my_http_client::MyHttpClientError; + #[derive(Debug)] pub enum FlUrlError { HyperError(hyper::Error), @@ -11,9 +13,10 @@ pub enum FlUrlError { CanNotEstablishConnection(String), RustTlsError(my_tls::tokio_rustls::rustls::Error), CanNotConvertToUtf8(std::str::Utf8Error), - + MyHttpClientError(my_http_client::MyHttpClientError), #[cfg(feature = "with-ssh")] SshSessionError(my_ssh::SshSessionError), + ReadingHyperBodyError(String), } impl FlUrlError { @@ -31,6 +34,12 @@ impl From for FlUrlError { } } +impl From for FlUrlError { + fn from(value: MyHttpClientError) -> Self { + Self::MyHttpClientError(value) + } +} + #[cfg(feature = "with-ssh")] impl FlUrlError { pub fn is_ssh_session_error(&self) -> bool { diff --git a/src/fl_response.rs b/src/fl_response.rs index 1e228bd..8b5d2eb 100644 --- a/src/fl_response.rs +++ b/src/fl_response.rs @@ -1,35 +1,40 @@ use std::{collections::HashMap, fmt::Debug}; -use hyper::{body::Incoming, Response}; +use hyper::StatusCode; use serde::de::DeserializeOwned; -use crate::{FlUrlError, FlUrlReadingHeaderError, ResponseBody, UrlBuilderOwned}; +use crate::{FlUrlError, FlUrlReadingHeaderError, ResponseBody, UrlBuilder}; pub struct FlUrlResponse { - pub url: UrlBuilderOwned, - status_code: u16, + pub url: UrlBuilder, + status_code: StatusCode, response: ResponseBody, } impl Debug for FlUrlResponse { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("FlUrlResponse") - .field("url", &self.url) + .field("url", &self.url.as_str()) .field("status_code", &self.status_code) .finish() } } impl FlUrlResponse { - pub fn new(url: UrlBuilderOwned, response: Response) -> Self { + pub fn from_http1_response< + TStream: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Sync + 'static, + >( + url: UrlBuilder, + response: my_http_client::http1::MyHttpResponse, + ) -> Self { Self { - status_code: response.status().as_u16(), - response: ResponseBody::Incoming(response.into()), + status_code: response.status(), + response: ResponseBody::Hyper(Some(response.into_response())), url, } } - pub fn into_hyper_response(self) -> Response { + pub fn into_hyper_response(self) -> my_http_client::HyperResponse { self.response.into_hyper_response() } @@ -76,6 +81,6 @@ impl FlUrlResponse { } pub fn get_status_code(&self) -> u16 { - self.status_code + self.status_code.as_u16() } } diff --git a/src/fl_url.rs b/src/fl_url.rs index eaf8cdb..0318547 100644 --- a/src/fl_url.rs +++ b/src/fl_url.rs @@ -1,19 +1,19 @@ use hyper::Method; -use rust_extensions::ShortString; +use hyper::Version; +use my_http_client::http1::MyHttpClient; +use my_http_client::http1::MyHttpRequest; use rust_extensions::StrOrString; use std::sync::Arc; use std::time::Duration; use super::FlUrlResponse; -use crate::DropConnectionScenario; use crate::HttpClientsCache; use crate::FlUrlError; use crate::FlUrlHeaders; -use crate::HttpClient; use crate::UrlBuilder; pub struct FlUrl { @@ -24,12 +24,13 @@ pub struct FlUrl { pub execute_timeout: Duration, // If we are trying to reuse connection, but it was not used for this time, we will drop it pub not_used_connection_timeout: Duration, + pub request_timeout: Duration, pub do_not_reuse_connection: bool, pub clients_cache: Option>, + pub tls_server_name: Option, #[cfg(feature = "with-ssh")] - ssh_target: crate::ssh::SshTarget, + ssh_credentials: Option, - pub drop_connection_scenario: Box, max_retries: usize, retry_delay: Duration, } @@ -38,7 +39,42 @@ impl FlUrl { pub fn new<'s>(url: impl Into>) -> Self { let url: StrOrString<'s> = url.into(); - let url = UrlBuilder::new(ShortString::from_str(url.as_str()).unwrap()); + #[cfg(feature = "with-ssh")] + let (url, credentials) = { + let endpoint = + rust_extensions::remote_endpoint::RemoteEndpointHostString::try_parse(url.as_str()) + .unwrap(); + + match endpoint { + rust_extensions::remote_endpoint::RemoteEndpointHostString::Direct( + _remote_endpoint, + ) => (UrlBuilder::new(url.as_str()), None), + rust_extensions::remote_endpoint::RemoteEndpointHostString::ViaSsh { + ssh_remote_host, + remote_host_behind_ssh, + } => ( + UrlBuilder::new(remote_host_behind_ssh.as_str()), + Some(crate::ssh::to_ssh_credentials(&ssh_remote_host)), + ), + } + }; + + #[cfg(not(feature = "with-ssh"))] + let url = { + let endpoint = + rust_extensions::remote_endpoint::RemoteEndpointHostString::try_parse(url.as_str()) + .unwrap(); + + match endpoint { + rust_extensions::remote_endpoint::RemoteEndpointHostString::Direct( + _remote_endpoint, + ) => UrlBuilder::new(url.as_str()), + rust_extensions::remote_endpoint::RemoteEndpointHostString::ViaSsh { + ssh_remote_host: _, + remote_host_behind_ssh: _, + } => panic!("To use ssh you need to enable with-ssh feature"), + } + }; Self { headers: FlUrlHeaders::new(), @@ -47,38 +83,15 @@ impl FlUrl { url, accept_invalid_certificate: false, do_not_reuse_connection: false, - drop_connection_scenario: Box::new(crate::DefaultDropConnectionScenario), clients_cache: None, not_used_connection_timeout: Duration::from_secs(30), max_retries: 0, retry_delay: Duration::from_secs(3), + request_timeout: Duration::from_secs(10), + tls_server_name: None, #[cfg(feature = "with-ssh")] - ssh_target: crate::ssh::SshTarget { - credentials: None, - sessions_pool: None, - http_buffer_size: 512 * 1024, - }, - } - } - - /// Url can be: "http://localhost:8080" or "ssh://user:password@host:port->http://localhost:8080" - #[cfg(feature = "with-ssh")] - pub async fn new_with_maybe_ssh<'s>( - url: impl Into>, - ssh_credentials: Option< - &std::collections::HashMap, - >, - ) -> Self { - let url = url.into(); - let over_ssh_config = - my_ssh::OverSshConnectionSettings::parse(url.as_str(), ssh_credentials).await; - - if over_ssh_config.ssh_credentials.is_none() { - return Self::new(url); + ssh_credentials: credentials, } - - Self::new(over_ssh_config.remote_resource_string) - .set_ssh_credentials(Arc::new(over_ssh_config.ssh_credentials.unwrap())) } pub fn set_not_used_connection_timeout(mut self, timeout: Duration) -> Self { @@ -97,21 +110,53 @@ impl FlUrl { self } - #[cfg(feature = "with-ssh")] - pub fn set_ssh_credentials(mut self, ssh_credentials: Arc) -> Self { - self.ssh_target.credentials = Some(ssh_credentials); + pub fn set_tls_server_name(mut self, domain: String) -> Self { + self.tls_server_name = Some(domain); self } #[cfg(feature = "with-ssh")] - pub fn set_ssh_sessions_pool(mut self, ssh_credentials: Arc) -> Self { - self.ssh_target.sessions_pool = Some(ssh_credentials); + pub fn set_ssh_password<'s>(mut self, password: impl Into>) -> Self { + let ssh_credentials = self.ssh_credentials.take(); + if ssh_credentials.is_none() { + panic!("To specify ssh password you need to use ssh://user:password@host:port->http://localhost:8080 connection line"); + } + let ssh_credentials = ssh_credentials.unwrap(); + + let (host, port) = ssh_credentials.get_host_port(); + + let password = password.into(); + + self.ssh_credentials = Some(my_ssh::SshCredentials::UserNameAndPassword { + ssh_remote_host: host.to_string(), + ssh_remote_port: port, + ssh_user_name: ssh_credentials.get_user_name().to_string(), + password: password.to_string(), + }); self } #[cfg(feature = "with-ssh")] - pub fn set_http_buffer_size(mut self, buffer_size: usize) -> Self { - self.ssh_target.http_buffer_size = buffer_size; + pub fn set_ssh_private_key<'s>( + mut self, + private_key: String, + passphrase: Option, + ) -> Self { + let ssh_credentials = self.ssh_credentials.take(); + if ssh_credentials.is_none() { + panic!("To specify ssh password you need to use ssh://user:password@host:port->http://localhost:8080 connection line"); + } + let ssh_credentials = ssh_credentials.unwrap(); + + let (host, port) = ssh_credentials.get_host_port(); + + self.ssh_credentials = Some(my_ssh::SshCredentials::PrivateKey { + ssh_remote_host: host.to_string(), + ssh_remote_port: port, + ssh_user_name: ssh_credentials.get_user_name().to_string(), + private_key, + passphrase, + }); self } @@ -119,19 +164,6 @@ impl FlUrl { self.execute_timeout = timeout; self } - pub fn set_tls_domain(mut self, domain: impl Into>) -> Self { - let domain = domain.into(); - self.url.tls_domain = Some(domain.to_string()); - self - } - - pub fn override_drop_connection_scenario( - mut self, - drop_connection_scenario: impl DropConnectionScenario + Send + Sync + 'static, - ) -> Self { - self.drop_connection_scenario = Box::new(drop_connection_scenario); - self - } pub fn do_not_reuse_connection(mut self) -> Self { self.do_not_reuse_connection = true; @@ -165,226 +197,144 @@ impl FlUrl { param_name: impl Into>, value: Option>>, ) -> Self { - let param_name = param_name.into().to_string(); + let param_name = param_name.into(); - let value: Option = if let Some(value) = value { - Some(value.into().to_string()) + if let Some(value) = value { + let value = value.into(); + self.url + .append_query_param(param_name.as_str(), Some(value.as_str())); } else { - None + self.url.append_query_param(param_name.as_str(), None); }; - self.url.append_query_param(param_name, value); self } - pub fn with_header<'v>( + pub fn with_header<'n, 'v>( mut self, - name: impl Into>, + name: impl Into>, value: impl Into>, ) -> Self { - let name: StrOrString<'static> = name.into(); - let value: StrOrString<'v> = value.into(); + let name: StrOrString<'_> = name.into(); + let value: StrOrString<'_> = value.into(); - self.headers.add(name, value.to_string()); + self.headers.add(name.as_str(), value.as_str()); self } pub fn append_raw_ending_to_url<'r>(mut self, raw: impl Into>) -> Self { let raw: StrOrString<'r> = raw.into(); - self.url.append_raw_ending(raw.to_string()); + self.url.append_raw_ending(raw.as_str()); self } - async fn execute_json( - self, - method: Method, - json: &impl serde::Serialize, - ) -> Result { - let body = serde_json::to_vec(json).unwrap(); - self.with_header("Content-Type", "application/json") - .execute(method, Some(body)) - .await - } - - async fn execute_json_with_retries( - self, - method: Method, - json: &impl serde::Serialize, - ) -> Result { - let body = serde_json::to_vec(json).unwrap(); - - if self.max_retries > 0 { - return self - .with_header("Content-Type", "application/json") - .execute_with_retires(method, Some(body)) - .await; - } - self.with_header("Content-Type", "application/json") - .execute(method, Some(body)) - .await - } - - async fn execute_with_retires( - &self, - method: Method, - body: Option>, - ) -> Result { - let mut no = 0; - loop { - match self.execute(method.clone(), body.clone()).await { - Ok(response) => return Ok(response), - Err(e) => { - if no >= self.max_retries { - return Err(e); - } - tokio::time::sleep(self.retry_delay).await; - } - } - - no += 1; - } - } - - async fn execute( - &self, - method: Method, - body: Option>, - ) -> Result { + async fn execute(mut self, request: MyHttpRequest) -> Result { #[cfg(feature = "with-ssh")] { - if let Some(ssh_credentials) = &self.ssh_target.credentials { - return self.execute_with_ssh(method, body, ssh_credentials).await; + if let Some(ssh_credentials) = self.ssh_credentials.take() { + return self.execute_with_ssh(request, ssh_credentials).await; } } - let scheme_and_host = self.url.get_scheme_and_host(); - - if self.do_not_reuse_connection { - let client = - HttpClient::new(&self.url, &self.client_cert, self.execute_timeout).await?; - return client - .execute_request(&self.url, method, &self.headers, body, self.execute_timeout) - .await; - } - - return self - .execute_with_cancel_retry(scheme_and_host, method, body) - .await; - - /* - let clients_cache = self.get_clients_cache(); - - let client = clients_cache - .get_and_reuse( - &self.url, - self.execute_timeout, - &self.client_cert, - self.not_used_connection_timeout, - ) - .await?; - - let result = client - .execute_request(&self.url, method, &self.headers, body, self.execute_timeout) - .await; - - match result { - Ok(result) => { - if self.drop_connection_scenario.should_we_drop_it(&result) { - clients_cache.remove(scheme_and_host.as_str()).await; + let response = match self.url.get_scheme() { + crate::Scheme::Http => { + if self.do_not_reuse_connection { + let remote_endpoint = self.url.get_remote_endpoint(); + let http_connector = + crate::http_connectors::HttpConnector::new(remote_endpoint.to_owned()); + let client = MyHttpClient::new(http_connector); + let response = client.do_request(request, self.request_timeout).await?; + FlUrlResponse::from_http1_response(self.url, response) + } else { + let reused_connection = self + .get_clients_cache() + .get_http_and_reuse(&self.url) + .await?; + + let response = reused_connection + .do_request(request, self.request_timeout) + .await?; + FlUrlResponse::from_http1_response(self.url, response) } - return Ok(result); } - Err(err) => { - clients_cache.remove(scheme_and_host.as_str()).await; - return Err(err); - } - } - */ - } - - async fn execute_with_cancel_retry( - &self, - scheme_and_host: ShortString, - method: Method, - body: Option>, - ) -> Result { - let clients_cache = self.get_clients_cache(); - - let mut had_retry = false; - - loop { - let client = clients_cache - .get_and_reuse( - &self.url, - self.execute_timeout, - &self.client_cert, - self.not_used_connection_timeout, - ) - .await?; - - let result = client - .execute_request( - &self.url, - method.clone(), - &self.headers, - body.clone(), - self.execute_timeout, - ) - .await; - - match result { - Ok(result) => { - if self.drop_connection_scenario.should_we_drop_it(&result) { - clients_cache.remove(scheme_and_host.as_str()).await; - } - return Ok(result); + crate::Scheme::Https => { + if self.do_not_reuse_connection { + let http_connector = crate::http_connectors::HttpsConnector::new( + self.url.get_remote_endpoint().to_owned(), + self.tls_server_name.take(), + self.client_cert.take(), + ); + let client = MyHttpClient::new(http_connector); + let response = client.do_request(request, self.request_timeout).await?; + FlUrlResponse::from_http1_response(self.url, response) + } else { + let reused_connection = self + .get_clients_cache() + .get_https_and_reuse( + &self.url, + self.tls_server_name.take(), + self.client_cert.take(), + ) + .await?; + + let response = reused_connection + .do_request(request, self.request_timeout) + .await?; + + FlUrlResponse::from_http1_response(self.url, response) } - Err(err) => { - clients_cache.remove(scheme_and_host.as_str()).await; - - if !err.is_hyper_canceled() { - return Err(err); - } + } + #[cfg(not(feature = "unix-socket"))] + crate::Scheme::UnixSocket => { + panic!("To use unix socket you need to enable unix-socket feature") + } - if had_retry { - return Err(err); - } + #[cfg(feature = "unix-socket")] + crate::Scheme::UnixSocket => { + if self.do_not_reuse_connection { + let remote_endpoint = self.url.get_remote_endpoint(); + let http_connector = crate::http_connectors::UnixSocketConnector::new( + remote_endpoint.to_owned(), + ); + let client = MyHttpClient::new(http_connector); + let response = client.do_request(request, self.request_timeout).await?; + FlUrlResponse::from_http1_response(self.url, response) + } else { + let reused_connection = self + .get_clients_cache() + .get_unix_socket_and_reuse(&self.url) + .await?; + + let response = reused_connection + .do_request(request, self.request_timeout) + .await?; + + FlUrlResponse::from_http1_response(self.url, response) } } + }; - had_retry = true; - } + Ok(response) } #[cfg(feature = "with-ssh")] async fn execute_with_ssh( - &self, - method: Method, - body: Option>, - ssh_credentials: &Arc, + self, + request: MyHttpRequest, + ssh_credentials: my_ssh::SshCredentials, ) -> Result { - let http_client = HttpClient::new_ssh( - &self.url, - self.execute_timeout, - ssh_credentials, - self.ssh_target.sessions_pool.as_ref(), - self.ssh_target.http_buffer_size, - ) - .await?; + let reused_connection = self + .get_clients_cache() + .get_ssh_and_reuse(&self.url, &Arc::new(ssh_credentials)) + .await?; - let result = http_client - .execute_request(&self.url, method, &self.headers, body, self.execute_timeout) - .await; + let response = reused_connection + .do_request(request, self.request_timeout) + .await?; - if result.is_err() { - println!("Http through ssh failed. Removing session from cache"); - if let Some(session_cache) = &self.ssh_target.sessions_pool { - if let Some(ssh_credentials) = &self.ssh_target.credentials { - session_cache.remove(ssh_credentials).await; - } - } - } - return result; + let result = FlUrlResponse::from_http1_response(self.url, response); + + Ok(result) } pub(crate) fn get_clients_cache(&self) -> Arc { @@ -394,72 +344,89 @@ impl FlUrl { } } - pub async fn get(self) -> Result { - if self.max_retries > 0 { - return self.execute_with_retires(Method::GET, None).await; + fn compile_request(&mut self, method: Method, body: Option>) -> MyHttpRequest { + if !self.headers.has_host_header { + if !self.url.host_is_ip() { + self.headers + .add(hyper::header::HOST.as_str(), self.url.get_host()); + } } - self.execute(Method::GET, None).await - } - pub async fn head(self) -> Result { - if self.max_retries > 0 { - return self.execute_with_retires(Method::HEAD, None).await; + if !self.headers.has_connection_header { + if !self.do_not_reuse_connection { + self.headers + .add(hyper::header::CONNECTION.as_str(), "keep-alive"); + } } - self.execute(Method::HEAD, None).await + + MyHttpRequest::new( + method, + self.url.get_path_and_query(), + Version::HTTP_11, + self.headers.get_builder(), + body.unwrap_or_default().into(), + ) } - pub async fn post(self, body: Option>) -> Result { - if self.max_retries > 0 { - return self.execute_with_retires(Method::POST, body).await; - } - self.execute(Method::POST, body).await + pub async fn get(mut self) -> Result { + let request = self.compile_request(Method::GET, None); + self.execute(request).await + } + + pub async fn head(mut self) -> Result { + let request = self.compile_request(Method::HEAD, None); + self.execute(request).await + } + + pub async fn post(mut self, body: Option>) -> Result { + let request = self.compile_request(Method::POST, body); + self.execute(request).await } pub async fn post_json( - self, + mut self, json: &impl serde::Serialize, ) -> Result { - if self.max_retries > 0 { - return self.execute_json_with_retries(Method::POST, json).await; - } - self.execute_json(Method::POST, json).await + let body = serde_json::to_vec(json).unwrap(); + self.headers.add_json_content_type(); + let request = self.compile_request(Method::POST, body.into()); + + self.execute(request).await } - pub async fn patch(self, body: Option>) -> Result { - if self.max_retries > 0 { - return self.execute_with_retires(Method::PATCH, body).await; - } - self.execute(Method::PATCH, body).await + pub async fn patch(mut self, body: Option>) -> Result { + let request = self.compile_request(Method::PATCH, body); + self.execute(request).await } pub async fn patch_json( - self, + mut self, json: &impl serde::Serialize, ) -> Result { - if self.max_retries > 0 { - return self.execute_json_with_retries(Method::PATCH, json).await; - } - self.execute_json(Method::PATCH, json).await + let body = serde_json::to_vec(json).unwrap(); + self.headers.add_json_content_type(); + let request = self.compile_request(Method::PATCH, body.into()); + + self.execute(request).await } - pub async fn put(self, body: Option>) -> Result { - if self.max_retries > 0 { - return self.execute_with_retires(Method::PUT, body).await; - } - self.execute(Method::PUT, body).await + pub async fn put(mut self, body: Option>) -> Result { + let request = self.compile_request(Method::PUT, body); + self.execute(request).await } - pub async fn put_json(self, json: &impl serde::Serialize) -> Result { - if self.max_retries > 0 { - return self.execute_json_with_retries(Method::PUT, json).await; - } - self.execute_json(Method::PUT, json).await + pub async fn put_json( + mut self, + json: &impl serde::Serialize, + ) -> Result { + let body = serde_json::to_vec(json).unwrap(); + self.headers.add_json_content_type(); + let request = self.compile_request(Method::PUT, body.into()); + self.execute(request).await } - pub async fn delete(self) -> Result { - if self.max_retries > 0 { - return self.execute_with_retires(Method::DELETE, None).await; - } - self.execute(Method::DELETE, None).await + pub async fn delete(mut self) -> Result { + let request = self.compile_request(Method::GET, None); + self.execute(request).await } } diff --git a/src/fl_url_headers.rs b/src/fl_url_headers.rs index b6732c4..d077faa 100644 --- a/src/fl_url_headers.rs +++ b/src/fl_url_headers.rs @@ -1,47 +1,50 @@ -use rust_extensions::StrOrString; - -pub struct FlUrlHeader { - pub name: StrOrString<'static>, - pub value: String, -} +use hyper::header::CONTENT_TYPE; +use my_http_client::MyHttpClientHeadersBuilder; pub struct FlUrlHeaders { - headers: Vec, + headers: MyHttpClientHeadersBuilder, pub has_host_header: bool, + pub has_connection_header: bool, + pub len: usize, } impl FlUrlHeaders { pub fn new() -> Self { Self { - headers: Vec::new(), + headers: MyHttpClientHeadersBuilder::new(), has_host_header: false, + has_connection_header: false, + len: 0, } } - fn find_index(&self, name: &str) -> Option { - self.headers.iter().position(|header| { - rust_extensions::str_utils::compare_strings_case_insensitive(header.name.as_str(), name) - }) + pub fn add_json_content_type(&mut self) { + self.headers + .add_header(CONTENT_TYPE.as_str(), "application/json"); } - pub fn add(&mut self, name: StrOrString<'static>, value: String) { - if rust_extensions::str_utils::compare_strings_case_insensitive(name.as_str(), "host") { + pub fn add(&mut self, name: &str, value: &str) { + if rust_extensions::str_utils::compare_strings_case_insensitive(name, "host") { self.has_host_header = true; } - match self.find_index(name.as_str()) { - Some(index) => self.headers[index].value = value, - None => { - self.headers.push(FlUrlHeader { name, value }); - } + if rust_extensions::str_utils::compare_strings_case_insensitive(name, "connection") { + self.has_connection_header = true; } + + self.headers.add_header(name, value); + self.len += 1; } pub fn len(&self) -> usize { - self.headers.len() + self.len } - pub fn iter(&self) -> std::slice::Iter { + pub fn iter<'s>(&'s self) -> impl Iterator { self.headers.iter() } + + pub fn get_builder(&self) -> &MyHttpClientHeadersBuilder { + &self.headers + } } diff --git a/src/http_client/connect_to_http_endpoint.rs b/src/http_client/connect_to_http_endpoint.rs deleted file mode 100644 index 35999f8..0000000 --- a/src/http_client/connect_to_http_endpoint.rs +++ /dev/null @@ -1,44 +0,0 @@ -use bytes::Bytes; -use http_body_util::Full; -use hyper::client::conn::http1::SendRequest; -use hyper_util::rt::TokioIo; -use tokio::net::TcpStream; - -use crate::FlUrlError; - -pub async fn connect_to_http_endpoint( - host_port: &str, -) -> Result>, FlUrlError> { - let connect_result = TcpStream::connect(host_port).await; - - match connect_result { - Ok(tcp_stream) => { - let io = TokioIo::new(tcp_stream); - let handshake_result = hyper::client::conn::http1::handshake(io).await; - match handshake_result { - Ok((mut sender, conn)) => { - let host_port = host_port.to_owned(); - tokio::task::spawn(async move { - if let Err(err) = conn.await { - println!( - "Http Connection to http://{} is failed: {:?}", - host_port, err - ); - } - - //Here - }); - - sender.ready().await?; - return Ok(sender); - } - Err(err) => { - return Err(FlUrlError::InvalidHttp1HandShake(format!("{}", err))); - } - } - } - Err(err) => { - return Err(FlUrlError::CanNotEstablishConnection(format!("{}", err))); - } - } -} diff --git a/src/http_client/connect_to_unix_socket.rs b/src/http_client/connect_to_unix_socket.rs deleted file mode 100644 index 48dbc80..0000000 --- a/src/http_client/connect_to_unix_socket.rs +++ /dev/null @@ -1,44 +0,0 @@ -use http_body_util::Full; -use hyper::{body::Bytes, client::conn::http1::SendRequest}; -use hyper_util::rt::TokioIo; -use tokio::net::UnixSocket; - -use crate::FlUrlError; - -pub async fn connect_to_http_unix_socket_endpoint( - unix_socket_path: &str, -) -> Result>, FlUrlError> { - let unix_socket = UnixSocket::new_stream()?; - let connect_result = unix_socket.connect(unix_socket_path).await; - - match connect_result { - Ok(tcp_stream) => { - let io = TokioIo::new(tcp_stream); - let handshake_result = hyper::client::conn::http1::handshake(io).await; - match handshake_result { - Ok((mut sender, conn)) => { - let unix_socket_path = unix_socket_path.to_string(); - tokio::task::spawn(async move { - if let Err(err) = conn.with_upgrades().await { - println!( - "Http Connection to {} is failed: {:?}", - unix_socket_path, err - ); - } - - //Here - }); - - sender.ready().await?; - return Ok(sender); - } - Err(err) => { - return Err(FlUrlError::InvalidHttp1HandShake(format!("{}", err))); - } - } - } - Err(err) => { - return Err(FlUrlError::CanNotEstablishConnection(format!("{}", err))); - } - } -} diff --git a/src/http_client/http_client.rs b/src/http_client/http_client.rs deleted file mode 100644 index 4f2a51a..0000000 --- a/src/http_client/http_client.rs +++ /dev/null @@ -1,401 +0,0 @@ -use std::{sync::atomic::AtomicBool, time::Duration}; - -use bytes::Bytes; -use http_body_util::Full; -use hyper::{client::conn::http1::SendRequest, Method, Request, Uri}; - -use rust_extensions::{ - date_time::{AtomicDateTimeAsMicroseconds, DateTimeAsMicroseconds}, - MaybeShortString, StrOrString, -}; -use tokio::sync::Mutex; - -use crate::{FlUrlError, FlUrlHeaders, FlUrlResponse, UrlBuilder, UrlBuilderOwned}; - -use my_tls::ClientCertificate; - -const DEAD_CONNECTION_DURATION: Duration = Duration::from_secs(20); - -pub struct HttpClient { - connection: Mutex>>>, - pub created: DateTimeAsMicroseconds, - disconnected: AtomicBool, - pub last_accessed: AtomicDateTimeAsMicroseconds, - - #[cfg(feature = "with-ssh")] - _ssh_session: Option>, -} - -impl HttpClient { - pub fn connection_can_be_disposed(&self) -> bool { - let now = DateTimeAsMicroseconds::now(); - now.duration_since(self.created).as_positive_or_zero() > DEAD_CONNECTION_DURATION - } - - #[cfg(feature = "with-ssh")] - pub fn from_ssh_session( - connection: SendRequest>, - ssh_session: std::sync::Arc, - ) -> Self { - Self { - connection: Mutex::new(Some(connection)), - created: DateTimeAsMicroseconds::now(), - disconnected: AtomicBool::new(false), - _ssh_session: Some(ssh_session), - last_accessed: AtomicDateTimeAsMicroseconds::now(), - } - } - - #[cfg(feature = "with-ssh")] - pub async fn new_ssh( - src: &UrlBuilder, - request_timeout: Duration, - ssh_credentials: &std::sync::Arc, - ssh_sessions_pool: Option<&std::sync::Arc>, - http_buffer_size: usize, - ) -> Result { - let host_port = src.get_host_port(); - - let (host, port) = match host_port.find(':') { - Some(index) => { - let host = &host_port[0..index]; - let port = &host_port[index + 1..]; - - (host, port.parse::().unwrap()) - } - None => (host_port, 80), - }; - - let (ssh_session, connection) = super::connect_to_http_over_ssh::connect_to_http_over_ssh( - ssh_credentials, - ssh_sessions_pool, - host, - port, - request_timeout, - http_buffer_size, - ) - .await?; - - let result = Self { - connection: Mutex::new(Some(connection)), - created: DateTimeAsMicroseconds::now(), - disconnected: AtomicBool::new(false), - last_accessed: AtomicDateTimeAsMicroseconds::now(), - #[cfg(feature = "with-ssh")] - _ssh_session: Some(ssh_session), - }; - - return Ok(result); - } - - pub async fn new( - src: &UrlBuilder, - client_certificate: &Option, - request_timeout: Duration, - ) -> Result { - #[cfg(feature = "unix-socket")] - if src.scheme.is_unix_socket() { - let host_port = src.get_host_port(); - let connection_future = super::connect_to_http_unix_socket_endpoint(host_port); - let result = tokio::time::timeout(request_timeout, connection_future).await; - - match result { - Ok(result) => { - let send_request = result?; - - return Ok(Self { - connection: Mutex::new(Some(send_request)), - created: DateTimeAsMicroseconds::now(), - disconnected: AtomicBool::new(false), - last_accessed: AtomicDateTimeAsMicroseconds::now(), - #[cfg(feature = "with-ssh")] - _ssh_session: None, - }); - } - Err(_) => { - return Err(FlUrlError::Timeout); - } - } - } - - let host_port = src.get_host_port(); - - let domain = src.get_domain(); - - let is_https = src.scheme.is_https(); - - let host_port: StrOrString = if host_port.contains(":") { - host_port.into() - } else { - if is_https { - format!("{}:443", host_port).into() - } else { - format!("{}:80", host_port).into() - } - }; - - let connection = if is_https { - let connection_future = - super::connect_to_tls_endpoint(host_port.as_str(), domain, client_certificate); - let result = tokio::time::timeout(request_timeout, connection_future).await; - - if result.is_err() { - return Err(FlUrlError::Timeout); - } - - result.unwrap()? - } else { - let connection_future = super::connect_to_http_endpoint(host_port.as_str()); - let result = tokio::time::timeout(request_timeout, connection_future).await; - - if result.is_err() { - return Err(FlUrlError::Timeout); - } - - result.unwrap()? - }; - let result = Self { - connection: Mutex::new(Some(connection)), - created: DateTimeAsMicroseconds::now(), - disconnected: AtomicBool::new(false), - last_accessed: AtomicDateTimeAsMicroseconds::now(), - #[cfg(feature = "with-ssh")] - _ssh_session: None, - }; - - Ok(result) - } - - pub async fn execute_request( - &self, - url_builder: &UrlBuilder, - method: Method, - headers: &FlUrlHeaders, - body: Option>, - request_timeout: Duration, - ) -> Result { - let mut attempt_no = 0; - let url_builder_owned = url_builder.into_builder_owned(); - loop { - let result = self - .execute_int( - &url_builder_owned, - &method, - &headers, - body.clone(), - request_timeout, - ) - .await; - - if result.is_ok() { - return result; - } - - if let Err(FlUrlError::HyperError(err)) = &result { - if err.is_canceled() { - if self.connection_can_be_disposed() { - self.disconnect().await; - return result; - } - - tokio::time::sleep(Duration::from_millis(50)).await; - attempt_no += 1; - - if attempt_no > 100 { - return result; - } - - continue; - } - } - - return result; - } - } - - async fn execute_int( - &self, - url_builder: &UrlBuilderOwned, - method: &Method, - headers: &FlUrlHeaders, - body: Option>, - request_timeout: Duration, - ) -> Result { - let body = if let Some(body) = body { - http_body_util::Full::new(body.into()) - } else { - http_body_util::Full::new(hyper::body::Bytes::from(vec![])) - }; - - let uri: Uri = url_builder.as_str().parse().unwrap(); - - let authority = MaybeShortString::from_str(uri.authority().unwrap().as_str()); - - let mut request = Request::builder().uri(uri).method(method); - - { - if !headers.has_host_header { - request.headers_mut().unwrap().insert( - hyper::http::header::HOST, - hyper::http::HeaderValue::from_str(authority.as_str()).unwrap(), - ); - } - - if headers.len() > 0 { - for header in headers.iter() { - request = request.header(header.name.as_str(), header.value.to_string()); - } - }; - } - - #[cfg(feature = "debug-request")] - { - println!("Request: {:?}", request); - } - - let request = request.body(body)?; - - let request_future = { - let mut access = self.connection.lock().await; - - if access.is_none() { - return Err(FlUrlError::ConnectionIsDead); - } - - let connection = access.as_mut().unwrap(); - - connection.send_request(request) - }; - - let result = tokio::time::timeout(request_timeout, request_future).await; - - if result.is_err() { - self.disconnect().await; - return Err(FlUrlError::Timeout); - } - - let result = result.unwrap()?; - - Ok(FlUrlResponse::new(url_builder.clone(), result)) - } - - async fn disconnect(&self) { - self.disconnected - .store(true, std::sync::atomic::Ordering::Relaxed); - let mut access = self.connection.lock().await; - *access = None; - } - - pub fn is_disconnected(&self) -> bool { - self.disconnected.load(std::sync::atomic::Ordering::Relaxed) - } -} - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use hyper::Method; - use rust_extensions::StopWatch; - - use super::HttpClient; - use crate::{FlUrlHeaders, UrlBuilder}; - - static REQUEST_TIMEOUT: Duration = Duration::from_secs(5); - - #[tokio::test] - async fn test_http_request() { - let url_builder = UrlBuilder::new("http://google.com/".into()); - - let fl_url_client = HttpClient::new(&url_builder, &None, REQUEST_TIMEOUT) - .await - .unwrap(); - - let mut sw: StopWatch = StopWatch::new(); - - sw.start(); - - let mut response = fl_url_client - .execute_request( - &url_builder, - Method::GET, - &FlUrlHeaders::new(), - None, - REQUEST_TIMEOUT, - ) - .await - .unwrap(); - println!("StatusCode: {}", response.get_status_code()); - println!("Body: {}", response.body_as_str().await.unwrap()); - - sw.pause(); - println!("Duration: {:?}", sw.duration()); - - let mut sw: StopWatch = StopWatch::new(); - sw.start(); - - let mut response = fl_url_client - .execute_request( - &url_builder, - Method::GET, - &FlUrlHeaders::new(), - None, - REQUEST_TIMEOUT, - ) - .await - .unwrap(); - println!("StatusCode: {}", response.get_status_code()); - println!("Body: {}", response.body_as_str().await.unwrap()); - - sw.pause(); - println!("Duration: {:?}", sw.duration()); - } - - #[tokio::test] - async fn test_https_request() { - let url_builder = UrlBuilder::new("https://trade-demo.yourfin.tech".into()); - - let fl_url_client = HttpClient::new(&url_builder, &None, REQUEST_TIMEOUT) - .await - .unwrap(); - - let mut sw: StopWatch = StopWatch::new(); - - sw.start(); - - let mut response = fl_url_client - .execute_request( - &url_builder, - Method::GET, - &FlUrlHeaders::new(), - None, - REQUEST_TIMEOUT, - ) - .await - .unwrap(); - println!("StatusCode: {}", response.get_status_code()); - println!("Body: {}", response.body_as_str().await.unwrap()); - - sw.pause(); - println!("Duration: {:?}", sw.duration()); - - let mut sw: StopWatch = StopWatch::new(); - sw.start(); - - let mut response = fl_url_client - .execute_request( - &url_builder, - Method::GET, - &FlUrlHeaders::new(), - None, - REQUEST_TIMEOUT, - ) - .await - .unwrap(); - println!("StatusCode: {}", response.get_status_code()); - println!("Body: {}", response.body_as_str().await.unwrap()); - - sw.pause(); - println!("Duration: {:?}", sw.duration()); - } -} diff --git a/src/http_client/mod.rs b/src/http_client/mod.rs deleted file mode 100644 index b838275..0000000 --- a/src/http_client/mod.rs +++ /dev/null @@ -1,13 +0,0 @@ -mod http_client; - -mod connect_to_tls_endpoint; -pub use http_client::*; -mod connect_to_http_endpoint; -use connect_to_http_endpoint::*; -use connect_to_tls_endpoint::*; -#[cfg(feature = "with-ssh")] -pub mod connect_to_http_over_ssh; -#[cfg(feature = "unix-socket")] -mod connect_to_unix_socket; -#[cfg(feature = "unix-socket")] -pub use connect_to_unix_socket::*; diff --git a/src/http_clients_cache.rs b/src/http_clients_cache.rs index f4366bb..6dc915d 100644 --- a/src/http_clients_cache.rs +++ b/src/http_clients_cache.rs @@ -1,85 +1,190 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc}; -use rust_extensions::date_time::DateTimeAsMicroseconds; -use tokio::sync::RwLock; +use my_http_client::http1::MyHttpClient; -use crate::{FlUrlError, HttpClient, UrlBuilder}; -use my_tls::ClientCertificate; +use rust_extensions::{remote_endpoint::RemoteEndpoint, ShortString}; + +use tokio::{net::TcpStream, sync::RwLock}; + +use crate::{FlUrlError, UrlBuilder}; +use my_tls::{tokio_rustls::client::TlsStream, ClientCertificate}; + +use crate::http_connectors::*; + +#[derive(Default)] +pub struct HttpClientsCacheInner { + pub http: HashMap>>, + pub https: HashMap, HttpsConnector>>>, + #[cfg(feature = "unix-socket")] + pub unix_socket: HashMap>>, + #[cfg(feature = "with-ssh")] + pub ssh: HashMap>>, +} pub struct HttpClientsCache { - pub clients: RwLock>>, + pub inner: RwLock, } impl HttpClientsCache { pub fn new() -> Self { Self { - clients: RwLock::new(HashMap::new()), + inner: RwLock::new(HttpClientsCacheInner::default()), } } - pub async fn get_and_reuse( + pub async fn get_http_and_reuse( &self, url_builder: &UrlBuilder, - request_timeout: Duration, - client_certificate: &Option, - not_used_timeout: Duration, - ) -> Result, FlUrlError> { - let schema_and_domain = url_builder.get_scheme_and_host(); - - let mut write_access = self.clients.write().await; - - if let Some(existing_connection) = - get_existing_connection(&mut write_access, schema_and_domain.as_str()) - { - let now = DateTimeAsMicroseconds::now(); - - if existing_connection - .last_accessed - .as_date_time() - .duration_since(now) - .as_positive_or_zero() - < not_used_timeout - { - existing_connection.last_accessed.update(now); - return Ok(existing_connection); - } - write_access.remove(schema_and_domain.as_str()); + ) -> Result>, FlUrlError> { + let remote_endpoint = url_builder.get_remote_endpoint(); + + let mut write_access = self.inner.write().await; + + let hash_map_key = get_http_key(remote_endpoint); + + if let Some(existing_connection) = write_access.http.get(hash_map_key.as_str()) { + return Ok(existing_connection.clone()); } - let new_one = HttpClient::new(url_builder, client_certificate, request_timeout).await?; + let connector = HttpConnector::new(remote_endpoint.to_owned()); + + let new_one = MyHttpClient::new(connector); + let new_one = Arc::new(new_one); - write_access.insert(schema_and_domain.to_string(), new_one.clone()); + write_access + .http + .insert(hash_map_key.to_string(), new_one.clone()); - Ok(write_access - .get(schema_and_domain.as_str()) - .cloned() - .unwrap()) + Ok(new_one) } - pub async fn remove(&self, schema_domain: &str) { - let mut write_access = self.clients.write().await; - write_access.remove(schema_domain); + pub async fn get_https_and_reuse( + &self, + url_builder: &UrlBuilder, + domain_override: Option, + client_certificate: Option, + ) -> Result, HttpsConnector>>, FlUrlError> { + let remote_endpoint = url_builder.get_remote_endpoint(); + + let mut write_access = self.inner.write().await; + + let hash_map_key = get_https_key(remote_endpoint); + + if let Some(existing_connection) = write_access.https.get(hash_map_key.as_str()) { + return Ok(existing_connection.clone()); + } + + let connector = HttpsConnector::new( + remote_endpoint.to_owned(), + domain_override, + client_certificate, + ); + let new_one = MyHttpClient::new(connector); + + let new_one = Arc::new(new_one); + + write_access + .https + .insert(hash_map_key.to_string(), new_one.clone()); + + Ok(new_one) } -} -fn get_existing_connection( - connections: &mut HashMap>, - schema_and_domain: &str, -) -> Option> { - let mut has_connection_disconnected = false; - - if let Some(connection) = connections.get(schema_and_domain) { - if connection.is_disconnected() { - has_connection_disconnected = true; - } else { - return Some(connection.clone()); + #[cfg(feature = "unix-socket")] + pub async fn get_unix_socket_and_reuse( + &self, + url_builder: &UrlBuilder, + ) -> Result>, FlUrlError> { + let remote_endpoint = url_builder.get_remote_endpoint(); + + let mut write_access = self.inner.write().await; + + let hash_map_key = get_unix_socket_key(remote_endpoint); + + if let Some(existing_connection) = write_access.unix_socket.get(hash_map_key.as_str()) { + return Ok(existing_connection.clone()); } + + let connector = UnixSocketConnector::new(remote_endpoint.to_owned()); + let new_one = MyHttpClient::new(connector); + + let new_one = Arc::new(new_one); + + write_access + .unix_socket + .insert(hash_map_key.to_string(), new_one.clone()); + + Ok(new_one) } - if has_connection_disconnected { - connections.remove(schema_and_domain); + #[cfg(feature = "with-ssh")] + pub async fn get_ssh_and_reuse( + &self, + url_builder: &UrlBuilder, + ssh_credentials: &Arc, + ) -> Result>, FlUrlError> { + let remote_endpoint = url_builder.get_remote_endpoint(); + + let mut write_access = self.inner.write().await; + + let hash_map_key = get_ssh_key(ssh_credentials, remote_endpoint); + + if let Some(existing_connection) = write_access.ssh.get(hash_map_key.as_str()) { + return Ok(existing_connection.clone()); + } + + let ssh_session = my_ssh::SSH_SESSIONS_POOL + .get_or_create(ssh_credentials) + .await; + + let connector = SshHttpConnector { + ssh_session, + remote_host: remote_endpoint.to_owned(), + }; + let new_one = MyHttpClient::new(connector); + + let new_one = Arc::new(new_one); + + write_access + .ssh + .insert(hash_map_key.to_string(), new_one.clone()); + + Ok(new_one) } +} + +fn get_http_key(remote_endpoint: RemoteEndpoint) -> ShortString { + remote_endpoint.get_host_port(Some(80)) +} + +fn get_https_key(remote_endpoint: RemoteEndpoint) -> ShortString { + remote_endpoint.get_host_port(Some(443)) +} + +#[cfg(feature = "unix-socket")] +fn get_unix_socket_key(remote_endpoint: RemoteEndpoint) -> ShortString { + ShortString::from_str(remote_endpoint.get_host()).unwrap() +} +#[cfg(feature = "with-ssh")] +fn get_ssh_key( + ssh_credentials: &my_ssh::SshCredentials, + remote_endpoint: RemoteEndpoint, +) -> ShortString { + let mut result = ShortString::new_empty(); + + result.push_str(ssh_credentials.get_user_name()); + result.push('@'); + + let (host, port) = ssh_credentials.get_host_port(); + + result.push_str(host); + result.push(':'); + + result.push_str(port.to_string().as_str()); + + result.push_str("->"); + result.push_str(remote_endpoint.get_host_port(None).as_str()); - None + result } diff --git a/src/http_client/connect_to_http_over_ssh.rs b/src/http_connectors/connect_to_http_over_ssh.rs similarity index 100% rename from src/http_client/connect_to_http_over_ssh.rs rename to src/http_connectors/connect_to_http_over_ssh.rs diff --git a/src/http_client/connect_to_tls_endpoint.rs b/src/http_connectors/connect_to_tls_endpoint.rs similarity index 100% rename from src/http_client/connect_to_tls_endpoint.rs rename to src/http_connectors/connect_to_tls_endpoint.rs diff --git a/src/http_connectors/http_connector.rs b/src/http_connectors/http_connector.rs new file mode 100644 index 0000000..3f39f21 --- /dev/null +++ b/src/http_connectors/http_connector.rs @@ -0,0 +1,42 @@ +use my_http_client::{MyHttpClientConnector, MyHttpClientError}; +use rust_extensions::{remote_endpoint::RemoteEndpointOwned, StrOrString}; +use tokio::net::TcpStream; + +pub struct HttpConnector { + pub remote_host: RemoteEndpointOwned, +} + +impl HttpConnector { + pub fn new(remote_host: RemoteEndpointOwned) -> Self { + Self { remote_host } + } +} + +#[async_trait::async_trait] +impl MyHttpClientConnector for HttpConnector { + async fn connect(&self) -> Result { + let host_port = self.remote_host.get_host_port(Some(80)); + match TcpStream::connect(host_port.as_str()).await { + Ok(tcp_stream) => Ok(tcp_stream), + Err(err) => Err( + my_http_client::MyHttpClientError::CanNotConnectToRemoteHost(format!( + "{}. Err:{}", + host_port, err + )), + ), + } + } + fn get_remote_host(&self) -> StrOrString { + self.remote_host.as_str().into() + } + fn is_debug(&self) -> bool { + false + } + + fn reunite( + _read: tokio::io::ReadHalf, + _write: tokio::io::WriteHalf, + ) -> TcpStream { + panic!("Would implement this if upgrade fl-url to support WebSockets") + } +} diff --git a/src/http_connectors/https_connector.rs b/src/http_connectors/https_connector.rs new file mode 100644 index 0000000..553d5e0 --- /dev/null +++ b/src/http_connectors/https_connector.rs @@ -0,0 +1,97 @@ +use std::sync::Arc; + +use my_http_client::{MyHttpClientConnector, MyHttpClientError}; +use my_tls::{ + tokio_rustls::{client::TlsStream, TlsConnector}, + ClientCertificate, +}; +use rust_extensions::{remote_endpoint::RemoteEndpointOwned, StrOrString}; +use tokio::net::TcpStream; + +pub struct HttpsConnector { + pub remote_host: RemoteEndpointOwned, + pub domain: Option, + pub client_certificate: Option, +} + +impl HttpsConnector { + pub fn new( + remote_host: RemoteEndpointOwned, + domain: Option, + client_certificate: Option, + ) -> Self { + Self { + remote_host, + domain, + client_certificate, + } + } +} + +#[async_trait::async_trait] +impl MyHttpClientConnector> for HttpsConnector { + async fn connect(&self) -> Result, MyHttpClientError> { + let host_port = self.remote_host.get_host_port(Some(443)); + let connect_result = TcpStream::connect(host_port.as_str()).await; + + if let Err(err) = &connect_result { + return Err( + my_http_client::MyHttpClientError::CanNotConnectToRemoteHost(format!( + "{}. Err:{}", + host_port, err + )), + ); + } + + let tcp_stream = connect_result.unwrap(); + + let client_config = my_tls::create_tls_client_config(&self.client_certificate); + + if let Err(err) = client_config { + return Err( + my_http_client::MyHttpClientError::CanNotConnectToRemoteHost(format!( + "{}. Err:{}", + host_port, err + )), + ); + } + + let client_config = client_config.unwrap(); + + let connector = TlsConnector::from(Arc::new(client_config)); + + let domain = if let Some(domain) = self.domain.as_ref() { + my_tls::tokio_rustls::rustls::pki_types::ServerName::try_from(domain.to_string()) + .unwrap() + } else { + my_tls::tokio_rustls::rustls::pki_types::ServerName::try_from( + self.remote_host.get_host().to_string(), + ) + .unwrap() + }; + + match connector.connect(domain, tcp_stream).await { + Ok(tls_stream) => Ok(tls_stream), + Err(err) => Err( + my_http_client::MyHttpClientError::CanNotConnectToRemoteHost(format!( + "{}. Err:{}", + host_port, err + )), + ), + } + } + + fn get_remote_host(&self) -> StrOrString { + self.remote_host.as_str().into() + } + fn is_debug(&self) -> bool { + false + } + + fn reunite( + _read: tokio::io::ReadHalf>, + _write: tokio::io::WriteHalf>, + ) -> TlsStream { + panic!("Would implement this if upgrade fl-url to support WebSockets") + } +} diff --git a/src/http_connectors/mod.rs b/src/http_connectors/mod.rs new file mode 100644 index 0000000..bd14b34 --- /dev/null +++ b/src/http_connectors/mod.rs @@ -0,0 +1,14 @@ +mod http_connector; +pub use http_connector::*; + +#[cfg(feature = "with-ssh")] +mod ssh_connector; + +#[cfg(feature = "with-ssh")] +pub use ssh_connector::*; +mod https_connector; +pub use https_connector::*; +#[cfg(feature = "unix-socket")] +mod unix_socket_connector; +#[cfg(feature = "unix-socket")] +pub use unix_socket_connector::*; diff --git a/src/http_connectors/ssh_connector.rs b/src/http_connectors/ssh_connector.rs new file mode 100644 index 0000000..aef7aa3 --- /dev/null +++ b/src/http_connectors/ssh_connector.rs @@ -0,0 +1,64 @@ +use std::{sync::Arc, time::Duration}; + +use my_http_client::{MyHttpClientConnector, MyHttpClientError}; +use my_ssh::{SshAsyncChannel, SshSession}; +use rust_extensions::{remote_endpoint::RemoteEndpointOwned, StrOrString}; + +pub struct SshHttpConnector { + pub ssh_session: Arc, + pub remote_host: RemoteEndpointOwned, +} + +#[async_trait::async_trait] +impl MyHttpClientConnector for SshHttpConnector { + async fn connect(&self) -> Result { + let port = self.remote_host.get_port(); + if port.is_none() { + panic!( + "Port is not defined in the remote endpoint {}", + self.remote_host.get_host_port(None) + ); + } + + let ssh_channel = self + .ssh_session + .connect_to_remote_host( + self.remote_host.get_host(), + port.unwrap(), + Duration::from_secs(30), + ) + .await; + + match ssh_channel { + Ok(ssh_channel) => Ok(ssh_channel), + Err(err) => { + let ssh_credentials = self.ssh_session.get_ssh_credentials(); + + let (ssh_host, ssh_port) = ssh_credentials.get_host_port(); + Err( + my_http_client::MyHttpClientError::CanNotConnectToRemoteHost(format!( + "Can not connect to remote endpoint ssh:{}@{}:{}->{}. Err:{:?}", + ssh_credentials.get_user_name(), + ssh_host, + ssh_port, + self.remote_host.get_host_port(None).as_str(), + err + )), + ) + } + } + } + fn get_remote_host(&self) -> StrOrString { + self.remote_host.as_str().into() + } + fn is_debug(&self) -> bool { + false + } + + fn reunite( + _read: tokio::io::ReadHalf, + _write: tokio::io::WriteHalf, + ) -> SshAsyncChannel { + panic!("Would implement this if upgrade fl-url to support WebSockets") + } +} diff --git a/src/http_connectors/unix_socket_connector.rs b/src/http_connectors/unix_socket_connector.rs new file mode 100644 index 0000000..9c0dd81 --- /dev/null +++ b/src/http_connectors/unix_socket_connector.rs @@ -0,0 +1,56 @@ +use my_http_client::{MyHttpClientConnector, MyHttpClientError}; +use rust_extensions::{remote_endpoint::RemoteEndpointOwned, StrOrString}; +use tokio::net::{UnixSocket, UnixStream}; + +pub type UnixSocketStream = tokio::net::UnixStream; + +pub struct UnixSocketConnector { + pub remote_host: RemoteEndpointOwned, +} + +impl UnixSocketConnector { + pub fn new(remote_host: RemoteEndpointOwned) -> Self { + Self { remote_host } + } +} + +#[async_trait::async_trait] +impl MyHttpClientConnector for UnixSocketConnector { + async fn connect(&self) -> Result { + let unix_socket = match UnixSocket::new_stream() { + Ok(result) => result, + Err(err) => { + return Err(MyHttpClientError::CanNotConnectToRemoteHost(format!( + "Can not create UnixSocket to connection to {}. Err: {}", + self.remote_host.as_str(), + err + ))) + } + }; + + let connect_result = unix_socket.connect(self.remote_host.get_host()).await; + match connect_result { + Ok(stream) => Ok(stream), + Err(err) => Err( + my_http_client::MyHttpClientError::CanNotConnectToRemoteHost(format!( + "{}. Err:{}", + self.remote_host.as_str(), + err + )), + ), + } + } + fn get_remote_host(&self) -> StrOrString { + self.remote_host.as_str().into() + } + fn is_debug(&self) -> bool { + false + } + + fn reunite( + _read: tokio::io::ReadHalf, + _write: tokio::io::WriteHalf, + ) -> UnixStream { + panic!("Would implement this if upgrade fl-url to support WebSockets") + } +} diff --git a/src/into_fl_url.rs b/src/into_fl_url.rs index 8ebd35d..7d86fda 100644 --- a/src/into_fl_url.rs +++ b/src/into_fl_url.rs @@ -11,10 +11,10 @@ pub trait IntoFlUrl { value: Option>>, ) -> FlUrl; - fn with_header<'a>( + fn with_header<'n, 'v>( self, - name: impl Into>, - value: impl Into>, + name: impl Into>, + value: impl Into>, ) -> FlUrl; fn append_raw_ending_to_url<'s>(self, raw: impl Into>) -> FlUrl; @@ -44,10 +44,10 @@ impl<'g> IntoFlUrl for &'g str { FlUrl::new(self).append_query_param(name, value) } - fn with_header<'a>( + fn with_header<'n, 'v>( self, - name: impl Into>, - value: impl Into>, + name: impl Into>, + value: impl Into>, ) -> FlUrl { FlUrl::new(self).with_header(name, value) } @@ -98,10 +98,10 @@ impl<'g> IntoFlUrl for &'g String { FlUrl::new(self).append_query_param(name, value) } - fn with_header<'a>( + fn with_header<'n, 'v>( self, - name: impl Into>, - value: impl Into>, + name: impl Into>, + value: impl Into>, ) -> FlUrl { FlUrl::new(self).with_header(name, value) } @@ -152,9 +152,9 @@ impl IntoFlUrl for String { FlUrl::new(self).append_query_param(name, value) } - fn with_header<'v>( + fn with_header<'n, 'v>( self, - name: impl Into>, + name: impl Into>, value: impl Into>, ) -> FlUrl { FlUrl::new(self).with_header(name, value) diff --git a/src/lib.rs b/src/lib.rs index c21e980..8532f18 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,14 +18,14 @@ pub use fl_url::FlUrl; pub use http_clients_cache::*; pub use into_fl_url::*; pub use url_builder::*; -mod url_builder_owned; -pub use url_builder_owned::*; +//mod url_builder_owned; +//pub use url_builder_owned::*; pub extern crate hyper; mod response_body; pub use response_body::*; -mod http_client; -pub use http_client::*; +mod http_connectors; + mod errors; pub use errors::*; diff --git a/src/response_body.rs b/src/response_body.rs index 8d5121c..5702b86 100644 --- a/src/response_body.rs +++ b/src/response_body.rs @@ -1,11 +1,12 @@ use std::collections::HashMap; -use hyper::{body::Incoming, HeaderMap, Response}; +use http_body_util::BodyExt; +use hyper::HeaderMap; use crate::{FlUrlError, FlUrlReadingHeaderError}; pub enum ResponseBody { - Incoming(Option>), + Hyper(Option), Body { headers: HeaderMap, body: Option>, @@ -13,18 +14,18 @@ pub enum ResponseBody { } impl ResponseBody { - pub fn as_incoming(&self) -> &Response { + pub fn as_hyper_response(&self) -> &my_http_client::HyperResponse { match &self { - Self::Incoming(response) => response.as_ref().unwrap(), + Self::Hyper(response) => response.as_ref().unwrap(), Self::Body { .. } => { panic!("Body is already disposed"); } } } - pub fn into_hyper_response(self) -> Response { + pub fn into_hyper_response(self) -> my_http_client::HyperResponse { match self { - Self::Incoming(response) => { + Self::Hyper(response) => { let response = response.unwrap(); response } @@ -36,7 +37,7 @@ impl ResponseBody { pub fn get_header(&self, header: &str) -> Result, FlUrlReadingHeaderError> { match self { - Self::Incoming(response) => { + Self::Hyper(response) => { let result = response.as_ref().unwrap().headers().get(header); if result.is_none() { @@ -58,7 +59,7 @@ impl ResponseBody { header: &str, ) -> Result, FlUrlReadingHeaderError> { match self { - Self::Incoming(response) => { + Self::Hyper(response) => { for (name, value) in response.as_ref().unwrap().headers().iter() { if rust_extensions::str_utils::compare_strings_case_insensitive( name.as_str(), @@ -82,7 +83,7 @@ impl ResponseBody { hash_map: &mut HashMap<&'s str, Option<&'s str>>, ) { match self { - ResponseBody::Incoming(incoming) => { + ResponseBody::Hyper(incoming) => { if let Some(incoming) = incoming { for (key, value) in incoming.headers() { if let Ok(value) = value.to_str() { @@ -106,7 +107,7 @@ impl ResponseBody { hash_map: &mut HashMap>, ) { match self { - ResponseBody::Incoming(incoming) => { + ResponseBody::Hyper(incoming) => { if let Some(incoming) = incoming { for (key, value) in incoming.headers() { hash_map.insert( @@ -137,12 +138,12 @@ impl ResponseBody { async fn convert_to_slice_if_needed(&mut self) -> Result<(), FlUrlError> { match self { - Self::Incoming(response) => { + Self::Hyper(response) => { let response = response.take().unwrap(); let (parts, incoming) = response.into_parts(); - let body = read_bytes(incoming).await?; + let body = body_to_vec(incoming).await?; *self = Self::Body { headers: parts.headers, body: Some(body), @@ -158,7 +159,7 @@ impl ResponseBody { self.convert_to_slice_if_needed().await?; match self { - ResponseBody::Incoming(_) => { + ResponseBody::Hyper(_) => { panic!("Should not be here") } ResponseBody::Body { body, .. } => match body { @@ -172,7 +173,7 @@ impl ResponseBody { self.convert_to_slice_if_needed().await?; match self { - ResponseBody::Incoming(_) => { + ResponseBody::Hyper(_) => { panic!("Should not be here") } ResponseBody::Body { body, .. } => match body.take() { @@ -183,12 +184,19 @@ impl ResponseBody { } } -async fn read_bytes( - incoming: impl hyper::body::Body, +async fn body_to_vec( + body: http_body_util::combinators::BoxBody, ) -> Result, FlUrlError> { - use http_body_util::BodyExt; + let collected = body.collect().await; - let collected = incoming.collect().await?; - let bytes = collected.to_bytes(); - Ok(bytes.into()) + match collected { + Ok(bytes) => { + let bytes = bytes.to_bytes(); + Ok(bytes.into()) + } + Err(err) => { + let err = FlUrlError::ReadingHyperBodyError(err); + Err(err) + } + } } diff --git a/src/scheme.rs b/src/scheme.rs index 1a3eaf1..cc025c6 100644 --- a/src/scheme.rs +++ b/src/scheme.rs @@ -2,7 +2,6 @@ pub enum Scheme { Http, Https, - #[cfg(feature = "unix-socket")] UnixSocket, } @@ -26,14 +25,9 @@ impl Scheme { return (Scheme::Https, Some(index)); } - #[cfg(feature = "unix-socket")] if rust_extensions::str_utils::compare_strings_case_insensitive("http+unix", scheme) { return (Scheme::UnixSocket, Some(index)); } - #[cfg(not(feature = "unix-socket"))] - if rust_extensions::str_utils::compare_strings_case_insensitive("http+unix", scheme) { - panic!("Please enable feature unix-socket to use http+unix scheme"); - } panic!("Unknown scheme: {}", scheme); } @@ -56,7 +50,6 @@ impl Scheme { } } - #[cfg(feature = "unix-socket")] pub fn is_unix_socket(&self) -> bool { match self { Scheme::UnixSocket => true, @@ -67,7 +60,6 @@ impl Scheme { match self { Scheme::Http => "http://", Scheme::Https => "https://", - #[cfg(feature = "unix-socket")] Scheme::UnixSocket => "http+unix:/", } } diff --git a/src/ssh.rs b/src/ssh.rs index ebec3d6..4d123a8 100644 --- a/src/ssh.rs +++ b/src/ssh.rs @@ -1,9 +1,19 @@ use std::sync::Arc; use my_ssh::*; +use rust_extensions::remote_endpoint::SshRemoteEndpoint; pub struct SshTarget { pub credentials: Option>, pub sessions_pool: Option>, pub http_buffer_size: usize, } + +pub fn to_ssh_credentials(ssh_remote_endpoint: &SshRemoteEndpoint) -> SshCredentials { + let (host, port) = ssh_remote_endpoint.get_host_port(); + my_ssh::SshCredentials::SshAgent { + ssh_remote_host: host.to_string(), + ssh_remote_port: port, + ssh_user_name: ssh_remote_endpoint.get_user().to_string(), + } +} diff --git a/src/telemetry_flow.rs b/src/telemetry_flow.rs deleted file mode 100644 index bfb26ab..0000000 --- a/src/telemetry_flow.rs +++ /dev/null @@ -1,92 +0,0 @@ -use hyper::Error; -use my_telemetry::{MyTelemetryContext, TelemetryEvent}; -use rust_extensions::date_time::DateTimeAsMicroseconds; - -use crate::FlUrlResponse; - -pub struct TelemetryData { - pub method: hyper::Method, - pub url: String, -} - -impl TelemetryData { - pub fn as_string(&self) -> String { - format!("[{}]{}", self.method, self.url) - } -} - -pub struct TelemetryFlow { - started: DateTimeAsMicroseconds, - telemetry_context: MyTelemetryContext, - pub data: Option, -} - -impl TelemetryFlow { - pub fn new(telemetry_context: Option) -> Option { - let telemetry_context = telemetry_context?; - Self { - started: DateTimeAsMicroseconds::now(), - telemetry_context, - data: None, - } - .into() - } - - - pub async fn write_telemetry(&mut self, result: &Result) { - if !my_telemetry::TELEMETRY_INTERFACE.is_telemetry_set_up() { - return; - } - - let data = self.data.take(); - - if data.is_none() { - return; - } - - let data = data.unwrap(); - - let telemetry_event = match &result { - Ok(result) => { - let status_code = result.get_status_code(); - if status_code < 300 { - TelemetryEvent { - process_id: self.telemetry_context.process_id, - started: self.started.unix_microseconds, - finished: DateTimeAsMicroseconds::now().unix_microseconds, - data: data.as_string(), - success: format!("Status Code: {}", status_code).into(), - fail: None, - ip: None, - } - } else { - TelemetryEvent { - process_id: self.telemetry_context.process_id, - started: self.started.unix_microseconds, - finished: DateTimeAsMicroseconds::now().unix_microseconds, - data: data.as_string(), - success: None, - fail: format!("Status Code: {}", status_code).into(), - ip: None, - } - } - } - Err(err) => TelemetryEvent { - process_id: self.telemetry_context.process_id, - started: self.started.unix_microseconds, - finished: DateTimeAsMicroseconds::now().unix_microseconds, - data: data.as_string(), - success: None, - fail: format!("Err: {}", err).into(), - ip: None, - }, - }; - - let mut write_access = my_telemetry::TELEMETRY_INTERFACE - .telemetry_collector - .lock() - .await; - - write_access.write(telemetry_event) - } -} diff --git a/src/url_builder.rs b/src/url_builder.rs index 1a7089b..cc3ccdd 100644 --- a/src/url_builder.rs +++ b/src/url_builder.rs @@ -1,216 +1,219 @@ -use rust_extensions::ShortString; +use core::str; -use crate::{url_utils, Scheme, UrlBuilderOwned}; +use rust_extensions::remote_endpoint::RemoteEndpoint; + +use crate::{url_utils, Scheme}; pub struct UrlBuilder { - path_segments: String, - scheme_index: Option, - pub query: Vec<(String, Option)>, - pub scheme: Scheme, - raw_ending: Option, - pub host_port: ShortString, - has_last_slash: bool, - pub tls_domain: Option, + value: String, + host_index: usize, + port_index: usize, + path_index: usize, + query_index: usize, } impl UrlBuilder { - pub fn new(mut host_port: ShortString) -> Self { - let has_last_slash = remove_last_symbol_if_exists(&mut host_port, '/'); + pub fn new(host_port: &str) -> Self { + let mut value = String::new(); - let (scheme, scheme_index) = Scheme::from_url(host_port.as_str()); + let mut domain_index = host_port.find("://"); - Self { - query: Vec::new(), - path_segments: String::new(), - scheme, - scheme_index, - raw_ending: None, - host_port, - has_last_slash, - tls_domain: None, + if domain_index.is_none() { + domain_index = host_port.find(":/~"); } - } - pub fn append_raw_ending(&mut self, raw_ending: String) { - self.raw_ending = Some(raw_ending); - } + let host_index = if let Some(domain_index) = domain_index { + domain_index + 3 + } else { + value.push_str("http://"); + 7 + }; + value.push_str(host_port); + + let mut port_index = 0; + let mut path_index = 0; + let mut query_index = 0; + + let mut pos = 0; + for c in value.chars() { + if pos <= host_index { + pos += 1; + continue; + } - pub fn append_path_segment(&mut self, path: &str) { - if self.path_segments.len() > 0 { - self.path_segments.push('/'); + match c { + ':' => { + if path_index == 0 { + port_index = pos; + } + } + '/' => { + if path_index == 0 { + path_index = pos; + } + } + '?' => { + if path_index == 0 { + path_index = pos; + } + if query_index == 0 { + query_index = pos; + break; + } + } + _ => {} + } + + pos += 1; } - self.path_segments.push_str(path); - } - pub fn append_query_param(&mut self, param: String, value: Option) { - self.query.push((param.to_string(), value)); + Self { + value, + host_index, + path_index, + port_index, + query_index, + } } - pub fn get_scheme(&self) -> Scheme { - self.scheme.clone() + pub fn get_remote_endpoint(&self) -> RemoteEndpoint { + if self.path_index == 0 { + RemoteEndpoint::try_parse(&self.value[self.host_index..]).unwrap() + } else { + RemoteEndpoint::try_parse(&self.value[self.host_index..self.path_index]).unwrap() + } } - pub fn get_host_port(&self) -> &str { - #[cfg(feature = "unix-socket")] - if self.scheme.is_unix_socket() { - match self.scheme_index { - Some(index) => return &self.host_port.as_str()[index + 2..], - None => return self.host_port.as_str(), - } + pub fn append_path_segment(&mut self, path: &str) { + if !self.value.ends_with('/') { + self.value.push('/'); + } + if self.path_index == 0 { + self.path_index = self.value.len() - 1; } - match self.scheme_index { - Some(index) => &self.host_port.as_str()[index + 3..] - .split('/') - .next() - .unwrap(), - None => self.host_port.as_str().split('/').next().unwrap(), + if path.starts_with('/') { + self.value.push_str(&path[1..]); + } else { + self.value.push_str(path); } } - pub fn get_domain(&self) -> &str { - if let Some(tls_domain) = self.tls_domain.as_ref() { - return tls_domain.as_str(); + pub fn append_query_param(&mut self, param: &str, value: Option<&str>) { + if self.query_index == 0 { + self.value.push('?'); + self.query_index = self.value.len() - 1; + } else { + self.value.push('&'); } - - let host_port = self.get_host_port(); - if let Some(index) = host_port.find(":") { - return &host_port[0..index]; + url_utils::encode_to_url_string_and_copy(&mut self.value, param); + if let Some(value) = value { + self.value.push('='); + url_utils::encode_to_url_string_and_copy(&mut self.value, value); } - - host_port.split('/').next().unwrap() } - fn fill_schema_and_host(&self, result: &mut String) { - result.push_str(self.scheme.scheme_as_str()); - - if let Some(index) = self.scheme_index { - #[cfg(feature = "unix-socket")] - if self.scheme.is_unix_socket() { - result.push_str(&self.host_port.as_str()[index + 2..]); - } else { - result.push_str(&self.host_port.as_str()[index + 3..]); - } - - result.push_str(&self.host_port.as_str()[index + 3..]); - } else { - result.push_str(self.host_port.as_str()); - } + pub fn append_raw_ending(&mut self, raw_ending: &str) { + self.value.push_str(raw_ending); } - fn fill_schema_and_host_to_short_string(&self, result: &mut ShortString) { - result.push_str(self.scheme.scheme_as_str()); + pub fn get_scheme(&self) -> Scheme { + Scheme::from_url(&self.value).0 + } - if let Some(index) = self.scheme_index { - result.push_str(&self.host_port.as_str()[index + 3..]); - } else { - result.push_str(self.host_port.as_str()); + pub fn get_host(&self) -> &str { + if self.port_index > 0 { + return &self.value[self.host_index..self.port_index]; } - } - pub fn get_scheme_and_host(&self) -> ShortString { - #[cfg(feature = "unix-socket")] - if self.scheme.is_unix_socket() { - return self.host_port.clone(); + if self.path_index > 0 { + return &self.value[self.host_index..self.path_index]; } - if self.scheme_index.is_some() { - return self.host_port.as_str().into(); + if self.query_index > 0 { + return &self.value[self.host_index..self.query_index]; } - let mut result = ShortString::new_empty(); - self.fill_schema_and_host_to_short_string(&mut result); - result.into() + &self.value[self.host_index..] } - pub fn get_path_and_query(self) -> String { - let mut result = String::new(); + pub fn get_host_port(&self) -> &str { + if self.get_scheme().is_unix_socket() { + if self.query_index > 0 { + return &self.value[self.host_index - 1..self.query_index]; + } else { + return &self.value[self.host_index - 1..]; + } + } - fill_with_path(&mut result, &self.path_segments); + if self.path_index > 0 { + return &self.value[self.host_index..self.path_index]; + } - if self.query.len() > 0 { - fill_with_query(&mut result, &self.query) + if self.query_index > 0 { + return &self.value[self.host_index..self.query_index]; } - result + &self.value[self.host_index..] } - pub fn get_path(&self) -> String { - if self.path_segments.len() == 0 { - return "/".to_string(); + pub fn get_scheme_and_host(&self) -> &str { + if self.get_scheme().is_unix_socket() { + if self.query_index > 0 { + return &self.value[..self.query_index]; + } else { + return &self.value; + } } - let mut result = String::new(); + if self.path_index > 0 { + return &self.value[..self.path_index]; + } - fill_with_path(&mut result, &self.path_segments); + if self.query_index > 0 { + return &self.value[..self.query_index]; + } - result + &self.value } - pub fn to_string(&self) -> String { - let mut result: String = String::new(); - - self.fill_schema_and_host(&mut result); - - if self.path_segments.len() > 0 { - fill_with_path(&mut result, &self.path_segments); - } else { - if self.has_last_slash && self.raw_ending.is_none() { - result.push('/'); - } + pub fn get_path_and_query(&self) -> &str { + if self.get_scheme().is_unix_socket() { + return &self.value[self.host_index - 1..]; } - if self.query.len() > 0 { - fill_with_query(&mut result, &self.query) + if self.path_index == 0 && self.query_index == 0 { + return "/"; } - if let Some(raw_ending) = &self.raw_ending { - result.push_str(raw_ending) + if self.path_index > 0 { + return &self.value[self.path_index..]; } - result + return &self.value[self.query_index..]; } - - pub fn into_builder_owned(&self) -> UrlBuilderOwned { - UrlBuilderOwned::new(self.to_string()) - } -} - -fn fill_with_path<'s>(res: &mut String, path: &str) { - res.push('/'); - if path.len() == 0 { - return; + pub fn host_is_ip(&self) -> bool { + let host = self.get_host(); + host.chars().all(|c| c.is_numeric() || c == '.') } - res.push_str(path) -} + pub fn get_path(&self) -> &str { + if self.path_index == 0 { + return "/"; + } + if self.query_index == 0 { + return &self.value[self.path_index..]; + } -fn remove_last_symbol_if_exists<'s>(src: &mut ShortString, last_symbol: char) -> bool { - let last_char = last_symbol as u8; - let src_as_bytes = src.as_str().as_bytes(); - if src_as_bytes[src_as_bytes.len() - 1] == last_char { - src.set_len(src_as_bytes.len() as u8 - 1); - return true; + &self.value[self.path_index..self.query_index] } - false -} - -fn fill_with_query(res: &mut String, src: &Vec<(String, Option)>) { - let mut first = true; - for (key, value) in src { - if first { - res.push('?'); - first = false; - } else { - res.push('&'); - } - url_utils::encode_to_url_string_and_copy(res, key); + pub fn as_str(&self) -> &str { + &self.value + } - if let Some(value) = value { - res.push('='); - url_utils::encode_to_url_string_and_copy(res, value); - } + pub fn to_string(&self) -> String { + self.value.to_string() } } @@ -223,12 +226,14 @@ mod tests { pub fn test_with_default_scheme() { let uri_builder = UrlBuilder::new("google.com".into()); - assert_eq!("http://google.com", uri_builder.to_string()); - assert_eq!( - "http://google.com", - uri_builder.get_scheme_and_host().as_str() - ); - assert_eq!("google.com", uri_builder.get_domain()); + assert_eq!(uri_builder.host_index, 7); + assert_eq!(uri_builder.port_index, 0); + assert_eq!(uri_builder.path_index, 0); + assert_eq!(uri_builder.query_index, 0); + + assert_eq!("http://google.com", uri_builder.as_str()); + assert_eq!("http://google.com", uri_builder.get_scheme_and_host()); + assert_eq!("google.com", uri_builder.get_host()); assert_eq!(true, uri_builder.get_scheme().is_http()); assert_eq!("google.com", uri_builder.get_host_port()); @@ -241,11 +246,13 @@ mod tests { pub fn test_with_http_scheme() { let uri_builder = UrlBuilder::new("http://google.com".into()); + assert_eq!(uri_builder.host_index, 7); + assert_eq!(uri_builder.port_index, 0); + assert_eq!(uri_builder.path_index, 0); + assert_eq!(uri_builder.query_index, 0); + assert_eq!("http://google.com", uri_builder.to_string()); - assert_eq!( - "http://google.com", - uri_builder.get_scheme_and_host().as_str() - ); + assert_eq!("http://google.com", uri_builder.get_scheme_and_host()); assert_eq!(true, uri_builder.get_scheme().is_http()); assert_eq!("google.com", uri_builder.get_host_port()); assert_eq!("/", uri_builder.get_path()); @@ -256,11 +263,13 @@ mod tests { pub fn test_with_http_scheme_and_last_slash() { let uri_builder = UrlBuilder::new("http://google.com/".into()); + assert_eq!(uri_builder.host_index, 7); + assert_eq!(uri_builder.port_index, 0); + assert_eq!(uri_builder.path_index, 17); + assert_eq!(uri_builder.query_index, 0); + assert_eq!("http://google.com/", uri_builder.to_string()); - assert_eq!( - "http://google.com", - uri_builder.get_scheme_and_host().as_str() - ); + assert_eq!("http://google.com", uri_builder.get_scheme_and_host()); assert_eq!(true, uri_builder.get_scheme().is_http()); assert_eq!("google.com", uri_builder.get_host_port()); assert_eq!("/", uri_builder.get_path()); @@ -271,11 +280,13 @@ mod tests { pub fn test_with_https_scheme() { let uri_builder = UrlBuilder::new("https://google.com".into()); + assert_eq!(uri_builder.host_index, 8); + assert_eq!(uri_builder.port_index, 0); + assert_eq!(uri_builder.path_index, 0); + assert_eq!(uri_builder.query_index, 0); + assert_eq!("https://google.com", uri_builder.to_string()); - assert_eq!( - "https://google.com", - uri_builder.get_scheme_and_host().as_str() - ); + assert_eq!("https://google.com", uri_builder.get_scheme_and_host()); assert_eq!(true, uri_builder.get_scheme().is_https()); assert_eq!("google.com", uri_builder.get_host_port()); @@ -286,14 +297,17 @@ mod tests { #[test] pub fn test_path_segments() { let mut uri_builder = UrlBuilder::new("https://google.com".into()); + assert_eq!(uri_builder.host_index, 8); + assert_eq!(uri_builder.port_index, 0); + assert_eq!(uri_builder.path_index, 0); + assert_eq!(uri_builder.query_index, 0); + uri_builder.append_path_segment("first"); + assert_eq!(uri_builder.path_index, 18); uri_builder.append_path_segment("second"); - assert_eq!("https://google.com/first/second", uri_builder.to_string()); - assert_eq!( - "https://google.com", - uri_builder.get_scheme_and_host().as_str() - ); + assert_eq!("https://google.com/first/second", uri_builder.as_str()); + assert_eq!("https://google.com", uri_builder.get_scheme_and_host()); assert_eq!(true, uri_builder.get_scheme().is_https()); assert_eq!("google.com", uri_builder.get_host_port()); @@ -304,14 +318,15 @@ mod tests { #[test] pub fn test_path_segments_with_slug_at_the_end() { let mut uri_builder = UrlBuilder::new("https://google.com/".into()); + assert_eq!(uri_builder.host_index, 8); + assert_eq!(uri_builder.port_index, 0); + assert_eq!(uri_builder.path_index, 18); + assert_eq!(uri_builder.query_index, 0); uri_builder.append_path_segment("first"); uri_builder.append_path_segment("second"); assert_eq!("https://google.com/first/second", uri_builder.to_string()); - assert_eq!( - "https://google.com", - uri_builder.get_scheme_and_host().as_str() - ); + assert_eq!("https://google.com", uri_builder.get_scheme_and_host()); assert_eq!(true, uri_builder.get_scheme().is_https()); assert_eq!("google.com", uri_builder.get_host_port()); @@ -322,23 +337,25 @@ mod tests { #[test] pub fn test_query_with_no_path() { let mut uri_builder = UrlBuilder::new("https://google.com".into()); - uri_builder.append_query_param("first".to_string(), Some("first_value".to_string())); - uri_builder.append_query_param("second".to_string(), Some("second_value".to_string())); + uri_builder.append_query_param("first", Some("first_value")); + uri_builder.append_query_param("second", Some("second_value")); + + assert_eq!(uri_builder.host_index, 8); + assert_eq!(uri_builder.port_index, 0); + assert_eq!(uri_builder.path_index, 0); + assert_eq!(uri_builder.query_index, 18); assert_eq!( "https://google.com?first=first_value&second=second_value", uri_builder.to_string() ); - assert_eq!( - "https://google.com", - uri_builder.get_scheme_and_host().as_str() - ); + assert_eq!("https://google.com", uri_builder.get_scheme_and_host()); assert_eq!(true, uri_builder.get_scheme().is_https()); assert_eq!("google.com", uri_builder.get_host_port()); - assert_eq!("/", uri_builder.get_path()); + assert_eq!(uri_builder.get_path(), "/",); assert_eq!( - "/?first=first_value&second=second_value", + "?first=first_value&second=second_value", uri_builder.get_path_and_query() ); } @@ -348,17 +365,17 @@ mod tests { let uri_builder = UrlBuilder::new("https://my-domain:5123".into()); assert_eq!("my-domain:5123", uri_builder.get_host_port()); - assert_eq!("my-domain", uri_builder.get_domain()); + assert_eq!("my-domain", uri_builder.get_host()); let uri_builder = UrlBuilder::new("https://my-domain:5123/my-path".into()); assert_eq!("my-domain:5123", uri_builder.get_host_port()); - assert_eq!("my-domain", uri_builder.get_domain()); + assert_eq!("my-domain", uri_builder.get_host()); let uri_builder = UrlBuilder::new("https://my-domain/my-path".into()); assert_eq!("my-domain", uri_builder.get_host_port()); - assert_eq!("my-domain", uri_builder.get_domain()); + assert_eq!("my-domain", uri_builder.get_host()); } #[test] @@ -367,17 +384,14 @@ mod tests { uri_builder.append_path_segment("first"); uri_builder.append_path_segment("second"); - uri_builder.append_query_param("first".to_string(), Some("first_value".to_string())); - uri_builder.append_query_param("second".to_string(), Some("second_value".to_string())); + uri_builder.append_query_param("first", Some("first_value")); + uri_builder.append_query_param("second", Some("second_value")); assert_eq!( "https://google.com/first/second?first=first_value&second=second_value", uri_builder.to_string() ); - assert_eq!( - "https://google.com", - uri_builder.get_scheme_and_host().as_str() - ); + assert_eq!("https://google.com", uri_builder.get_scheme_and_host()); assert_eq!(true, uri_builder.get_scheme().is_https()); assert_eq!("google.com", uri_builder.get_host_port()); @@ -389,61 +403,54 @@ mod tests { } #[test] - #[cfg(feature = "unix-socket")] + pub fn test_unix_path_and_query() { let mut uri_builder = UrlBuilder::new("http+unix://var/run/test".into()); - uri_builder.append_path_segment("first"); - uri_builder.append_path_segment("second"); - uri_builder.append_query_param("first".to_string(), Some("first_value".to_string())); - uri_builder.append_query_param("second".to_string(), Some("second_value".to_string())); + uri_builder.append_query_param("first", Some("first_value")); + uri_builder.append_query_param("second", Some("second_value")); assert_eq!(true, uri_builder.get_scheme().is_unix_socket()); assert_eq!( - "http+unix://var/run/test/first/second?first=first_value&second=second_value", + "http+unix://var/run/test?first=first_value&second=second_value", uri_builder.to_string() ); assert_eq!( "http+unix://var/run/test", - uri_builder.get_scheme_and_host().as_str() + uri_builder.get_scheme_and_host() ); assert_eq!("/var/run/test", uri_builder.get_host_port()); - assert_eq!("/first/second", uri_builder.get_path()); assert_eq!( - "/first/second?first=first_value&second=second_value", + "/var/run/test?first=first_value&second=second_value", uri_builder.get_path_and_query() ); } #[test] - #[cfg(feature = "unix-socket")] pub fn test_unix_from_home_path() { let mut uri_builder = UrlBuilder::new("http+unix:/~/var/run/test".into()); - uri_builder.append_path_segment("first"); - uri_builder.append_path_segment("second"); - uri_builder.append_query_param("first".to_string(), Some("first_value".to_string())); - uri_builder.append_query_param("second".to_string(), Some("second_value".to_string())); + uri_builder.append_query_param("first", Some("first_value")); + uri_builder.append_query_param("second", Some("second_value")); assert_eq!(true, uri_builder.get_scheme().is_unix_socket()); assert_eq!( - "http+unix:/~/var/run/test/first/second?first=first_value&second=second_value", + "http+unix:/~/var/run/test?first=first_value&second=second_value", uri_builder.to_string() ); assert_eq!( "http+unix:/~/var/run/test", - uri_builder.get_scheme_and_host().as_str() + uri_builder.get_scheme_and_host() ); assert_eq!("~/var/run/test", uri_builder.get_host_port()); - assert_eq!("/first/second", uri_builder.get_path()); assert_eq!( - "/first/second?first=first_value&second=second_value", + "~/var/run/test?first=first_value&second=second_value", uri_builder.get_path_and_query() ); }