From ad221437e6ef363410cf037d8b53643a800be6bb Mon Sep 17 00:00:00 2001 From: Stephen Balogh Date: Sat, 23 Nov 2024 11:13:37 -0800 Subject: [PATCH] working --- src/append_session.rs | 101 ++++++++++++++++++------------------------ src/client.rs | 35 +++++++++++---- src/service.rs | 7 +-- src/service/stream.rs | 33 +++++++------- 4 files changed, 88 insertions(+), 88 deletions(-) diff --git a/src/append_session.rs b/src/append_session.rs index 0c5043d..8cfa278 100644 --- a/src/append_session.rs +++ b/src/append_session.rs @@ -1,8 +1,8 @@ use crate::client::{AppendRetryPolicy, ClientError, StreamClient}; use crate::service::stream::{AppendSessionServiceRequest, AppendSessionStreamingResponse}; use crate::service::ServiceStreamingResponse; -use crate::types::MeteredSize; use crate::types; +use crate::types::MeteredSize; use bytesize::ByteSize; use enum_ordinalize::Ordinalize; use futures::StreamExt; @@ -15,7 +15,7 @@ use tokio::time::Instant; use tokio_muxt::{CoalesceMode, MuxTimer}; use tokio_stream::wrappers::ReceiverStream; use tonic_side_effect::FrameSignal; -use tracing::{trace, warn}; +use tracing::trace; async fn connect( stream_client: &StreamClient, @@ -27,7 +27,6 @@ async fn connect( ), ClientError, > { - // Signal can be reset as we are creating a new connection anyway. frame_signal.reset(); let (input_tx, input_rx) = mpsc::channel(10); let service_req = AppendSessionServiceRequest::new( @@ -83,7 +82,6 @@ async fn recover( ); let mut recovery_index = 0; let mut recovery_tx_finished = false; - let timer = MuxTimer::<{ TimerEvent::VARIANT_COUNT }>::default(); tokio::pin!(timer); @@ -116,12 +114,12 @@ async fn recover( assert_eq!( n_acked_records as usize, recovery_batch.inner.records.len(), - "number of acknowledged should equal amount in recovery batch" + "number of acknowledged records should equal amount in first recovery batch" ); output_tx .send(Ok(ack)) .await - .map_err(|_| ClientError::LostUser)?; + .map_err(|_| ClientError::SdkClientDisconnected)?; // Adjust next timer. match inflight.front() { @@ -158,11 +156,9 @@ where } = lock.deref_mut(); assert!(inflight.len() <= stream_client.inner.config.max_append_batches_inflight); - let (input_tx, mut ack_stream) = connect(&stream_client, frame_signal.clone()).await?; let batch_ack_deadline = stream_client.inner.config.request_timeout; - // Recovery. if !inflight.is_empty() { recover( batch_ack_deadline, @@ -173,95 +169,84 @@ where output_tx.clone(), ) .await?; - frame_signal.reset(); assert_eq!(inflight.len(), 0); assert_eq!(*inflight_size, 0); + frame_signal.reset(); + trace!("recovery finished"); } - let timer = MuxTimer::<{ TimerEvent::VARIANT_COUNT }>::default(); tokio::pin!(timer); - - let mut input_terminated = false; + while !input_terminated { tokio::select! { - (event_ord, deadline) = &mut timer, + (event_ord, _deadline) = &mut timer, if timer.is_armed() => { match TimerEvent::from_ordinal(event_ord as i8).expect("valid event ordinal") { - TimerEvent::MetricUpdate => { - //TODO - } - TimerEvent::BatchDeadline => { - let first_batch_start = inflight.front().map(|s| s.start); - warn!(?deadline, ?first_batch_start, "hitting batch deadline!"); + // TODO + TimerEvent::MetricUpdate => {} + TimerEvent::BatchDeadline => Err(ClientError::LocalDeadline("deadline for append acknowledgement hit".to_string()))? - } } - } client_input = request_stream.next(), if !input_terminated && inflight.len() < stream_client.inner.config.max_append_batches_inflight => { match client_input { - None => input_terminated = true, Some(append_input) => { let metered_size = append_input.metered_size(); *inflight_size += metered_size.0; - let enqueue_time = Instant::now(); + let start = Instant::now(); inflight.push_back(InflightBatch { - start: enqueue_time, + start, metered_size, inner: append_input.clone() }); - timer.as_mut().fire_at(TimerEvent::BatchDeadline, enqueue_time + batch_ack_deadline, CoalesceMode::Earliest); + timer.as_mut().fire_at(TimerEvent::BatchDeadline, start + batch_ack_deadline, CoalesceMode::Earliest); input_tx.send(append_input) .await .map_err(|_| ClientError::Service(tonic::Status::unavailable("frontend input_tx disconnected")))?; } + None => input_terminated = true, } }, - ack = ack_stream.next() => { - match ack { - Some(ack) => { - let ack = ack?; - let n_acked_records = ack.end_seq_num - ack.start_seq_num; - let corresponding_batch = inflight.pop_front() - .expect("inflight should not be empty"); - assert_eq!( - n_acked_records as usize, - corresponding_batch.inner.records.len(), - "number of acknowledged should equal amount in recovery batch" - ); - output_tx.send(Ok(ack)).await.map_err(|_| ClientError::LostUser)?; + Some(ack) = ack_stream.next() => { + let ack = ack?; + let n_acked_records = ack.end_seq_num - ack.start_seq_num; + let corresponding_batch = inflight.pop_front() + .expect("inflight should not be empty"); + assert_eq!( + n_acked_records as usize, + corresponding_batch.inner.records.len(), + "number of acknowledged records should equal amount in first inflight batch" + ); + output_tx.send(Ok(ack)).await.map_err(|_| ClientError::SdkClientDisconnected)?; - if inflight.is_empty() { - frame_signal.reset() - } + if inflight.is_empty() { + frame_signal.reset() + } - // Adjust next timer. - match inflight.front() { - Some(batch) => timer.as_mut().fire_at( - TimerEvent::BatchDeadline, - batch.start + batch_ack_deadline, - CoalesceMode::Latest - ), - None => timer.as_mut().cancel(TimerEvent::BatchDeadline), - }; + // Adjust next timer. + match inflight.front() { + Some(batch) => timer.as_mut().fire_at( + TimerEvent::BatchDeadline, + batch.start + batch_ack_deadline, + CoalesceMode::Latest + ), + None => timer.as_mut().cancel(TimerEvent::BatchDeadline), + }; - *inflight_size -= corresponding_batch.metered_size.0; - } - None => break, - } + *inflight_size -= corresponding_batch.metered_size.0; }, - else => { - break; - } + + else => break, } } + assert!(input_terminated); assert_eq!(inflight.len(), 0); assert_eq!(*inflight_size, 0); @@ -318,7 +303,7 @@ pub(crate) async fn manage_session( } ClientError::Conversion(_) | ClientError::LocalDeadline(_) - | ClientError::LostUser => false, + | ClientError::SdkClientDisconnected => false, } }; let policy_compliant = { diff --git a/src/client.rs b/src/client.rs index e186524..05c4be7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,8 +5,8 @@ use futures::StreamExt; use http::{uri::Authority, HeaderValue}; use hyper_util::client::legacy::connect::HttpConnector; use secrecy::SecretString; -use tokio::sync::mpsc; use sync_docs::sync_docs; +use tokio::sync::mpsc; use tokio::time::sleep; use tokio_stream::wrappers::ReceiverStream; use tonic::{ @@ -41,7 +41,6 @@ use crate::{ }; const DEFAULT_CONNECTOR: Option = None; -pub const NO_FRAMES_TAG: &str = "s2-sdk-no-request-frames"; /// S2 cloud environment to connect with. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -101,9 +100,18 @@ pub struct S2Endpoints { #[derive(Debug, Clone)] pub enum AppendRetryPolicy { - /// Retry all eligible failures. "At least once" semantics; duplicates are possible. + /// Retry all eligible failures encountered during an append. + /// + /// This corresponds to "at least once" record durability semantics, + /// as duplicates are possible. All, - /// Retry only failures with no side effects. "At most once" semantics. + + /// Retry only failures with no side effects. + /// + /// Will not attempt to retry failures that could have resulted in an append becoming durable, + /// in order to prevent duplicates. + /// + /// A successful append signifies "exactly once" record durability. NoSideEffects, } @@ -249,8 +257,8 @@ impl ClientConfig { /// Retry policy for appends. /// Only relevant if `max_retries > 1`. - /// Determines if appends follow "at most once" or "at least once" semantics. - /// Defaults to retries of all failures ("at least once"). + /// + /// Defaults to retries of all failures ("at least once" semantics). pub fn with_append_retry_policy( self, append_retry_policy: impl Into, @@ -263,6 +271,8 @@ impl ClientConfig { /// Number of append batches, regardless of their size, that can be /// inflight (pending acknowledgment) within an append session. + /// + /// Defaults to 1000. pub fn with_max_append_batches_inflight(self, max_append_batches_inflight: usize) -> Self { Self { max_append_batches_inflight, @@ -309,8 +319,8 @@ pub enum ClientError { Service(#[from] tonic::Status), #[error("Deadline expired: {0}")] LocalDeadline(String), - #[error("Client user lost")] - LostUser, + #[error("SDK client disconnected")] + SdkClientDisconnected, } /// Client for account-level operations. @@ -584,6 +594,7 @@ impl StreamClient { .send_retryable(AppendServiceRequest::new( self.inner .frame_monitoring_stream_service_client(frame_signal.clone()), + self.inner.config.append_retry_policy.clone(), frame_signal, &self.stream, req, @@ -601,7 +612,11 @@ impl StreamClient { S: 'static + Send + Unpin + futures::Stream, { let (response_tx, response_rx) = mpsc::channel(10); - _ = tokio::spawn(append_session::manage_session(self.clone(), req, response_tx)); + _ = tokio::spawn(append_session::manage_session( + self.clone(), + req, + response_tx, + )); Ok(Box::pin(ReceiverStream::new(response_rx))) } @@ -661,6 +676,8 @@ impl ClientInner { .assume_http2(true), ) .expect("valid TLS config") + .http2_adaptive_window(true) + // TODO tcp and http2 keepalive settings .connect_timeout(config.connection_timeout) .timeout(config.request_timeout); diff --git a/src/service.rs b/src/service.rs index 34635c5..a5b4204 100644 --- a/src/service.rs +++ b/src/service.rs @@ -12,7 +12,6 @@ use prost_types::method_options::IdempotencyLevel; use secrecy::{ExposeSecret, SecretString}; use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue, MetadataMap}; -use crate::client::NO_FRAMES_TAG; use crate::{client::ClientError, types}; pub async fn send_request( @@ -76,11 +75,7 @@ pub trait ServiceRequest { /// Return true if the request should be retried based on the error returned. fn should_retry(&self, err: &ClientError) -> bool { if Self::IDEMPOTENCY_LEVEL == IdempotencyLevel::IdempotencyUnknown { - return if let ClientError::Service(status) = err { - status.metadata().contains_key(NO_FRAMES_TAG) - } else { - false - }; + return false; }; // The request is definitely idempotent. diff --git a/src/service/stream.rs b/src/service/stream.rs index ecaaa85..8112449 100644 --- a/src/service/stream.rs +++ b/src/service/stream.rs @@ -8,6 +8,7 @@ use super::{ StreamingRequest, StreamingResponse, }; +use crate::client::AppendRetryPolicy; use crate::{ api::{self, stream_service_client::StreamServiceClient}, types, @@ -174,6 +175,7 @@ impl StreamingResponse for ReadSessionStreamingResponse { #[derive(Debug, Clone)] pub struct AppendServiceRequest { client: StreamServiceClient, + append_retry_policy: AppendRetryPolicy, frame_signal: FrameSignal, stream: String, req: types::AppendInput, @@ -182,12 +184,14 @@ pub struct AppendServiceRequest { impl AppendServiceRequest { pub fn new( client: StreamServiceClient, + append_retry_policy: AppendRetryPolicy, frame_signal: FrameSignal, stream: impl Into, req: types::AppendInput, ) -> Self { Self { client, + append_retry_policy, frame_signal, stream: stream.into(), req, @@ -222,8 +226,20 @@ impl ServiceRequest for AppendServiceRequest { resp.into_inner().try_into().map_err(Into::into) } - fn should_retry(&self, _err: &ClientError) -> bool { - !self.frame_signal.is_signalled() + fn should_retry(&self, err: &ClientError) -> bool { + if let ClientError::Service(status) = err { + let retryable_error = matches!( + status.code(), + tonic::Code::Unavailable | tonic::Code::DeadlineExceeded | tonic::Code::Unknown + ); + let policy_compliant = match self.append_retry_policy { + AppendRetryPolicy::All => true, + AppendRetryPolicy::NoSideEffects => !self.frame_signal.is_signalled(), + }; + retryable_error && policy_compliant + } else { + false + } } } @@ -236,19 +252,6 @@ where req: Option, } -impl Clone for AppendSessionServiceRequest -where - S: Send + futures::Stream + Unpin + Clone, -{ - fn clone(&self) -> Self { - Self { - client: self.client.clone(), - stream: self.stream.clone(), - req: self.req.clone(), - } - } -} - impl AppendSessionServiceRequest where S: Send + futures::Stream + Unpin,