Skip to content

Commit

Permalink
gazette: refactor retry, introduce backoffs, and fixing routing
Browse files Browse the repository at this point in the history
Previously, gazette routing was previously broken in some subtle ways:

* The process ID of a prior response was sent in the next request,
  causing "ProcessID doesn't match our own" errors in production.

* Improperly assuming that endpoints present in response headers are
  reach-able from the client. We need to condition that assumption on
  the `do_not_proxy` flag.

* `append` routine was not leveraging a prior response header to
  route its next attempt.

Sleep-back backoffs are also introduced to all client routines, rather
than making them a responsibility of the caller. This standardizes
handling and prevents accidental thundering-herds of retries if a caller
forgets to put in a backoff. Now callers don't need to worry about it,
and must only decide whether to re-poll, drop to cancel, log, or take
some other action.

We additionally want to surface a notion of how many operation
_attempts_ there have been since the last success. This is important
both for logging (logged attempts in Go implementations are useful),
and also for client determinations of when to give up.

Update simd_doc::Parser::chunk to be able to recover should
a caller provide discontinuous partial documents. This was a missing
re-entrancy case when retrying a ReadJsonLines stream.

Reserve a simd_doc::Parser::transcode_many() buffer based on the input
size, to prevent frequent re-allocations in the common usage of
ReadJsonLines.
  • Loading branch information
jgraettinger committed Jan 24, 2025
1 parent 6622e0e commit eb4884e
Show file tree
Hide file tree
Showing 14 changed files with 321 additions and 161 deletions.
39 changes: 16 additions & 23 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,6 @@ impl Read {
let mut tmp = Vec::new();
let mut buf = bytes::BytesMut::new();

let mut has_had_parsing_error = false;
let mut transient_errors = 0;

let timeout = tokio::time::sleep_until(timeout.into());
let timeout = futures::future::maybe_done(timeout);
tokio::pin!(timeout);
Expand Down Expand Up @@ -167,30 +164,26 @@ impl Read {
continue;
}
},
Err(err) if err.is_transient() && transient_errors < 5 => {
use rand::Rng;

transient_errors = transient_errors + 1;

tracing::warn!(error = ?err, "Retrying transient read error");
let delay = std::time::Duration::from_millis(
rand::thread_rng().gen_range(300..2000),
);
tokio::time::sleep(delay).await;
Err(gazette::RetryError { attempt, inner })
if inner.is_transient() && attempt < 5 =>
{
tracing::warn!(error = ?inner, "Retrying transient read error");
// We can retry transient errors just by continuing to poll the stream
continue;
}
Err(err @ gazette::Error::Parsing { .. }) if !has_had_parsing_error => {
tracing::debug!(%err, "Ignoring first parse error to skip past partial document");
has_had_parsing_error = true;

continue;
}
Err(err @ gazette::Error::Parsing { .. }) => {
tracing::warn!(%err, "Got a second parse error, something is wrong");
Err(err)
Err(gazette::RetryError {
attempt,
inner: err @ gazette::Error::Parsing { .. },
}) => {
if attempt == 0 {
tracing::debug!(%err, "Ignoring first parse error to skip past partial document");
continue;
} else {
tracing::warn!(%err, "Got a second parse error, something is wrong");
Err(err)
}
}
Err(e) => Err(e),
Err(gazette::RetryError { inner, .. }) => Err(inner),
}?,
};

Expand Down
49 changes: 27 additions & 22 deletions crates/flowctl/src/collection/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use anyhow::Context;
use futures::StreamExt;
use gazette::journal::ReadJsonLine;
use proto_gazette::broker;
use rand::Rng;
use std::io::Write;
use time::OffsetDateTime;

Expand Down Expand Up @@ -116,6 +115,7 @@ pub async fn read_collection_journal(
offset: 0,
block: bounds.follow,
begin_mod_time,
// TODO(johnny): Set `do_not_proxy: true` once cronut is migrated.
..Default::default()
},
1,
Expand All @@ -127,7 +127,13 @@ pub async fn read_collection_journal(

while let Some(line) = lines.next().await {
match line {
Ok(ReadJsonLine::Meta(_)) => (),
Ok(ReadJsonLine::Meta(broker::ReadResponse {
fragment,
write_head,
..
})) => {
tracing::debug!(?fragment, %write_head, "journal metadata");
}
Ok(ReadJsonLine::Doc {
root,
next_offset: _,
Expand All @@ -136,26 +142,25 @@ pub async fn read_collection_journal(
v.push(b'\n');
() = stdout.write_all(&v)?;
}
Err(gazette::Error::BrokerStatus(broker::Status::Suspended)) if bounds.follow => {
// The journal is fully suspended, so we use a pretty long delay
// here because it's unlikely to be resumed quickly and also
// unlikely that anyone will care about the little bit of extra
// latency in this case.
let delay_secs = rand::thread_rng().gen_range(30..=60);
tracing::info!(delay_secs, "journal suspended, will retry");
tokio::time::sleep(std::time::Duration::from_secs(delay_secs)).await;
}
Err(gazette::Error::BrokerStatus(
status @ broker::Status::OffsetNotYetAvailable | status @ broker::Status::Suspended,
)) => {
tracing::debug!(?status, "stopping read at end of journal content");
break; // Graceful EOF of non-blocking read.
}
Err(err) if err.is_transient() => {
tracing::warn!(?err, "error reading collection (will retry)");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
Err(err) => anyhow::bail!(err),
Err(gazette::RetryError {
inner: err,
attempt,
}) => match err {
err if err.is_transient() => {
tracing::warn!(?err, %attempt, "error reading collection (will retry)");
}
gazette::Error::BrokerStatus(broker::Status::Suspended) if bounds.follow => {
tracing::debug!(?err, %attempt, "journal is suspended (will retry)");
}
gazette::Error::BrokerStatus(
status @ broker::Status::OffsetNotYetAvailable
| status @ broker::Status::Suspended,
) => {
tracing::debug!(?status, "stopping read at end of journal content");
break; // Graceful EOF of non-blocking read.
}
err => anyhow::bail!(err),
},
}
}

Expand Down
11 changes: 7 additions & 4 deletions crates/flowctl/src/preview/journal_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ impl Reader {
offset,
block: true,
begin_mod_time,
// TODO(johnny): Set `do_not_proxy: true` once cronut is migrated.
..Default::default()
},
1,
Expand All @@ -211,12 +212,14 @@ impl Reader {
offset = meta.offset;
continue;
}
Err(err) if err.is_transient() => {
tracing::warn!(%err, %journal, %binding, "transient error reading journal (will retry)");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
Err(gazette::RetryError {
attempt,
inner: err,
}) if err.is_transient() => {
tracing::warn!(?err, %attempt, %journal, %binding, "transient error reading journal (will retry)");
continue;
}
Err(err) => return Err(err),
Err(gazette::RetryError { inner, .. }) => return Err(inner),
};

// TODO(johnny): plumb through OwnedArchivedNode end-to-end.
Expand Down
90 changes: 55 additions & 35 deletions crates/gazette/src/journal/append.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use super::Client;
use crate::{journal::check_ok, Error};
use crate::{router, Error};
use futures::{FutureExt, Stream, StreamExt};
use proto_gazette::broker::{self, AppendResponse};
use std::sync::Arc;

impl Client {
/// Append the contents of a byte stream to the specified journal.
Expand All @@ -13,59 +12,81 @@ impl Client {
/// retries the request from the beginning.
pub fn append<'a, S>(
&'a self,
request: broker::AppendRequest,
mut req: broker::AppendRequest,
source: impl Fn() -> S + Send + Sync + 'a,
) -> impl Stream<Item = crate::Result<broker::AppendResponse>> + '_
) -> impl Stream<Item = crate::RetryResult<broker::AppendResponse>> + '_
where
S: Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static,
{
coroutines::coroutine(move |mut co| async move {
loop {
let input_stream = source();
let mut attempt = 0;

match self.try_append(request.clone(), input_stream).await {
loop {
let err = match self.try_append(&mut req, source()).await {
Ok(resp) => {
() = co.yield_(Ok(resp)).await;
return;
}
Err(err) => {
() = co.yield_(Err(err)).await;
// Polling after an error indicates the caller would like to retry,
// so continue the loop to re-generate the input stream and try again.
}
Err(err) => err,
};

if matches!(err, Error::BrokerStatus(broker::Status::NotJournalPrimaryBroker) if req.do_not_proxy)
{
// This is an expected error which drives dynamic route discovery.
// Route topology in `req.header` has been updated, and we restart the request.
continue;
}

// Surface error to the caller, who can either drop to cancel or poll to retry.
() = co.yield_(Err(err.with_attempt(attempt))).await;
() = tokio::time::sleep(crate::backoff(attempt)).await;
attempt += 1;

// Restart route discovery.
req.header = None;
}
})
}

async fn try_append<S>(
&self,
request: broker::AppendRequest,
req: &mut broker::AppendRequest,
source: S,
) -> crate::Result<AppendResponse>
where
S: Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static,
{
let (input_err_tx, input_err_rx) = tokio::sync::oneshot::channel();
let mut client = self.into_sub(self.router.route(
req.header.as_mut(),
if req.do_not_proxy {
router::Mode::Primary
} else {
router::Mode::Default
},
&self.default,
)?);
let req_clone = req.clone();

let (source_err_tx, source_err_rx) = tokio::sync::oneshot::channel();

// `JournalClient::append()` wants a stream of `AppendRequest`s, so let's compose one starting with
// the initial metadata request containing the journal name and any other request metadata, then
// "data" requests that contain chunks of data to write, then the final EOF indicating completion.
let request_stream = futures::stream::once(async move { Ok(request) })
let source = futures::stream::once(async move { Ok(req_clone) })
.chain(source.filter_map(|input| {
futures::future::ready(match input {
// It's technically possible to get an empty set of bytes when reading
// from the input stream. Filter these out as otherwise they would look
// like EOFs to the append RPC and cause confusion.
Ok(input_bytes) if input_bytes.len() == 0 => None,
Ok(input_bytes) => Some(Ok(broker::AppendRequest {
content: input_bytes.to_vec(),
Ok(content) if content.len() == 0 => None,
Ok(content) => Some(Ok(broker::AppendRequest {
content,
..Default::default()
})),
Err(err) => Some(Err(err)),
})
}))
// Final empty chunk / EOF to signal we're done
// Final empty chunk signals the broker to commit (rather than rollback).
.chain(futures::stream::once(async {
Ok(broker::AppendRequest {
..Default::default()
Expand All @@ -74,11 +95,11 @@ impl Client {
// Since it's possible to error when reading input data, we handle an error by stopping
// the stream and storing the error. Later, we first check if we have hit an input error
// and if so we bubble it up, otherwise proceeding with handling the output of the RPC
.scan(Some(input_err_tx), |input_err_tx, input_res| {
futures::future::ready(match input_res {
Ok(input) => Some(input),
.scan(Some(source_err_tx), |err_tx, result| {
futures::future::ready(match result {
Ok(request) => Some(request),
Err(err) => {
input_err_tx
err_tx
.take()
.expect("we should reach this point at most once")
.send(err)
Expand All @@ -87,21 +108,20 @@ impl Client {
}
})
});
let result = client.append(source).await;

let mut client = self.into_sub(self.router.route(None, false, &self.default)?);

let resp = client.append(request_stream).await;
// An error reading `source` has precedence,
// as it's likely causal if the broker *also* errored.
if let Ok(err) = source_err_rx.now_or_never().expect("tx has been dropped") {
return Err(Error::AppendRead(err));
}
let mut resp = result?.into_inner();

if let Some(Ok(input_err)) = input_err_rx.now_or_never() {
return Err(Error::AppendRead(input_err));
if resp.status() == broker::Status::Ok {
Ok(resp)
} else {
match resp {
Ok(resp) => {
let resp = resp.into_inner();
check_ok(resp.status(), resp)
}
Err(err) => Err(err.into()),
}
req.header = resp.header.take();
Err(Error::BrokerStatus(resp.status()))
}
}
}
52 changes: 31 additions & 21 deletions crates/gazette/src/journal/list.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{check_ok, Client};
use crate::Error;
use crate::{router, Error};
use futures::TryStreamExt;
use proto_gazette::broker;

Expand All @@ -16,32 +16,42 @@ impl Client {
pub fn list_watch(
self,
mut req: broker::ListRequest,
) -> impl futures::Stream<Item = crate::Result<broker::ListResponse>> + 'static {
) -> impl futures::Stream<Item = crate::RetryResult<broker::ListResponse>> + 'static {
assert!(req.watch, "list_watch() requires ListRequest.watch is set");

coroutines::coroutine(move |mut co| async move {
let mut attempt = 0;
let mut maybe_stream = None;

loop {
let mut stream = match self.start_list(&self.router, &req).await {
Ok(stream) => stream,
Err(err) => {
() = co.yield_(Err(err)).await;
continue;
}
let err = match maybe_stream.take() {
Some(mut stream) => match recv_snapshot(&mut req, &mut stream).await {
Ok(resp) => {
() = co.yield_(Ok(resp)).await;
attempt = 0;
maybe_stream = Some(stream);
continue;
}
Err(err) => err,
},
None => match self.start_list(&self.router, &req).await {
Ok(stream) => {
maybe_stream = Some(stream);
continue;
}
Err(err) => err,
},
};

loop {
match recv_snapshot(&mut req, &mut stream).await {
Ok(resp) => co.yield_(Ok(resp)).await,
Err(err) => {
if matches!(err, Error::UnexpectedEof if req.watch_resume.is_some()) {
// Server stopped an ongoing watch. Expected and not an error.
} else {
co.yield_(Err(err)).await;
}
break; // Start new stream on next poll.
}
}
if matches!(err, Error::UnexpectedEof if req.watch_resume.is_some()) {
// Server stopped an ongoing watch. Expected and not an error.
continue;
}

// Surface error to the caller, who can either drop to cancel or poll to retry.
() = co.yield_(Err(err.with_attempt(attempt))).await;
() = tokio::time::sleep(crate::backoff(attempt)).await;
attempt += 1;
}
})
}
Expand All @@ -51,7 +61,7 @@ impl Client {
router: &crate::Router,
req: &broker::ListRequest,
) -> crate::Result<tonic::Streaming<broker::ListResponse>> {
let mut client = self.into_sub(router.route(None, false, &self.default)?);
let mut client = self.into_sub(router.route(None, router::Mode::Default, &self.default)?);
Ok(client.list(req.clone()).await?.into_inner())
}
}
Expand Down
Loading

0 comments on commit eb4884e

Please sign in to comment.