Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: make ClientConfig fields private + revise docs #73

Merged
merged 2 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;
use futures::StreamExt;
use streamstore::{
batching::AppendRecordsBatchingStream,
client::{Client, ClientConfig, ClientError, HostEndpoints},
client::{Client, ClientConfig, ClientError, S2Endpoints},
types::{
AppendInput, AppendRecord, BasinName, CreateBasinRequest, CreateStreamRequest,
DeleteBasinRequest, DeleteStreamRequest, ListBasinsRequest, ListStreamsRequest,
Expand All @@ -15,10 +15,10 @@ use streamstore::{
async fn main() {
let token = std::env::var("S2_AUTH_TOKEN").unwrap();

let host_endpoints = HostEndpoints::from_env().unwrap();
let endpoints = S2Endpoints::from_env().unwrap();

let config = ClientConfig::new(token)
.with_host_endpoints(host_endpoints)
.with_endpoints(endpoints)
.with_request_timeout(Duration::from_secs(10));

println!("Connecting with {config:#?}");
Expand Down
86 changes: 38 additions & 48 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ use crate::{

const DEFAULT_HTTP_CONNECTOR: Option<HttpConnector> = None;

/// Cloud deployment to be used to connect the client with.
/// S2 cloud environment to connect with.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum HostCloud {
/// S2 hosted on AWS.
/// S2 running on AWS.
#[default]
Aws,
}
Expand Down Expand Up @@ -163,26 +163,26 @@ impl ParseError {
}
}

/// Endpoints for the hosted S2 environment.
/// Endpoints for the S2 environment.
#[derive(Debug, Clone)]
pub struct HostEndpoints {
pub struct S2Endpoints {
cell: Authority,
basin_zone: Option<Authority>,
}

impl From<HostCloud> for HostEndpoints {
impl From<HostCloud> for S2Endpoints {
fn from(cloud: HostCloud) -> Self {
HostEndpoints::for_cloud(cloud)
S2Endpoints::for_cloud(cloud)
}
}

impl Default for HostEndpoints {
impl Default for S2Endpoints {
fn default() -> Self {
Self::for_cloud(HostCloud::default())
}
}

impl HostEndpoints {
impl S2Endpoints {
pub fn for_cloud(cloud: HostCloud) -> Self {
Self::from_parts(cloud, HostEnv::default(), None, None)
}
Expand Down Expand Up @@ -244,35 +244,26 @@ impl HostEndpoints {
}
}

/// Client configuration to be used to connect with the host.
/// Client configuration.
#[derive(Debug, Clone)]
pub struct ClientConfig {
/// Auth token for the client.
pub token: SecretString,
/// Host URI to connect with.
pub host_endpoints: HostEndpoints,
/// Timeout for connecting/reconnecting.
pub connection_timeout: Duration,
/// Timeout for a particular request.
pub request_timeout: Duration,
/// User agent to be used for the client.
pub user_agent: HeaderValue,
/// URI scheme to use to connect.
pub(crate) token: SecretString,
pub(crate) endpoints: S2Endpoints,
pub(crate) connection_timeout: Duration,
pub(crate) request_timeout: Duration,
pub(crate) user_agent: HeaderValue,
#[cfg(feature = "connector")]
pub uri_scheme: http::uri::Scheme,
/// Backoff duration for retries.
pub retry_backoff_duration: Duration,
/// Maximum number of retries.
pub max_attempts: usize,
pub(crate) uri_scheme: http::uri::Scheme,
pub(crate) retry_backoff_duration: Duration,
pub(crate) max_attempts: usize,
}

impl ClientConfig {
/// Construct a new client configuration with given auth token and other
/// defaults.
/// Initialize a default client configuration with the specified authentication token.
pub fn new(token: impl Into<String>) -> Self {
Self {
token: token.into().into(),
host_endpoints: HostEndpoints::default(),
endpoints: S2Endpoints::default(),
connection_timeout: Duration::from_secs(3),
request_timeout: Duration::from_secs(5),
user_agent: "s2-sdk-rust".parse().expect("valid user agent"),
Expand All @@ -283,40 +274,39 @@ impl ClientConfig {
}
}

/// Construct from an existing configuration with the new host URIs.
pub fn with_host_endpoints(self, host_endpoints: impl Into<HostEndpoints>) -> Self {
/// S2 endpoints to connect to.
pub fn with_endpoints(self, host_endpoints: impl Into<S2Endpoints>) -> Self {
Self {
host_endpoints: host_endpoints.into(),
endpoints: host_endpoints.into(),
..self
}
}

/// Construct from an existing configuration with the new connection
/// timeout.
/// Timeout for connecting and transparently reconnecting. Defaults to 3s.
pub fn with_connection_timeout(self, connection_timeout: impl Into<Duration>) -> Self {
Self {
connection_timeout: connection_timeout.into(),
..self
}
}

/// Construct from an existing configuration with the new request timeout.
/// Timeout for a particular request. Defaults to 5s.
pub fn with_request_timeout(self, request_timeout: impl Into<Duration>) -> Self {
Self {
request_timeout: request_timeout.into(),
..self
}
}

/// Construct from an existing configuration with the new user agent.
/// User agent. Defaults to `s2-sdk-rust`. Feel free to say hi.
pub fn with_user_agent(self, user_agent: impl Into<HeaderValue>) -> Self {
Self {
user_agent: user_agent.into(),
..self
}
}

/// Construct from an existing configuration with the new URI scheme.
/// URI scheme to use when connecting with a custom connector. Defaults to `https`.
#[cfg(feature = "connector")]
pub fn with_uri_scheme(self, uri_scheme: impl Into<http::uri::Scheme>) -> Self {
Self {
Expand All @@ -325,15 +315,19 @@ impl ClientConfig {
}
}

/// Construct from an existing configuration with the retry backoff duration.
/// Backoff duration when retrying.
/// Defaults to 100ms.
/// A jitter is always applied.
pub fn with_retry_backoff_duration(self, retry_backoff_duration: impl Into<Duration>) -> Self {
Self {
retry_backoff_duration: retry_backoff_duration.into(),
..self
}
}

/// Construct from an existing configuration with maximum number of retries.
/// Maximum number of attempts per request.
/// Setting it to 1 disables retrying.
/// The default is to make 3 attempts.
pub fn max_attempts(self, max_attempts: usize) -> Self {
assert!(max_attempts > 0, "max attempts must be greater than 0");
Self {
Expand All @@ -351,14 +345,13 @@ pub enum ClientError {
Service(#[from] tonic::Status),
}

/// The S2 client to interact with the API.
/// Client for account-level operations.
#[derive(Debug, Clone)]
pub struct Client {
inner: ClientInner,
}

impl Client {
/// Create the client to connect with the S2 API.
pub fn new(config: ClientConfig) -> Self {
Self {
inner: ClientInner::new_cell(config, DEFAULT_HTTP_CONNECTOR),
Expand All @@ -378,7 +371,6 @@ impl Client {
}
}

/// Get the client to interact with the S2 basin service API.
pub fn basin_client(&self, basin: types::BasinName) -> BasinClient {
BasinClient {
inner: self.inner.new_basin(basin),
Expand Down Expand Up @@ -448,14 +440,13 @@ impl Client {
}
}

/// Client to interact with the S2 basin service API.
/// Client for basin-level operations.
#[derive(Debug, Clone)]
pub struct BasinClient {
inner: ClientInner,
}

impl BasinClient {
/// Create the client to connect with the S2 basin service API.
pub fn new(config: ClientConfig, basin: types::BasinName) -> Self {
Client::new(config).basin_client(basin)
}
Expand Down Expand Up @@ -543,15 +534,14 @@ impl BasinClient {
}
}

/// Client to interact with the S2 stream service API.
/// Client for stream-level operations.
#[derive(Debug, Clone)]
pub struct StreamClient {
inner: ClientInner,
stream: String,
}

impl StreamClient {
/// Create the client to connect with the S2 stream service API.
pub fn new(config: ClientConfig, basin: types::BasinName, stream: impl Into<String>) -> Self {
BasinClient::new(config, basin).stream_client(stream)
}
Expand Down Expand Up @@ -660,12 +650,12 @@ impl ClientInner {
C::Future: Send,
C::Error: std::error::Error + Send + Sync + 'static,
{
let cell_endpoint = config.host_endpoints.cell.clone();
let cell_endpoint = config.endpoints.cell.clone();
Self::new(config, cell_endpoint, connector)
}

fn new_basin(&self, basin: types::BasinName) -> Self {
match self.config.host_endpoints.basin_zone.clone() {
match self.config.endpoints.basin_zone.clone() {
Some(endpoint) => {
let basin_endpoint: Authority = format!("{basin}.{endpoint}")
.parse()
Expand Down Expand Up @@ -708,7 +698,7 @@ impl ClientInner {

let channel = if let Some(connector) = connector {
assert!(
config.host_endpoints.basin_zone.is_none(),
config.endpoints.basin_zone.is_none(),
"cannot connect with connector if basin zone is provided"
);
endpoint.connect_with_connector_lazy(connector)
Expand Down