Skip to content

Commit

Permalink
fix: temp disable option check on source options (#13362)
Browse files Browse the repository at this point in the history
tabVersion authored Nov 10, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent eabfeba commit 8f15ace
Showing 3 changed files with 28 additions and 15 deletions.
9 changes: 3 additions & 6 deletions src/connector/src/schema/schema_registry/client.rs
Original file line number Diff line number Diff line change
@@ -25,6 +25,9 @@ use serde::de::DeserializeOwned;

use super::util::*;

pub const SCHEMA_REGISTRY_USERNAME: &str = "schema.registry.username";
pub const SCHEMA_REGISTRY_PASSWORD: &str = "schema.registry.password";

#[derive(Debug, Clone, Default)]
pub struct SchemaRegistryAuth {
username: Option<String>,
@@ -33,9 +36,6 @@ pub struct SchemaRegistryAuth {

impl From<&HashMap<String, String>> for SchemaRegistryAuth {
fn from(props: &HashMap<String, String>) -> Self {
const SCHEMA_REGISTRY_USERNAME: &str = "schema.registry.username";
const SCHEMA_REGISTRY_PASSWORD: &str = "schema.registry.password";

SchemaRegistryAuth {
username: props.get(SCHEMA_REGISTRY_USERNAME).cloned(),
password: props.get(SCHEMA_REGISTRY_PASSWORD).cloned(),
@@ -45,9 +45,6 @@ impl From<&HashMap<String, String>> for SchemaRegistryAuth {

impl From<&BTreeMap<String, String>> for SchemaRegistryAuth {
fn from(props: &BTreeMap<String, String>) -> Self {
const SCHEMA_REGISTRY_USERNAME: &str = "schema.registry.username";
const SCHEMA_REGISTRY_PASSWORD: &str = "schema.registry.password";

SchemaRegistryAuth {
username: props.get(SCHEMA_REGISTRY_USERNAME).cloned(),
password: props.get(SCHEMA_REGISTRY_PASSWORD).cloned(),
32 changes: 24 additions & 8 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
@@ -31,7 +31,9 @@ use risingwave_connector::parser::{
schema_to_columns, AvroParserConfig, DebeziumAvroParserConfig, ProtobufParserConfig,
SpecificParserConfig,
};
use risingwave_connector::schema::schema_registry::name_strategy_from_str;
use risingwave_connector::schema::schema_registry::{
name_strategy_from_str, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME,
};
use risingwave_connector::source::cdc::{
CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CITUS_CDC_CONNECTOR,
MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
@@ -291,6 +293,7 @@ fn get_name_strategy_or_default(name_strategy: Option<AstString>) -> Result<Opti
/// resolve the schema of the source from external schema file, return the relation's columns. see <https://www.risingwave.dev/docs/current/sql-create-source> for more information.
/// return `(columns, source info)`
pub(crate) async fn bind_columns_from_source(
session: &SessionImpl,
source_schema: &ConnectorSchema,
with_properties: &HashMap<String, String>,
create_cdc_source_job: bool,
@@ -567,17 +570,25 @@ pub(crate) async fn bind_columns_from_source(
))));
}
};

{
// fixme: remove this after correctly consuming the two options
options.remove(SCHEMA_REGISTRY_USERNAME);
options.remove(SCHEMA_REGISTRY_PASSWORD);
}

if !options.is_empty() {
return Err(RwError::from(ProtocolError(format!(
"Unknown options for {:?} {:?}: {}",
let err_string = format!(
"Get unknown options for {:?} {:?}: {}",
source_schema.format,
source_schema.row_encode,
options
.iter()
.map(|(k, v)| format!("{}:{}", k, v))
.keys()
.map(|k| k.to_string())
.collect::<Vec<String>>()
.join(","),
))));
);
session.notice_to_user(err_string);
}

Ok(res)
@@ -1135,8 +1146,13 @@ pub async fn handle_create_source(
let create_cdc_source_job =
is_cdc_connector(&with_properties) && session.config().get_cdc_backfill();

let (columns_from_resolve_source, source_info) =
bind_columns_from_source(&source_schema, &with_properties, create_cdc_source_job).await?;
let (columns_from_resolve_source, source_info) = bind_columns_from_source(
&session,
&source_schema,
&with_properties,
create_cdc_source_job,
)
.await?;
let columns_from_sql = bind_sql_columns(&stmt.columns)?;

let mut columns = bind_all_columns(
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
@@ -486,7 +486,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
let sql_pk_names = bind_sql_pk_names(&column_defs, &constraints)?;

let (columns_from_resolve_source, mut source_info) =
bind_columns_from_source(&source_schema, &properties, false).await?;
bind_columns_from_source(context.session_ctx(), &source_schema, &properties, false).await?;
let columns_from_sql = bind_sql_columns(&column_defs)?;

let mut columns = bind_all_columns(

0 comments on commit 8f15ace

Please sign in to comment.