Skip to content

Commit

Permalink
working
Browse files Browse the repository at this point in the history
  • Loading branch information
sgbalogh committed Nov 23, 2024
1 parent 5b9bfa2 commit ad22143
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 88 deletions.
101 changes: 43 additions & 58 deletions src/append_session.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::client::{AppendRetryPolicy, ClientError, StreamClient};
use crate::service::stream::{AppendSessionServiceRequest, AppendSessionStreamingResponse};
use crate::service::ServiceStreamingResponse;
use crate::types::MeteredSize;
use crate::types;
use crate::types::MeteredSize;
use bytesize::ByteSize;
use enum_ordinalize::Ordinalize;
use futures::StreamExt;
Expand All @@ -15,7 +15,7 @@ use tokio::time::Instant;
use tokio_muxt::{CoalesceMode, MuxTimer};
use tokio_stream::wrappers::ReceiverStream;
use tonic_side_effect::FrameSignal;
use tracing::{trace, warn};
use tracing::trace;

async fn connect(
stream_client: &StreamClient,
Expand All @@ -27,7 +27,6 @@ async fn connect(
),
ClientError,
> {
// Signal can be reset as we are creating a new connection anyway.
frame_signal.reset();
let (input_tx, input_rx) = mpsc::channel(10);
let service_req = AppendSessionServiceRequest::new(
Expand Down Expand Up @@ -83,7 +82,6 @@ async fn recover(
);
let mut recovery_index = 0;
let mut recovery_tx_finished = false;

let timer = MuxTimer::<{ TimerEvent::VARIANT_COUNT }>::default();
tokio::pin!(timer);

Expand Down Expand Up @@ -116,12 +114,12 @@ async fn recover(
assert_eq!(
n_acked_records as usize,
recovery_batch.inner.records.len(),
"number of acknowledged should equal amount in recovery batch"
"number of acknowledged records should equal amount in first recovery batch"
);
output_tx
.send(Ok(ack))
.await
.map_err(|_| ClientError::LostUser)?;
.map_err(|_| ClientError::SdkClientDisconnected)?;

// Adjust next timer.
match inflight.front() {
Expand Down Expand Up @@ -158,11 +156,9 @@ where
} = lock.deref_mut();

assert!(inflight.len() <= stream_client.inner.config.max_append_batches_inflight);

let (input_tx, mut ack_stream) = connect(&stream_client, frame_signal.clone()).await?;
let batch_ack_deadline = stream_client.inner.config.request_timeout;

// Recovery.
if !inflight.is_empty() {
recover(
batch_ack_deadline,
Expand All @@ -173,95 +169,84 @@ where
output_tx.clone(),
)
.await?;
frame_signal.reset();

assert_eq!(inflight.len(), 0);
assert_eq!(*inflight_size, 0);
frame_signal.reset();

trace!("recovery finished");
}

let timer = MuxTimer::<{ TimerEvent::VARIANT_COUNT }>::default();
tokio::pin!(timer);


let mut input_terminated = false;

while !input_terminated {
tokio::select! {
(event_ord, deadline) = &mut timer,
(event_ord, _deadline) = &mut timer,
if timer.is_armed()
=> {
match TimerEvent::from_ordinal(event_ord as i8).expect("valid event ordinal") {
TimerEvent::MetricUpdate => {
//TODO
}
TimerEvent::BatchDeadline => {
let first_batch_start = inflight.front().map(|s| s.start);
warn!(?deadline, ?first_batch_start, "hitting batch deadline!");
// TODO
TimerEvent::MetricUpdate => {}
TimerEvent::BatchDeadline =>
Err(ClientError::LocalDeadline("deadline for append acknowledgement hit".to_string()))?
}
}

}
client_input = request_stream.next(),
if !input_terminated && inflight.len() < stream_client.inner.config.max_append_batches_inflight
=> {
match client_input {
None => input_terminated = true,
Some(append_input) => {
let metered_size = append_input.metered_size();
*inflight_size += metered_size.0;
let enqueue_time = Instant::now();
let start = Instant::now();
inflight.push_back(InflightBatch {
start: enqueue_time,
start,
metered_size,
inner: append_input.clone()
});
timer.as_mut().fire_at(TimerEvent::BatchDeadline, enqueue_time + batch_ack_deadline, CoalesceMode::Earliest);
timer.as_mut().fire_at(TimerEvent::BatchDeadline, start + batch_ack_deadline, CoalesceMode::Earliest);
input_tx.send(append_input)
.await
.map_err(|_| ClientError::Service(tonic::Status::unavailable("frontend input_tx disconnected")))?;
}
None => input_terminated = true,
}
},
ack = ack_stream.next() => {
match ack {
Some(ack) => {
let ack = ack?;
let n_acked_records = ack.end_seq_num - ack.start_seq_num;
let corresponding_batch = inflight.pop_front()
.expect("inflight should not be empty");
assert_eq!(
n_acked_records as usize,
corresponding_batch.inner.records.len(),
"number of acknowledged should equal amount in recovery batch"
);
output_tx.send(Ok(ack)).await.map_err(|_| ClientError::LostUser)?;
Some(ack) = ack_stream.next() => {
let ack = ack?;
let n_acked_records = ack.end_seq_num - ack.start_seq_num;
let corresponding_batch = inflight.pop_front()
.expect("inflight should not be empty");
assert_eq!(
n_acked_records as usize,
corresponding_batch.inner.records.len(),
"number of acknowledged records should equal amount in first inflight batch"
);
output_tx.send(Ok(ack)).await.map_err(|_| ClientError::SdkClientDisconnected)?;

if inflight.is_empty() {
frame_signal.reset()
}
if inflight.is_empty() {
frame_signal.reset()
}

// Adjust next timer.
match inflight.front() {
Some(batch) => timer.as_mut().fire_at(
TimerEvent::BatchDeadline,
batch.start + batch_ack_deadline,
CoalesceMode::Latest
),
None => timer.as_mut().cancel(TimerEvent::BatchDeadline),
};
// Adjust next timer.
match inflight.front() {
Some(batch) => timer.as_mut().fire_at(
TimerEvent::BatchDeadline,
batch.start + batch_ack_deadline,
CoalesceMode::Latest
),
None => timer.as_mut().cancel(TimerEvent::BatchDeadline),
};

*inflight_size -= corresponding_batch.metered_size.0;
}
None => break,
}
*inflight_size -= corresponding_batch.metered_size.0;
},
else => {
break;
}

else => break,
}
}

assert!(input_terminated);
assert_eq!(inflight.len(), 0);
assert_eq!(*inflight_size, 0);

Expand Down Expand Up @@ -318,7 +303,7 @@ pub(crate) async fn manage_session<S>(
}
ClientError::Conversion(_)
| ClientError::LocalDeadline(_)
| ClientError::LostUser => false,
| ClientError::SdkClientDisconnected => false,
}
};
let policy_compliant = {
Expand Down
35 changes: 26 additions & 9 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use futures::StreamExt;
use http::{uri::Authority, HeaderValue};
use hyper_util::client::legacy::connect::HttpConnector;
use secrecy::SecretString;
use tokio::sync::mpsc;
use sync_docs::sync_docs;
use tokio::sync::mpsc;
use tokio::time::sleep;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{
Expand Down Expand Up @@ -41,7 +41,6 @@ use crate::{
};

const DEFAULT_CONNECTOR: Option<HttpConnector> = None;
pub const NO_FRAMES_TAG: &str = "s2-sdk-no-request-frames";

/// S2 cloud environment to connect with.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -101,9 +100,18 @@ pub struct S2Endpoints {

#[derive(Debug, Clone)]
pub enum AppendRetryPolicy {
/// Retry all eligible failures. "At least once" semantics; duplicates are possible.
/// Retry all eligible failures encountered during an append.
///
/// This corresponds to "at least once" record durability semantics,
/// as duplicates are possible.
All,
/// Retry only failures with no side effects. "At most once" semantics.

/// Retry only failures with no side effects.
///
/// Will not attempt to retry failures that could have resulted in an append becoming durable,
/// in order to prevent duplicates.
///
/// A successful append signifies "exactly once" record durability.
NoSideEffects,
}

Expand Down Expand Up @@ -249,8 +257,8 @@ impl ClientConfig {

/// Retry policy for appends.
/// Only relevant if `max_retries > 1`.
/// Determines if appends follow "at most once" or "at least once" semantics.
/// Defaults to retries of all failures ("at least once").
///
/// Defaults to retries of all failures ("at least once" semantics).
pub fn with_append_retry_policy(
self,
append_retry_policy: impl Into<AppendRetryPolicy>,
Expand All @@ -263,6 +271,8 @@ impl ClientConfig {

/// Number of append batches, regardless of their size, that can be
/// inflight (pending acknowledgment) within an append session.
///
/// Defaults to 1000.
pub fn with_max_append_batches_inflight(self, max_append_batches_inflight: usize) -> Self {
Self {
max_append_batches_inflight,
Expand Down Expand Up @@ -309,8 +319,8 @@ pub enum ClientError {
Service(#[from] tonic::Status),
#[error("Deadline expired: {0}")]
LocalDeadline(String),
#[error("Client user lost")]
LostUser,
#[error("SDK client disconnected")]
SdkClientDisconnected,
}

/// Client for account-level operations.
Expand Down Expand Up @@ -584,6 +594,7 @@ impl StreamClient {
.send_retryable(AppendServiceRequest::new(
self.inner
.frame_monitoring_stream_service_client(frame_signal.clone()),
self.inner.config.append_retry_policy.clone(),
frame_signal,
&self.stream,
req,
Expand All @@ -601,7 +612,11 @@ impl StreamClient {
S: 'static + Send + Unpin + futures::Stream<Item = types::AppendInput>,
{
let (response_tx, response_rx) = mpsc::channel(10);
_ = tokio::spawn(append_session::manage_session(self.clone(), req, response_tx));
_ = tokio::spawn(append_session::manage_session(
self.clone(),
req,
response_tx,
));

Ok(Box::pin(ReceiverStream::new(response_rx)))
}
Expand Down Expand Up @@ -661,6 +676,8 @@ impl ClientInner {
.assume_http2(true),
)
.expect("valid TLS config")
.http2_adaptive_window(true)
// TODO tcp and http2 keepalive settings
.connect_timeout(config.connection_timeout)
.timeout(config.request_timeout);

Expand Down
7 changes: 1 addition & 6 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use prost_types::method_options::IdempotencyLevel;
use secrecy::{ExposeSecret, SecretString};
use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue, MetadataMap};

use crate::client::NO_FRAMES_TAG;
use crate::{client::ClientError, types};

pub async fn send_request<T: ServiceRequest>(
Expand Down Expand Up @@ -76,11 +75,7 @@ pub trait ServiceRequest {
/// Return true if the request should be retried based on the error returned.
fn should_retry(&self, err: &ClientError) -> bool {
if Self::IDEMPOTENCY_LEVEL == IdempotencyLevel::IdempotencyUnknown {
return if let ClientError::Service(status) = err {
status.metadata().contains_key(NO_FRAMES_TAG)
} else {
false
};
return false;
};

// The request is definitely idempotent.
Expand Down
33 changes: 18 additions & 15 deletions src/service/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::{
StreamingRequest, StreamingResponse,
};

use crate::client::AppendRetryPolicy;
use crate::{
api::{self, stream_service_client::StreamServiceClient},
types,
Expand Down Expand Up @@ -174,6 +175,7 @@ impl StreamingResponse for ReadSessionStreamingResponse {
#[derive(Debug, Clone)]
pub struct AppendServiceRequest {
client: StreamServiceClient<RequestFrameMonitor>,
append_retry_policy: AppendRetryPolicy,
frame_signal: FrameSignal,
stream: String,
req: types::AppendInput,
Expand All @@ -182,12 +184,14 @@ pub struct AppendServiceRequest {
impl AppendServiceRequest {
pub fn new(
client: StreamServiceClient<RequestFrameMonitor>,
append_retry_policy: AppendRetryPolicy,
frame_signal: FrameSignal,
stream: impl Into<String>,
req: types::AppendInput,
) -> Self {
Self {
client,
append_retry_policy,
frame_signal,
stream: stream.into(),
req,
Expand Down Expand Up @@ -222,8 +226,20 @@ impl ServiceRequest for AppendServiceRequest {
resp.into_inner().try_into().map_err(Into::into)
}

fn should_retry(&self, _err: &ClientError) -> bool {
!self.frame_signal.is_signalled()
fn should_retry(&self, err: &ClientError) -> bool {
if let ClientError::Service(status) = err {
let retryable_error = matches!(
status.code(),
tonic::Code::Unavailable | tonic::Code::DeadlineExceeded | tonic::Code::Unknown
);
let policy_compliant = match self.append_retry_policy {
AppendRetryPolicy::All => true,
AppendRetryPolicy::NoSideEffects => !self.frame_signal.is_signalled(),
};
retryable_error && policy_compliant
} else {
false
}
}
}

Expand All @@ -236,19 +252,6 @@ where
req: Option<S>,
}

impl<S> Clone for AppendSessionServiceRequest<S>
where
S: Send + futures::Stream<Item = types::AppendInput> + Unpin + Clone,
{
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
stream: self.stream.clone(),
req: self.req.clone(),
}
}
}

impl<S> AppendSessionServiceRequest<S>
where
S: Send + futures::Stream<Item = types::AppendInput> + Unpin,
Expand Down

0 comments on commit ad22143

Please sign in to comment.