From 8e6eef07b1a86e122e637d4c925c7a2b43af3a7b Mon Sep 17 00:00:00 2001 From: Phil Date: Tue, 30 Jul 2024 17:05:22 -0400 Subject: [PATCH] agent: remove bundled write schemas from collections Updates the collection controller to remove a bundled write schema from the `readSchema` of collections that use schema inference. We used to bundle the write schema as part of the `$defs`, just like we do for the inferred schema. But this was confusing for users so we stopped doing this with the previous commit. This causes the collection controller to remove a previously bundled write schema, which will happen on the next scheduled run for each collection. The code added by this commit is intended to be removed once all the bundled write schemas have been removed. --- crates/agent/src/controllers/collection.rs | 144 +++++++++++++++++- .../src/integration_tests/schema_evolution.rs | 36 +++++ crates/models/src/schemas.rs | 94 +++++++++++- 3 files changed, 268 insertions(+), 6 deletions(-) diff --git a/crates/agent/src/controllers/collection.rs b/crates/agent/src/controllers/collection.rs index 2ae8a7c9b9..5588792ebf 100644 --- a/crates/agent/src/controllers/collection.rs +++ b/crates/agent/src/controllers/collection.rs @@ -66,8 +66,6 @@ impl CollectionStatus { }; if pending_pub.has_pending() { - // If the publication fails, then it's quite unlikely to succeed if - // we were to retry it. So consider this a terminal error. let _result = pending_pub .finish(state, &mut self.publications, control_plane) .await?; @@ -159,6 +157,34 @@ impl InferredSchemaStatus { .await .context("fetching inferred schema")?; + // If the read schema includes a bundled write schema, remove it. + // TODO: remove this code once all production collections have been updated. + let must_remove_write_schema = read_schema_bundles_write_schema(collection_def); + if must_remove_write_schema { + let draft = pending_pub.update_pending_draft("removing bundled write schema"); + let draft_row = draft.collections.get_or_insert_with(&collection_name, || { + tables::DraftCollection { + collection: collection_name.clone(), + scope: tables::synthetic_scope( + models::CatalogType::Collection, + &collection_name, + ), + expect_pub_id: Some(state.last_pub_id), + model: Some(collection_def.clone()), + } + }); + let (removed, new_schema) = collection_def + .read_schema + .as_ref() + .unwrap() + .remove_bundled_write_schema(); + if removed { + draft_row.model.as_mut().unwrap().read_schema = Some(new_schema); + tracing::info!("removing bundled write schema"); + } else { + tracing::warn!("bundled write schema was not removed"); + } + } if let Some(inferred_schema) = maybe_inferred_schema { let tables::InferredSchema { collection_name, @@ -207,6 +233,20 @@ impl InferredSchemaStatus { } } +fn read_schema_bundles_write_schema(model: &models::CollectionDef) -> bool { + let Some(read_schema) = &model.read_schema else { + return false; + }; + // This is a little hacky, but works to identify schemas that bundle the write schema + // without needing to actually parse the entire schema. The three expected occurrences + // of the url are: the key in `$defs`, the `$id` of the bundled schema, and the `$ref`. + read_schema + .get() + .matches(models::Schema::REF_WRITE_SCHEMA_URL) + .count() + >= 3 +} + fn update_inferred_schema( collection: &mut tables::DraftCollection, inferred_schema: &models::Schema, @@ -232,3 +272,103 @@ pub fn uses_inferred_schema(collection: &models::CollectionDef) -> bool { .map(models::Schema::references_inferred_schema) .unwrap_or(false) } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_read_schema_bundles_write_schema() { + let collection_json = r##"{ + "writeSchema": { + "properties": { + "id": { + "type": "string" + } + }, + "type": "object", + "x-infer-schema": true + }, + "readSchema": { + "$defs": { + "flow://inferred-schema": { + "$id": "flow://inferred-schema", + "$schema": "https://json-schema.org/draft/2019-09/schema", + "additionalProperties": false, + "properties": { + "id": { "type": "string" }, + "a": { "type": "string" }, + "hello": { "type": "string" } + }, + "required": [ + "aa", + "hello", + "id" + ], + "type": "object" + }, + "flow://write-schema": { + "$id": "flow://write-schema", + "properties": { + "id": { "type": "string" } + }, + "required": [ + "id" + ], + "type": "object", + "x-infer-schema": true + } + }, + "allOf": [ + { + "$ref": "flow://write-schema" + }, + { + "$ref": "flow://inferred-schema" + } + ] + }, + "key": [ + "/id" + ] + }"##; + let mut collection: models::CollectionDef = serde_json::from_str(collection_json).unwrap(); + assert!(read_schema_bundles_write_schema(&collection)); + + collection.read_schema = Some(models::Schema::new( + models::RawValue::from_str( + r##"{ + "$defs": { + "flow://inferred-schema": { + "$id": "flow://inferred-schema", + "$schema": "https://json-schema.org/draft/2019-09/schema", + "additionalProperties": false, + "properties": { + "id": { "type": "string" }, + "a": { "type": "string" }, + "hello": { "type": "string" } + }, + "required": [ + "aa", + "hello", + "id" + ], + "type": "object" + } + }, + "allOf": [ + { + "$ref": "flow://write-schema" + }, + { + "$ref": "flow://inferred-schema" + } + ] + }"##, + ) + .unwrap(), + )); + + assert!(!read_schema_bundles_write_schema(&collection)); + } +} diff --git a/crates/agent/src/integration_tests/schema_evolution.rs b/crates/agent/src/integration_tests/schema_evolution.rs index 4313413211..836adf0991 100644 --- a/crates/agent/src/integration_tests/schema_evolution.rs +++ b/crates/agent/src/integration_tests/schema_evolution.rs @@ -44,6 +44,14 @@ async fn test_schema_evolution() { "properties": { "pre-existing": { "type": "integer" } } + }, + // Include write schema to start, and expect that this is removed + "flow://write-schema": { + "$id": "flow://write-schema", + "type": "object", + "properties": { + "id": { "type": "string" } + } } }, "allOf": [ @@ -150,6 +158,34 @@ async fn test_schema_evolution() { assert!(totes_spec .read_schema_json .contains("inferredSchemaIsNotAvailable")); + // Assert that the bundled write schema has been removed. We expect one reference to + // the write schema url, down from 3 originally. + // TODO: we can remove these assertions (and the bundled write schema in the setup) once + // all the collections have been updated. + let totes_model = totes_state + .live_spec + .as_ref() + .unwrap() + .as_collection() + .unwrap(); + assert_eq!( + 1, + totes_model + .read_schema + .as_ref() + .unwrap() + .get() + .matches(models::Schema::REF_WRITE_SCHEMA_URL) + .count() + ); + // Assert that the schema in the built spec _does_ contain the bundled write schema + assert_eq!( + 3, + totes_spec + .read_schema_json + .matches(models::Schema::REF_WRITE_SCHEMA_URL) + .count() + ); assert!(totes_state.next_run.is_some()); harness.control_plane().reset_activations(); diff --git a/crates/models/src/schemas.rs b/crates/models/src/schemas.rs index 3c1d677852..e6f7e4d977 100644 --- a/crates/models/src/schemas.rs +++ b/crates/models/src/schemas.rs @@ -33,6 +33,10 @@ impl std::ops::DerefMut for Schema { } } +type Skim = BTreeMap; +const KEYWORD_DEF: &str = "$defs"; +const KEYWORD_ID: &str = "$id"; + impl Schema { pub fn new(v: RawValue) -> Self { Self(v) @@ -96,11 +100,7 @@ impl Schema { write_bundle: Option<&Self>, inferred_bundle: Option<&Self>, ) -> Self { - const KEYWORD_DEF: &str = "$defs"; - const KEYWORD_ID: &str = "$id"; - use serde_json::{value::to_raw_value, Value}; - type Skim = BTreeMap; let mut read_schema: Skim = serde_json::from_str(read_bundle.get()).unwrap(); let mut read_defs: Skim = read_schema @@ -180,6 +180,32 @@ impl Schema { ); Self(to_raw_value(&read_schema).unwrap().into()) } + + /// Removes the bundled write schema from the `$defs` of `self`, returning + /// a new schema with the value removed, and a boolean indicating whether the write + /// schema def was actually present. We used to bundle the write schema as part of the + /// read schema, just like the inferred schema. We're no longer doing that because it's + /// confusing to users, so this function removes the bundled write schema. This function + /// should only be needed for long enough to update all the inferred schemas, and can then + /// be safely removed. + pub fn remove_bundled_write_schema(&self) -> (bool, Self) { + use serde_json::value::to_raw_value; + + let mut read_schema: Skim = serde_json::from_str(self.0.get()).unwrap(); + let mut read_defs: Skim = read_schema + .get(KEYWORD_DEF) + .map(|d| serde_json::from_str(d.get()).unwrap()) + .unwrap_or_default(); + let had_write_schema = read_defs.remove(Schema::REF_WRITE_SCHEMA_URL).is_some(); + read_schema.insert( + KEYWORD_DEF.to_string(), + to_raw_value(&read_defs).unwrap().into(), + ); + ( + had_write_schema, + Self(to_raw_value(&read_schema).unwrap().into()), + ) + } } // These patterns let us cheaply detect if a collection schema references the @@ -362,4 +388,64 @@ mod test { } "###); } + + #[test] + fn test_removing_bundled_write_schema() { + let read_schema = Schema::new(RawValue::from_value(&json!({ + "$defs": { + "existing://def": {"type": "array"}, + }, + "maxProperties": 10, + "allOf": [ + {"$ref": "flow://inferred-schema"}, + {"$ref": "flow://write-schema"}, + ] + }))); + let write_schema = Schema::new(RawValue::from_value(&json!({ + "$id": "old://value", + "required": ["a_key"], + }))); + let inferred_schema = Schema::new(RawValue::from_value(&json!({ + "$id": "old://value", + "minProperties": 5, + }))); + + let bundle = + Schema::extend_read_bundle(&read_schema, Some(&write_schema), Some(&inferred_schema)); + assert_eq!( + 3, + bundle.get().matches(Schema::REF_WRITE_SCHEMA_URL).count(), + "schema should contain 'flow://write-schema' 3 times, for $ref, $defs key, and $id" + ); + let (was_removed, new_bundle) = bundle.remove_bundled_write_schema(); + assert!(was_removed); + insta::assert_json_snapshot!(new_bundle.to_value(), @r###" + { + "$defs": { + "existing://def": { + "type": "array" + }, + "flow://inferred-schema": { + "$id": "flow://inferred-schema", + "minProperties": 5 + } + }, + "allOf": [ + { + "$ref": "flow://inferred-schema" + }, + { + "$ref": "flow://write-schema" + } + ], + "maxProperties": 10 + } + "###); + + let (was_removed, _) = new_bundle.remove_bundled_write_schema(); + assert!( + !was_removed, + "expected write schema to have already been removed" + ); + } }