Skip to content

Commit

Permalink
agent: remove bundled write schemas from collections
Browse files Browse the repository at this point in the history
Updates the collection controller to remove a bundled write schema from
the `readSchema` of collections that use schema inference. We used to
bundle the write schema as part of the `$defs`, just like we do for the
inferred schema. But this was confusing for users so we stopped doing
this with the previous commit. This causes the collection controller to
remove a previously bundled write schema, which will happen on the next
scheduled run for each collection.

The code added by this commit is intended to be removed once all the
bundled write schemas have been removed.
  • Loading branch information
psFried committed Jul 31, 2024
1 parent e1b74ec commit 8e6eef0
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 6 deletions.
144 changes: 142 additions & 2 deletions crates/agent/src/controllers/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ impl CollectionStatus {
};

if pending_pub.has_pending() {
// If the publication fails, then it's quite unlikely to succeed if
// we were to retry it. So consider this a terminal error.
let _result = pending_pub
.finish(state, &mut self.publications, control_plane)
.await?;
Expand Down Expand Up @@ -159,6 +157,34 @@ impl InferredSchemaStatus {
.await
.context("fetching inferred schema")?;

// If the read schema includes a bundled write schema, remove it.
// TODO: remove this code once all production collections have been updated.
let must_remove_write_schema = read_schema_bundles_write_schema(collection_def);
if must_remove_write_schema {
let draft = pending_pub.update_pending_draft("removing bundled write schema");
let draft_row = draft.collections.get_or_insert_with(&collection_name, || {
tables::DraftCollection {
collection: collection_name.clone(),
scope: tables::synthetic_scope(
models::CatalogType::Collection,
&collection_name,
),
expect_pub_id: Some(state.last_pub_id),
model: Some(collection_def.clone()),
}
});
let (removed, new_schema) = collection_def
.read_schema
.as_ref()
.unwrap()
.remove_bundled_write_schema();
if removed {
draft_row.model.as_mut().unwrap().read_schema = Some(new_schema);
tracing::info!("removing bundled write schema");
} else {
tracing::warn!("bundled write schema was not removed");
}
}
if let Some(inferred_schema) = maybe_inferred_schema {
let tables::InferredSchema {
collection_name,
Expand Down Expand Up @@ -207,6 +233,20 @@ impl InferredSchemaStatus {
}
}

fn read_schema_bundles_write_schema(model: &models::CollectionDef) -> bool {
let Some(read_schema) = &model.read_schema else {
return false;
};
// This is a little hacky, but works to identify schemas that bundle the write schema
// without needing to actually parse the entire schema. The three expected occurrences
// of the url are: the key in `$defs`, the `$id` of the bundled schema, and the `$ref`.
read_schema
.get()
.matches(models::Schema::REF_WRITE_SCHEMA_URL)
.count()
>= 3
}

fn update_inferred_schema(
collection: &mut tables::DraftCollection,
inferred_schema: &models::Schema,
Expand All @@ -232,3 +272,103 @@ pub fn uses_inferred_schema(collection: &models::CollectionDef) -> bool {
.map(models::Schema::references_inferred_schema)
.unwrap_or(false)
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_read_schema_bundles_write_schema() {
let collection_json = r##"{
"writeSchema": {
"properties": {
"id": {
"type": "string"
}
},
"type": "object",
"x-infer-schema": true
},
"readSchema": {
"$defs": {
"flow://inferred-schema": {
"$id": "flow://inferred-schema",
"$schema": "https://json-schema.org/draft/2019-09/schema",
"additionalProperties": false,
"properties": {
"id": { "type": "string" },
"a": { "type": "string" },
"hello": { "type": "string" }
},
"required": [
"aa",
"hello",
"id"
],
"type": "object"
},
"flow://write-schema": {
"$id": "flow://write-schema",
"properties": {
"id": { "type": "string" }
},
"required": [
"id"
],
"type": "object",
"x-infer-schema": true
}
},
"allOf": [
{
"$ref": "flow://write-schema"
},
{
"$ref": "flow://inferred-schema"
}
]
},
"key": [
"/id"
]
}"##;
let mut collection: models::CollectionDef = serde_json::from_str(collection_json).unwrap();
assert!(read_schema_bundles_write_schema(&collection));

collection.read_schema = Some(models::Schema::new(
models::RawValue::from_str(
r##"{
"$defs": {
"flow://inferred-schema": {
"$id": "flow://inferred-schema",
"$schema": "https://json-schema.org/draft/2019-09/schema",
"additionalProperties": false,
"properties": {
"id": { "type": "string" },
"a": { "type": "string" },
"hello": { "type": "string" }
},
"required": [
"aa",
"hello",
"id"
],
"type": "object"
}
},
"allOf": [
{
"$ref": "flow://write-schema"
},
{
"$ref": "flow://inferred-schema"
}
]
}"##,
)
.unwrap(),
));

assert!(!read_schema_bundles_write_schema(&collection));
}
}
36 changes: 36 additions & 0 deletions crates/agent/src/integration_tests/schema_evolution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ async fn test_schema_evolution() {
"properties": {
"pre-existing": { "type": "integer" }
}
},
// Include write schema to start, and expect that this is removed
"flow://write-schema": {
"$id": "flow://write-schema",
"type": "object",
"properties": {
"id": { "type": "string" }
}
}
},
"allOf": [
Expand Down Expand Up @@ -150,6 +158,34 @@ async fn test_schema_evolution() {
assert!(totes_spec
.read_schema_json
.contains("inferredSchemaIsNotAvailable"));
// Assert that the bundled write schema has been removed. We expect one reference to
// the write schema url, down from 3 originally.
// TODO: we can remove these assertions (and the bundled write schema in the setup) once
// all the collections have been updated.
let totes_model = totes_state
.live_spec
.as_ref()
.unwrap()
.as_collection()
.unwrap();
assert_eq!(
1,
totes_model
.read_schema
.as_ref()
.unwrap()
.get()
.matches(models::Schema::REF_WRITE_SCHEMA_URL)
.count()
);
// Assert that the schema in the built spec _does_ contain the bundled write schema
assert_eq!(
3,
totes_spec
.read_schema_json
.matches(models::Schema::REF_WRITE_SCHEMA_URL)
.count()
);
assert!(totes_state.next_run.is_some());

harness.control_plane().reset_activations();
Expand Down
94 changes: 90 additions & 4 deletions crates/models/src/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ impl std::ops::DerefMut for Schema {
}
}

type Skim = BTreeMap<String, RawValue>;
const KEYWORD_DEF: &str = "$defs";
const KEYWORD_ID: &str = "$id";

impl Schema {
pub fn new(v: RawValue) -> Self {
Self(v)
Expand Down Expand Up @@ -96,11 +100,7 @@ impl Schema {
write_bundle: Option<&Self>,
inferred_bundle: Option<&Self>,
) -> Self {
const KEYWORD_DEF: &str = "$defs";
const KEYWORD_ID: &str = "$id";

use serde_json::{value::to_raw_value, Value};
type Skim = BTreeMap<String, RawValue>;

let mut read_schema: Skim = serde_json::from_str(read_bundle.get()).unwrap();
let mut read_defs: Skim = read_schema
Expand Down Expand Up @@ -180,6 +180,32 @@ impl Schema {
);
Self(to_raw_value(&read_schema).unwrap().into())
}

/// Removes the bundled write schema from the `$defs` of `self`, returning
/// a new schema with the value removed, and a boolean indicating whether the write
/// schema def was actually present. We used to bundle the write schema as part of the
/// read schema, just like the inferred schema. We're no longer doing that because it's
/// confusing to users, so this function removes the bundled write schema. This function
/// should only be needed for long enough to update all the inferred schemas, and can then
/// be safely removed.
pub fn remove_bundled_write_schema(&self) -> (bool, Self) {
use serde_json::value::to_raw_value;

let mut read_schema: Skim = serde_json::from_str(self.0.get()).unwrap();
let mut read_defs: Skim = read_schema
.get(KEYWORD_DEF)
.map(|d| serde_json::from_str(d.get()).unwrap())
.unwrap_or_default();
let had_write_schema = read_defs.remove(Schema::REF_WRITE_SCHEMA_URL).is_some();
read_schema.insert(
KEYWORD_DEF.to_string(),
to_raw_value(&read_defs).unwrap().into(),
);
(
had_write_schema,
Self(to_raw_value(&read_schema).unwrap().into()),
)
}
}

// These patterns let us cheaply detect if a collection schema references the
Expand Down Expand Up @@ -362,4 +388,64 @@ mod test {
}
"###);
}

#[test]
fn test_removing_bundled_write_schema() {
let read_schema = Schema::new(RawValue::from_value(&json!({
"$defs": {
"existing://def": {"type": "array"},
},
"maxProperties": 10,
"allOf": [
{"$ref": "flow://inferred-schema"},
{"$ref": "flow://write-schema"},
]
})));
let write_schema = Schema::new(RawValue::from_value(&json!({
"$id": "old://value",
"required": ["a_key"],
})));
let inferred_schema = Schema::new(RawValue::from_value(&json!({
"$id": "old://value",
"minProperties": 5,
})));

let bundle =
Schema::extend_read_bundle(&read_schema, Some(&write_schema), Some(&inferred_schema));
assert_eq!(
3,
bundle.get().matches(Schema::REF_WRITE_SCHEMA_URL).count(),
"schema should contain 'flow://write-schema' 3 times, for $ref, $defs key, and $id"
);
let (was_removed, new_bundle) = bundle.remove_bundled_write_schema();
assert!(was_removed);
insta::assert_json_snapshot!(new_bundle.to_value(), @r###"
{
"$defs": {
"existing://def": {
"type": "array"
},
"flow://inferred-schema": {
"$id": "flow://inferred-schema",
"minProperties": 5
}
},
"allOf": [
{
"$ref": "flow://inferred-schema"
},
{
"$ref": "flow://write-schema"
}
],
"maxProperties": 10
}
"###);

let (was_removed, _) = new_bundle.remove_bundled_write_schema();
assert!(
!was_removed,
"expected write schema to have already been removed"
);
}
}

0 comments on commit 8e6eef0

Please sign in to comment.