From eb4884e31ab17fd4cf55cd7cc7b85e13bf8e908c Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Fri, 24 Jan 2025 10:12:25 -0600 Subject: [PATCH] gazette: refactor retry, introduce backoffs, and fixing routing Previously, gazette routing was previously broken in some subtle ways: * The process ID of a prior response was sent in the next request, causing "ProcessID doesn't match our own" errors in production. * Improperly assuming that endpoints present in response headers are reach-able from the client. We need to condition that assumption on the `do_not_proxy` flag. * `append` routine was not leveraging a prior response header to route its next attempt. Sleep-back backoffs are also introduced to all client routines, rather than making them a responsibility of the caller. This standardizes handling and prevents accidental thundering-herds of retries if a caller forgets to put in a backoff. Now callers don't need to worry about it, and must only decide whether to re-poll, drop to cancel, log, or take some other action. We additionally want to surface a notion of how many operation _attempts_ there have been since the last success. This is important both for logging (logged attempts in Go implementations are useful), and also for client determinations of when to give up. Update simd_doc::Parser::chunk to be able to recover should a caller provide discontinuous partial documents. This was a missing re-entrancy case when retrying a ReadJsonLines stream. Reserve a simd_doc::Parser::transcode_many() buffer based on the input size, to prevent frequent re-allocations in the common usage of ReadJsonLines. --- crates/dekaf/src/read.rs | 39 ++++---- crates/flowctl/src/collection/read/mod.rs | 49 +++++----- crates/flowctl/src/preview/journal_reader.rs | 11 ++- crates/gazette/src/journal/append.rs | 90 +++++++++++-------- crates/gazette/src/journal/list.rs | 52 ++++++----- crates/gazette/src/journal/mod.rs | 13 ++- crates/gazette/src/journal/read.rs | 60 ++++++++----- crates/gazette/src/journal/read_json_lines.rs | 24 +++-- crates/gazette/src/lib.rs | 32 +++++++ crates/gazette/src/router.rs | 55 ++++++++++-- crates/gazette/src/shard/mod.rs | 19 +++- crates/proto-gazette/build.rs | 2 +- crates/proto-gazette/src/protocol.rs | 4 +- crates/simd-doc/src/lib.rs | 32 ++++--- 14 files changed, 321 insertions(+), 161 deletions(-) diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index 1363b10f64..539f2b1819 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -128,9 +128,6 @@ impl Read { let mut tmp = Vec::new(); let mut buf = bytes::BytesMut::new(); - let mut has_had_parsing_error = false; - let mut transient_errors = 0; - let timeout = tokio::time::sleep_until(timeout.into()); let timeout = futures::future::maybe_done(timeout); tokio::pin!(timeout); @@ -167,30 +164,26 @@ impl Read { continue; } }, - Err(err) if err.is_transient() && transient_errors < 5 => { - use rand::Rng; - - transient_errors = transient_errors + 1; - - tracing::warn!(error = ?err, "Retrying transient read error"); - let delay = std::time::Duration::from_millis( - rand::thread_rng().gen_range(300..2000), - ); - tokio::time::sleep(delay).await; + Err(gazette::RetryError { attempt, inner }) + if inner.is_transient() && attempt < 5 => + { + tracing::warn!(error = ?inner, "Retrying transient read error"); // We can retry transient errors just by continuing to poll the stream continue; } - Err(err @ gazette::Error::Parsing { .. }) if !has_had_parsing_error => { - tracing::debug!(%err, "Ignoring first parse error to skip past partial document"); - has_had_parsing_error = true; - - continue; - } - Err(err @ gazette::Error::Parsing { .. }) => { - tracing::warn!(%err, "Got a second parse error, something is wrong"); - Err(err) + Err(gazette::RetryError { + attempt, + inner: err @ gazette::Error::Parsing { .. }, + }) => { + if attempt == 0 { + tracing::debug!(%err, "Ignoring first parse error to skip past partial document"); + continue; + } else { + tracing::warn!(%err, "Got a second parse error, something is wrong"); + Err(err) + } } - Err(e) => Err(e), + Err(gazette::RetryError { inner, .. }) => Err(inner), }?, }; diff --git a/crates/flowctl/src/collection/read/mod.rs b/crates/flowctl/src/collection/read/mod.rs index 75eefac41f..cbf19335df 100644 --- a/crates/flowctl/src/collection/read/mod.rs +++ b/crates/flowctl/src/collection/read/mod.rs @@ -3,7 +3,6 @@ use anyhow::Context; use futures::StreamExt; use gazette::journal::ReadJsonLine; use proto_gazette::broker; -use rand::Rng; use std::io::Write; use time::OffsetDateTime; @@ -116,6 +115,7 @@ pub async fn read_collection_journal( offset: 0, block: bounds.follow, begin_mod_time, + // TODO(johnny): Set `do_not_proxy: true` once cronut is migrated. ..Default::default() }, 1, @@ -127,7 +127,13 @@ pub async fn read_collection_journal( while let Some(line) = lines.next().await { match line { - Ok(ReadJsonLine::Meta(_)) => (), + Ok(ReadJsonLine::Meta(broker::ReadResponse { + fragment, + write_head, + .. + })) => { + tracing::debug!(?fragment, %write_head, "journal metadata"); + } Ok(ReadJsonLine::Doc { root, next_offset: _, @@ -136,26 +142,25 @@ pub async fn read_collection_journal( v.push(b'\n'); () = stdout.write_all(&v)?; } - Err(gazette::Error::BrokerStatus(broker::Status::Suspended)) if bounds.follow => { - // The journal is fully suspended, so we use a pretty long delay - // here because it's unlikely to be resumed quickly and also - // unlikely that anyone will care about the little bit of extra - // latency in this case. - let delay_secs = rand::thread_rng().gen_range(30..=60); - tracing::info!(delay_secs, "journal suspended, will retry"); - tokio::time::sleep(std::time::Duration::from_secs(delay_secs)).await; - } - Err(gazette::Error::BrokerStatus( - status @ broker::Status::OffsetNotYetAvailable | status @ broker::Status::Suspended, - )) => { - tracing::debug!(?status, "stopping read at end of journal content"); - break; // Graceful EOF of non-blocking read. - } - Err(err) if err.is_transient() => { - tracing::warn!(?err, "error reading collection (will retry)"); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - } - Err(err) => anyhow::bail!(err), + Err(gazette::RetryError { + inner: err, + attempt, + }) => match err { + err if err.is_transient() => { + tracing::warn!(?err, %attempt, "error reading collection (will retry)"); + } + gazette::Error::BrokerStatus(broker::Status::Suspended) if bounds.follow => { + tracing::debug!(?err, %attempt, "journal is suspended (will retry)"); + } + gazette::Error::BrokerStatus( + status @ broker::Status::OffsetNotYetAvailable + | status @ broker::Status::Suspended, + ) => { + tracing::debug!(?status, "stopping read at end of journal content"); + break; // Graceful EOF of non-blocking read. + } + err => anyhow::bail!(err), + }, } } diff --git a/crates/flowctl/src/preview/journal_reader.rs b/crates/flowctl/src/preview/journal_reader.rs index 11fbd0190c..b377e74d77 100644 --- a/crates/flowctl/src/preview/journal_reader.rs +++ b/crates/flowctl/src/preview/journal_reader.rs @@ -196,6 +196,7 @@ impl Reader { offset, block: true, begin_mod_time, + // TODO(johnny): Set `do_not_proxy: true` once cronut is migrated. ..Default::default() }, 1, @@ -211,12 +212,14 @@ impl Reader { offset = meta.offset; continue; } - Err(err) if err.is_transient() => { - tracing::warn!(%err, %journal, %binding, "transient error reading journal (will retry)"); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + Err(gazette::RetryError { + attempt, + inner: err, + }) if err.is_transient() => { + tracing::warn!(?err, %attempt, %journal, %binding, "transient error reading journal (will retry)"); continue; } - Err(err) => return Err(err), + Err(gazette::RetryError { inner, .. }) => return Err(inner), }; // TODO(johnny): plumb through OwnedArchivedNode end-to-end. diff --git a/crates/gazette/src/journal/append.rs b/crates/gazette/src/journal/append.rs index cf41b70158..122b34f6cf 100644 --- a/crates/gazette/src/journal/append.rs +++ b/crates/gazette/src/journal/append.rs @@ -1,8 +1,7 @@ use super::Client; -use crate::{journal::check_ok, Error}; +use crate::{router, Error}; use futures::{FutureExt, Stream, StreamExt}; use proto_gazette::broker::{self, AppendResponse}; -use std::sync::Arc; impl Client { /// Append the contents of a byte stream to the specified journal. @@ -13,59 +12,81 @@ impl Client { /// retries the request from the beginning. pub fn append<'a, S>( &'a self, - request: broker::AppendRequest, + mut req: broker::AppendRequest, source: impl Fn() -> S + Send + Sync + 'a, - ) -> impl Stream> + '_ + ) -> impl Stream> + '_ where S: Stream> + Send + 'static, { coroutines::coroutine(move |mut co| async move { - loop { - let input_stream = source(); + let mut attempt = 0; - match self.try_append(request.clone(), input_stream).await { + loop { + let err = match self.try_append(&mut req, source()).await { Ok(resp) => { () = co.yield_(Ok(resp)).await; return; } - Err(err) => { - () = co.yield_(Err(err)).await; - // Polling after an error indicates the caller would like to retry, - // so continue the loop to re-generate the input stream and try again. - } + Err(err) => err, + }; + + if matches!(err, Error::BrokerStatus(broker::Status::NotJournalPrimaryBroker) if req.do_not_proxy) + { + // This is an expected error which drives dynamic route discovery. + // Route topology in `req.header` has been updated, and we restart the request. + continue; } + + // Surface error to the caller, who can either drop to cancel or poll to retry. + () = co.yield_(Err(err.with_attempt(attempt))).await; + () = tokio::time::sleep(crate::backoff(attempt)).await; + attempt += 1; + + // Restart route discovery. + req.header = None; } }) } async fn try_append( &self, - request: broker::AppendRequest, + req: &mut broker::AppendRequest, source: S, ) -> crate::Result where S: Stream> + Send + 'static, { - let (input_err_tx, input_err_rx) = tokio::sync::oneshot::channel(); + let mut client = self.into_sub(self.router.route( + req.header.as_mut(), + if req.do_not_proxy { + router::Mode::Primary + } else { + router::Mode::Default + }, + &self.default, + )?); + let req_clone = req.clone(); + + let (source_err_tx, source_err_rx) = tokio::sync::oneshot::channel(); // `JournalClient::append()` wants a stream of `AppendRequest`s, so let's compose one starting with // the initial metadata request containing the journal name and any other request metadata, then // "data" requests that contain chunks of data to write, then the final EOF indicating completion. - let request_stream = futures::stream::once(async move { Ok(request) }) + let source = futures::stream::once(async move { Ok(req_clone) }) .chain(source.filter_map(|input| { futures::future::ready(match input { // It's technically possible to get an empty set of bytes when reading // from the input stream. Filter these out as otherwise they would look // like EOFs to the append RPC and cause confusion. - Ok(input_bytes) if input_bytes.len() == 0 => None, - Ok(input_bytes) => Some(Ok(broker::AppendRequest { - content: input_bytes.to_vec(), + Ok(content) if content.len() == 0 => None, + Ok(content) => Some(Ok(broker::AppendRequest { + content, ..Default::default() })), Err(err) => Some(Err(err)), }) })) - // Final empty chunk / EOF to signal we're done + // Final empty chunk signals the broker to commit (rather than rollback). .chain(futures::stream::once(async { Ok(broker::AppendRequest { ..Default::default() @@ -74,11 +95,11 @@ impl Client { // Since it's possible to error when reading input data, we handle an error by stopping // the stream and storing the error. Later, we first check if we have hit an input error // and if so we bubble it up, otherwise proceeding with handling the output of the RPC - .scan(Some(input_err_tx), |input_err_tx, input_res| { - futures::future::ready(match input_res { - Ok(input) => Some(input), + .scan(Some(source_err_tx), |err_tx, result| { + futures::future::ready(match result { + Ok(request) => Some(request), Err(err) => { - input_err_tx + err_tx .take() .expect("we should reach this point at most once") .send(err) @@ -87,21 +108,20 @@ impl Client { } }) }); + let result = client.append(source).await; - let mut client = self.into_sub(self.router.route(None, false, &self.default)?); - - let resp = client.append(request_stream).await; + // An error reading `source` has precedence, + // as it's likely causal if the broker *also* errored. + if let Ok(err) = source_err_rx.now_or_never().expect("tx has been dropped") { + return Err(Error::AppendRead(err)); + } + let mut resp = result?.into_inner(); - if let Some(Ok(input_err)) = input_err_rx.now_or_never() { - return Err(Error::AppendRead(input_err)); + if resp.status() == broker::Status::Ok { + Ok(resp) } else { - match resp { - Ok(resp) => { - let resp = resp.into_inner(); - check_ok(resp.status(), resp) - } - Err(err) => Err(err.into()), - } + req.header = resp.header.take(); + Err(Error::BrokerStatus(resp.status())) } } } diff --git a/crates/gazette/src/journal/list.rs b/crates/gazette/src/journal/list.rs index dbfdc43b18..02aa6bd363 100644 --- a/crates/gazette/src/journal/list.rs +++ b/crates/gazette/src/journal/list.rs @@ -1,5 +1,5 @@ use super::{check_ok, Client}; -use crate::Error; +use crate::{router, Error}; use futures::TryStreamExt; use proto_gazette::broker; @@ -16,32 +16,42 @@ impl Client { pub fn list_watch( self, mut req: broker::ListRequest, - ) -> impl futures::Stream> + 'static { + ) -> impl futures::Stream> + 'static { assert!(req.watch, "list_watch() requires ListRequest.watch is set"); coroutines::coroutine(move |mut co| async move { + let mut attempt = 0; + let mut maybe_stream = None; + loop { - let mut stream = match self.start_list(&self.router, &req).await { - Ok(stream) => stream, - Err(err) => { - () = co.yield_(Err(err)).await; - continue; - } + let err = match maybe_stream.take() { + Some(mut stream) => match recv_snapshot(&mut req, &mut stream).await { + Ok(resp) => { + () = co.yield_(Ok(resp)).await; + attempt = 0; + maybe_stream = Some(stream); + continue; + } + Err(err) => err, + }, + None => match self.start_list(&self.router, &req).await { + Ok(stream) => { + maybe_stream = Some(stream); + continue; + } + Err(err) => err, + }, }; - loop { - match recv_snapshot(&mut req, &mut stream).await { - Ok(resp) => co.yield_(Ok(resp)).await, - Err(err) => { - if matches!(err, Error::UnexpectedEof if req.watch_resume.is_some()) { - // Server stopped an ongoing watch. Expected and not an error. - } else { - co.yield_(Err(err)).await; - } - break; // Start new stream on next poll. - } - } + if matches!(err, Error::UnexpectedEof if req.watch_resume.is_some()) { + // Server stopped an ongoing watch. Expected and not an error. + continue; } + + // Surface error to the caller, who can either drop to cancel or poll to retry. + () = co.yield_(Err(err.with_attempt(attempt))).await; + () = tokio::time::sleep(crate::backoff(attempt)).await; + attempt += 1; } }) } @@ -51,7 +61,7 @@ impl Client { router: &crate::Router, req: &broker::ListRequest, ) -> crate::Result> { - let mut client = self.into_sub(router.route(None, false, &self.default)?); + let mut client = self.into_sub(router.route(None, router::Mode::Default, &self.default)?); Ok(client.list(req.clone()).await?.into_inner()) } } diff --git a/crates/gazette/src/journal/mod.rs b/crates/gazette/src/journal/mod.rs index cc3ba45b35..727ca96012 100644 --- a/crates/gazette/src/journal/mod.rs +++ b/crates/gazette/src/journal/mod.rs @@ -1,3 +1,4 @@ +use crate::router; use proto_gazette::broker; use tonic::transport::Channel; @@ -51,7 +52,11 @@ impl Client { /// Invoke the Gazette journal Apply API. pub async fn apply(&self, req: broker::ApplyRequest) -> crate::Result { - let mut client = self.into_sub(self.router.route(None, false, &self.default)?); + let mut client = self.into_sub(self.router.route( + None, + router::Mode::Default, + &self.default, + )?); let resp = client .apply(req) @@ -67,7 +72,11 @@ impl Client { &self, req: broker::FragmentsRequest, ) -> crate::Result { - let mut client = self.into_sub(self.router.route(None, false, &self.default)?); + let mut client = self.into_sub(self.router.route( + None, + router::Mode::Default, + &self.default, + )?); let resp = client .list_fragments(req) diff --git a/crates/gazette/src/journal/read.rs b/crates/gazette/src/journal/read.rs index 8c9d4e2bf5..d7a128eb13 100644 --- a/crates/gazette/src/journal/read.rs +++ b/crates/gazette/src/journal/read.rs @@ -1,5 +1,5 @@ use super::Client; -use crate::Error; +use crate::{router, Error}; use futures::TryStreamExt; use proto_gazette::broker; @@ -10,9 +10,10 @@ impl Client { pub fn read( self, mut req: broker::ReadRequest, - ) -> impl futures::Stream> + 'static { + ) -> impl futures::Stream> + 'static { coroutines::coroutine(move |mut co| async move { let mut write_head = i64::MAX; + let mut attempt = 0; loop { // Have we read through requested `end_offset`? @@ -24,40 +25,53 @@ impl Client { return; } - match self.read_some(&mut co, &mut req, &mut write_head).await { - Ok(()) => (), - Err(Error::BrokerStatus(broker::Status::NotJournalBroker)) - if req.do_not_proxy => - { - // Expected error which drives dynamic route discovery. - // `req.header` has updated route topology and we restart the request. - } - Err(err) => { - // Surface error to the caller, which can either drop us - // or poll us again to retry. - () = co.yield_(Err(err)).await; - // Restart route discovery. - req.header = None; + let err = match self.read_some(&mut co, &mut req, &mut write_head).await { + Ok(()) => { + attempt = 0; + continue; } + Err(err) => err, + }; + + if matches!(err, Error::BrokerStatus(broker::Status::NotJournalBroker) if req.do_not_proxy) + { + // This is an expected error which drives dynamic route discovery. + // `req.header` has updated route topology and we restart the request. + continue; } + + // Surface error to the caller, who can either drop to cancel or poll to retry. + () = co.yield_(Err(err.with_attempt(attempt))).await; + () = tokio::time::sleep(crate::backoff(attempt)).await; + attempt += 1; + + // Restart route discovery. + req.header = None; } }) } async fn read_some( &self, - co: &mut coroutines::Suspend, ()>, + co: &mut coroutines::Suspend, ()>, req: &mut broker::ReadRequest, write_head: &mut i64, ) -> crate::Result<()> { - let route = req.header.as_ref().and_then(|hdr| hdr.route.as_ref()); - let mut client = self.into_sub(self.router.route(route, false, &self.default)?); + let mut client = self.into_sub(self.router.route( + req.header.as_mut(), + if req.do_not_proxy { + router::Mode::Replica + } else { + router::Mode::Default + }, + &self.default, + )?); // Fetch metadata first before we start the actual read. req.metadata_only = true; let mut stream = client.read(req.clone()).await?.into_inner(); - let metadata = stream.try_next().await?.ok_or(Error::UnexpectedEof)?; + let mut metadata = stream.try_next().await?.ok_or(Error::UnexpectedEof)?; let _eof = stream.try_next().await?; // Broker sends EOF. std::mem::drop(stream); @@ -73,6 +87,8 @@ impl Client { tracing::info!(req.journal, req.offset, metadata.offset, "offset jump"); req.offset = metadata.offset; } + req.header = metadata.header.take(); + *write_head = metadata.write_head; let (fragment, fragment_url) = (fragment.clone(), metadata.fragment_url.clone()); () = co.yield_(Ok(metadata)).await; @@ -118,7 +134,7 @@ impl Client { } async fn read_fragment_url( - co: &mut coroutines::Suspend, ()>, + co: &mut coroutines::Suspend, ()>, fragment: broker::Fragment, fragment_url: String, http: &reqwest::Client, @@ -174,7 +190,7 @@ async fn read_fragment_url( } async fn read_fragment_url_body( - co: &mut coroutines::Suspend, ()>, + co: &mut coroutines::Suspend, ()>, fragment: broker::Fragment, r: impl futures::io::AsyncRead, req: &mut broker::ReadRequest, diff --git a/crates/gazette/src/journal/read_json_lines.rs b/crates/gazette/src/journal/read_json_lines.rs index 84f63fd9c7..83c63971e5 100644 --- a/crates/gazette/src/journal/read_json_lines.rs +++ b/crates/gazette/src/journal/read_json_lines.rs @@ -47,23 +47,25 @@ impl Client { }; ReadJsonLines { + inner, + attempts: 0, parsed: simd_doc::transcoded::OwnedIterOut::empty(), parser: simd_doc::Parser::new(), - inner, } } } pin_project_lite::pin_project! { pub struct ReadJsonLines { - inner: BoxStream<'static, crate::Result>, + inner: BoxStream<'static, crate::RetryResult>, + attempts: usize, parsed: simd_doc::transcoded::OwnedIterOut, parser: simd_doc::Parser, } } impl futures::Stream for ReadJsonLines { - type Item = crate::Result; + type Item = crate::RetryResult; fn poll_next( self: std::pin::Pin<&mut Self>, @@ -80,10 +82,13 @@ impl futures::Stream for ReadJsonLines { match me.parser.transcode_many(Default::default()) { Ok(out) if !out.is_empty() => { *me.parsed = out.into_iter(); + *me.attempts = 0; continue; } Err((err, location)) => { - return Poll::Ready(Some(Err(Error::Parsing { location, err }))) + let err = Error::Parsing { location, err }.with_attempt(*me.attempts); + *me.attempts += 1; + return Poll::Ready(Some(Err(err))); } Ok(_out) => {} // Requires more chunks. } @@ -101,12 +106,15 @@ impl futures::Stream for ReadJsonLines { return Poll::Ready(Some(Ok(ReadJsonLine::Meta(response)))); } - me.parser - .chunk(&response.content, response.offset) - .map_err(|err| Error::Parsing { + if let Err(err) = me.parser.chunk(&response.content, response.offset) { + let err = Error::Parsing { location: response.offset..response.offset, err, - })?; + } + .with_attempt(*me.attempts); + *me.attempts += 1; + return Poll::Ready(Some(Err(err))); + } } std::task::Poll::Ready(None) => return std::task::Poll::Ready(None), std::task::Poll::Pending => return std::task::Poll::Pending, diff --git a/crates/gazette/src/lib.rs b/crates/gazette/src/lib.rs index 33e53bd808..981f65acf9 100644 --- a/crates/gazette/src/lib.rs +++ b/crates/gazette/src/lib.rs @@ -45,7 +45,22 @@ pub enum Error { JWT(#[from] jsonwebtoken::errors::Error), } +/// RetryError is an Error encountered during a retry-able operation. +pub struct RetryError { + /// Number of operation attempts since the last success. + pub attempt: usize, + /// Error encountered with this attempt. + pub inner: Error, +} + impl Error { + pub fn with_attempt(self, attempt: usize) -> RetryError { + RetryError { + attempt: attempt, + inner: self, + } + } + pub fn is_transient(&self) -> bool { match self { // These errors are generally failure of a transport, and can be retried. @@ -80,6 +95,9 @@ impl Error { pub type Result = std::result::Result; +/// RetryResult is a single Result of a retry-able operation. +pub type RetryResult = std::result::Result; + /// Lazily dial a gRPC endpoint with opinionated defaults and /// support for TLS and Unix Domain Sockets. pub fn dial_channel(endpoint: &str) -> Result { @@ -136,3 +154,17 @@ async fn connect_unix( tokio::net::UnixStream::connect(path).await?, )) } + +fn backoff(attempt: usize) -> std::time::Duration { + // The choices of backoff duration reflect that we're usually waiting for + // the cluster to converge on a shared understanding of ownership, and that + // involves a couple of Nagle-like read delays (~30ms) as Etcd watch + // updates are applied by participants. + match attempt { + 0 => std::time::Duration::ZERO, + 1 => std::time::Duration::from_millis(50), + 2 | 3 => std::time::Duration::from_millis(100), + 4 | 5 => std::time::Duration::from_secs(1), + _ => std::time::Duration::from_secs(5), + } +} diff --git a/crates/gazette/src/router.rs b/crates/gazette/src/router.rs index 3125491a59..55b6223ac5 100644 --- a/crates/gazette/src/router.rs +++ b/crates/gazette/src/router.rs @@ -5,6 +5,18 @@ use std::collections::HashMap; use std::sync::Arc; use tonic::transport::Channel; +/// Mode controls how Router maps a current request to an member Channel. +pub enum Mode { + /// Prefer the primary of the current topology. + Primary, + /// Prefer the closest replica of the current topology. + Replica, + /// Use the default service address, ignoring the current topology. + /// This is appropriate for un-routed RPCs or when running + /// behind a proxy. + Default, +} + /// Router facilitates dispatching requests to designated members of /// a dynamic serving topology, by maintaining ready Channels to /// member endpoints which may be dynamically discovered over time. @@ -31,24 +43,51 @@ impl Router { } } - /// Map an optional broker::Route and indication of whether the "primary" - /// member is required into a ready Channel for use in the dispatch of an RPC, - /// and a boolean which is set if and only if the Channel is in our local zone. + /// Map an Option<&mut Header>, Mode, and `default` service address into a + /// Channel for use in the dispatch of an RPC, and a boolean which is set + /// if and only if the Channel is in our local zone. + /// + /// `default.suffix` must be the dial-able endpoint of the service, + /// while `default.zone` should be its zone (if known). + /// + /// route() dials Channels as required, and users MUST call sweep() + /// to periodically clean up Channels which are no longer in use. /// - /// route() dial new Channels as required by the `route` and `primary` requirement. - /// Use sweep() to periodically clean up Channels which are no longer in use. + /// route() mutates `header` by clearing its `process_id` if set. + /// This facilitates passing forward the Header of an RPC response into + /// a next RPC request, in order to leverage route topology and Etcd + /// metadata of that response. `process_id` must be cleared because it + /// represents the handling server in a response context, but in a + /// request context it denotes the server to which the request is directed, + /// which is not our intention here. Rather, we wish to use a prior response + /// Header to pick a *better* member to which we'll route the next request. pub fn route( &self, - route: Option<&broker::Route>, - primary: bool, + header: Option<&mut broker::Header>, + mode: Mode, default: &MemberId, ) -> Result<(Channel, bool), Error> { + let (route, primary) = match header { + Some(header) => { + header.process_id = None; + + match mode { + Mode::Primary => (header.route.as_ref(), true), + Mode::Replica => (header.route.as_ref(), false), + Mode::Default => (None, false), + } + } + None => (None, false), + }; let index = pick(route, primary, &self.inner.zone); let id = match index { Some(index) => &route.unwrap().members[index], None => default, }; + let local = id.zone == self.inner.zone; + tracing::debug!(?id, %local, "picked member"); + let mut states = self.inner.states.lock().unwrap(); // Is the channel already started? @@ -64,7 +103,7 @@ impl Router { })?; states.insert(id.clone(), (channel.clone(), true)); - Ok((channel, id.zone == self.inner.zone)) + Ok((channel, local)) } // Identify Channels which have not been used since the preceding sweep, and close them. diff --git a/crates/gazette/src/shard/mod.rs b/crates/gazette/src/shard/mod.rs index 1cb1df43f8..9ea05b20c1 100644 --- a/crates/gazette/src/shard/mod.rs +++ b/crates/gazette/src/shard/mod.rs @@ -1,3 +1,4 @@ +use crate::router; use proto_gazette::{broker, consumer}; use tonic::transport::Channel; @@ -44,7 +45,11 @@ impl Client { &self, req: consumer::ListRequest, ) -> Result { - let mut client = self.into_sub(self.router.route(None, false, &self.default)?); + let mut client = self.into_sub(self.router.route( + None, + router::Mode::Default, + &self.default, + )?); let resp = client .list(req) @@ -60,7 +65,11 @@ impl Client { &self, req: consumer::ApplyRequest, ) -> Result { - let mut client = self.into_sub(self.router.route(None, false, &self.default)?); + let mut client = self.into_sub(self.router.route( + None, + router::Mode::Default, + &self.default, + )?); let resp = client .apply(req) @@ -76,7 +85,11 @@ impl Client { &self, req: consumer::UnassignRequest, ) -> Result { - let mut client = self.into_sub(self.router.route(None, false, &self.default)?); + let mut client = self.into_sub(self.router.route( + None, + router::Mode::Default, + &self.default, + )?); let resp = client .unassign(req) diff --git a/crates/proto-gazette/build.rs b/crates/proto-gazette/build.rs index 52a7e14a38..657aa13f30 100644 --- a/crates/proto-gazette/build.rs +++ b/crates/proto-gazette/build.rs @@ -5,7 +5,7 @@ fn main() { prost_build::Config::new() .out_dir(&b.src_dir) - .bytes(&["ReadResponse.content"]) + .bytes(&["AppendRequest.content", "ReadResponse.content"]) .file_descriptor_set_path(&b.descriptor_path) .compile_well_known_types() .extern_path(".google.protobuf", "::pbjson_types") diff --git a/crates/proto-gazette/src/protocol.rs b/crates/proto-gazette/src/protocol.rs index 540a60ac1f..7cee1f010f 100644 --- a/crates/proto-gazette/src/protocol.rs +++ b/crates/proto-gazette/src/protocol.rs @@ -524,8 +524,8 @@ pub struct AppendRequest { /// the client must send an empty chunk (eg, zero-valued AppendRequest) to /// indicate the Append should be committed. Absence of this empty chunk /// prior to EOF is interpreted by the broker as a rollback of the Append. - #[prost(bytes = "vec", tag = "4")] - pub content: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "4")] + pub content: ::prost::bytes::Bytes, } /// Nested message and enum types in `AppendRequest`. pub mod append_request { diff --git a/crates/simd-doc/src/lib.rs b/crates/simd-doc/src/lib.rs index 19f50bdf65..3b42683937 100644 --- a/crates/simd-doc/src/lib.rs +++ b/crates/simd-doc/src/lib.rs @@ -110,22 +110,32 @@ impl Parser { pub fn chunk(&mut self, chunk: &[u8], chunk_offset: i64) -> Result<(), std::io::Error> { let enqueued = self.whole.len() + self.partial.len(); - if enqueued == 0 { + let result = if enqueued == 0 { self.offset = chunk_offset; // We're empty. Allow the offset to jump. - } else if chunk_offset != self.offset + enqueued as i64 { - return Err(std::io::Error::new( + Ok(()) + } else if chunk_offset == self.offset + enqueued as i64 { + Ok(()) // Chunk is contiguous. + } else { + let err = std::io::Error::new( std::io::ErrorKind::InvalidData, format!( "parser has {enqueued} bytes of document prefix starting at offset {}, but got {}-byte chunk at unexpected input offset {chunk_offset}", self.offset, chunk.len(), ), - )); - } + ); + + // Clear previous state to allow best-effort continuation. + self.whole.clear(); + self.partial.clear(); + self.offset = chunk_offset; + + Err(err) + }; let Some(last_newline) = memchr::memrchr(b'\n', &chunk) else { // If `chunk` doesn't contain a newline, it cannot complete a document. self.partial.extend_from_slice(chunk); - return Ok(()); + return result; }; if self.whole.is_empty() { @@ -140,11 +150,11 @@ impl Parser { self.partial.extend_from_slice(&chunk[last_newline + 1..]); } - Ok(()) + result } /// Transcode newline-delimited JSON documents into equivalent - /// doc::ArchivedNode representations. `pre_allocated` is a potentially + /// doc::ArchivedNode representations. `buffer` is a potentially /// pre-allocated buffer which is cleared and used within the returned /// Transcoded instance. /// @@ -154,10 +164,10 @@ impl Parser { /// documents and errors. pub fn transcode_many( &mut self, - pre_allocated: rkyv::AlignedVec, + buffer: rkyv::AlignedVec, ) -> Result)> { let mut output = Transcoded { - v: pre_allocated, + v: buffer, offset: self.offset, }; output.v.clear(); @@ -165,6 +175,8 @@ impl Parser { if self.whole.is_empty() { return Ok(output); } + // Reserve 2x because transcodings use more bytes then raw JSON. + output.v.reserve(2 * self.whole.len()); let (consumed, maybe_err) = match transcode_simd(&mut self.whole, &mut output, &mut self.ffi) {