Skip to content

Commit

Permalink
dekaf: Fix unary_materialize() to properly parse both outer and inn…
Browse files Browse the repository at this point in the history
…er `DekafConfig`s
  • Loading branch information
jshearer committed Nov 25, 2024
1 parent 5c051fc commit aa25030
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions crates/dekaf/src/connector.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{bail, Context};
use proto_flow::materialize;
use proto_flow::{flow::materialization_spec, materialize};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
Expand Down Expand Up @@ -38,6 +38,7 @@ pub struct DekafConfig {
/// tombstones with null values, and "Header" emits then as a kafka document
/// with empty string and `_is_deleted` header set to `1`. Setting this value
/// will also cause all other non-deletions to have an `_is_deleted` header of `0`.
#[serde(default)]
pub deletions: DeletionMode,
}

Expand Down Expand Up @@ -70,8 +71,22 @@ pub async fn unary_materialize(
) -> anyhow::Result<materialize::Response> {
use proto_flow::materialize::response::validated;
if let Some(mut validate) = request.validate {
serde_json::de::from_str::<DekafConfig>(&validate.config_json)
.context("validating endpoint config")?;
match materialization_spec::ConnectorType::try_from(validate.connector_type)? {
materialization_spec::ConnectorType::Dekaf => {}
other => bail!("invalid connector type: {}", other.as_str_name()),
};

let parsed_outer_config =
serde_json::from_str::<models::DekafConfig>(&validate.config_json)
.context("validating dekaf config")?;

let _parsed_inner_config = serde_json::from_value::<DekafConfig>(
parsed_outer_config.config.to_value(),
)
.context(format!(
"validating dekaf endpoint config for variant {}",
parsed_outer_config.variant
))?;

// Largely copied from crates/validation/src/noop.rs
let validated_bindings = std::mem::take(&mut validate.bindings)
Expand Down

0 comments on commit aa25030

Please sign in to comment.