diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index 1363b10f64..539f2b1819 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -128,9 +128,6 @@ impl Read { let mut tmp = Vec::new(); let mut buf = bytes::BytesMut::new(); - let mut has_had_parsing_error = false; - let mut transient_errors = 0; - let timeout = tokio::time::sleep_until(timeout.into()); let timeout = futures::future::maybe_done(timeout); tokio::pin!(timeout); @@ -167,30 +164,26 @@ impl Read { continue; } }, - Err(err) if err.is_transient() && transient_errors < 5 => { - use rand::Rng; - - transient_errors = transient_errors + 1; - - tracing::warn!(error = ?err, "Retrying transient read error"); - let delay = std::time::Duration::from_millis( - rand::thread_rng().gen_range(300..2000), - ); - tokio::time::sleep(delay).await; + Err(gazette::RetryError { attempt, inner }) + if inner.is_transient() && attempt < 5 => + { + tracing::warn!(error = ?inner, "Retrying transient read error"); // We can retry transient errors just by continuing to poll the stream continue; } - Err(err @ gazette::Error::Parsing { .. }) if !has_had_parsing_error => { - tracing::debug!(%err, "Ignoring first parse error to skip past partial document"); - has_had_parsing_error = true; - - continue; - } - Err(err @ gazette::Error::Parsing { .. }) => { - tracing::warn!(%err, "Got a second parse error, something is wrong"); - Err(err) + Err(gazette::RetryError { + attempt, + inner: err @ gazette::Error::Parsing { .. }, + }) => { + if attempt == 0 { + tracing::debug!(%err, "Ignoring first parse error to skip past partial document"); + continue; + } else { + tracing::warn!(%err, "Got a second parse error, something is wrong"); + Err(err) + } } - Err(e) => Err(e), + Err(gazette::RetryError { inner, .. }) => Err(inner), }?, }; diff --git a/crates/flowctl/src/collection/read/mod.rs b/crates/flowctl/src/collection/read/mod.rs index 75eefac41f..cbf19335df 100644 --- a/crates/flowctl/src/collection/read/mod.rs +++ b/crates/flowctl/src/collection/read/mod.rs @@ -3,7 +3,6 @@ use anyhow::Context; use futures::StreamExt; use gazette::journal::ReadJsonLine; use proto_gazette::broker; -use rand::Rng; use std::io::Write; use time::OffsetDateTime; @@ -116,6 +115,7 @@ pub async fn read_collection_journal( offset: 0, block: bounds.follow, begin_mod_time, + // TODO(johnny): Set `do_not_proxy: true` once cronut is migrated. ..Default::default() }, 1, @@ -127,7 +127,13 @@ pub async fn read_collection_journal( while let Some(line) = lines.next().await { match line { - Ok(ReadJsonLine::Meta(_)) => (), + Ok(ReadJsonLine::Meta(broker::ReadResponse { + fragment, + write_head, + .. + })) => { + tracing::debug!(?fragment, %write_head, "journal metadata"); + } Ok(ReadJsonLine::Doc { root, next_offset: _, @@ -136,26 +142,25 @@ pub async fn read_collection_journal( v.push(b'\n'); () = stdout.write_all(&v)?; } - Err(gazette::Error::BrokerStatus(broker::Status::Suspended)) if bounds.follow => { - // The journal is fully suspended, so we use a pretty long delay - // here because it's unlikely to be resumed quickly and also - // unlikely that anyone will care about the little bit of extra - // latency in this case. - let delay_secs = rand::thread_rng().gen_range(30..=60); - tracing::info!(delay_secs, "journal suspended, will retry"); - tokio::time::sleep(std::time::Duration::from_secs(delay_secs)).await; - } - Err(gazette::Error::BrokerStatus( - status @ broker::Status::OffsetNotYetAvailable | status @ broker::Status::Suspended, - )) => { - tracing::debug!(?status, "stopping read at end of journal content"); - break; // Graceful EOF of non-blocking read. - } - Err(err) if err.is_transient() => { - tracing::warn!(?err, "error reading collection (will retry)"); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - } - Err(err) => anyhow::bail!(err), + Err(gazette::RetryError { + inner: err, + attempt, + }) => match err { + err if err.is_transient() => { + tracing::warn!(?err, %attempt, "error reading collection (will retry)"); + } + gazette::Error::BrokerStatus(broker::Status::Suspended) if bounds.follow => { + tracing::debug!(?err, %attempt, "journal is suspended (will retry)"); + } + gazette::Error::BrokerStatus( + status @ broker::Status::OffsetNotYetAvailable + | status @ broker::Status::Suspended, + ) => { + tracing::debug!(?status, "stopping read at end of journal content"); + break; // Graceful EOF of non-blocking read. + } + err => anyhow::bail!(err), + }, } } diff --git a/crates/flowctl/src/preview/journal_reader.rs b/crates/flowctl/src/preview/journal_reader.rs index 11fbd0190c..b377e74d77 100644 --- a/crates/flowctl/src/preview/journal_reader.rs +++ b/crates/flowctl/src/preview/journal_reader.rs @@ -196,6 +196,7 @@ impl Reader { offset, block: true, begin_mod_time, + // TODO(johnny): Set `do_not_proxy: true` once cronut is migrated. ..Default::default() }, 1, @@ -211,12 +212,14 @@ impl Reader { offset = meta.offset; continue; } - Err(err) if err.is_transient() => { - tracing::warn!(%err, %journal, %binding, "transient error reading journal (will retry)"); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + Err(gazette::RetryError { + attempt, + inner: err, + }) if err.is_transient() => { + tracing::warn!(?err, %attempt, %journal, %binding, "transient error reading journal (will retry)"); continue; } - Err(err) => return Err(err), + Err(gazette::RetryError { inner, .. }) => return Err(inner), }; // TODO(johnny): plumb through OwnedArchivedNode end-to-end. diff --git a/crates/gazette/src/journal/append.rs b/crates/gazette/src/journal/append.rs index cf41b70158..122b34f6cf 100644 --- a/crates/gazette/src/journal/append.rs +++ b/crates/gazette/src/journal/append.rs @@ -1,8 +1,7 @@ use super::Client; -use crate::{journal::check_ok, Error}; +use crate::{router, Error}; use futures::{FutureExt, Stream, StreamExt}; use proto_gazette::broker::{self, AppendResponse}; -use std::sync::Arc; impl Client { /// Append the contents of a byte stream to the specified journal. @@ -13,59 +12,81 @@ impl Client { /// retries the request from the beginning. pub fn append<'a, S>( &'a self, - request: broker::AppendRequest, + mut req: broker::AppendRequest, source: impl Fn() -> S + Send + Sync + 'a, - ) -> impl Stream> + '_ + ) -> impl Stream> + '_ where S: Stream> + Send + 'static, { coroutines::coroutine(move |mut co| async move { - loop { - let input_stream = source(); + let mut attempt = 0; - match self.try_append(request.clone(), input_stream).await { + loop { + let err = match self.try_append(&mut req, source()).await { Ok(resp) => { () = co.yield_(Ok(resp)).await; return; } - Err(err) => { - () = co.yield_(Err(err)).await; - // Polling after an error indicates the caller would like to retry, - // so continue the loop to re-generate the input stream and try again. - } + Err(err) => err, + }; + + if matches!(err, Error::BrokerStatus(broker::Status::NotJournalPrimaryBroker) if req.do_not_proxy) + { + // This is an expected error which drives dynamic route discovery. + // Route topology in `req.header` has been updated, and we restart the request. + continue; } + + // Surface error to the caller, who can either drop to cancel or poll to retry. + () = co.yield_(Err(err.with_attempt(attempt))).await; + () = tokio::time::sleep(crate::backoff(attempt)).await; + attempt += 1; + + // Restart route discovery. + req.header = None; } }) } async fn try_append( &self, - request: broker::AppendRequest, + req: &mut broker::AppendRequest, source: S, ) -> crate::Result where S: Stream> + Send + 'static, { - let (input_err_tx, input_err_rx) = tokio::sync::oneshot::channel(); + let mut client = self.into_sub(self.router.route( + req.header.as_mut(), + if req.do_not_proxy { + router::Mode::Primary + } else { + router::Mode::Default + }, + &self.default, + )?); + let req_clone = req.clone(); + + let (source_err_tx, source_err_rx) = tokio::sync::oneshot::channel(); // `JournalClient::append()` wants a stream of `AppendRequest`s, so let's compose one starting with // the initial metadata request containing the journal name and any other request metadata, then // "data" requests that contain chunks of data to write, then the final EOF indicating completion. - let request_stream = futures::stream::once(async move { Ok(request) }) + let source = futures::stream::once(async move { Ok(req_clone) }) .chain(source.filter_map(|input| { futures::future::ready(match input { // It's technically possible to get an empty set of bytes when reading // from the input stream. Filter these out as otherwise they would look // like EOFs to the append RPC and cause confusion. - Ok(input_bytes) if input_bytes.len() == 0 => None, - Ok(input_bytes) => Some(Ok(broker::AppendRequest { - content: input_bytes.to_vec(), + Ok(content) if content.len() == 0 => None, + Ok(content) => Some(Ok(broker::AppendRequest { + content, ..Default::default() })), Err(err) => Some(Err(err)), }) })) - // Final empty chunk / EOF to signal we're done + // Final empty chunk signals the broker to commit (rather than rollback). .chain(futures::stream::once(async { Ok(broker::AppendRequest { ..Default::default() @@ -74,11 +95,11 @@ impl Client { // Since it's possible to error when reading input data, we handle an error by stopping // the stream and storing the error. Later, we first check if we have hit an input error // and if so we bubble it up, otherwise proceeding with handling the output of the RPC - .scan(Some(input_err_tx), |input_err_tx, input_res| { - futures::future::ready(match input_res { - Ok(input) => Some(input), + .scan(Some(source_err_tx), |err_tx, result| { + futures::future::ready(match result { + Ok(request) => Some(request), Err(err) => { - input_err_tx + err_tx .take() .expect("we should reach this point at most once") .send(err) @@ -87,21 +108,20 @@ impl Client { } }) }); + let result = client.append(source).await; - let mut client = self.into_sub(self.router.route(None, false, &self.default)?); - - let resp = client.append(request_stream).await; + // An error reading `source` has precedence, + // as it's likely causal if the broker *also* errored. + if let Ok(err) = source_err_rx.now_or_never().expect("tx has been dropped") { + return Err(Error::AppendRead(err)); + } + let mut resp = result?.into_inner(); - if let Some(Ok(input_err)) = input_err_rx.now_or_never() { - return Err(Error::AppendRead(input_err)); + if resp.status() == broker::Status::Ok { + Ok(resp) } else { - match resp { - Ok(resp) => { - let resp = resp.into_inner(); - check_ok(resp.status(), resp) - } - Err(err) => Err(err.into()), - } + req.header = resp.header.take(); + Err(Error::BrokerStatus(resp.status())) } } } diff --git a/crates/gazette/src/journal/list.rs b/crates/gazette/src/journal/list.rs index dbfdc43b18..02aa6bd363 100644 --- a/crates/gazette/src/journal/list.rs +++ b/crates/gazette/src/journal/list.rs @@ -1,5 +1,5 @@ use super::{check_ok, Client}; -use crate::Error; +use crate::{router, Error}; use futures::TryStreamExt; use proto_gazette::broker; @@ -16,32 +16,42 @@ impl Client { pub fn list_watch( self, mut req: broker::ListRequest, - ) -> impl futures::Stream> + 'static { + ) -> impl futures::Stream> + 'static { assert!(req.watch, "list_watch() requires ListRequest.watch is set"); coroutines::coroutine(move |mut co| async move { + let mut attempt = 0; + let mut maybe_stream = None; + loop { - let mut stream = match self.start_list(&self.router, &req).await { - Ok(stream) => stream, - Err(err) => { - () = co.yield_(Err(err)).await; - continue; - } + let err = match maybe_stream.take() { + Some(mut stream) => match recv_snapshot(&mut req, &mut stream).await { + Ok(resp) => { + () = co.yield_(Ok(resp)).await; + attempt = 0; + maybe_stream = Some(stream); + continue; + } + Err(err) => err, + }, + None => match self.start_list(&self.router, &req).await { + Ok(stream) => { + maybe_stream = Some(stream); + continue; + } + Err(err) => err, + }, }; - loop { - match recv_snapshot(&mut req, &mut stream).await { - Ok(resp) => co.yield_(Ok(resp)).await, - Err(err) => { - if matches!(err, Error::UnexpectedEof if req.watch_resume.is_some()) { - // Server stopped an ongoing watch. Expected and not an error. - } else { - co.yield_(Err(err)).await; - } - break; // Start new stream on next poll. - } - } + if matches!(err, Error::UnexpectedEof if req.watch_resume.is_some()) { + // Server stopped an ongoing watch. Expected and not an error. + continue; } + + // Surface error to the caller, who can either drop to cancel or poll to retry. + () = co.yield_(Err(err.with_attempt(attempt))).await; + () = tokio::time::sleep(crate::backoff(attempt)).await; + attempt += 1; } }) } @@ -51,7 +61,7 @@ impl Client { router: &crate::Router, req: &broker::ListRequest, ) -> crate::Result> { - let mut client = self.into_sub(router.route(None, false, &self.default)?); + let mut client = self.into_sub(router.route(None, router::Mode::Default, &self.default)?); Ok(client.list(req.clone()).await?.into_inner()) } } diff --git a/crates/gazette/src/journal/mod.rs b/crates/gazette/src/journal/mod.rs index cc3ba45b35..727ca96012 100644 --- a/crates/gazette/src/journal/mod.rs +++ b/crates/gazette/src/journal/mod.rs @@ -1,3 +1,4 @@ +use crate::router; use proto_gazette::broker; use tonic::transport::Channel; @@ -51,7 +52,11 @@ impl Client { /// Invoke the Gazette journal Apply API. pub async fn apply(&self, req: broker::ApplyRequest) -> crate::Result { - let mut client = self.into_sub(self.router.route(None, false, &self.default)?); + let mut client = self.into_sub(self.router.route( + None, + router::Mode::Default, + &self.default, + )?); let resp = client .apply(req) @@ -67,7 +72,11 @@ impl Client { &self, req: broker::FragmentsRequest, ) -> crate::Result { - let mut client = self.into_sub(self.router.route(None, false, &self.default)?); + let mut client = self.into_sub(self.router.route( + None, + router::Mode::Default, + &self.default, + )?); let resp = client .list_fragments(req) diff --git a/crates/gazette/src/journal/read.rs b/crates/gazette/src/journal/read.rs index 8c9d4e2bf5..d7a128eb13 100644 --- a/crates/gazette/src/journal/read.rs +++ b/crates/gazette/src/journal/read.rs @@ -1,5 +1,5 @@ use super::Client; -use crate::Error; +use crate::{router, Error}; use futures::TryStreamExt; use proto_gazette::broker; @@ -10,9 +10,10 @@ impl Client { pub fn read( self, mut req: broker::ReadRequest, - ) -> impl futures::Stream> + 'static { + ) -> impl futures::Stream> + 'static { coroutines::coroutine(move |mut co| async move { let mut write_head = i64::MAX; + let mut attempt = 0; loop { // Have we read through requested `end_offset`? @@ -24,40 +25,53 @@ impl Client { return; } - match self.read_some(&mut co, &mut req, &mut write_head).await { - Ok(()) => (), - Err(Error::BrokerStatus(broker::Status::NotJournalBroker)) - if req.do_not_proxy => - { - // Expected error which drives dynamic route discovery. - // `req.header` has updated route topology and we restart the request. - } - Err(err) => { - // Surface error to the caller, which can either drop us - // or poll us again to retry. - () = co.yield_(Err(err)).await; - // Restart route discovery. - req.header = None; + let err = match self.read_some(&mut co, &mut req, &mut write_head).await { + Ok(()) => { + attempt = 0; + continue; } + Err(err) => err, + }; + + if matches!(err, Error::BrokerStatus(broker::Status::NotJournalBroker) if req.do_not_proxy) + { + // This is an expected error which drives dynamic route discovery. + // `req.header` has updated route topology and we restart the request. + continue; } + + // Surface error to the caller, who can either drop to cancel or poll to retry. + () = co.yield_(Err(err.with_attempt(attempt))).await; + () = tokio::time::sleep(crate::backoff(attempt)).await; + attempt += 1; + + // Restart route discovery. + req.header = None; } }) } async fn read_some( &self, - co: &mut coroutines::Suspend, ()>, + co: &mut coroutines::Suspend, ()>, req: &mut broker::ReadRequest, write_head: &mut i64, ) -> crate::Result<()> { - let route = req.header.as_ref().and_then(|hdr| hdr.route.as_ref()); - let mut client = self.into_sub(self.router.route(route, false, &self.default)?); + let mut client = self.into_sub(self.router.route( + req.header.as_mut(), + if req.do_not_proxy { + router::Mode::Replica + } else { + router::Mode::Default + }, + &self.default, + )?); // Fetch metadata first before we start the actual read. req.metadata_only = true; let mut stream = client.read(req.clone()).await?.into_inner(); - let metadata = stream.try_next().await?.ok_or(Error::UnexpectedEof)?; + let mut metadata = stream.try_next().await?.ok_or(Error::UnexpectedEof)?; let _eof = stream.try_next().await?; // Broker sends EOF. std::mem::drop(stream); @@ -73,6 +87,8 @@ impl Client { tracing::info!(req.journal, req.offset, metadata.offset, "offset jump"); req.offset = metadata.offset; } + req.header = metadata.header.take(); + *write_head = metadata.write_head; let (fragment, fragment_url) = (fragment.clone(), metadata.fragment_url.clone()); () = co.yield_(Ok(metadata)).await; @@ -118,7 +134,7 @@ impl Client { } async fn read_fragment_url( - co: &mut coroutines::Suspend, ()>, + co: &mut coroutines::Suspend, ()>, fragment: broker::Fragment, fragment_url: String, http: &reqwest::Client, @@ -174,7 +190,7 @@ async fn read_fragment_url( } async fn read_fragment_url_body( - co: &mut coroutines::Suspend, ()>, + co: &mut coroutines::Suspend, ()>, fragment: broker::Fragment, r: impl futures::io::AsyncRead, req: &mut broker::ReadRequest, diff --git a/crates/gazette/src/journal/read_json_lines.rs b/crates/gazette/src/journal/read_json_lines.rs index 84f63fd9c7..83c63971e5 100644 --- a/crates/gazette/src/journal/read_json_lines.rs +++ b/crates/gazette/src/journal/read_json_lines.rs @@ -47,23 +47,25 @@ impl Client { }; ReadJsonLines { + inner, + attempts: 0, parsed: simd_doc::transcoded::OwnedIterOut::empty(), parser: simd_doc::Parser::new(), - inner, } } } pin_project_lite::pin_project! { pub struct ReadJsonLines { - inner: BoxStream<'static, crate::Result>, + inner: BoxStream<'static, crate::RetryResult>, + attempts: usize, parsed: simd_doc::transcoded::OwnedIterOut, parser: simd_doc::Parser, } } impl futures::Stream for ReadJsonLines { - type Item = crate::Result; + type Item = crate::RetryResult; fn poll_next( self: std::pin::Pin<&mut Self>, @@ -80,10 +82,13 @@ impl futures::Stream for ReadJsonLines { match me.parser.transcode_many(Default::default()) { Ok(out) if !out.is_empty() => { *me.parsed = out.into_iter(); + *me.attempts = 0; continue; } Err((err, location)) => { - return Poll::Ready(Some(Err(Error::Parsing { location, err }))) + let err = Error::Parsing { location, err }.with_attempt(*me.attempts); + *me.attempts += 1; + return Poll::Ready(Some(Err(err))); } Ok(_out) => {} // Requires more chunks. } @@ -101,12 +106,15 @@ impl futures::Stream for ReadJsonLines { return Poll::Ready(Some(Ok(ReadJsonLine::Meta(response)))); } - me.parser - .chunk(&response.content, response.offset) - .map_err(|err| Error::Parsing { + if let Err(err) = me.parser.chunk(&response.content, response.offset) { + let err = Error::Parsing { location: response.offset..response.offset, err, - })?; + } + .with_attempt(*me.attempts); + *me.attempts += 1; + return Poll::Ready(Some(Err(err))); + } } std::task::Poll::Ready(None) => return std::task::Poll::Ready(None), std::task::Poll::Pending => return std::task::Poll::Pending, diff --git a/crates/gazette/src/lib.rs b/crates/gazette/src/lib.rs index 33e53bd808..981f65acf9 100644 --- a/crates/gazette/src/lib.rs +++ b/crates/gazette/src/lib.rs @@ -45,7 +45,22 @@ pub enum Error { JWT(#[from] jsonwebtoken::errors::Error), } +/// RetryError is an Error encountered during a retry-able operation. +pub struct RetryError { + /// Number of operation attempts since the last success. + pub attempt: usize, + /// Error encountered with this attempt. + pub inner: Error, +} + impl Error { + pub fn with_attempt(self, attempt: usize) -> RetryError { + RetryError { + attempt: attempt, + inner: self, + } + } + pub fn is_transient(&self) -> bool { match self { // These errors are generally failure of a transport, and can be retried. @@ -80,6 +95,9 @@ impl Error { pub type Result = std::result::Result; +/// RetryResult is a single Result of a retry-able operation. +pub type RetryResult = std::result::Result; + /// Lazily dial a gRPC endpoint with opinionated defaults and /// support for TLS and Unix Domain Sockets. pub fn dial_channel(endpoint: &str) -> Result { @@ -136,3 +154,17 @@ async fn connect_unix( tokio::net::UnixStream::connect(path).await?, )) } + +fn backoff(attempt: usize) -> std::time::Duration { + // The choices of backoff duration reflect that we're usually waiting for + // the cluster to converge on a shared understanding of ownership, and that + // involves a couple of Nagle-like read delays (~30ms) as Etcd watch + // updates are applied by participants. + match attempt { + 0 => std::time::Duration::ZERO, + 1 => std::time::Duration::from_millis(50), + 2 | 3 => std::time::Duration::from_millis(100), + 4 | 5 => std::time::Duration::from_secs(1), + _ => std::time::Duration::from_secs(5), + } +} diff --git a/crates/gazette/src/router.rs b/crates/gazette/src/router.rs index 3125491a59..55b6223ac5 100644 --- a/crates/gazette/src/router.rs +++ b/crates/gazette/src/router.rs @@ -5,6 +5,18 @@ use std::collections::HashMap; use std::sync::Arc; use tonic::transport::Channel; +/// Mode controls how Router maps a current request to an member Channel. +pub enum Mode { + /// Prefer the primary of the current topology. + Primary, + /// Prefer the closest replica of the current topology. + Replica, + /// Use the default service address, ignoring the current topology. + /// This is appropriate for un-routed RPCs or when running + /// behind a proxy. + Default, +} + /// Router facilitates dispatching requests to designated members of /// a dynamic serving topology, by maintaining ready Channels to /// member endpoints which may be dynamically discovered over time. @@ -31,24 +43,51 @@ impl Router { } } - /// Map an optional broker::Route and indication of whether the "primary" - /// member is required into a ready Channel for use in the dispatch of an RPC, - /// and a boolean which is set if and only if the Channel is in our local zone. + /// Map an Option<&mut Header>, Mode, and `default` service address into a + /// Channel for use in the dispatch of an RPC, and a boolean which is set + /// if and only if the Channel is in our local zone. + /// + /// `default.suffix` must be the dial-able endpoint of the service, + /// while `default.zone` should be its zone (if known). + /// + /// route() dials Channels as required, and users MUST call sweep() + /// to periodically clean up Channels which are no longer in use. /// - /// route() dial new Channels as required by the `route` and `primary` requirement. - /// Use sweep() to periodically clean up Channels which are no longer in use. + /// route() mutates `header` by clearing its `process_id` if set. + /// This facilitates passing forward the Header of an RPC response into + /// a next RPC request, in order to leverage route topology and Etcd + /// metadata of that response. `process_id` must be cleared because it + /// represents the handling server in a response context, but in a + /// request context it denotes the server to which the request is directed, + /// which is not our intention here. Rather, we wish to use a prior response + /// Header to pick a *better* member to which we'll route the next request. pub fn route( &self, - route: Option<&broker::Route>, - primary: bool, + header: Option<&mut broker::Header>, + mode: Mode, default: &MemberId, ) -> Result<(Channel, bool), Error> { + let (route, primary) = match header { + Some(header) => { + header.process_id = None; + + match mode { + Mode::Primary => (header.route.as_ref(), true), + Mode::Replica => (header.route.as_ref(), false), + Mode::Default => (None, false), + } + } + None => (None, false), + }; let index = pick(route, primary, &self.inner.zone); let id = match index { Some(index) => &route.unwrap().members[index], None => default, }; + let local = id.zone == self.inner.zone; + tracing::debug!(?id, %local, "picked member"); + let mut states = self.inner.states.lock().unwrap(); // Is the channel already started? @@ -64,7 +103,7 @@ impl Router { })?; states.insert(id.clone(), (channel.clone(), true)); - Ok((channel, id.zone == self.inner.zone)) + Ok((channel, local)) } // Identify Channels which have not been used since the preceding sweep, and close them. diff --git a/crates/gazette/src/shard/mod.rs b/crates/gazette/src/shard/mod.rs index 1cb1df43f8..9ea05b20c1 100644 --- a/crates/gazette/src/shard/mod.rs +++ b/crates/gazette/src/shard/mod.rs @@ -1,3 +1,4 @@ +use crate::router; use proto_gazette::{broker, consumer}; use tonic::transport::Channel; @@ -44,7 +45,11 @@ impl Client { &self, req: consumer::ListRequest, ) -> Result { - let mut client = self.into_sub(self.router.route(None, false, &self.default)?); + let mut client = self.into_sub(self.router.route( + None, + router::Mode::Default, + &self.default, + )?); let resp = client .list(req) @@ -60,7 +65,11 @@ impl Client { &self, req: consumer::ApplyRequest, ) -> Result { - let mut client = self.into_sub(self.router.route(None, false, &self.default)?); + let mut client = self.into_sub(self.router.route( + None, + router::Mode::Default, + &self.default, + )?); let resp = client .apply(req) @@ -76,7 +85,11 @@ impl Client { &self, req: consumer::UnassignRequest, ) -> Result { - let mut client = self.into_sub(self.router.route(None, false, &self.default)?); + let mut client = self.into_sub(self.router.route( + None, + router::Mode::Default, + &self.default, + )?); let resp = client .unassign(req) diff --git a/crates/proto-gazette/build.rs b/crates/proto-gazette/build.rs index 52a7e14a38..657aa13f30 100644 --- a/crates/proto-gazette/build.rs +++ b/crates/proto-gazette/build.rs @@ -5,7 +5,7 @@ fn main() { prost_build::Config::new() .out_dir(&b.src_dir) - .bytes(&["ReadResponse.content"]) + .bytes(&["AppendRequest.content", "ReadResponse.content"]) .file_descriptor_set_path(&b.descriptor_path) .compile_well_known_types() .extern_path(".google.protobuf", "::pbjson_types") diff --git a/crates/proto-gazette/src/protocol.rs b/crates/proto-gazette/src/protocol.rs index 540a60ac1f..7cee1f010f 100644 --- a/crates/proto-gazette/src/protocol.rs +++ b/crates/proto-gazette/src/protocol.rs @@ -524,8 +524,8 @@ pub struct AppendRequest { /// the client must send an empty chunk (eg, zero-valued AppendRequest) to /// indicate the Append should be committed. Absence of this empty chunk /// prior to EOF is interpreted by the broker as a rollback of the Append. - #[prost(bytes = "vec", tag = "4")] - pub content: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "4")] + pub content: ::prost::bytes::Bytes, } /// Nested message and enum types in `AppendRequest`. pub mod append_request { diff --git a/crates/simd-doc/src/lib.rs b/crates/simd-doc/src/lib.rs index 19f50bdf65..3b42683937 100644 --- a/crates/simd-doc/src/lib.rs +++ b/crates/simd-doc/src/lib.rs @@ -110,22 +110,32 @@ impl Parser { pub fn chunk(&mut self, chunk: &[u8], chunk_offset: i64) -> Result<(), std::io::Error> { let enqueued = self.whole.len() + self.partial.len(); - if enqueued == 0 { + let result = if enqueued == 0 { self.offset = chunk_offset; // We're empty. Allow the offset to jump. - } else if chunk_offset != self.offset + enqueued as i64 { - return Err(std::io::Error::new( + Ok(()) + } else if chunk_offset == self.offset + enqueued as i64 { + Ok(()) // Chunk is contiguous. + } else { + let err = std::io::Error::new( std::io::ErrorKind::InvalidData, format!( "parser has {enqueued} bytes of document prefix starting at offset {}, but got {}-byte chunk at unexpected input offset {chunk_offset}", self.offset, chunk.len(), ), - )); - } + ); + + // Clear previous state to allow best-effort continuation. + self.whole.clear(); + self.partial.clear(); + self.offset = chunk_offset; + + Err(err) + }; let Some(last_newline) = memchr::memrchr(b'\n', &chunk) else { // If `chunk` doesn't contain a newline, it cannot complete a document. self.partial.extend_from_slice(chunk); - return Ok(()); + return result; }; if self.whole.is_empty() { @@ -140,11 +150,11 @@ impl Parser { self.partial.extend_from_slice(&chunk[last_newline + 1..]); } - Ok(()) + result } /// Transcode newline-delimited JSON documents into equivalent - /// doc::ArchivedNode representations. `pre_allocated` is a potentially + /// doc::ArchivedNode representations. `buffer` is a potentially /// pre-allocated buffer which is cleared and used within the returned /// Transcoded instance. /// @@ -154,10 +164,10 @@ impl Parser { /// documents and errors. pub fn transcode_many( &mut self, - pre_allocated: rkyv::AlignedVec, + buffer: rkyv::AlignedVec, ) -> Result)> { let mut output = Transcoded { - v: pre_allocated, + v: buffer, offset: self.offset, }; output.v.clear(); @@ -165,6 +175,8 @@ impl Parser { if self.whole.is_empty() { return Ok(output); } + // Reserve 2x because transcodings use more bytes then raw JSON. + output.v.reserve(2 * self.whole.len()); let (consumed, maybe_err) = match transcode_simd(&mut self.whole, &mut output, &mut self.ffi) {