Skip to content

Commit

Permalink
models: add autoDiscover properties to captures
Browse files Browse the repository at this point in the history
Introduces the autoDiscover object as a new optional property of captures.
This allows models to represent the desire to keep captured collections up to
date with respect to discovered collection specs and/or inferred schemas.
These properties are intended to primarily be used by the control plane, and
they require no validation or special handling in any other part of the build
process.
  • Loading branch information
psFried committed Jul 5, 2023
1 parent 1a3ce27 commit bcec066
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 2 deletions.
7 changes: 5 additions & 2 deletions crates/agent/src/discovers/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,20 @@ pub fn merge_capture(
) -> (models::CaptureDef, Vec<Binding>) {
let capture_prefix = capture_name.rsplit_once("/").unwrap().0;

let (fetched_bindings, interval, shards) = match fetched_capture {
let (fetched_bindings, interval, shards, auto_discover) = match fetched_capture {
Some(models::CaptureDef {
auto_discover,
endpoint: _,
bindings: fetched_bindings,
interval,
shards,
}) => (fetched_bindings, interval, shards),
}) => (fetched_bindings, interval, shards, auto_discover),

None => (
Vec::new(),
models::CaptureDef::default_interval(),
models::ShardTemplate::default(),
None,
),
};

Expand Down Expand Up @@ -95,6 +97,7 @@ pub fn merge_capture(

(
models::CaptureDef {
auto_discover,
endpoint,
bindings: capture_bindings,
interval,
Expand Down
2 changes: 2 additions & 0 deletions crates/flowctl/src/raw/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ pub async fn do_discover(
captures: BTreeMap::from([(
Capture::new(capture_name),
CaptureDef {
auto_discover: None,
endpoint: CaptureEndpoint::Connector(ConnectorConfig {
image: image.to_string(),
config,
Expand Down Expand Up @@ -181,6 +182,7 @@ pub async fn do_discover(
captures: BTreeMap::from([(
Capture::new(capture_name),
CaptureDef {
auto_discover: None,
endpoint: CaptureEndpoint::Connector(ConnectorConfig {
image: image.to_string(),
config: serde_json::from_value(config)?,
Expand Down
21 changes: 21 additions & 0 deletions crates/models/src/captures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ use std::time::Duration;
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
pub struct CaptureDef {
/// # Continuously keep the collection spec and schema up-to-date
#[serde(default, skip_serializing_if = "Option::is_none")]
pub auto_discover: Option<AutoDiscover>,
/// # Endpoint to capture from.
pub endpoint: CaptureEndpoint,
/// # Bound collections to capture from the endpoint.
Expand All @@ -37,6 +40,20 @@ pub struct CaptureDef {
pub shards: ShardTemplate,
}

/// Settings to determine how Flow should stay abreast of ongoing changes to collections and schemas.
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
pub struct AutoDiscover {
/// Automatically add new bindings discovered from the source.
#[serde(default)]
add_new_bindings: bool,
/// Whether to automatically evolve collections and/or materialization
/// bindings to handle changes to collections that would otherwise be
/// incompatible with the existing catalog.
#[serde(default)]
evolve_incompatible_collections: bool,
}

/// An endpoint from which Flow will capture.
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
Expand Down Expand Up @@ -68,6 +85,10 @@ impl CaptureDef {

pub fn example() -> Self {
Self {
auto_discover: Some(AutoDiscover {
add_new_bindings: true,
evolve_incompatible_collections: true,
}),
endpoint: CaptureEndpoint::Connector(ConnectorConfig::example()),
bindings: vec![CaptureBinding::example()],
interval: Self::default_interval(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ expression: "&schema"
"examples": [
{
"acmeCo/capture": {
"autoDiscover": {
"addNewBindings": true,
"evolveIncompatibleCollections": true
},
"bindings": [
{
"resource": {
Expand Down Expand Up @@ -163,6 +167,23 @@ expression: "&schema"
},
"additionalProperties": false,
"definitions": {
"AutoDiscover": {
"description": "Settings to determine how Flow should stay abreast of ongoing changes to collections and schemas.",
"type": "object",
"properties": {
"addNewBindings": {
"description": "Automatically add new bindings discovered from the source.",
"default": false,
"type": "boolean"
},
"evolveIncompatibleCollections": {
"description": "Whether to automatically evolve collections and/or materialization bindings to handle changes to collections that would otherwise be incompatible with the existing catalog.",
"default": false,
"type": "boolean"
}
},
"additionalProperties": false
},
"Capture": {
"description": "Capture names are paths of Unicode letters, numbers, '-', '_', or '.'. Each path component is separated by a slash '/', and a name may not begin or end in a '/'.",
"examples": [
Expand Down Expand Up @@ -213,6 +234,10 @@ expression: "&schema"
"endpoint"
],
"properties": {
"autoDiscover": {
"title": "Continuously keep the collection spec and schema up-to-date",
"$ref": "#/definitions/AutoDiscover"
},
"bindings": {
"title": "Bound collections to capture from the endpoint.",
"type": "array",
Expand Down
25 changes: 25 additions & 0 deletions flow.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
"examples": [
{
"acmeCo/capture": {
"autoDiscover": {
"addNewBindings": true,
"evolveIncompatibleCollections": true
},
"bindings": [
{
"resource": {
Expand Down Expand Up @@ -159,6 +163,23 @@
},
"additionalProperties": false,
"definitions": {
"AutoDiscover": {
"description": "Settings to determine how Flow should stay abreast of ongoing changes to collections and schemas.",
"type": "object",
"properties": {
"addNewBindings": {
"description": "Automatically add new bindings discovered from the source.",
"default": false,
"type": "boolean"
},
"evolveIncompatibleCollections": {
"description": "Whether to automatically evolve collections and/or materialization bindings to handle changes to collections that would otherwise be incompatible with the existing catalog.",
"default": false,
"type": "boolean"
}
},
"additionalProperties": false
},
"Capture": {
"description": "Capture names are paths of Unicode letters, numbers, '-', '_', or '.'. Each path component is separated by a slash '/', and a name may not begin or end in a '/'.",
"examples": [
Expand Down Expand Up @@ -209,6 +230,10 @@
"endpoint"
],
"properties": {
"autoDiscover": {
"title": "Continuously keep the collection spec and schema up-to-date",
"$ref": "#/definitions/AutoDiscover"
},
"bindings": {
"title": "Bound collections to capture from the endpoint.",
"type": "array",
Expand Down

0 comments on commit bcec066

Please sign in to comment.