Skip to content

Commit

Permalink
reset the retry counter
Browse files Browse the repository at this point in the history
  • Loading branch information
sgbalogh committed Nov 24, 2024
1 parent ad22143 commit 9647bd6
Showing 1 changed file with 19 additions and 3 deletions.
22 changes: 19 additions & 3 deletions src/append_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ where
inflight: VecDeque<InflightBatch>,
inflight_size: u64,
request_stream: S,
last_acked_seqnum: Option<u64>,
}

async fn recover(
Expand All @@ -73,6 +74,7 @@ async fn recover(
inflight_size: &mut u64,
channel_input_tx: mpsc::Sender<types::AppendInput>,
channel_ack_stream: &mut ServiceStreamingResponse<AppendSessionStreamingResponse>,
last_acked_seqnum: &mut Option<u64>,
output_tx: mpsc::Sender<Result<types::AppendOutput, ClientError>>,
) -> Result<(), ClientError> {
trace!(
Expand Down Expand Up @@ -116,6 +118,7 @@ async fn recover(
recovery_batch.inner.records.len(),
"number of acknowledged records should equal amount in first recovery batch"
);
*last_acked_seqnum = Some(ack.end_seq_num);
output_tx
.send(Ok(ack))
.await
Expand Down Expand Up @@ -150,9 +153,10 @@ where
{
let mut lock = state.lock().await;
let AppendState {
ref mut inflight,
ref mut inflight_size,
ref mut request_stream,
inflight,
inflight_size,
request_stream,
last_acked_seqnum,
} = lock.deref_mut();

assert!(inflight.len() <= stream_client.inner.config.max_append_batches_inflight);
Expand All @@ -166,6 +170,7 @@ where
inflight_size,
input_tx.clone(),
&mut ack_stream,
last_acked_seqnum,
output_tx.clone(),
)
.await?;
Expand Down Expand Up @@ -223,6 +228,7 @@ where
corresponding_batch.inner.records.len(),
"number of acknowledged records should equal amount in first inflight batch"
);
*last_acked_seqnum = Some(ack.end_seq_num);
output_tx.send(Ok(ack)).await.map_err(|_| ClientError::SdkClientDisconnected)?;

if inflight.is_empty() {
Expand Down Expand Up @@ -264,6 +270,7 @@ pub(crate) async fn manage_session<S>(
inflight: Default::default(),
inflight_size: Default::default(),
request_stream: input,
last_acked_seqnum: None,
}));

let frame_signal = FrameSignal::new();
Expand All @@ -276,7 +283,15 @@ pub(crate) async fn manage_session<S>(
)
.await;

let mut last_acked_seqnum: Option<u64> = None;
while let Err(e) = resp {
let new_last_acked_seqnum = state.lock().await.last_acked_seqnum;
if last_acked_seqnum != new_last_acked_seqnum {
// Progress has been made during the last attempt, so reset the retry counter.
last_acked_seqnum = new_last_acked_seqnum;
attempts = 1;
}

let now = Instant::now();
let remaining_attempts = attempts < stream_client.inner.config.max_attempts;
let enough_time = {
Expand Down Expand Up @@ -320,6 +335,7 @@ pub(crate) async fn manage_session<S>(
if remaining_attempts && enough_time && retryable_error && policy_compliant {
tokio::time::sleep(stream_client.inner.config.retry_backoff_duration).await;
attempts += 1;
trace!(attempts, ?e, "retrying");
resp = session_inner(
state.clone(),
frame_signal.clone(),
Expand Down

0 comments on commit 9647bd6

Please sign in to comment.