From 5b9bfa23895dbf235a34a4a11521a1aba0ebe1c8 Mon Sep 17 00:00:00 2001 From: Stephen Balogh Date: Thu, 21 Nov 2024 14:53:10 -0800 Subject: [PATCH] hackk --- Cargo.toml | 5 +- src/append_session.rs | 373 +++++++++++++++++++++++++++++++++++++---- src/client.rs | 100 +++++++---- src/service/account.rs | 34 ++-- src/service/basin.rs | 34 ++-- src/service/stream.rs | 38 +++-- 6 files changed, 464 insertions(+), 120 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index db3ba46..227f6b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ license = "Apache-2.0" async-stream = "0.3.6" backon = "1.2.0" bytesize = "1.3.0" +enum-ordinalize = "4.3.0" futures = "0.3.31" http = "1.1.0" hyper = "1.5.0" @@ -25,10 +26,12 @@ serde = { version = "1.0.214", optional = true, features = ["derive"] } sync_docs = { path = "sync_docs" } thiserror = "1.0.67" tokio = { version = "1.41.1", features = ["time"] } +tokio-muxt = "0.5.0" tokio-stream = "0.1.16" tonic = { version = "0.12.3", features = ["tls", "tls-webpki-roots"] } -tonic-side-effect = { git = "https://github.com/s2-streamstore/tonic-side-effect.git", rev = "15eb454dd596517820dd31d8f3fef1cb2d9f1e78" } +tonic-side-effect = { git = "https://github.com/s2-streamstore/tonic-side-effect.git", rev = "4b298633211fccba5d8142930f635781114e5874" } tower-service = "0.3.3" +tracing = "0.1.40" [build-dependencies] tonic-build = { version = "0.12.3", features = ["prost"] } diff --git a/src/append_session.rs b/src/append_session.rs index d8a796d..0c5043d 100644 --- a/src/append_session.rs +++ b/src/append_session.rs @@ -1,46 +1,357 @@ -use crate::client::{ClientError, StreamClient}; -use crate::service::stream::AppendSessionServiceRequest; -use crate::{types, Streaming}; -use tokio::sync::mpsc; +use crate::client::{AppendRetryPolicy, ClientError, StreamClient}; +use crate::service::stream::{AppendSessionServiceRequest, AppendSessionStreamingResponse}; +use crate::service::ServiceStreamingResponse; +use crate::types::MeteredSize; +use crate::types; +use bytesize::ByteSize; +use enum_ordinalize::Ordinalize; +use futures::StreamExt; +use std::collections::VecDeque; +use std::ops::DerefMut; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::{mpsc, Mutex}; +use tokio::time::Instant; +use tokio_muxt::{CoalesceMode, MuxTimer}; use tokio_stream::wrappers::ReceiverStream; -use tokio_stream::StreamExt; -use tonic::Status; +use tonic_side_effect::FrameSignal; +use tracing::{trace, warn}; -async fn run( +async fn connect( + stream_client: &StreamClient, + frame_signal: FrameSignal, +) -> Result< + ( + mpsc::Sender, + ServiceStreamingResponse, + ), + ClientError, +> { + // Signal can be reset as we are creating a new connection anyway. + frame_signal.reset(); + let (input_tx, input_rx) = mpsc::channel(10); + let service_req = AppendSessionServiceRequest::new( + stream_client + .inner + .frame_monitoring_stream_service_client(frame_signal.clone()), + &stream_client.stream, + ReceiverStream::new(input_rx), + ); + + Ok((input_tx, stream_client.inner.send(service_req).await?)) +} + +struct InflightBatch { + start: Instant, + metered_size: ByteSize, + inner: types::AppendInput, +} + +#[derive(Ordinalize, Debug, Clone, Copy, PartialEq, Eq)] +enum TimerEvent { + MetricUpdate, + BatchDeadline, +} + +impl From for usize { + fn from(event: TimerEvent) -> Self { + event.ordinal() as usize + } +} + +struct AppendState +where + S: 'static + Send + Unpin + futures::Stream, +{ + inflight: VecDeque, + inflight_size: u64, + request_stream: S, +} + +async fn recover( + request_timeout: Duration, + inflight: &mut VecDeque, + inflight_size: &mut u64, + channel_input_tx: mpsc::Sender, + channel_ack_stream: &mut ServiceStreamingResponse, + output_tx: mpsc::Sender>, +) -> Result<(), ClientError> { + trace!( + inflight_len = inflight.len(), + inflight_bytes = inflight_size, + "recover" + ); + let mut recovery_index = 0; + let mut recovery_tx_finished = false; + + let timer = MuxTimer::<{ TimerEvent::VARIANT_COUNT }>::default(); + tokio::pin!(timer); + + while !inflight.is_empty() { + tokio::select! { + (event_ord, _) = &mut timer, if timer.is_armed() => { + match TimerEvent::from_ordinal(event_ord as i8).expect("valid event ordinal") { + TimerEvent::BatchDeadline => { + Err(ClientError::LocalDeadline("deadline for append acknowledgement hit".to_string()))? + } + _ => unreachable!("only batch deadline timer in recovery mode") + } + } + Ok(permit) = channel_input_tx.reserve(), if !recovery_tx_finished => { + match inflight.get(recovery_index) { + Some(batch) => { + timer.as_mut().fire_at(TimerEvent::BatchDeadline, batch.start + request_timeout, CoalesceMode::Earliest); + permit.send(batch.inner.clone()); + recovery_index += 1; + }, + None => recovery_tx_finished = true + } + }, + Some(ack) = channel_ack_stream.next() => { + let ack = ack?; + let n_acked_records = ack.end_seq_num - ack.start_seq_num; + let recovery_batch = inflight + .pop_front() + .expect("inflight should not be empty"); + assert_eq!( + n_acked_records as usize, + recovery_batch.inner.records.len(), + "number of acknowledged should equal amount in recovery batch" + ); + output_tx + .send(Ok(ack)) + .await + .map_err(|_| ClientError::LostUser)?; + + // Adjust next timer. + match inflight.front() { + Some(batch) => timer.as_mut().fire_at( + TimerEvent::BatchDeadline, + batch.start + request_timeout, + CoalesceMode::Latest + ), + None => timer.as_mut().cancel(TimerEvent::BatchDeadline), + }; + + *inflight_size -= recovery_batch.metered_size.0; + recovery_index -= 1; + } + } + } + Ok(()) +} + +async fn session_inner( + state: Arc>>, + frame_signal: FrameSignal, stream_client: StreamClient, - input: S, output_tx: mpsc::Sender>, -) where +) -> Result<(), ClientError> +where S: 'static + Send + Unpin + futures::Stream, { - let mut resp = stream_client - .inner - .send(AppendSessionServiceRequest::new( - stream_client.inner.stream_service_client(), - &stream_client.stream, - input, - )) - .await - .unwrap(); - - while let Some(x) = resp.next().await { - let _ = output_tx.send(x).await; + let mut lock = state.lock().await; + let AppendState { + ref mut inflight, + ref mut inflight_size, + ref mut request_stream, + } = lock.deref_mut(); + + assert!(inflight.len() <= stream_client.inner.config.max_append_batches_inflight); + + let (input_tx, mut ack_stream) = connect(&stream_client, frame_signal.clone()).await?; + let batch_ack_deadline = stream_client.inner.config.request_timeout; + + // Recovery. + if !inflight.is_empty() { + recover( + batch_ack_deadline, + inflight, + inflight_size, + input_tx.clone(), + &mut ack_stream, + output_tx.clone(), + ) + .await?; + frame_signal.reset(); + + assert_eq!(inflight.len(), 0); + assert_eq!(*inflight_size, 0); + trace!("recovery finished"); + } + + let timer = MuxTimer::<{ TimerEvent::VARIANT_COUNT }>::default(); + tokio::pin!(timer); + + + let mut input_terminated = false; + while !input_terminated { + tokio::select! { + (event_ord, deadline) = &mut timer, + if timer.is_armed() + => { + match TimerEvent::from_ordinal(event_ord as i8).expect("valid event ordinal") { + TimerEvent::MetricUpdate => { + //TODO + } + TimerEvent::BatchDeadline => { + let first_batch_start = inflight.front().map(|s| s.start); + warn!(?deadline, ?first_batch_start, "hitting batch deadline!"); + Err(ClientError::LocalDeadline("deadline for append acknowledgement hit".to_string()))? + } + } + + } + client_input = request_stream.next(), + if !input_terminated && inflight.len() < stream_client.inner.config.max_append_batches_inflight + => { + match client_input { + None => input_terminated = true, + Some(append_input) => { + let metered_size = append_input.metered_size(); + *inflight_size += metered_size.0; + let enqueue_time = Instant::now(); + inflight.push_back(InflightBatch { + start: enqueue_time, + metered_size, + inner: append_input.clone() + }); + timer.as_mut().fire_at(TimerEvent::BatchDeadline, enqueue_time + batch_ack_deadline, CoalesceMode::Earliest); + input_tx.send(append_input) + .await + .map_err(|_| ClientError::Service(tonic::Status::unavailable("frontend input_tx disconnected")))?; + } + } + }, + ack = ack_stream.next() => { + match ack { + Some(ack) => { + let ack = ack?; + let n_acked_records = ack.end_seq_num - ack.start_seq_num; + let corresponding_batch = inflight.pop_front() + .expect("inflight should not be empty"); + assert_eq!( + n_acked_records as usize, + corresponding_batch.inner.records.len(), + "number of acknowledged should equal amount in recovery batch" + ); + output_tx.send(Ok(ack)).await.map_err(|_| ClientError::LostUser)?; + + if inflight.is_empty() { + frame_signal.reset() + } + + // Adjust next timer. + match inflight.front() { + Some(batch) => timer.as_mut().fire_at( + TimerEvent::BatchDeadline, + batch.start + batch_ack_deadline, + CoalesceMode::Latest + ), + None => timer.as_mut().cancel(TimerEvent::BatchDeadline), + }; + + *inflight_size -= corresponding_batch.metered_size.0; + } + None => break, + } + }, + else => { + break; + } + } } + + assert_eq!(inflight.len(), 0); + assert_eq!(*inflight_size, 0); + + Ok(()) } -pub async fn append_session( - stream_client: &StreamClient, - req: S, -) -> Result, ClientError> -where +pub(crate) async fn manage_session( + stream_client: StreamClient, + input: S, + output_tx: mpsc::Sender>, +) where S: 'static + Send + Unpin + futures::Stream, { - let (response_tx, response_rx) = mpsc::channel(10); - let _ = tokio::spawn(run(stream_client.clone(), req, response_tx)); + let state = Arc::new(Mutex::new(AppendState { + inflight: Default::default(), + inflight_size: Default::default(), + request_stream: input, + })); - let s = ReceiverStream::new(response_rx); + let frame_signal = FrameSignal::new(); + let mut attempts = 1; + let mut resp = session_inner( + state.clone(), + frame_signal.clone(), + stream_client.clone(), + output_tx.clone(), + ) + .await; - Ok(Box::pin(s)) + while let Err(e) = resp { + let now = Instant::now(); + let remaining_attempts = attempts < stream_client.inner.config.max_attempts; + let enough_time = { + state + .lock() + .await + .inflight + .front() + .map(|state| { + let next_deadline = state.start + stream_client.inner.config.request_timeout; + now + stream_client.inner.config.retry_backoff_duration < next_deadline + }) + .unwrap_or(true) + }; + let retryable_error = { + match &e { + ClientError::Service(status) => { + matches!( + status.code(), + tonic::Code::Unavailable + | tonic::Code::DeadlineExceeded + | tonic::Code::Unknown + ) + } + ClientError::Conversion(_) + | ClientError::LocalDeadline(_) + | ClientError::LostUser => false, + } + }; + let policy_compliant = { + match stream_client.inner.config.append_retry_policy { + AppendRetryPolicy::All => true, + AppendRetryPolicy::NoSideEffects => { + // If no request frame has been produced, we conclude that the failing append + // never left this host, so it is safe to retry. + !frame_signal.is_signalled() + } + } + }; - //Err(ClientError::Service(Status::internal(""))) + if remaining_attempts && enough_time && retryable_error && policy_compliant { + tokio::time::sleep(stream_client.inner.config.retry_backoff_duration).await; + attempts += 1; + resp = session_inner( + state.clone(), + frame_signal.clone(), + stream_client.clone(), + output_tx.clone(), + ) + .await; + } else { + trace!( + remaining_attempts, + enough_time, + retryable_error, + policy_compliant, + "not retrying" + ); + _ = output_tx.send(Err(e)).await; + return; + } + } } diff --git a/src/client.rs b/src/client.rs index e3216ee..e186524 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,13 +5,15 @@ use futures::StreamExt; use http::{uri::Authority, HeaderValue}; use hyper_util::client::legacy::connect::HttpConnector; use secrecy::SecretString; +use tokio::sync::mpsc; use sync_docs::sync_docs; use tokio::time::sleep; +use tokio_stream::wrappers::ReceiverStream; use tonic::{ metadata::AsciiMetadataValue, transport::{Channel, ClientTlsConfig, Endpoint}, }; -use tonic_side_effect::RequestFrameMonitor; +use tonic_side_effect::{FrameSignal, RequestFrameMonitor}; use crate::{ api::{ @@ -30,8 +32,8 @@ use crate::{ }, send_request, stream::{ - AppendServiceRequest, AppendSessionServiceRequest, CheckTailServiceRequest, - ReadServiceRequest, ReadSessionServiceRequest, ReadSessionStreamingResponse, + AppendServiceRequest, CheckTailServiceRequest, ReadServiceRequest, + ReadSessionServiceRequest, ReadSessionStreamingResponse, }, ServiceRequest, ServiceStreamingResponse, Streaming, }, @@ -97,6 +99,14 @@ pub struct S2Endpoints { pub basin: BasinEndpoint, } +#[derive(Debug, Clone)] +pub enum AppendRetryPolicy { + /// Retry all eligible failures. "At least once" semantics; duplicates are possible. + All, + /// Retry only failures with no side effects. "At most once" semantics. + NoSideEffects, +} + impl S2Endpoints { pub fn for_cloud(cloud: S2Cloud) -> Self { Self { @@ -179,10 +189,12 @@ pub struct ClientConfig { pub(crate) connection_timeout: Duration, pub(crate) request_timeout: Duration, pub(crate) user_agent: HeaderValue, + pub(crate) append_retry_policy: AppendRetryPolicy, #[cfg(feature = "connector")] pub(crate) uri_scheme: http::uri::Scheme, pub(crate) retry_backoff_duration: Duration, pub(crate) max_attempts: usize, + pub(crate) max_append_batches_inflight: usize, } impl ClientConfig { @@ -194,10 +206,12 @@ impl ClientConfig { connection_timeout: Duration::from_secs(3), request_timeout: Duration::from_secs(5), user_agent: "s2-sdk-rust".parse().expect("valid user agent"), + append_retry_policy: AppendRetryPolicy::All, #[cfg(feature = "connector")] uri_scheme: http::uri::Scheme::HTTPS, retry_backoff_duration: Duration::from_millis(100), max_attempts: 3, + max_append_batches_inflight: 1000, } } @@ -233,6 +247,29 @@ impl ClientConfig { } } + /// Retry policy for appends. + /// Only relevant if `max_retries > 1`. + /// Determines if appends follow "at most once" or "at least once" semantics. + /// Defaults to retries of all failures ("at least once"). + pub fn with_append_retry_policy( + self, + append_retry_policy: impl Into, + ) -> Self { + Self { + append_retry_policy: append_retry_policy.into(), + ..self + } + } + + /// Number of append batches, regardless of their size, that can be + /// inflight (pending acknowledgment) within an append session. + pub fn with_max_append_batches_inflight(self, max_append_batches_inflight: usize) -> Self { + Self { + max_append_batches_inflight, + ..self + } + } + /// URI scheme to use when connecting with a custom connector. Defaults to `https`. #[cfg(feature = "connector")] pub fn with_uri_scheme(self, uri_scheme: impl Into) -> Self { @@ -270,6 +307,10 @@ pub enum ClientError { Conversion(#[from] types::ConvertError), #[error(transparent)] Service(#[from] tonic::Status), + #[error("Deadline expired: {0}")] + LocalDeadline(String), + #[error("Client user lost")] + LostUser, } /// Client for account-level operations. @@ -538,9 +579,12 @@ impl StreamClient { &self, req: types::AppendInput, ) -> Result { + let frame_signal = FrameSignal::new(); self.inner .send_retryable(AppendServiceRequest::new( - self.inner.stream_service_client(), + self.inner + .frame_monitoring_stream_service_client(frame_signal.clone()), + frame_signal, &self.stream, req, )) @@ -548,6 +592,7 @@ impl StreamClient { } #[sync_docs] + #[allow(clippy::unused_async)] pub async fn append_session( &self, req: S, @@ -555,16 +600,10 @@ impl StreamClient { where S: 'static + Send + Unpin + futures::Stream, { - // probably want the machinery here actually - append_session::append_session(self, req).await - // self.inner - // .send(AppendSessionServiceRequest::new( - // self.inner.stream_service_client(), - // &self.stream, - // req, - // )) - // .await - // .map(|s| Box::pin(s) as _) + let (response_tx, response_rx) = mpsc::channel(10); + _ = tokio::spawn(append_session::manage_session(self.clone(), req, response_tx)); + + Ok(Box::pin(ReceiverStream::new(response_rx))) } } @@ -592,7 +631,7 @@ impl ClientKind { pub(crate) struct ClientInner { kind: ClientKind, channel: Channel, - config: ClientConfig, + pub(crate) config: ClientConfig, } impl ClientInner { @@ -682,7 +721,7 @@ impl ClientInner { .await } - async fn send_retryable( + pub(crate) async fn send_retryable( &self, service_req: T, ) -> Result { @@ -690,32 +729,29 @@ impl ClientInner { .await } - fn backoff_builder(&self) -> impl BackoffBuilder { + pub fn backoff_builder(&self) -> impl BackoffBuilder { ConstantBuilder::default() .with_delay(self.config.retry_backoff_duration) .with_max_times(self.config.max_attempts) .with_jitter() } - fn account_service_client(&self) -> AccountServiceClient { - AccountServiceClient::new(RequestFrameMonitor::new( - self.channel.clone(), - NO_FRAMES_TAG, - )) + fn account_service_client(&self) -> AccountServiceClient { + AccountServiceClient::new(self.channel.clone()) } - fn basin_service_client(&self) -> BasinServiceClient { - BasinServiceClient::new(RequestFrameMonitor::new( - self.channel.clone(), - NO_FRAMES_TAG, - )) + fn basin_service_client(&self) -> BasinServiceClient { + BasinServiceClient::new(self.channel.clone()) } - pub(crate) fn stream_service_client(&self) -> StreamServiceClient { - StreamServiceClient::new(RequestFrameMonitor::new( - self.channel.clone(), - NO_FRAMES_TAG, - )) + pub(crate) fn stream_service_client(&self) -> StreamServiceClient { + StreamServiceClient::new(self.channel.clone()) + } + pub(crate) fn frame_monitoring_stream_service_client( + &self, + frame_signal: FrameSignal, + ) -> StreamServiceClient { + StreamServiceClient::new(RequestFrameMonitor::new(self.channel.clone(), frame_signal)) } } diff --git a/src/service/account.rs b/src/service/account.rs index e794964..baf584b 100644 --- a/src/service/account.rs +++ b/src/service/account.rs @@ -1,6 +1,6 @@ use prost_types::method_options::IdempotencyLevel; +use tonic::transport::Channel; use tonic::IntoRequest; -use tonic_side_effect::RequestFrameMonitor; use super::ServiceRequest; use crate::{ @@ -10,15 +10,12 @@ use crate::{ #[derive(Debug, Clone)] pub struct CreateBasinServiceRequest { - client: AccountServiceClient, + client: AccountServiceClient, req: types::CreateBasinRequest, } impl CreateBasinServiceRequest { - pub fn new( - client: AccountServiceClient, - req: types::CreateBasinRequest, - ) -> Self { + pub fn new(client: AccountServiceClient, req: types::CreateBasinRequest) -> Self { Self { client, req } } } @@ -51,15 +48,12 @@ impl ServiceRequest for CreateBasinServiceRequest { #[derive(Debug, Clone)] pub struct ListBasinsServiceRequest { - client: AccountServiceClient, + client: AccountServiceClient, req: types::ListBasinsRequest, } impl ListBasinsServiceRequest { - pub fn new( - client: AccountServiceClient, - req: types::ListBasinsRequest, - ) -> Self { + pub fn new(client: AccountServiceClient, req: types::ListBasinsRequest) -> Self { Self { client, req } } } @@ -92,15 +86,12 @@ impl ServiceRequest for ListBasinsServiceRequest { #[derive(Debug, Clone)] pub struct DeleteBasinServiceRequest { - client: AccountServiceClient, + client: AccountServiceClient, req: types::DeleteBasinRequest, } impl DeleteBasinServiceRequest { - pub fn new( - client: AccountServiceClient, - req: types::DeleteBasinRequest, - ) -> Self { + pub fn new(client: AccountServiceClient, req: types::DeleteBasinRequest) -> Self { Self { client, req } } } @@ -133,12 +124,12 @@ impl ServiceRequest for DeleteBasinServiceRequest { #[derive(Debug, Clone)] pub struct GetBasinConfigServiceRequest { - client: AccountServiceClient, + client: AccountServiceClient, basin: types::BasinName, } impl GetBasinConfigServiceRequest { - pub fn new(client: AccountServiceClient, basin: types::BasinName) -> Self { + pub fn new(client: AccountServiceClient, basin: types::BasinName) -> Self { Self { client, basin } } } @@ -173,15 +164,12 @@ impl ServiceRequest for GetBasinConfigServiceRequest { #[derive(Debug, Clone)] pub struct ReconfigureBasinServiceRequest { - client: AccountServiceClient, + client: AccountServiceClient, req: types::ReconfigureBasinRequest, } impl ReconfigureBasinServiceRequest { - pub fn new( - client: AccountServiceClient, - req: types::ReconfigureBasinRequest, - ) -> Self { + pub fn new(client: AccountServiceClient, req: types::ReconfigureBasinRequest) -> Self { Self { client, req } } } diff --git a/src/service/basin.rs b/src/service/basin.rs index 779b9be..1746647 100644 --- a/src/service/basin.rs +++ b/src/service/basin.rs @@ -1,6 +1,6 @@ use prost_types::method_options::IdempotencyLevel; +use tonic::transport::Channel; use tonic::IntoRequest; -use tonic_side_effect::RequestFrameMonitor; use super::ServiceRequest; use crate::{ @@ -10,15 +10,12 @@ use crate::{ #[derive(Debug, Clone)] pub struct ListStreamsServiceRequest { - client: BasinServiceClient, + client: BasinServiceClient, req: types::ListStreamsRequest, } impl ListStreamsServiceRequest { - pub fn new( - client: BasinServiceClient, - req: types::ListStreamsRequest, - ) -> Self { + pub fn new(client: BasinServiceClient, req: types::ListStreamsRequest) -> Self { Self { client, req } } } @@ -51,12 +48,12 @@ impl ServiceRequest for ListStreamsServiceRequest { #[derive(Debug, Clone)] pub struct GetStreamConfigServiceRequest { - client: BasinServiceClient, + client: BasinServiceClient, stream: String, } impl GetStreamConfigServiceRequest { - pub fn new(client: BasinServiceClient, stream: impl Into) -> Self { + pub fn new(client: BasinServiceClient, stream: impl Into) -> Self { Self { client, stream: stream.into(), @@ -94,15 +91,12 @@ impl ServiceRequest for GetStreamConfigServiceRequest { #[derive(Debug, Clone)] pub struct CreateStreamServiceRequest { - client: BasinServiceClient, + client: BasinServiceClient, req: types::CreateStreamRequest, } impl CreateStreamServiceRequest { - pub fn new( - client: BasinServiceClient, - req: types::CreateStreamRequest, - ) -> Self { + pub fn new(client: BasinServiceClient, req: types::CreateStreamRequest) -> Self { Self { client, req } } } @@ -135,15 +129,12 @@ impl ServiceRequest for CreateStreamServiceRequest { #[derive(Debug, Clone)] pub struct DeleteStreamServiceRequest { - client: BasinServiceClient, + client: BasinServiceClient, req: types::DeleteStreamRequest, } impl DeleteStreamServiceRequest { - pub fn new( - client: BasinServiceClient, - req: types::DeleteStreamRequest, - ) -> Self { + pub fn new(client: BasinServiceClient, req: types::DeleteStreamRequest) -> Self { Self { client, req } } } @@ -176,15 +167,12 @@ impl ServiceRequest for DeleteStreamServiceRequest { #[derive(Debug, Clone)] pub struct ReconfigureStreamServiceRequest { - client: BasinServiceClient, + client: BasinServiceClient, req: types::ReconfigureStreamRequest, } impl ReconfigureStreamServiceRequest { - pub fn new( - client: BasinServiceClient, - req: types::ReconfigureStreamRequest, - ) -> Self { + pub fn new(client: BasinServiceClient, req: types::ReconfigureStreamRequest) -> Self { Self { client, req } } } diff --git a/src/service/stream.rs b/src/service/stream.rs index 3ed2eb2..ecaaa85 100644 --- a/src/service/stream.rs +++ b/src/service/stream.rs @@ -1,6 +1,7 @@ use prost_types::method_options::IdempotencyLevel; +use tonic::transport::Channel; use tonic::IntoRequest; -use tonic_side_effect::RequestFrameMonitor; +use tonic_side_effect::{FrameSignal, RequestFrameMonitor}; use super::{ ClientError, ServiceRequest, ServiceStreamingRequest, ServiceStreamingResponse, @@ -14,15 +15,12 @@ use crate::{ #[derive(Debug, Clone)] pub struct CheckTailServiceRequest { - client: StreamServiceClient, + client: StreamServiceClient, stream: String, } impl CheckTailServiceRequest { - pub fn new( - client: StreamServiceClient, - stream: impl Into, - ) -> Self { + pub fn new(client: StreamServiceClient, stream: impl Into) -> Self { Self { client, stream: stream.into(), @@ -60,14 +58,14 @@ impl ServiceRequest for CheckTailServiceRequest { #[derive(Debug, Clone)] pub struct ReadServiceRequest { - client: StreamServiceClient, + client: StreamServiceClient, stream: String, req: types::ReadRequest, } impl ReadServiceRequest { pub fn new( - client: StreamServiceClient, + client: StreamServiceClient, stream: impl Into, req: types::ReadRequest, ) -> Self { @@ -107,14 +105,14 @@ impl ServiceRequest for ReadServiceRequest { #[derive(Debug, Clone)] pub struct ReadSessionServiceRequest { - client: StreamServiceClient, + client: StreamServiceClient, stream: String, req: types::ReadSessionRequest, } impl ReadSessionServiceRequest { pub fn new( - client: StreamServiceClient, + client: StreamServiceClient, stream: impl Into, req: types::ReadSessionRequest, ) -> Self { @@ -176,6 +174,7 @@ impl StreamingResponse for ReadSessionStreamingResponse { #[derive(Debug, Clone)] pub struct AppendServiceRequest { client: StreamServiceClient, + frame_signal: FrameSignal, stream: String, req: types::AppendInput, } @@ -183,11 +182,13 @@ pub struct AppendServiceRequest { impl AppendServiceRequest { pub fn new( client: StreamServiceClient, + frame_signal: FrameSignal, stream: impl Into, req: types::AppendInput, ) -> Self { Self { client, + frame_signal, stream: stream.into(), req, } @@ -220,6 +221,10 @@ impl ServiceRequest for AppendServiceRequest { ) -> Result { resp.into_inner().try_into().map_err(Into::into) } + + fn should_retry(&self, _err: &ClientError) -> bool { + !self.frame_signal.is_signalled() + } } pub struct AppendSessionServiceRequest @@ -231,6 +236,19 @@ where req: Option, } +impl Clone for AppendSessionServiceRequest +where + S: Send + futures::Stream + Unpin + Clone, +{ + fn clone(&self) -> Self { + Self { + client: self.client.clone(), + stream: self.stream.clone(), + req: self.req.clone(), + } + } +} + impl AppendSessionServiceRequest where S: Send + futures::Stream + Unpin,