diff --git a/crates/agent/src/controllers/collection.rs b/crates/agent/src/controllers/collection.rs index 2ae8a7c9b9..5588792ebf 100644 --- a/crates/agent/src/controllers/collection.rs +++ b/crates/agent/src/controllers/collection.rs @@ -66,8 +66,6 @@ impl CollectionStatus { }; if pending_pub.has_pending() { - // If the publication fails, then it's quite unlikely to succeed if - // we were to retry it. So consider this a terminal error. let _result = pending_pub .finish(state, &mut self.publications, control_plane) .await?; @@ -159,6 +157,34 @@ impl InferredSchemaStatus { .await .context("fetching inferred schema")?; + // If the read schema includes a bundled write schema, remove it. + // TODO: remove this code once all production collections have been updated. + let must_remove_write_schema = read_schema_bundles_write_schema(collection_def); + if must_remove_write_schema { + let draft = pending_pub.update_pending_draft("removing bundled write schema"); + let draft_row = draft.collections.get_or_insert_with(&collection_name, || { + tables::DraftCollection { + collection: collection_name.clone(), + scope: tables::synthetic_scope( + models::CatalogType::Collection, + &collection_name, + ), + expect_pub_id: Some(state.last_pub_id), + model: Some(collection_def.clone()), + } + }); + let (removed, new_schema) = collection_def + .read_schema + .as_ref() + .unwrap() + .remove_bundled_write_schema(); + if removed { + draft_row.model.as_mut().unwrap().read_schema = Some(new_schema); + tracing::info!("removing bundled write schema"); + } else { + tracing::warn!("bundled write schema was not removed"); + } + } if let Some(inferred_schema) = maybe_inferred_schema { let tables::InferredSchema { collection_name, @@ -207,6 +233,20 @@ impl InferredSchemaStatus { } } +fn read_schema_bundles_write_schema(model: &models::CollectionDef) -> bool { + let Some(read_schema) = &model.read_schema else { + return false; + }; + // This is a little hacky, but works to identify schemas that bundle the write schema + // without needing to actually parse the entire schema. The three expected occurrences + // of the url are: the key in `$defs`, the `$id` of the bundled schema, and the `$ref`. + read_schema + .get() + .matches(models::Schema::REF_WRITE_SCHEMA_URL) + .count() + >= 3 +} + fn update_inferred_schema( collection: &mut tables::DraftCollection, inferred_schema: &models::Schema, @@ -232,3 +272,103 @@ pub fn uses_inferred_schema(collection: &models::CollectionDef) -> bool { .map(models::Schema::references_inferred_schema) .unwrap_or(false) } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_read_schema_bundles_write_schema() { + let collection_json = r##"{ + "writeSchema": { + "properties": { + "id": { + "type": "string" + } + }, + "type": "object", + "x-infer-schema": true + }, + "readSchema": { + "$defs": { + "flow://inferred-schema": { + "$id": "flow://inferred-schema", + "$schema": "https://json-schema.org/draft/2019-09/schema", + "additionalProperties": false, + "properties": { + "id": { "type": "string" }, + "a": { "type": "string" }, + "hello": { "type": "string" } + }, + "required": [ + "aa", + "hello", + "id" + ], + "type": "object" + }, + "flow://write-schema": { + "$id": "flow://write-schema", + "properties": { + "id": { "type": "string" } + }, + "required": [ + "id" + ], + "type": "object", + "x-infer-schema": true + } + }, + "allOf": [ + { + "$ref": "flow://write-schema" + }, + { + "$ref": "flow://inferred-schema" + } + ] + }, + "key": [ + "/id" + ] + }"##; + let mut collection: models::CollectionDef = serde_json::from_str(collection_json).unwrap(); + assert!(read_schema_bundles_write_schema(&collection)); + + collection.read_schema = Some(models::Schema::new( + models::RawValue::from_str( + r##"{ + "$defs": { + "flow://inferred-schema": { + "$id": "flow://inferred-schema", + "$schema": "https://json-schema.org/draft/2019-09/schema", + "additionalProperties": false, + "properties": { + "id": { "type": "string" }, + "a": { "type": "string" }, + "hello": { "type": "string" } + }, + "required": [ + "aa", + "hello", + "id" + ], + "type": "object" + } + }, + "allOf": [ + { + "$ref": "flow://write-schema" + }, + { + "$ref": "flow://inferred-schema" + } + ] + }"##, + ) + .unwrap(), + )); + + assert!(!read_schema_bundles_write_schema(&collection)); + } +} diff --git a/crates/agent/src/integration_tests/schema_evolution.rs b/crates/agent/src/integration_tests/schema_evolution.rs index 4313413211..836adf0991 100644 --- a/crates/agent/src/integration_tests/schema_evolution.rs +++ b/crates/agent/src/integration_tests/schema_evolution.rs @@ -44,6 +44,14 @@ async fn test_schema_evolution() { "properties": { "pre-existing": { "type": "integer" } } + }, + // Include write schema to start, and expect that this is removed + "flow://write-schema": { + "$id": "flow://write-schema", + "type": "object", + "properties": { + "id": { "type": "string" } + } } }, "allOf": [ @@ -150,6 +158,34 @@ async fn test_schema_evolution() { assert!(totes_spec .read_schema_json .contains("inferredSchemaIsNotAvailable")); + // Assert that the bundled write schema has been removed. We expect one reference to + // the write schema url, down from 3 originally. + // TODO: we can remove these assertions (and the bundled write schema in the setup) once + // all the collections have been updated. + let totes_model = totes_state + .live_spec + .as_ref() + .unwrap() + .as_collection() + .unwrap(); + assert_eq!( + 1, + totes_model + .read_schema + .as_ref() + .unwrap() + .get() + .matches(models::Schema::REF_WRITE_SCHEMA_URL) + .count() + ); + // Assert that the schema in the built spec _does_ contain the bundled write schema + assert_eq!( + 3, + totes_spec + .read_schema_json + .matches(models::Schema::REF_WRITE_SCHEMA_URL) + .count() + ); assert!(totes_state.next_run.is_some()); harness.control_plane().reset_activations(); diff --git a/crates/models/src/schemas.rs b/crates/models/src/schemas.rs index 3c1d677852..e6f7e4d977 100644 --- a/crates/models/src/schemas.rs +++ b/crates/models/src/schemas.rs @@ -33,6 +33,10 @@ impl std::ops::DerefMut for Schema { } } +type Skim = BTreeMap; +const KEYWORD_DEF: &str = "$defs"; +const KEYWORD_ID: &str = "$id"; + impl Schema { pub fn new(v: RawValue) -> Self { Self(v) @@ -96,11 +100,7 @@ impl Schema { write_bundle: Option<&Self>, inferred_bundle: Option<&Self>, ) -> Self { - const KEYWORD_DEF: &str = "$defs"; - const KEYWORD_ID: &str = "$id"; - use serde_json::{value::to_raw_value, Value}; - type Skim = BTreeMap; let mut read_schema: Skim = serde_json::from_str(read_bundle.get()).unwrap(); let mut read_defs: Skim = read_schema @@ -180,6 +180,32 @@ impl Schema { ); Self(to_raw_value(&read_schema).unwrap().into()) } + + /// Removes the bundled write schema from the `$defs` of `self`, returning + /// a new schema with the value removed, and a boolean indicating whether the write + /// schema def was actually present. We used to bundle the write schema as part of the + /// read schema, just like the inferred schema. We're no longer doing that because it's + /// confusing to users, so this function removes the bundled write schema. This function + /// should only be needed for long enough to update all the inferred schemas, and can then + /// be safely removed. + pub fn remove_bundled_write_schema(&self) -> (bool, Self) { + use serde_json::value::to_raw_value; + + let mut read_schema: Skim = serde_json::from_str(self.0.get()).unwrap(); + let mut read_defs: Skim = read_schema + .get(KEYWORD_DEF) + .map(|d| serde_json::from_str(d.get()).unwrap()) + .unwrap_or_default(); + let had_write_schema = read_defs.remove(Schema::REF_WRITE_SCHEMA_URL).is_some(); + read_schema.insert( + KEYWORD_DEF.to_string(), + to_raw_value(&read_defs).unwrap().into(), + ); + ( + had_write_schema, + Self(to_raw_value(&read_schema).unwrap().into()), + ) + } } // These patterns let us cheaply detect if a collection schema references the @@ -362,4 +388,64 @@ mod test { } "###); } + + #[test] + fn test_removing_bundled_write_schema() { + let read_schema = Schema::new(RawValue::from_value(&json!({ + "$defs": { + "existing://def": {"type": "array"}, + }, + "maxProperties": 10, + "allOf": [ + {"$ref": "flow://inferred-schema"}, + {"$ref": "flow://write-schema"}, + ] + }))); + let write_schema = Schema::new(RawValue::from_value(&json!({ + "$id": "old://value", + "required": ["a_key"], + }))); + let inferred_schema = Schema::new(RawValue::from_value(&json!({ + "$id": "old://value", + "minProperties": 5, + }))); + + let bundle = + Schema::extend_read_bundle(&read_schema, Some(&write_schema), Some(&inferred_schema)); + assert_eq!( + 3, + bundle.get().matches(Schema::REF_WRITE_SCHEMA_URL).count(), + "schema should contain 'flow://write-schema' 3 times, for $ref, $defs key, and $id" + ); + let (was_removed, new_bundle) = bundle.remove_bundled_write_schema(); + assert!(was_removed); + insta::assert_json_snapshot!(new_bundle.to_value(), @r###" + { + "$defs": { + "existing://def": { + "type": "array" + }, + "flow://inferred-schema": { + "$id": "flow://inferred-schema", + "minProperties": 5 + } + }, + "allOf": [ + { + "$ref": "flow://inferred-schema" + }, + { + "$ref": "flow://write-schema" + } + ], + "maxProperties": 10 + } + "###); + + let (was_removed, _) = new_bundle.remove_bundled_write_schema(); + assert!( + !was_removed, + "expected write schema to have already been removed" + ); + } }