diff --git a/Cargo.toml b/Cargo.toml index 386bc1c..2164a2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,15 @@ keywords = ["wal", "grpc", "s2", "log", "stream"] repository = "https://github.com/s2-streamstore/s2-sdk-rust" homepage = "https://github.com/s2-streamstore/s2-sdk-rust" +[package.metadata.docs.rs] +features = ["connector"] +cargo-args = ["-Zunstable-options", "-Zrustdoc-scrape-examples"] + +[[example]] +# `doc-scrape-examples` requires *any* one example to specify the option. +name = "create_basin" +doc-scrape-examples = true + [dependencies] async-stream = "0.3.6" backon = "1.2.0" diff --git a/examples/basic.rs b/examples/basic.rs deleted file mode 100644 index a9a17e8..0000000 --- a/examples/basic.rs +++ /dev/null @@ -1,168 +0,0 @@ -use futures::StreamExt; -use streamstore::{ - batching::AppendRecordsBatchingStream, - client::{Client, ClientConfig, ClientError, S2Endpoints}, - types::{ - AppendInput, AppendRecord, AppendRecordBatch, BasinName, CreateBasinRequest, - CreateStreamRequest, DeleteBasinRequest, DeleteStreamRequest, ListBasinsRequest, - ListStreamsRequest, ReadSessionRequest, - }, -}; - -#[tokio::main] -async fn main() { - let token = std::env::var("S2_AUTH_TOKEN").unwrap(); - - let config = ClientConfig::new(token).with_endpoints(S2Endpoints::from_env().unwrap()); - - println!("Connecting with {config:#?}"); - - let client = Client::new(config); - - let basin: BasinName = "s2-sdk-example-basin".parse().unwrap(); - - let create_basin_req = CreateBasinRequest::new(basin.clone()); - - match client.create_basin(create_basin_req).await { - Ok(created_basin) => { - println!("Basin created: {created_basin:#?}"); - } - Err(ClientError::Service(status)) => { - if status.code() == tonic::Code::AlreadyExists { - println!("WARN: {}", status.message()); - } - } - Err(other) => exit_with_err(other), - }; - - let list_basins_req = ListBasinsRequest::new(); - - match client.list_basins(list_basins_req).await { - Ok(basins_list) => { - println!( - "List of basins: {:#?}{}", - basins_list.basins, - if basins_list.has_more { - " ... and more ..." - } else { - "" - } - ) - } - Err(err) => exit_with_err(err), - }; - - match client.get_basin_config(basin.clone()).await { - Ok(config) => { - println!("Basin config: {config:#?}"); - } - Err(err) => exit_with_err(err), - }; - - let stream = "s2-sdk-example-stream"; - - let create_stream_req = CreateStreamRequest::new(stream); - - let basin_client = client.basin_client(basin.clone()); - - match basin_client.create_stream(create_stream_req).await { - Ok(info) => { - println!("Stream created: {info:?}"); - } - Err(ClientError::Service(status)) => { - if status.code() == tonic::Code::AlreadyExists { - println!("WARN: {}", status.message()); - } - } - Err(other) => exit_with_err(other), - }; - - let list_streams_req = ListStreamsRequest::new(); - - match basin_client.list_streams(list_streams_req).await { - Ok(streams_list) => { - println!( - "List of streams: {:#?}{}", - streams_list.streams, - if streams_list.has_more { - " ... and more ..." - } else { - "" - } - ) - } - Err(err) => exit_with_err(err), - } - - match basin_client.get_stream_config(stream).await { - Ok(config) => { - println!("Stream config: {config:#?}"); - } - Err(err) => exit_with_err(err), - }; - - let stream_client = basin_client.stream_client(stream); - - let records = [ - AppendRecord::new("hello world").unwrap(), - AppendRecord::new("bye world").unwrap(), - ]; - - let append_input = AppendInput::new(AppendRecordBatch::try_from_iter(records.clone()).unwrap()); - - match stream_client.append(append_input.clone()).await { - Ok(resp) => { - println!("Appended: {resp:#?}"); - } - Err(err) => exit_with_err(err), - }; - - let append_session_req = - AppendRecordsBatchingStream::new(futures::stream::iter(records), Default::default()); - - match stream_client.append_session(append_session_req).await { - Ok(mut stream) => { - println!("Appended in session: {:#?}", stream.next().await); - } - Err(err) => exit_with_err(err), - }; - - match stream_client.check_tail().await { - Ok(next_seq_num) => { - println!("Next seq num: {next_seq_num:#?}"); - } - Err(err) => exit_with_err(err), - }; - - let read_session_req = ReadSessionRequest::default(); - - match stream_client.read_session(read_session_req).await { - Ok(mut stream) => { - println!("Read session: {:#?}", stream.next().await); - } - Err(err) => exit_with_err(err), - }; - - let delete_stream_req = DeleteStreamRequest::new(stream).with_if_exists(true); - - match basin_client.delete_stream(delete_stream_req).await { - Ok(()) => { - println!("Stream deleted!") - } - Err(err) => exit_with_err(err), - }; - - let delete_basin_req = DeleteBasinRequest::new(basin).with_if_exists(false); - - match client.delete_basin(delete_basin_req).await { - Ok(()) => { - println!("Basin deleted!") - } - Err(err) => exit_with_err(err), - }; -} - -fn exit_with_err(err: E) { - println!("Error: {err}"); - std::process::exit(1); -} diff --git a/examples/consumer.rs b/examples/consumer.rs new file mode 100644 index 0000000..9e99cb2 --- /dev/null +++ b/examples/consumer.rs @@ -0,0 +1,32 @@ +use futures::StreamExt; +use streamstore::{ + client::{ClientConfig, StreamClient}, + types::{BasinName, ReadSessionRequest}, +}; +use tokio::select; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let token = std::env::var("S2_AUTH_TOKEN")?; + let config = ClientConfig::new(token); + let basin: BasinName = "my-basin".parse()?; + let stream = "my-stream"; + let stream_client = StreamClient::new(config, basin, stream); + + let start_seq_num = 0; + let read_session_request = ReadSessionRequest::new(start_seq_num); + let mut read_stream = stream_client.read_session(read_session_request).await?; + + loop { + select! { + next_batch = read_stream.next() => { + let Some(next_batch) = next_batch else { break }; + let next_batch = next_batch?; + println!("{next_batch:?}"); + } + _ = tokio::signal::ctrl_c() => break, + } + } + + Ok(()) +} diff --git a/examples/create_basin.rs b/examples/create_basin.rs new file mode 100644 index 0000000..abe5047 --- /dev/null +++ b/examples/create_basin.rs @@ -0,0 +1,34 @@ +use std::time::Duration; + +use streamstore::{ + client::{Client, ClientConfig}, + types::{BasinConfig, BasinName, CreateBasinRequest, RetentionPolicy, StreamConfig}, +}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let token = std::env::var("S2_AUTH_TOKEN")?; + let config = ClientConfig::new(token); + let client = Client::new(config); + + let basin: BasinName = "my-basin".parse()?; + + let default_stream_config = StreamConfig::new().with_retention_policy(RetentionPolicy::Age( + // Set the default retention age to 10 days. + Duration::from_secs(10 * 24 * 60 * 60), + )); + + let basin_config = BasinConfig { + default_stream_config: Some(default_stream_config), + }; + + let create_basin_request = CreateBasinRequest::new(basin.clone()).with_config(basin_config); + + let created_basin = client.create_basin(create_basin_request).await?; + println!("{created_basin:#?}"); + + let basin_config = client.get_basin_config(basin).await?; + println!("{basin_config:#?}"); + + Ok(()) +} diff --git a/examples/create_stream.rs b/examples/create_stream.rs new file mode 100644 index 0000000..c840010 --- /dev/null +++ b/examples/create_stream.rs @@ -0,0 +1,28 @@ +use streamstore::{ + client::{Client, ClientConfig}, + types::{BasinName, CreateStreamRequest, StorageClass, StreamConfig}, +}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let token = std::env::var("S2_AUTH_TOKEN")?; + let config = ClientConfig::new(token); + let client = Client::new(config); + + let basin: BasinName = "my-basin".parse()?; + let basin_client = client.basin_client(basin); + + let stream = "my-stream"; + + let stream_config = StreamConfig::new().with_storage_class(StorageClass::Express); + + let create_stream_request = CreateStreamRequest::new(stream).with_config(stream_config); + + let created_stream = basin_client.create_stream(create_stream_request).await?; + println!("{created_stream:#?}"); + + let stream_config = basin_client.get_stream_config(stream).await?; + println!("{stream_config:#?}"); + + Ok(()) +} diff --git a/examples/delete_basin.rs b/examples/delete_basin.rs new file mode 100644 index 0000000..a2a9c47 --- /dev/null +++ b/examples/delete_basin.rs @@ -0,0 +1,21 @@ +use streamstore::{ + client::{Client, ClientConfig}, + types::{BasinName, DeleteBasinRequest}, +}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let token = std::env::var("S2_AUTH_TOKEN")?; + let config = ClientConfig::new(token); + let client = Client::new(config); + + let basin: BasinName = "my-basin".parse()?; + + let delete_basin_request = DeleteBasinRequest::new(basin) + // Don't error if the basin doesn't exist. + .with_if_exists(true); + + client.delete_basin(delete_basin_request).await?; + + Ok(()) +} diff --git a/examples/delete_stream.rs b/examples/delete_stream.rs new file mode 100644 index 0000000..151354f --- /dev/null +++ b/examples/delete_stream.rs @@ -0,0 +1,20 @@ +use streamstore::{ + client::{BasinClient, ClientConfig}, + types::{BasinName, DeleteStreamRequest}, +}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let token = std::env::var("S2_AUTH_TOKEN")?; + let config = ClientConfig::new(token); + let basin: BasinName = "my-basin".parse()?; + let basin_client = BasinClient::new(config, basin); + + let stream = "my-stream"; + + let delete_stream_request = DeleteStreamRequest::new(stream); + + basin_client.delete_stream(delete_stream_request).await?; + + Ok(()) +} diff --git a/examples/explicit_trim.rs b/examples/explicit_trim.rs new file mode 100644 index 0000000..58fa936 --- /dev/null +++ b/examples/explicit_trim.rs @@ -0,0 +1,31 @@ +use streamstore::{ + client::{ClientConfig, StreamClient}, + types::{AppendInput, AppendRecordBatch, BasinName, CommandRecord}, +}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let token = std::env::var("S2_AUTH_TOKEN")?; + let config = ClientConfig::new(token); + let basin: BasinName = "my-basin".parse()?; + let stream = "my-stream"; + let stream_client = StreamClient::new(config, basin, stream); + + let tail = stream_client.check_tail().await?; + if tail == 0 { + println!("Empty stream"); + return Ok(()); + } + + let latest_seq_num = tail - 1; + let trim_request = CommandRecord::trim(latest_seq_num); + + let append_record_batch = AppendRecordBatch::try_from_iter([trim_request]) + .expect("valid batch with 1 command record"); + let append_input = AppendInput::new(append_record_batch); + let _ = stream_client.append(append_input).await?; + + println!("Trim requested"); + + Ok(()) +} diff --git a/examples/get_latest_record.rs b/examples/get_latest_record.rs new file mode 100644 index 0000000..1099bfb --- /dev/null +++ b/examples/get_latest_record.rs @@ -0,0 +1,29 @@ +use streamstore::{ + client::{ClientConfig, StreamClient}, + types::{BasinName, ReadLimit, ReadRequest}, +}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let token = std::env::var("S2_AUTH_TOKEN")?; + let config = ClientConfig::new(token); + let basin: BasinName = "my-basin".parse()?; + let stream = "my-stream"; + let stream_client = StreamClient::new(config, basin, stream); + + let tail = stream_client.check_tail().await?; + if tail == 0 { + println!("Empty stream"); + return Ok(()); + } + + let latest_seq_num = tail - 1; + + let read_limit = ReadLimit { count: 1, bytes: 0 }; + let read_request = ReadRequest::new(latest_seq_num).with_limit(read_limit); + let latest_record = stream_client.read(read_request).await?; + + println!("{latest_record:#?}"); + + Ok(()) +} diff --git a/examples/list_all_basins.rs b/examples/list_all_basins.rs new file mode 100644 index 0000000..0be001a --- /dev/null +++ b/examples/list_all_basins.rs @@ -0,0 +1,34 @@ +use streamstore::{ + client::{Client, ClientConfig}, + types::ListBasinsRequest, +}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let token = std::env::var("S2_AUTH_TOKEN")?; + let config = ClientConfig::new(token); + let client = Client::new(config); + + let mut all_basins = Vec::new(); + + let mut has_more = true; + let mut start_after: Option = None; + + while has_more { + let mut list_basins_request = ListBasinsRequest::new(); + if let Some(start_after) = start_after.take() { + list_basins_request = list_basins_request.with_start_after(start_after); + } + + let list_basins_response = client.list_basins(list_basins_request).await?; + + all_basins.extend(list_basins_response.basins); + + start_after = all_basins.last().map(|b| b.name.clone()); + has_more = list_basins_response.has_more; + } + + println!("{all_basins:#?}"); + + Ok(()) +} diff --git a/examples/list_streams.rs b/examples/list_streams.rs new file mode 100644 index 0000000..af73aa4 --- /dev/null +++ b/examples/list_streams.rs @@ -0,0 +1,21 @@ +use streamstore::{ + client::{BasinClient, ClientConfig}, + types::{BasinName, ListStreamsRequest}, +}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let token = std::env::var("S2_AUTH_TOKEN")?; + let config = ClientConfig::new(token); + let basin: BasinName = "my-basin".parse()?; + let basin_client = BasinClient::new(config, basin); + + let prefix = "my-"; + let list_streams_request = ListStreamsRequest::new().with_prefix(prefix); + + let list_streams_response = basin_client.list_streams(list_streams_request).await?; + + println!("{list_streams_response:#?}"); + + Ok(()) +} diff --git a/examples/producer.rs b/examples/producer.rs new file mode 100644 index 0000000..2ced5aa --- /dev/null +++ b/examples/producer.rs @@ -0,0 +1,48 @@ +use futures::StreamExt; +use streamstore::{ + batching::{AppendRecordsBatchingOpts, AppendRecordsBatchingStream}, + client::{ClientConfig, StreamClient}, + types::{AppendInput, AppendRecord, AppendRecordBatch, BasinName, CommandRecord, FencingToken}, +}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let token = std::env::var("S2_AUTH_TOKEN")?; + let config = ClientConfig::new(token); + let basin: BasinName = "my-basin".parse()?; + let stream = "my-stream"; + let stream_client = StreamClient::new(config, basin, stream); + + let fencing_token = FencingToken::generate(16).expect("valid fencing token with 16 bytes"); + + // Set the fencing token. + let fencing_token_record: AppendRecord = CommandRecord::fence(fencing_token.clone()).into(); + let fencing_token_batch = AppendRecordBatch::try_from_iter([fencing_token_record]) + .expect("valid batch with 1 append record"); + let fencing_token_append_input = AppendInput::new(fencing_token_batch); + let set_fencing_token = stream_client.append(fencing_token_append_input).await?; + + let match_seq_num = set_fencing_token.next_seq_num; // Tail + + // Stream of records + let append_stream = futures::stream::iter([ + AppendRecord::new("record_1")?, + AppendRecord::new("record_2")?, + ]); + + let append_records_batching_opts = AppendRecordsBatchingOpts::new() + .with_fencing_token(Some(fencing_token)) + .with_match_seq_num(Some(match_seq_num)); + + let append_session_request = + AppendRecordsBatchingStream::new(append_stream, append_records_batching_opts); + + let mut append_session_stream = stream_client.append_session(append_session_request).await?; + + while let Some(next) = append_session_stream.next().await { + let next = next?; + println!("{next:#?}"); + } + + Ok(()) +} diff --git a/examples/reconfigure_basin.rs b/examples/reconfigure_basin.rs new file mode 100644 index 0000000..069c99a --- /dev/null +++ b/examples/reconfigure_basin.rs @@ -0,0 +1,30 @@ +use streamstore::{ + client::{Client, ClientConfig}, + types::{BasinConfig, BasinName, ReconfigureBasinRequest, StorageClass, StreamConfig}, +}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let token = std::env::var("S2_AUTH_TOKEN")?; + let config = ClientConfig::new(token); + let client = Client::new(config); + + let basin: BasinName = "my-basin".parse()?; + + let default_stream_config_updates = + StreamConfig::new().with_storage_class(StorageClass::Standard); + let basin_config_updates = BasinConfig { + default_stream_config: Some(default_stream_config_updates), + }; + + let reconfigure_basin_request = ReconfigureBasinRequest::new(basin) + .with_config(basin_config_updates) + // Field mask specifies which fields to update. + .with_mask(vec!["default_stream_config.retention_policy".to_string()]); + + let updated_basin_config = client.reconfigure_basin(reconfigure_basin_request).await?; + + println!("{updated_basin_config:#?}"); + + Ok(()) +} diff --git a/examples/reconfigure_stream.rs b/examples/reconfigure_stream.rs new file mode 100644 index 0000000..9faf3a3 --- /dev/null +++ b/examples/reconfigure_stream.rs @@ -0,0 +1,34 @@ +use std::time::Duration; + +use streamstore::{ + client::{BasinClient, ClientConfig}, + types::{BasinName, ReconfigureStreamRequest, RetentionPolicy, StreamConfig}, +}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let token = std::env::var("S2_AUTH_TOKEN")?; + let config = ClientConfig::new(token); + let basin: BasinName = "my-basin".parse()?; + let basin_client = BasinClient::new(config, basin); + + let stream = "my-stream"; + + let stream_config_updates = StreamConfig::new().with_retention_policy(RetentionPolicy::Age( + // Change to retention policy to 1 day + Duration::from_secs(24 * 60 * 60), + )); + + let reconfigure_stream_request = ReconfigureStreamRequest::new(stream) + .with_config(stream_config_updates) + // Field mask specifies which fields to update. + .with_mask(vec!["retention_policy".to_string()]); + + let updated_stream_config = basin_client + .reconfigure_stream(reconfigure_stream_request) + .await?; + + println!("{updated_stream_config:#?}"); + + Ok(()) +} diff --git a/src/types.rs b/src/types.rs index 8fa9e29..f7b4857 100644 --- a/src/types.rs +++ b/src/types.rs @@ -101,20 +101,6 @@ pub struct BasinConfig { pub default_stream_config: Option, } -impl BasinConfig { - /// Create a new request. - pub fn new() -> Self { - Self::default() - } - - /// Overwrite default stream configuration. - pub fn with_default_stream_config(default_stream_config: StreamConfig) -> Self { - Self { - default_stream_config: Some(default_stream_config), - } - } -} - impl From for api::BasinConfig { fn from(value: BasinConfig) -> Self { let BasinConfig {