Skip to content

Commit

Permalink
refactor various no-ops into NoOpWrapper and remove runtime timeouts
Browse files Browse the repository at this point in the history
NoOpWrapper encapsulates a common pattern of dynamically disabling
validations for various connector types.

Remove timeouts from unary RPCs directly invoked against the runtime.
These timeouts are moving into the `agent`, which is the only place we
actually care to time out these requests. When running locally with
`flowctl`, for example, we never want to time out.
  • Loading branch information
jgraettinger committed Sep 5, 2024
1 parent 5dbe743 commit d7d789f
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 128 deletions.
7 changes: 5 additions & 2 deletions crates/agent/src/connector_tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,9 @@ async fn spec_materialization(
..Default::default()
};

// TODO(johnny): select a data-plane and use ProxyConnectors.
let spec = runtime
.unary_materialize(req, build::CONNECTOR_TIMEOUT)
.unary_materialize(req)
.await?
.spec
.ok_or_else(|| anyhow::anyhow!("connector didn't send expected Spec response"))?;
Expand Down Expand Up @@ -262,11 +263,13 @@ async fn spec_capture(
..Default::default()
};

// TODO(johnny): select a data-plane and use ProxyConnectors.
let spec = runtime
.unary_capture(req, build::CONNECTOR_TIMEOUT)
.unary_capture(req)
.await?
.spec
.ok_or_else(|| anyhow::anyhow!("connector didn't send expected Spec response"))?;

let capture::response::Spec {
// protocol here is the numeric version of the capture protocol
protocol: _,
Expand Down
87 changes: 12 additions & 75 deletions crates/build/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ pub async fn validate(
None,
format!("build/{build_id:#}"),
);
let connectors = Connectors {
let connectors = validation::NoOpWrapper {
noop_captures,
noop_derivations,
noop_materializations,
runtime,
inner: RuntimeConnectors { runtime },
};

let built = validation::validate(
Expand Down Expand Up @@ -348,97 +348,35 @@ impl sources::Fetcher for Fetcher {
}
}

/// Connectors is a general-purpose implementation of validation::Connectors
/// that dispatches to its contained runtime::Runtime.
pub struct Connectors<L: runtime::LogHandler> {
noop_captures: bool,
noop_derivations: bool,
noop_materializations: bool,
/// RuntimeConnectors is a general-purpose implementation of
/// validation::Connectors that dispatches to its contained runtime::Runtime.
pub struct RuntimeConnectors<L: runtime::LogHandler> {
runtime: runtime::Runtime<L>,
}

impl<L: runtime::LogHandler> Connectors<L> {
pub fn new(runtime: runtime::Runtime<L>) -> Self {
Self {
noop_captures: false,
noop_derivations: false,
noop_materializations: false,
runtime,
}
}

pub fn with_noop_validations(self) -> Self {
Self {
noop_captures: true,
noop_derivations: true,
noop_materializations: true,
runtime: self.runtime,
}
}
}

impl<L: runtime::LogHandler> validation::Connectors for Connectors<L> {
impl<L: runtime::LogHandler> validation::Connectors for RuntimeConnectors<L> {
fn validate_capture<'a>(
&'a self,
request: capture::Request,
data_plane: &'a tables::DataPlane,
_data_plane: &'a tables::DataPlane,
) -> BoxFuture<'a, anyhow::Result<capture::Response>> {
async move {
if self.noop_captures {
validation::NoOpConnectors
.validate_capture(request, data_plane)
.await
} else {
Ok(self
.runtime
.clone()
.unary_capture(request, CONNECTOR_TIMEOUT)
.await?)
}
}
.boxed()
self.runtime.clone().unary_capture(request).boxed()
}

fn validate_derivation<'a>(
&'a self,
request: derive::Request,
data_plane: &'a tables::DataPlane,
_data_plane: &'a tables::DataPlane,
) -> BoxFuture<'a, anyhow::Result<derive::Response>> {
async move {
if self.noop_derivations {
validation::NoOpConnectors
.validate_derivation(request, data_plane)
.await
} else {
Ok(self
.runtime
.clone()
.unary_derive(request, CONNECTOR_TIMEOUT)
.await?)
}
}
.boxed()
self.runtime.clone().unary_derive(request).boxed()
}

fn validate_materialization<'a>(
&'a self,
request: materialize::Request,
data_plane: &'a tables::DataPlane,
_data_plane: &'a tables::DataPlane,
) -> BoxFuture<'a, anyhow::Result<materialize::Response>> {
async move {
if self.noop_materializations {
validation::NoOpConnectors
.validate_materialization(request, data_plane)
.await
} else {
Ok(self
.runtime
.clone()
.unary_materialize(request, CONNECTOR_TIMEOUT)
.await?)
}
}
.boxed()
self.runtime.clone().unary_materialize(request).boxed()
}
}

Expand Down Expand Up @@ -482,5 +420,4 @@ impl tables::CatalogResolver for NoOpCatalogResolver {
}

pub const FETCH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
pub const CONNECTOR_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(300); // Five minutes.
pub const STDIN_URL: &str = "stdin://root/flow.yaml";
33 changes: 12 additions & 21 deletions crates/flowctl/src/generate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,10 @@ async fn generate_missing_capture_configs(
None,
format!("spec/{capture}"),
)
.unary_capture(
capture::Request {
spec: Some(spec),
..Default::default()
},
build::CONNECTOR_TIMEOUT,
)
.unary_capture(capture::Request {
spec: Some(spec),
..Default::default()
})
.await?
.spec
.context("connector didn't send expected Spec response")?;
Expand Down Expand Up @@ -246,13 +243,10 @@ async fn generate_missing_collection_configs(
None,
format!("spec/{collection}"),
)
.unary_derive(
derive::Request {
spec: Some(spec),
..Default::default()
},
build::CONNECTOR_TIMEOUT,
)
.unary_derive(derive::Request {
spec: Some(spec),
..Default::default()
})
.await?
.spec
.context("connector didn't send expected Spec response")?;
Expand Down Expand Up @@ -323,13 +317,10 @@ async fn generate_missing_materialization_configs(
None,
format!("spec/{materialization}"),
)
.unary_materialize(
materialize::Request {
spec: Some(spec),
..Default::default()
},
build::CONNECTOR_TIMEOUT,
)
.unary_materialize(materialize::Request {
spec: Some(spec),
..Default::default()
})
.await?
.spec
.context("connector didn't send expected Spec response")?;
Expand Down
2 changes: 1 addition & 1 deletion crates/flowctl/src/raw/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub async fn do_discover(
None,
format!("discover/{}", capture.capture),
)
.unary_capture(discover, build::CONNECTOR_TIMEOUT)
.unary_capture(discover)
.await?
.discovered
.context("connector didn't send expected Discovered response")?;
Expand Down
2 changes: 1 addition & 1 deletion crates/flowctl/src/raw/oauth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub async fn do_oauth(
None,
format!("spec/{}", capture.capture),
)
.unary_capture(spec_req, build::CONNECTOR_TIMEOUT)
.unary_capture(spec_req)
.await?
.spec
.context("connector didn't send expected Spec response")?;
Expand Down
2 changes: 1 addition & 1 deletion crates/flowctl/src/raw/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub async fn do_spec(
None,
format!("spec/{}", capture.capture),
)
.unary_capture(request, build::CONNECTOR_TIMEOUT)
.unary_capture(request)
.await?
.spec
.context("connector didn't send expected Spec response")?;
Expand Down
35 changes: 9 additions & 26 deletions crates/runtime/src/unary.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,42 @@
use super::{LogHandler, Runtime};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use proto_flow::{capture, derive, materialize};
use std::time::Duration;

impl<L: LogHandler> Runtime<L> {
pub async fn unary_capture(
self,
request: capture::Request,
timeout: Duration,
) -> anyhow::Result<capture::Response> {
let response = self.serve_capture(unary_in(request));
unary_out(response, timeout).await
unary_out(response).await
}

pub async fn unary_derive(
self,
request: derive::Request,
timeout: Duration,
) -> anyhow::Result<derive::Response> {
pub async fn unary_derive(self, request: derive::Request) -> anyhow::Result<derive::Response> {
let response = self.serve_derive(unary_in(request));
unary_out(response, timeout).await
unary_out(response).await
}

pub async fn unary_materialize(
self,
request: materialize::Request,
timeout: Duration,
) -> anyhow::Result<materialize::Response> {
let response = self.serve_materialize(unary_in(request)).boxed();
unary_out(response, timeout).await
unary_out(response).await
}
}

fn unary_in<R: Send + 'static>(request: R) -> BoxStream<'static, anyhow::Result<R>> {
futures::stream::once(async move { Ok(request) }).boxed()
}

async fn unary_out<S, R>(response_rx: S, timeout: Duration) -> anyhow::Result<R>
async fn unary_out<S, R>(response_rx: S) -> anyhow::Result<R>
where
S: futures::Stream<Item = anyhow::Result<R>>,
{
let response = async move {
let mut responses: Vec<R> = response_rx.try_collect().await?;

if responses.len() != 1 {
anyhow::bail!("unary request didn't return a response");
}
Ok(responses.pop().unwrap())
};
let mut responses: Vec<R> = response_rx.try_collect().await?;

tokio::select! {
response = response => response,
_ = tokio::time::sleep(timeout) => {
Err(tonic::Status::deadline_exceeded(r#"Timeout while waiting for the connector's response.
Please verify any network configuration and retry."#))?
}
if responses.len() != 1 {
anyhow::bail!("unary request didn't return a response");
}
Ok(responses.pop().unwrap())
}
2 changes: 1 addition & 1 deletion crates/validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod storage_mapping;
mod test_step;

pub use errors::Error;
pub use noop::NoOpConnectors;
pub use noop::{NoOpConnectors, NoOpWrapper};

/// Connectors is a delegated trait -- provided to validate -- through which
/// connector validation RPCs are dispatched. Request and Response must always
Expand Down
45 changes: 45 additions & 0 deletions crates/validation/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,48 @@ impl Connectors for NoOpConnectors {
})
}
}

/// NoOpWrapper wraps another Connectors implementation to selectively
/// enable validations for specific task types.
pub struct NoOpWrapper<C> {
pub noop_captures: bool,
pub noop_derivations: bool,
pub noop_materializations: bool,
pub inner: C,
}

impl<C: Connectors> Connectors for NoOpWrapper<C> {
fn validate_capture<'a>(
&'a self,
request: capture::Request,
data_plane: &'a tables::DataPlane,
) -> BoxFuture<'a, anyhow::Result<capture::Response>> {
if self.noop_captures {
NoOpConnectors.validate_capture(request, data_plane)
} else {
self.inner.validate_capture(request, data_plane)
}
}
fn validate_derivation<'a>(
&'a self,
request: derive::Request,
data_plane: &'a tables::DataPlane,
) -> BoxFuture<'a, anyhow::Result<derive::Response>> {
if self.noop_derivations {
NoOpConnectors.validate_derivation(request, data_plane)
} else {
self.inner.validate_derivation(request, data_plane)
}
}
fn validate_materialization<'a>(
&'a self,
request: materialize::Request,
data_plane: &'a tables::DataPlane,
) -> BoxFuture<'a, anyhow::Result<materialize::Response>> {
if self.noop_materializations {
NoOpConnectors.validate_materialization(request, data_plane)
} else {
self.inner.validate_materialization(request, data_plane)
}
}
}

0 comments on commit d7d789f

Please sign in to comment.