Skip to content

Commit

Permalink
crates/agent: add and use ProxyConnectors
Browse files Browse the repository at this point in the history
ProxyConnectors are used for all Validate RPCs and Discover.

It's not (yet) used for connector spec requests.

Issue #1602
  • Loading branch information
jgraettinger committed Sep 5, 2024
1 parent d7d789f commit 6acee5b
Show file tree
Hide file tree
Showing 14 changed files with 462 additions and 210 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions crates/agent-sql/src/data_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ pub async fn fetch_ops_journal_template(
)
}

pub async fn fetch_data_planes(
pool: &sqlx::PgPool,
pub async fn fetch_data_planes<'a, 'b>(
pool: impl sqlx::PgExecutor<'a>,
mut data_plane_ids: Vec<models::Id>,
default_data_plane_name: &str,
default_data_plane_name: &'b str,
user_id: Uuid,
) -> sqlx::Result<tables::DataPlanes> {
data_plane_ids.sort();
Expand Down
3 changes: 3 additions & 0 deletions crates/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ gazette = { path = "../gazette" }
json = { path = "../json" }
labels = { path = "../labels" }
models = { path = "../models" }
ops = { path = "../ops" }
proto-flow = { path = "../proto-flow" }
proto-gazette = { path = "../proto-gazette" }
proto-grpc = { path = "../proto-grpc", features = ["runtime_client"] }
runtime = { path = "../runtime" }
sources = { path = "../sources" }
tables = { path = "../tables", features = ["persist"] }
Expand Down Expand Up @@ -54,6 +56,7 @@ tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
tonic = { workspace = true }
tower-http = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/agent/src/controlplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl PGControlPlane {
std::time::Duration::from_secs(60),
&data_plane.hmac_keys,
broker::LabelSelector::default(),
"",
"agent",
)
.context("failed to sign claims for data-plane")?;

Expand Down
189 changes: 94 additions & 95 deletions crates/agent/src/discovers.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use super::{
connector_tags::LOCAL_IMAGE_TAG, draft, jobs, logs, CatalogType, HandleResult, Handler, Id,
};
use super::{draft, logs, CatalogType, HandleResult, Handler, Id};
use agent_sql::discovers::Row;
use anyhow::Context;
use models::CaptureEndpoint;
use proto_flow::{capture, flow::capture_spec};
use serde::{Deserialize, Serialize};
use sqlx::types::Uuid;

Expand Down Expand Up @@ -35,24 +34,13 @@ pub enum JobStatus {

/// A DiscoverHandler is a Handler which performs discovery operations.
pub struct DiscoverHandler {
connector_network: String,
bindir: String,
logs_tx: logs::Tx,
allow_local: bool,
}

impl DiscoverHandler {
pub fn new(
connector_network: &str,
bindir: &str,
logs_tx: &logs::Tx,
allow_local: bool,
) -> Self {
pub fn new(logs_tx: &logs::Tx) -> Self {
Self {
connector_network: connector_network.to_string(),
bindir: bindir.to_string(),
logs_tx: logs_tx.clone(),
allow_local,
}
}
}
Expand Down Expand Up @@ -94,21 +82,27 @@ impl DiscoverHandler {
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> anyhow::Result<(Id, JobStatus)> {
tracing::info!(
%row.background,
%row.capture_name,
%row.connector_tag_id,
%row.connector_tag_job_success,
%row.created_at,
%row.data_plane_name,
%row.draft_id,
%row.image_name,
%row.image_tag,
%row.logs_token,
%row.protocol,
%row.updated_at,
%row.user_id,
%row.background,
"processing discover",
);

// Remove draft errors from a previous attempt.
agent_sql::drafts::delete_errors(row.draft_id, txn)
.await
.context("clearing old errors")?;

// Various pre-flight checks.
if !row.connector_tag_job_success {
return Ok((row.id, JobStatus::TagFailed));
Expand All @@ -122,74 +116,72 @@ impl DiscoverHandler {

let image_composed = format!("{}{}", row.image_name, row.image_tag);

if row.image_tag != LOCAL_IMAGE_TAG {
// Pull the image.
let pull = jobs::run(
"pull",
&self.logs_tx,
row.logs_token,
async_process::Command::new("docker")
.arg("pull")
.arg("--quiet")
.arg(&image_composed),
)
.await?;

if !pull.success() {
return Ok((row.id, JobStatus::PullFailed));
}
}

// Remove draft errors from a previous attempt.
agent_sql::drafts::delete_errors(row.draft_id, txn)
.await
.context("clearing old errors")?;

let mut cmd = async_process::Command::new(format!("{}/flowctl-go", &self.bindir));
cmd.arg("api")
.arg("discover")
.arg("--config=/dev/stdin")
.arg("--image")
.arg(&image_composed)
.arg("--network")
.arg(&self.connector_network)
.arg("--output=json")
.arg("--log.level=warn")
.arg("--log.format=color");
if self.allow_local {
cmd.arg("--allow-local");
}
let (discover, discover_output) = jobs::run_with_input_output(
"discover",
&self.logs_tx,
row.logs_token,
row.endpoint_config.0.get().as_bytes(),
&mut cmd,
// Resolve the data-plane to which this discover should be applied.
let mut data_planes: tables::DataPlanes = agent_sql::data_plane::fetch_data_planes(
&mut *txn,
Vec::new(),
&row.data_plane_name,
row.user_id,
)
.await?;

if !discover.success() {
let detail = if discover_output.is_empty() {
format!(
"connector exited with status code {:?} and did not write any output",
discover.code()
)
} else {
String::from_utf8(discover_output).context("discover error output is not UTF-8")?
};
let Some(data_plane) = data_planes.pop().filter(|d| d.is_default) else {
let error = draft::Error {
catalog_name: row.capture_name,
scope: None,
detail,
detail: format!("data-plane {} could not be resolved. It may not exist or you may not be authorized", &row.data_plane_name),
};
draft::insert_errors(row.draft_id, vec![error], txn).await?;

return Ok((row.id, JobStatus::DiscoverFailed));
};

let request = capture::Request {
discover: Some(capture::request::Discover {
connector_type: capture_spec::ConnectorType::Image as i32,
config_json: serde_json::to_string(&models::ConnectorConfig {
image: image_composed,
config: row.endpoint_config.0.clone().into(),
})
.unwrap(),
}),
..Default::default()
}
.with_internal(|internal| {
// TODO(johnny): This can be dynamically passed in.
// Using INFO for now because these are not shown in the UI,
// so if we're looking then there's already a problem.
internal.set_log_level(ops::LogLevel::Info);
});

let task = ops::ShardRef {
name: row.capture_name.clone(),
kind: ops::TaskType::Capture as i32,
..Default::default()
};

let log_handler =
logs::ops_handler(self.logs_tx.clone(), "discover".to_string(), row.logs_token);

let result = crate::ProxyConnectors::new(log_handler)
.unary_capture(&data_plane, task, request)
.await;

let response = match result {
Ok(response) => response,
Err(err) => {
let error = draft::Error {
catalog_name: row.capture_name,
scope: None,
detail: format!("{err:#}"),
};
draft::insert_errors(row.draft_id, vec![error], txn).await?;
return Ok((row.id, JobStatus::DiscoverFailed));
}
};

let result = Self::build_merged_catalog(
&row.capture_name,
&discover_output,
response,
row.draft_id,
&row.endpoint_config.0,
&row.image_name,
Expand Down Expand Up @@ -270,7 +262,7 @@ impl DiscoverHandler {

async fn build_merged_catalog(
capture_name: &str,
discover_output: &[u8],
response: capture::Response,
draft_id: Id,
endpoint_config: &serde_json::value::RawValue,
image_name: &str,
Expand All @@ -281,7 +273,7 @@ impl DiscoverHandler {
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> anyhow::Result<Result<models::Catalog, Vec<draft::Error>>> {
let (endpoint, discovered_bindings) =
specs::parse_response(endpoint_config, image_name, image_tag, discover_output)
specs::parse_response(endpoint_config, image_name, image_tag, response)
.context("converting discovery response into specs")?;

if discovered_bindings
Expand Down Expand Up @@ -430,6 +422,7 @@ fn is_endpoint_changed(a: &CaptureEndpoint, b: &CaptureEndpoint) -> bool {
mod test {

use super::{Id, Uuid};
use proto_flow::capture;
use serde_json::json;
use sqlx::Connection;
use std::str::FromStr;
Expand Down Expand Up @@ -488,20 +481,22 @@ mod test {
.await
.unwrap();

let discover_output = json!({
"bindings": [
{"documentSchema": {"const": "write!"}, "key": ["/foo"], "recommendedName": "foo", "resourceConfig": {"table": "foo"}},
{"documentSchema": {"const": "bar"}, "key": ["/bar"], "recommendedName": "bar", "resourceConfig": {"table": "bar"}},
{"documentSchema": {"const": "quz"}, "key": ["/quz"], "recommendedName": "quz", "resourceConfig": {"table": "quz"}},
],
}).to_string();
let response : capture::Response = serde_json::from_value(json!({
"discovered": {
"bindings": [
{"documentSchema": {"const": "write!"}, "key": ["/foo"], "recommendedName": "foo", "resourceConfig": {"table": "foo"}},
{"documentSchema": {"const": "bar"}, "key": ["/bar"], "recommendedName": "bar", "resourceConfig": {"table": "bar"}},
{"documentSchema": {"const": "quz"}, "key": ["/quz"], "recommendedName": "quz", "resourceConfig": {"table": "quz"}},
],
}
})).unwrap();

let endpoint_config =
serde_json::value::to_raw_value(&json!({"some": "endpoint-config"})).unwrap();

let result = super::DiscoverHandler::build_merged_catalog(
"aliceCo/dir/source-thingy",
discover_output.as_bytes(),
response,
Id::from_hex("dddddddddddddddd").unwrap(),
&endpoint_config,
"ghcr.io/estuary/source-thingy",
Expand Down Expand Up @@ -551,19 +546,21 @@ mod test {
.await
.unwrap();

let discover_output = json!({
"bindings": [
{"documentSchema": {"const": "write!"}, "key": ["/foo"], "recommendedName": "foo", "resourceConfig": {"table": "foo"}},
{"documentSchema": {"const": "bar"}, "key": ["/bar"], "recommendedName": "bar", "resourceConfig": {"table": "bar"}},
{"documentSchema": {"const": "quz"}, "key": ["/quz"], "recommendedName": "quz", "resourceConfig": {"table": "quz"}},
],
}).to_string();
let response : capture::Response = serde_json::from_value(json!({
"discovered": {
"bindings": [
{"documentSchema": {"const": "write!"}, "key": ["/foo"], "recommendedName": "foo", "resourceConfig": {"table": "foo"}},
{"documentSchema": {"const": "bar"}, "key": ["/bar"], "recommendedName": "bar", "resourceConfig": {"table": "bar"}},
{"documentSchema": {"const": "quz"}, "key": ["/quz"], "recommendedName": "quz", "resourceConfig": {"table": "quz"}},
],
}
})).unwrap();

let endpoint_config = serde_json::value::to_raw_value(&json!({"a": "newA"})).unwrap();

let result = super::DiscoverHandler::build_merged_catalog(
"aliceCo/dir/source-thingy",
discover_output.as_bytes(),
response,
Id::from_hex("eeeeeeeeeeeeeeee").unwrap(),
&endpoint_config,
"ghcr.io/estuary/source-thingy",
Expand Down Expand Up @@ -610,15 +607,17 @@ mod test {
.await
.unwrap();

let discover_output = json!({
"bindings": [
{"documentSchema": {"const": 42}, "key": ["/key"], "recommendedName": "bad", "resourceConfig": {"table": "bad"}},
],
}).to_string();
let response : capture::Response = serde_json::from_value(json!({
"discovered": {
"bindings": [
{"documentSchema": {"const": 42}, "key": ["/key"], "recommendedName": "bad", "resourceConfig": {"table": "bad"}},
],
}
})).unwrap();

let result = super::DiscoverHandler::build_merged_catalog(
"aliceCo/source-thingy",
discover_output.as_bytes(),
response,
Id::from_hex("dddddddddddddddd").unwrap(),
&serde_json::value::to_raw_value(&json!({"some": "endpoint-config"})).unwrap(),
"ghcr.io/estuary/source-thingy",
Expand Down
Loading

0 comments on commit 6acee5b

Please sign in to comment.