From a9897ffbe249e34b25f7157b42e4fb10e2776dcd Mon Sep 17 00:00:00 2001 From: shikhar Date: Wed, 20 Nov 2024 11:35:17 -0500 Subject: [PATCH 1/2] s --- examples/basic.rs | 6 ++-- src/client.rs | 78 +++++++++++++++++++++-------------------------- 2 files changed, 37 insertions(+), 47 deletions(-) diff --git a/examples/basic.rs b/examples/basic.rs index e4c3db7..a52d66c 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -3,7 +3,7 @@ use std::time::Duration; use futures::StreamExt; use streamstore::{ batching::AppendRecordsBatchingStream, - client::{Client, ClientConfig, ClientError, HostEndpoints}, + client::{Client, ClientConfig, ClientError, S2Endpoints}, types::{ AppendInput, AppendRecord, BasinName, CreateBasinRequest, CreateStreamRequest, DeleteBasinRequest, DeleteStreamRequest, ListBasinsRequest, ListStreamsRequest, @@ -15,10 +15,10 @@ use streamstore::{ async fn main() { let token = std::env::var("S2_AUTH_TOKEN").unwrap(); - let host_endpoints = HostEndpoints::from_env().unwrap(); + let host_endpoints = S2Endpoints::from_env().unwrap(); let config = ClientConfig::new(token) - .with_host_endpoints(host_endpoints) + .with_endpoints(host_endpoints) .with_request_timeout(Duration::from_secs(10)); println!("Connecting with {config:#?}"); diff --git a/src/client.rs b/src/client.rs index 98ae454..ef01416 100644 --- a/src/client.rs +++ b/src/client.rs @@ -35,10 +35,10 @@ use crate::{ const DEFAULT_HTTP_CONNECTOR: Option = None; -/// Cloud deployment to be used to connect the client with. +/// S2 cloud environment to connect with. #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] pub enum HostCloud { - /// S2 hosted on AWS. + /// S2 running on AWS. #[default] Aws, } @@ -163,26 +163,26 @@ impl ParseError { } } -/// Endpoints for the hosted S2 environment. +/// Endpoints for the S2 environment. #[derive(Debug, Clone)] -pub struct HostEndpoints { +pub struct S2Endpoints { cell: Authority, basin_zone: Option, } -impl From for HostEndpoints { +impl From for S2Endpoints { fn from(cloud: HostCloud) -> Self { - HostEndpoints::for_cloud(cloud) + S2Endpoints::for_cloud(cloud) } } -impl Default for HostEndpoints { +impl Default for S2Endpoints { fn default() -> Self { Self::for_cloud(HostCloud::default()) } } -impl HostEndpoints { +impl S2Endpoints { pub fn for_cloud(cloud: HostCloud) -> Self { Self::from_parts(cloud, HostEnv::default(), None, None) } @@ -244,35 +244,26 @@ impl HostEndpoints { } } -/// Client configuration to be used to connect with the host. +/// Client configuration. #[derive(Debug, Clone)] pub struct ClientConfig { - /// Auth token for the client. - pub token: SecretString, - /// Host URI to connect with. - pub host_endpoints: HostEndpoints, - /// Timeout for connecting/reconnecting. - pub connection_timeout: Duration, - /// Timeout for a particular request. - pub request_timeout: Duration, - /// User agent to be used for the client. - pub user_agent: HeaderValue, - /// URI scheme to use to connect. + pub(crate) token: SecretString, + pub(crate) host_endpoints: S2Endpoints, + pub(crate) connection_timeout: Duration, + pub(crate) request_timeout: Duration, + pub(crate) user_agent: HeaderValue, #[cfg(feature = "connector")] - pub uri_scheme: http::uri::Scheme, - /// Backoff duration for retries. - pub retry_backoff_duration: Duration, - /// Maximum number of retries. - pub max_attempts: usize, + pub(crate) uri_scheme: http::uri::Scheme, + pub(crate) retry_backoff_duration: Duration, + pub(crate) max_attempts: usize, } impl ClientConfig { - /// Construct a new client configuration with given auth token and other - /// defaults. + /// Initialize a default client configuration with the specified authentication token. pub fn new(token: impl Into) -> Self { Self { token: token.into().into(), - host_endpoints: HostEndpoints::default(), + host_endpoints: S2Endpoints::default(), connection_timeout: Duration::from_secs(3), request_timeout: Duration::from_secs(5), user_agent: "s2-sdk-rust".parse().expect("valid user agent"), @@ -283,16 +274,15 @@ impl ClientConfig { } } - /// Construct from an existing configuration with the new host URIs. - pub fn with_host_endpoints(self, host_endpoints: impl Into) -> Self { + /// S2 endpoints to connect to. + pub fn with_endpoints(self, host_endpoints: impl Into) -> Self { Self { host_endpoints: host_endpoints.into(), ..self } } - /// Construct from an existing configuration with the new connection - /// timeout. + /// Timeout for connecting and transparently reconnecting. Defaults to 3s. pub fn with_connection_timeout(self, connection_timeout: impl Into) -> Self { Self { connection_timeout: connection_timeout.into(), @@ -300,7 +290,7 @@ impl ClientConfig { } } - /// Construct from an existing configuration with the new request timeout. + /// Timeout for a particular request. Defaults to 5s. pub fn with_request_timeout(self, request_timeout: impl Into) -> Self { Self { request_timeout: request_timeout.into(), @@ -308,7 +298,7 @@ impl ClientConfig { } } - /// Construct from an existing configuration with the new user agent. + /// User agent. Defaults to `s2-sdk-rust`. Feel free to say hi. pub fn with_user_agent(self, user_agent: impl Into) -> Self { Self { user_agent: user_agent.into(), @@ -316,7 +306,7 @@ impl ClientConfig { } } - /// Construct from an existing configuration with the new URI scheme. + /// URI scheme to use when connecting with a custom connector. Defaults to `https`. #[cfg(feature = "connector")] pub fn with_uri_scheme(self, uri_scheme: impl Into) -> Self { Self { @@ -325,7 +315,9 @@ impl ClientConfig { } } - /// Construct from an existing configuration with the retry backoff duration. + /// Backoff duration when retrying. + /// Defaults to 100ms. + /// A jitter is always applied. pub fn with_retry_backoff_duration(self, retry_backoff_duration: impl Into) -> Self { Self { retry_backoff_duration: retry_backoff_duration.into(), @@ -333,7 +325,9 @@ impl ClientConfig { } } - /// Construct from an existing configuration with maximum number of retries. + /// Maximum number of attempts per request. + /// Setting it to 1 disables retrying. + /// The default is to make 3 attempts. pub fn max_attempts(self, max_attempts: usize) -> Self { assert!(max_attempts > 0, "max attempts must be greater than 0"); Self { @@ -351,14 +345,13 @@ pub enum ClientError { Service(#[from] tonic::Status), } -/// The S2 client to interact with the API. +/// Client for account-level operations. #[derive(Debug, Clone)] pub struct Client { inner: ClientInner, } impl Client { - /// Create the client to connect with the S2 API. pub fn new(config: ClientConfig) -> Self { Self { inner: ClientInner::new_cell(config, DEFAULT_HTTP_CONNECTOR), @@ -378,7 +371,6 @@ impl Client { } } - /// Get the client to interact with the S2 basin service API. pub fn basin_client(&self, basin: types::BasinName) -> BasinClient { BasinClient { inner: self.inner.new_basin(basin), @@ -448,14 +440,13 @@ impl Client { } } -/// Client to interact with the S2 basin service API. +/// Client for basin-level operations. #[derive(Debug, Clone)] pub struct BasinClient { inner: ClientInner, } impl BasinClient { - /// Create the client to connect with the S2 basin service API. pub fn new(config: ClientConfig, basin: types::BasinName) -> Self { Client::new(config).basin_client(basin) } @@ -543,7 +534,7 @@ impl BasinClient { } } -/// Client to interact with the S2 stream service API. +/// Client for stream-level operations. #[derive(Debug, Clone)] pub struct StreamClient { inner: ClientInner, @@ -551,7 +542,6 @@ pub struct StreamClient { } impl StreamClient { - /// Create the client to connect with the S2 stream service API. pub fn new(config: ClientConfig, basin: types::BasinName, stream: impl Into) -> Self { BasinClient::new(config, basin).stream_client(stream) } From 1cc1c71dcae403e02f91b1827c358e9c9fb976fb Mon Sep 17 00:00:00 2001 From: shikhar Date: Wed, 20 Nov 2024 11:36:32 -0500 Subject: [PATCH 2/2] endpoints --- examples/basic.rs | 4 ++-- src/client.rs | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/examples/basic.rs b/examples/basic.rs index a52d66c..8e88f92 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -15,10 +15,10 @@ use streamstore::{ async fn main() { let token = std::env::var("S2_AUTH_TOKEN").unwrap(); - let host_endpoints = S2Endpoints::from_env().unwrap(); + let endpoints = S2Endpoints::from_env().unwrap(); let config = ClientConfig::new(token) - .with_endpoints(host_endpoints) + .with_endpoints(endpoints) .with_request_timeout(Duration::from_secs(10)); println!("Connecting with {config:#?}"); diff --git a/src/client.rs b/src/client.rs index ef01416..007999b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -248,7 +248,7 @@ impl S2Endpoints { #[derive(Debug, Clone)] pub struct ClientConfig { pub(crate) token: SecretString, - pub(crate) host_endpoints: S2Endpoints, + pub(crate) endpoints: S2Endpoints, pub(crate) connection_timeout: Duration, pub(crate) request_timeout: Duration, pub(crate) user_agent: HeaderValue, @@ -263,7 +263,7 @@ impl ClientConfig { pub fn new(token: impl Into) -> Self { Self { token: token.into().into(), - host_endpoints: S2Endpoints::default(), + endpoints: S2Endpoints::default(), connection_timeout: Duration::from_secs(3), request_timeout: Duration::from_secs(5), user_agent: "s2-sdk-rust".parse().expect("valid user agent"), @@ -277,7 +277,7 @@ impl ClientConfig { /// S2 endpoints to connect to. pub fn with_endpoints(self, host_endpoints: impl Into) -> Self { Self { - host_endpoints: host_endpoints.into(), + endpoints: host_endpoints.into(), ..self } } @@ -650,12 +650,12 @@ impl ClientInner { C::Future: Send, C::Error: std::error::Error + Send + Sync + 'static, { - let cell_endpoint = config.host_endpoints.cell.clone(); + let cell_endpoint = config.endpoints.cell.clone(); Self::new(config, cell_endpoint, connector) } fn new_basin(&self, basin: types::BasinName) -> Self { - match self.config.host_endpoints.basin_zone.clone() { + match self.config.endpoints.basin_zone.clone() { Some(endpoint) => { let basin_endpoint: Authority = format!("{basin}.{endpoint}") .parse() @@ -698,7 +698,7 @@ impl ClientInner { let channel = if let Some(connector) = connector { assert!( - config.host_endpoints.basin_zone.is_none(), + config.endpoints.basin_zone.is_none(), "cannot connect with connector if basin zone is provided" ); endpoint.connect_with_connector_lazy(connector)