Skip to content

Commit

Permalink
loop
Browse files Browse the repository at this point in the history
  • Loading branch information
sgbalogh committed Nov 25, 2024
1 parent 1c9e5a6 commit 091fc3f
Showing 1 changed file with 67 additions and 69 deletions.
136 changes: 67 additions & 69 deletions src/append_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,79 +304,77 @@ pub(crate) async fn manage_session<S>(

let frame_signal = FrameSignal::new();
let mut attempts = 1;
let mut resp = session_inner(
state.clone(),
frame_signal.clone(),
stream_client.clone(),
output_tx.clone(),
)
.await;

let mut last_acked_seqnum: Option<RangeTo<u64>> = None;
while let Err(e) = resp {
let new_last_acked_seqnum = state.lock().await.last_acked_seqnum;
if last_acked_seqnum != new_last_acked_seqnum {
// Progress has been made during the last attempt, so reset the retry counter.
last_acked_seqnum = new_last_acked_seqnum;
attempts = 1;
}

let now = Instant::now();
let remaining_attempts = attempts < stream_client.inner.config.max_attempts;
let enough_time = {
state
.lock()
.await
.inflight
.front()
.map(|state| {
let next_deadline = state.start + stream_client.inner.config.request_timeout;
now + stream_client.inner.config.retry_backoff_duration < next_deadline
})
.unwrap_or(true)
};
let retryable_error = {
match &e {
ClientError::Service(status) => {
matches!(
status.code(),
tonic::Code::Unavailable
| tonic::Code::DeadlineExceeded
| tonic::Code::Unknown
)
loop {
match session_inner(
state.clone(),
frame_signal.clone(),
stream_client.clone(),
output_tx.clone(),
)
.await
{
Ok(()) => return,
Err(e) => {
let new_last_acked_seqnum = state.lock().await.last_acked_seqnum;
if last_acked_seqnum != new_last_acked_seqnum {
// Progress has been made during the last attempt, so reset the retry counter.
last_acked_seqnum = new_last_acked_seqnum;
attempts = 1;
}
ClientError::Conversion(_) => false,
}
};
let policy_compliant = {
match stream_client.inner.config.append_retry_policy {
AppendRetryPolicy::All => true,
AppendRetryPolicy::NoSideEffects => {
// If no request frame has been produced, we conclude that the failing append
// never left this host, so it is safe to retry.
!frame_signal.is_signalled()

let now = Instant::now();
let remaining_attempts = attempts < stream_client.inner.config.max_attempts;
let enough_time = {
state
.lock()
.await
.inflight
.front()
.map(|state| {
let next_deadline =
state.start + stream_client.inner.config.request_timeout;
now + stream_client.inner.config.retry_backoff_duration < next_deadline
})
.unwrap_or(true)
};
let retryable_error = {
match &e {
ClientError::Service(status) => {
matches!(
status.code(),
tonic::Code::Unavailable
| tonic::Code::DeadlineExceeded
| tonic::Code::Unknown
)
}
ClientError::Conversion(_) => false,
}
};
let policy_compliant = {
match stream_client.inner.config.append_retry_policy {
AppendRetryPolicy::All => true,
AppendRetryPolicy::NoSideEffects => {
// If no request frame has been produced, we conclude that the failing append
// never left this host, so it is safe to retry.
!frame_signal.is_signalled()
}
}
};

if remaining_attempts && enough_time && retryable_error && policy_compliant {
tokio::time::sleep(stream_client.inner.config.retry_backoff_duration).await;
attempts += 1;
debug!(attempts, ?e, "retrying");
} else {
debug!(
remaining_attempts,
enough_time, retryable_error, policy_compliant, "not retrying"
);
_ = output_tx.send(Err(e)).await;
return;
}
}
};

if remaining_attempts && enough_time && retryable_error && policy_compliant {
tokio::time::sleep(stream_client.inner.config.retry_backoff_duration).await;
attempts += 1;
debug!(attempts, ?e, "retrying");
resp = session_inner(
state.clone(),
frame_signal.clone(),
stream_client.clone(),
output_tx.clone(),
)
.await;
} else {
debug!(
remaining_attempts,
enough_time, retryable_error, policy_compliant, "not retrying"
);
_ = output_tx.send(Err(e)).await;
return;
}
}
}

0 comments on commit 091fc3f

Please sign in to comment.