From d7d789f1923aad7d4ead5cc14b17a1c3fc06469c Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Mon, 2 Sep 2024 11:52:37 -0500 Subject: [PATCH] refactor various no-ops into NoOpWrapper and remove runtime timeouts NoOpWrapper encapsulates a common pattern of dynamically disabling validations for various connector types. Remove timeouts from unary RPCs directly invoked against the runtime. These timeouts are moving into the `agent`, which is the only place we actually care to time out these requests. When running locally with `flowctl`, for example, we never want to time out. --- crates/agent/src/connector_tags.rs | 7 ++- crates/build/src/lib.rs | 87 +++++------------------------- crates/flowctl/src/generate/mod.rs | 33 +++++------- crates/flowctl/src/raw/discover.rs | 2 +- crates/flowctl/src/raw/oauth.rs | 2 +- crates/flowctl/src/raw/spec.rs | 2 +- crates/runtime/src/unary.rs | 35 ++++-------- crates/validation/src/lib.rs | 2 +- crates/validation/src/noop.rs | 45 ++++++++++++++++ 9 files changed, 87 insertions(+), 128 deletions(-) diff --git a/crates/agent/src/connector_tags.rs b/crates/agent/src/connector_tags.rs index 820a9ba54d..cdb13b37b4 100644 --- a/crates/agent/src/connector_tags.rs +++ b/crates/agent/src/connector_tags.rs @@ -216,8 +216,9 @@ async fn spec_materialization( ..Default::default() }; + // TODO(johnny): select a data-plane and use ProxyConnectors. let spec = runtime - .unary_materialize(req, build::CONNECTOR_TIMEOUT) + .unary_materialize(req) .await? .spec .ok_or_else(|| anyhow::anyhow!("connector didn't send expected Spec response"))?; @@ -262,11 +263,13 @@ async fn spec_capture( ..Default::default() }; + // TODO(johnny): select a data-plane and use ProxyConnectors. let spec = runtime - .unary_capture(req, build::CONNECTOR_TIMEOUT) + .unary_capture(req) .await? .spec .ok_or_else(|| anyhow::anyhow!("connector didn't send expected Spec response"))?; + let capture::response::Spec { // protocol here is the numeric version of the capture protocol protocol: _, diff --git a/crates/build/src/lib.rs b/crates/build/src/lib.rs index 9cfea92c13..292e6b9886 100644 --- a/crates/build/src/lib.rs +++ b/crates/build/src/lib.rs @@ -115,11 +115,11 @@ pub async fn validate( None, format!("build/{build_id:#}"), ); - let connectors = Connectors { + let connectors = validation::NoOpWrapper { noop_captures, noop_derivations, noop_materializations, - runtime, + inner: RuntimeConnectors { runtime }, }; let built = validation::validate( @@ -348,97 +348,35 @@ impl sources::Fetcher for Fetcher { } } -/// Connectors is a general-purpose implementation of validation::Connectors -/// that dispatches to its contained runtime::Runtime. -pub struct Connectors { - noop_captures: bool, - noop_derivations: bool, - noop_materializations: bool, +/// RuntimeConnectors is a general-purpose implementation of +/// validation::Connectors that dispatches to its contained runtime::Runtime. +pub struct RuntimeConnectors { runtime: runtime::Runtime, } -impl Connectors { - pub fn new(runtime: runtime::Runtime) -> Self { - Self { - noop_captures: false, - noop_derivations: false, - noop_materializations: false, - runtime, - } - } - - pub fn with_noop_validations(self) -> Self { - Self { - noop_captures: true, - noop_derivations: true, - noop_materializations: true, - runtime: self.runtime, - } - } -} - -impl validation::Connectors for Connectors { +impl validation::Connectors for RuntimeConnectors { fn validate_capture<'a>( &'a self, request: capture::Request, - data_plane: &'a tables::DataPlane, + _data_plane: &'a tables::DataPlane, ) -> BoxFuture<'a, anyhow::Result> { - async move { - if self.noop_captures { - validation::NoOpConnectors - .validate_capture(request, data_plane) - .await - } else { - Ok(self - .runtime - .clone() - .unary_capture(request, CONNECTOR_TIMEOUT) - .await?) - } - } - .boxed() + self.runtime.clone().unary_capture(request).boxed() } fn validate_derivation<'a>( &'a self, request: derive::Request, - data_plane: &'a tables::DataPlane, + _data_plane: &'a tables::DataPlane, ) -> BoxFuture<'a, anyhow::Result> { - async move { - if self.noop_derivations { - validation::NoOpConnectors - .validate_derivation(request, data_plane) - .await - } else { - Ok(self - .runtime - .clone() - .unary_derive(request, CONNECTOR_TIMEOUT) - .await?) - } - } - .boxed() + self.runtime.clone().unary_derive(request).boxed() } fn validate_materialization<'a>( &'a self, request: materialize::Request, - data_plane: &'a tables::DataPlane, + _data_plane: &'a tables::DataPlane, ) -> BoxFuture<'a, anyhow::Result> { - async move { - if self.noop_materializations { - validation::NoOpConnectors - .validate_materialization(request, data_plane) - .await - } else { - Ok(self - .runtime - .clone() - .unary_materialize(request, CONNECTOR_TIMEOUT) - .await?) - } - } - .boxed() + self.runtime.clone().unary_materialize(request).boxed() } } @@ -482,5 +420,4 @@ impl tables::CatalogResolver for NoOpCatalogResolver { } pub const FETCH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); -pub const CONNECTOR_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(300); // Five minutes. pub const STDIN_URL: &str = "stdin://root/flow.yaml"; diff --git a/crates/flowctl/src/generate/mod.rs b/crates/flowctl/src/generate/mod.rs index f14d71db1a..bdf4ea9d59 100644 --- a/crates/flowctl/src/generate/mod.rs +++ b/crates/flowctl/src/generate/mod.rs @@ -163,13 +163,10 @@ async fn generate_missing_capture_configs( None, format!("spec/{capture}"), ) - .unary_capture( - capture::Request { - spec: Some(spec), - ..Default::default() - }, - build::CONNECTOR_TIMEOUT, - ) + .unary_capture(capture::Request { + spec: Some(spec), + ..Default::default() + }) .await? .spec .context("connector didn't send expected Spec response")?; @@ -246,13 +243,10 @@ async fn generate_missing_collection_configs( None, format!("spec/{collection}"), ) - .unary_derive( - derive::Request { - spec: Some(spec), - ..Default::default() - }, - build::CONNECTOR_TIMEOUT, - ) + .unary_derive(derive::Request { + spec: Some(spec), + ..Default::default() + }) .await? .spec .context("connector didn't send expected Spec response")?; @@ -323,13 +317,10 @@ async fn generate_missing_materialization_configs( None, format!("spec/{materialization}"), ) - .unary_materialize( - materialize::Request { - spec: Some(spec), - ..Default::default() - }, - build::CONNECTOR_TIMEOUT, - ) + .unary_materialize(materialize::Request { + spec: Some(spec), + ..Default::default() + }) .await? .spec .context("connector didn't send expected Spec response")?; diff --git a/crates/flowctl/src/raw/discover.rs b/crates/flowctl/src/raw/discover.rs index bf4f1fc18a..eaf940aad1 100644 --- a/crates/flowctl/src/raw/discover.rs +++ b/crates/flowctl/src/raw/discover.rs @@ -94,7 +94,7 @@ pub async fn do_discover( None, format!("discover/{}", capture.capture), ) - .unary_capture(discover, build::CONNECTOR_TIMEOUT) + .unary_capture(discover) .await? .discovered .context("connector didn't send expected Discovered response")?; diff --git a/crates/flowctl/src/raw/oauth.rs b/crates/flowctl/src/raw/oauth.rs index 3b863ed178..112e262e91 100644 --- a/crates/flowctl/src/raw/oauth.rs +++ b/crates/flowctl/src/raw/oauth.rs @@ -111,7 +111,7 @@ pub async fn do_oauth( None, format!("spec/{}", capture.capture), ) - .unary_capture(spec_req, build::CONNECTOR_TIMEOUT) + .unary_capture(spec_req) .await? .spec .context("connector didn't send expected Spec response")?; diff --git a/crates/flowctl/src/raw/spec.rs b/crates/flowctl/src/raw/spec.rs index 2db09cd3be..69cc00eb4c 100644 --- a/crates/flowctl/src/raw/spec.rs +++ b/crates/flowctl/src/raw/spec.rs @@ -75,7 +75,7 @@ pub async fn do_spec( None, format!("spec/{}", capture.capture), ) - .unary_capture(request, build::CONNECTOR_TIMEOUT) + .unary_capture(request) .await? .spec .context("connector didn't send expected Spec response")?; diff --git a/crates/runtime/src/unary.rs b/crates/runtime/src/unary.rs index 05b4438151..7ad7e28b82 100644 --- a/crates/runtime/src/unary.rs +++ b/crates/runtime/src/unary.rs @@ -1,34 +1,27 @@ use super::{LogHandler, Runtime}; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use proto_flow::{capture, derive, materialize}; -use std::time::Duration; impl Runtime { pub async fn unary_capture( self, request: capture::Request, - timeout: Duration, ) -> anyhow::Result { let response = self.serve_capture(unary_in(request)); - unary_out(response, timeout).await + unary_out(response).await } - pub async fn unary_derive( - self, - request: derive::Request, - timeout: Duration, - ) -> anyhow::Result { + pub async fn unary_derive(self, request: derive::Request) -> anyhow::Result { let response = self.serve_derive(unary_in(request)); - unary_out(response, timeout).await + unary_out(response).await } pub async fn unary_materialize( self, request: materialize::Request, - timeout: Duration, ) -> anyhow::Result { let response = self.serve_materialize(unary_in(request)).boxed(); - unary_out(response, timeout).await + unary_out(response).await } } @@ -36,24 +29,14 @@ fn unary_in(request: R) -> BoxStream<'static, anyhow::Result< futures::stream::once(async move { Ok(request) }).boxed() } -async fn unary_out(response_rx: S, timeout: Duration) -> anyhow::Result +async fn unary_out(response_rx: S) -> anyhow::Result where S: futures::Stream>, { - let response = async move { - let mut responses: Vec = response_rx.try_collect().await?; - - if responses.len() != 1 { - anyhow::bail!("unary request didn't return a response"); - } - Ok(responses.pop().unwrap()) - }; + let mut responses: Vec = response_rx.try_collect().await?; - tokio::select! { - response = response => response, - _ = tokio::time::sleep(timeout) => { - Err(tonic::Status::deadline_exceeded(r#"Timeout while waiting for the connector's response. - Please verify any network configuration and retry."#))? - } + if responses.len() != 1 { + anyhow::bail!("unary request didn't return a response"); } + Ok(responses.pop().unwrap()) } diff --git a/crates/validation/src/lib.rs b/crates/validation/src/lib.rs index 14f048e26f..051c3aa036 100644 --- a/crates/validation/src/lib.rs +++ b/crates/validation/src/lib.rs @@ -15,7 +15,7 @@ mod storage_mapping; mod test_step; pub use errors::Error; -pub use noop::NoOpConnectors; +pub use noop::{NoOpConnectors, NoOpWrapper}; /// Connectors is a delegated trait -- provided to validate -- through which /// connector validation RPCs are dispatched. Request and Response must always diff --git a/crates/validation/src/noop.rs b/crates/validation/src/noop.rs index 0c1af79253..f29c223b5d 100644 --- a/crates/validation/src/noop.rs +++ b/crates/validation/src/noop.rs @@ -120,3 +120,48 @@ impl Connectors for NoOpConnectors { }) } } + +/// NoOpWrapper wraps another Connectors implementation to selectively +/// enable validations for specific task types. +pub struct NoOpWrapper { + pub noop_captures: bool, + pub noop_derivations: bool, + pub noop_materializations: bool, + pub inner: C, +} + +impl Connectors for NoOpWrapper { + fn validate_capture<'a>( + &'a self, + request: capture::Request, + data_plane: &'a tables::DataPlane, + ) -> BoxFuture<'a, anyhow::Result> { + if self.noop_captures { + NoOpConnectors.validate_capture(request, data_plane) + } else { + self.inner.validate_capture(request, data_plane) + } + } + fn validate_derivation<'a>( + &'a self, + request: derive::Request, + data_plane: &'a tables::DataPlane, + ) -> BoxFuture<'a, anyhow::Result> { + if self.noop_derivations { + NoOpConnectors.validate_derivation(request, data_plane) + } else { + self.inner.validate_derivation(request, data_plane) + } + } + fn validate_materialization<'a>( + &'a self, + request: materialize::Request, + data_plane: &'a tables::DataPlane, + ) -> BoxFuture<'a, anyhow::Result> { + if self.noop_materializations { + NoOpConnectors.validate_materialization(request, data_plane) + } else { + self.inner.validate_materialization(request, data_plane) + } + } +}