diff --git a/Cargo.toml b/Cargo.toml index 98f37686db325..b4c7d57a6385c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -160,7 +160,11 @@ arrow-udf-flight = "0.4" clap = { version = "4", features = ["cargo", "derive", "env"] } # Use a forked version which removes the dependencies on dynamo db to reduce # compile time and binary size. -deltalake = { version = "0.20.1", features = ["s3", "gcs", "datafusion"] } +deltalake = { version = "0.20.1", features = [ + "s3", + "gcs", + "datafusion", +] } itertools = "0.13.0" jsonbb = "0.1.4" lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "2682b85" } diff --git a/e2e_test/source_inline/connection/ddl.slt b/e2e_test/source_inline/connection/ddl.slt new file mode 100644 index 0000000000000..77b96978961c5 --- /dev/null +++ b/e2e_test/source_inline/connection/ddl.slt @@ -0,0 +1,107 @@ +control substitution on + +# for non-shared source +statement ok +set streaming_use_shared_source to false; + +statement ok +create secret sec_broker with (backend = 'meta') as '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}'; + +statement error unknown field `foo` +create connection conn with (type = 'kafka', properties.bootstrap.server = secret sec_broker, foo = 'bar'); + +statement error Connection type "kinesis" is not supported +create connection conn with (type = 'kinesis'); + +statement ok +create connection conn with (type = 'kafka', properties.bootstrap.server = secret sec_broker, properties.security.protocol = 'plaintext'); + +query TTT +select "name", "type_" from rw_catalog.rw_connections; +---- +conn CONNECTION_TYPE_KAFKA + +# unstable test serialization due to iter on hashmap +# the "connectiond_params" looks like: +# {"properties.bootstrap.server":"SECRET sec_broker AS TEXT","properties.security.protocol":"plaintext"} + +statement error Permission denied: PermissionDenied: secret used by 1 other objects. +drop secret sec_broker; + +statement error Duplicated key in both WITH clause and Connection catalog: properties.security.protocol +create table t1 (a int, b varchar) with ( + connector = 'kafka', + connection = conn, + topic = 'connection_ddl_1', + properties.security.protocol = 'plaintext') +format plain encode json; + +statement error connector kinesis and connection type Kafka are not compatible +create table t1 (a int, b varchar) with ( + connector = 'kinesis', + connection = conn, + stream = 'connection_ddl_1', + region = 'us-east-1') +format plain encode json; + +system ok +rpk topic create connection_ddl_1 -p 1 + +statement ok +create table t1 (a int, b varchar) with ( + connector = 'kafka', + connection = conn, + topic = 'connection_ddl_1') +format plain encode json; + +statement error Permission denied: PermissionDenied: connection used by 1 other objects. +drop connection conn; + +# Connection & Source & Sink will have independent rely on the secret +statement error Permission denied: PermissionDenied: secret used by 2 other objects. +drop secret sec_broker; + +statement ok +create table data_table (a int, b varchar); + +statement ok +insert into data_table values (1, 'a'), (2, 'b'), (3, 'c'); + +statement ok +flush; + +statement ok +create sink sink_kafka from data_table with ( + connector = 'kafka', + connection = conn, + topic = 'connection_ddl_1' +) format plain encode json ( + force_append_only='true' +); + +sleep 3s + +query IT rowsort +select a, b from t1; +---- +1 a +2 b +3 c + +statement ok +drop sink sink_kafka + +statement ok +drop table data_table; + +statement ok +drop table t1; + +statement ok +drop connection conn; + +statement ok +drop secret sec_broker; + +statement ok +set streaming_use_shared_source to true; diff --git a/proto/catalog.proto b/proto/catalog.proto index 699804d8fe188..2f4cc2968a232 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -92,6 +92,9 @@ message StreamSourceInfo { // Handle the source relies on any sceret. The key is the propertity name and the value is the secret id and type. // For format and encode options. map format_encode_secret_refs = 16; + + // ref connection for schema registry + optional uint32 connection_id = 17; } message WebhookSourceInfo { @@ -128,6 +131,8 @@ message Source { uint32 associated_table_id = 12; } string definition = 13; + + // ref connection for connector optional uint32 connection_id = 14; optional uint64 initialized_at_epoch = 15; @@ -161,6 +166,9 @@ message SinkFormatDesc { optional plan_common.EncodeType key_encode = 4; // Secret used for format encode options. map secret_refs = 5; + + // ref connection for schema registry + optional uint32 connection_id = 6; } // the catalog of the sink. There are two kind of schema here. The full schema is all columns @@ -183,6 +191,8 @@ message Sink { uint32 owner = 11; map properties = 12; string definition = 13; + + // ref connection for connector optional uint32 connection_id = 14; optional uint64 initialized_at_epoch = 15; optional uint64 created_at_epoch = 16; @@ -231,6 +241,19 @@ message Subscription { SubscriptionState subscription_state = 19; } +message ConnectionParams { + enum ConnectionType { + CONNECTION_TYPE_UNSPECIFIED = 0; + CONNECTION_TYPE_KAFKA = 1; + CONNECTION_TYPE_ICEBERG = 2; + CONNECTION_TYPE_SCHEMA_REGISTRY = 3; + } + + ConnectionType connection_type = 1; + map properties = 2; + map secret_refs = 3; +} + message Connection { message PrivateLinkService { enum PrivateLinkProvider { @@ -251,6 +274,7 @@ message Connection { string name = 4; oneof info { PrivateLinkService private_link_service = 5 [deprecated = true]; + ConnectionParams connection_params = 7; } uint32 owner = 6; } diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 8daa21a3085ad..3b94b4d9f2bd9 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -442,7 +442,8 @@ message CreateConnectionRequest { uint32 database_id = 2; uint32 schema_id = 3; oneof payload { - PrivateLink private_link = 4; + PrivateLink private_link = 4 [deprecated = true]; + catalog.ConnectionParams connection_params = 6; } uint32 owner_id = 5; } diff --git a/src/bench/sink_bench/main.rs b/src/bench/sink_bench/main.rs index 850c41c31460f..14f106a61b9e8 100644 --- a/src/bench/sink_bench/main.rs +++ b/src/bench/sink_bench/main.rs @@ -487,6 +487,7 @@ fn mock_from_legacy_type( options: Default::default(), secret_refs: Default::default(), key_encode: None, + connection_id: None, })) } else { SinkFormatDesc::from_legacy_type(connector, r#type) diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index 3eaffa93d02a4..da99819fc7537 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -163,7 +163,7 @@ impl AwsAuthProps { #[serde_as] #[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)] -pub struct KafkaConnection { +pub struct KafkaConnectionProps { #[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")] pub brokers: String, @@ -244,6 +244,7 @@ pub struct KafkaConnection { #[serde_as] #[derive(Debug, Clone, Deserialize, WithOptions)] pub struct KafkaCommon { + // connection related props are moved to `KafkaConnection` #[serde(rename = "topic", alias = "kafka.topic")] pub topic: String, @@ -256,7 +257,7 @@ pub struct KafkaCommon { } #[serde_as] -#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq)] +#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)] pub struct KafkaPrivateLinkCommon { /// This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users. #[serde(rename = "broker.rewrite.endpoints")] @@ -321,7 +322,32 @@ impl RdKafkaPropertiesCommon { } } -impl KafkaConnection { +impl KafkaConnectionProps { + #[cfg(test)] + pub fn test_default() -> Self { + Self { + brokers: "localhost:9092".to_string(), + security_protocol: None, + ssl_ca_location: None, + ssl_certificate_location: None, + ssl_key_location: None, + ssl_ca_pem: None, + ssl_certificate_pem: None, + ssl_key_pem: None, + ssl_key_password: None, + ssl_endpoint_identification_algorithm: None, + sasl_mechanism: None, + sasl_username: None, + sasl_password: None, + sasl_kerberos_service_name: None, + sasl_kerberos_keytab: None, + sasl_kerberos_principal: None, + sasl_kerberos_kinit_cmd: None, + sasl_kerberos_min_time_before_relogin: None, + sasl_oathbearer_config: None, + } + } + pub(crate) fn set_security_properties(&self, config: &mut ClientConfig) { // AWS_MSK_IAM if self.is_aws_msk_iam() { diff --git a/src/connector/src/connector_common/connection.rs b/src/connector/src/connector_common/connection.rs new file mode 100644 index 0000000000000..fa1f420544677 --- /dev/null +++ b/src/connector/src/connector_common/connection.rs @@ -0,0 +1,128 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use rdkafka::consumer::{BaseConsumer, Consumer}; +use rdkafka::ClientConfig; +use risingwave_common::secret::LocalSecretManager; +use risingwave_pb::catalog::PbConnection; +use serde_derive::Deserialize; +use serde_with::serde_as; +use tonic::async_trait; +use with_options::WithOptions; + +use crate::connector_common::{AwsAuthProps, KafkaConnectionProps, KafkaPrivateLinkCommon}; +use crate::error::ConnectorResult; +use crate::source::kafka::{KafkaContextCommon, RwConsumerContext}; +use crate::{dispatch_connection_impl, ConnectionImpl}; + +#[async_trait] +pub trait Connection { + async fn test_connection(&self) -> ConnectorResult<()>; +} + +#[serde_as] +#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq)] +#[serde(deny_unknown_fields)] +pub struct KafkaConnection { + #[serde(flatten)] + pub inner: KafkaConnectionProps, + #[serde(flatten)] + pub kafka_private_link_common: KafkaPrivateLinkCommon, + #[serde(flatten)] + pub aws_auth_props: AwsAuthProps, +} + +pub async fn validate_connection(connection: &PbConnection) -> ConnectorResult<()> { + if let Some(ref info) = connection.info { + match info { + risingwave_pb::catalog::connection::Info::ConnectionParams(cp) => { + let options = cp.properties.clone().into_iter().collect(); + let secret_refs = cp.secret_refs.clone().into_iter().collect(); + let props_secret_resolved = + LocalSecretManager::global().fill_secrets(options, secret_refs)?; + let connection_impl = + ConnectionImpl::from_proto(cp.connection_type(), props_secret_resolved)?; + dispatch_connection_impl!(connection_impl, inner, inner.test_connection().await?) + } + risingwave_pb::catalog::connection::Info::PrivateLinkService(_) => unreachable!(), + } + } + Ok(()) +} + +#[async_trait] +impl Connection for KafkaConnection { + async fn test_connection(&self) -> ConnectorResult<()> { + let client = self.build_client().await?; + // describe cluster here + client.fetch_metadata(None, Duration::from_secs(10)).await?; + Ok(()) + } +} + +impl KafkaConnection { + async fn build_client(&self) -> ConnectorResult> { + let mut config = ClientConfig::new(); + let bootstrap_servers = &self.inner.brokers; + let broker_rewrite_map = self.kafka_private_link_common.broker_rewrite_map.clone(); + config.set("bootstrap.servers", bootstrap_servers); + self.inner.set_security_properties(&mut config); + + // dup with Kafka Enumerator + let ctx_common = KafkaContextCommon::new( + broker_rewrite_map, + None, + None, + self.aws_auth_props.clone(), + self.inner.is_aws_msk_iam(), + ) + .await?; + let client_ctx = RwConsumerContext::new(ctx_common); + let client: BaseConsumer = + config.create_with_context(client_ctx).await?; + if self.inner.is_aws_msk_iam() { + #[cfg(not(madsim))] + client.poll(Duration::from_secs(10)); // note: this is a blocking call + #[cfg(madsim)] + client.poll(Duration::from_secs(10)).await; + } + Ok(client) + } +} + +#[serde_as] +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)] +#[serde(deny_unknown_fields)] +pub struct IcebergConnection {} + +#[async_trait] +impl Connection for IcebergConnection { + async fn test_connection(&self) -> ConnectorResult<()> { + todo!() + } +} + +#[serde_as] +#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)] +#[serde(deny_unknown_fields)] +pub struct SchemaRegistryConnection {} + +#[async_trait] +impl Connection for SchemaRegistryConnection { + async fn test_connection(&self) -> ConnectorResult<()> { + todo!() + } +} diff --git a/src/connector/src/connector_common/mod.rs b/src/connector/src/connector_common/mod.rs index a4a2c5a02e3f5..11f5946bf98e1 100644 --- a/src/connector/src/connector_common/mod.rs +++ b/src/connector/src/connector_common/mod.rs @@ -19,10 +19,14 @@ pub use mqtt_common::{MqttCommon, QualityOfService as MqttQualityOfService}; mod common; pub use common::{ - AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, KafkaConnection, KafkaPrivateLinkCommon, + AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, KafkaConnectionProps, KafkaPrivateLinkCommon, KinesisCommon, MongodbCommon, NatsCommon, PulsarCommon, PulsarOauthCommon, RdKafkaPropertiesCommon, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY, }; +mod connection; +pub use connection::{ + validate_connection, Connection, IcebergConnection, KafkaConnection, SchemaRegistryConnection, +}; mod iceberg; #[cfg(not(madsim))] diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index f66b5116c110b..c6e81865c9c83 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -49,6 +49,7 @@ pub mod parser; pub mod schema; pub mod sink; pub mod source; +pub use source::ConnectionImpl; pub mod connector_common; diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index 1dc304c651a85..e5e4140f3a2f2 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -50,6 +50,20 @@ macro_rules! for_all_classified_sources { }; } +#[macro_export] +macro_rules! for_all_connections { + ($macro:path $(, $extra_args:tt)*) => { + $macro! { + { + { Kafka, $crate::connector_common::KafkaConnection, risingwave_pb::catalog::connection_params::PbConnectionType }, + { Iceberg, $crate::connector_common::IcebergConnection, risingwave_pb::catalog::connection_params::PbConnectionType }, + { SchemaRegistry, $crate::connector_common::SchemaRegistryConnection, risingwave_pb::catalog::connection_params::PbConnectionType } + } + $(,$extra_args)* + } + }; +} + #[macro_export] macro_rules! for_all_sources_inner { ( @@ -161,6 +175,88 @@ macro_rules! dispatch_split_impl { }}; } +#[macro_export] +macro_rules! dispatch_connection_impl { + ($impl:expr, $inner_name:ident, $body:expr) => { + $crate::dispatch_connection_enum! { $impl, $inner_name, $body } + }; +} + +#[macro_export] +macro_rules! dispatch_connection_enum { + ($impl:expr, $inner_name:ident, $body:expr) => {{ + $crate::for_all_connections! { + $crate::dispatch_connection_impl_inner, + $impl, + $inner_name, + $body + } + }}; +} + +#[macro_export] +macro_rules! dispatch_connection_impl_inner { + ( + { $({$conn_variant_name:ident, $connection:ty, $pb_variant_type:ty }),* }, + $impl:expr, + $inner_name:ident, + $body:expr + ) => {{ + match $impl { + $( + ConnectionImpl::$conn_variant_name($inner_name) => { + $body + } + ),* + } + }}; +} + +#[macro_export] +macro_rules! impl_connection { + ({$({ $variant_name:ident, $connection:ty, $pb_connection_path:path }),*}) => { + #[derive(Debug, Clone, EnumAsInner, PartialEq)] + pub enum ConnectionImpl { + $( + $variant_name(Box<$connection>), + )* + } + + $( + impl TryFrom for $connection { + type Error = $crate::error::ConnectorError; + + fn try_from(connection: ConnectionImpl) -> std::result::Result { + match connection { + ConnectionImpl::$variant_name(inner) => Ok(Box::into_inner(inner)), + other => risingwave_common::bail!("expect {} but get {:?}", stringify!($connection), other), + } + } + } + + impl From<$connection> for ConnectionImpl { + fn from(connection: $connection) -> ConnectionImpl { + ConnectionImpl::$variant_name(Box::new(connection)) + } + } + + )* + + impl ConnectionImpl { + pub fn from_proto(pb_connection_type: risingwave_pb::catalog::connection_params::PbConnectionType, value_secret_filled: std::collections::BTreeMap) -> $crate::error::ConnectorResult { + match pb_connection_type { + $( + <$pb_connection_path>::$variant_name => { + Ok(serde_json::from_value(json!(value_secret_filled)).map(ConnectionImpl::$variant_name).map_err($crate::error::ConnectorError::from)?) + }, + )* + risingwave_pb::catalog::connection_params::PbConnectionType::Unspecified => unreachable!(), + } + } + } + } +} + #[macro_export] macro_rules! impl_split { ({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => { diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index 5c9937196712e..57c9465853b19 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -124,6 +124,7 @@ pub struct SinkFormatDesc { pub options: BTreeMap, pub secret_refs: BTreeMap, pub key_encode: Option, + pub connection_id: Option, } /// TODO: consolidate with [`crate::source::SourceFormat`] and [`crate::parser::ProtocolProperties`]. @@ -188,6 +189,7 @@ impl SinkFormatDesc { options: Default::default(), secret_refs: Default::default(), key_encode: None, + connection_id: None, })) } @@ -223,6 +225,7 @@ impl SinkFormatDesc { options, key_encode, secret_refs: self.secret_refs.clone(), + connection_id: self.connection_id, } } @@ -235,6 +238,7 @@ impl SinkFormatDesc { options: Default::default(), secret_refs: Default::default(), key_encode: None, + connection_id: None, } } } @@ -299,6 +303,7 @@ impl TryFrom for SinkFormatDesc { options: value.options, key_encode, secret_refs: value.secret_refs, + connection_id: value.connection_id, }) } } diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 9fc8da7ef7a48..9716938d7a43f 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -35,7 +35,8 @@ use with_options::WithOptions; use super::catalog::{SinkFormat, SinkFormatDesc}; use super::{Sink, SinkError, SinkParam}; use crate::connector_common::{ - AwsAuthProps, KafkaCommon, KafkaConnection, KafkaPrivateLinkCommon, RdKafkaPropertiesCommon, + AwsAuthProps, KafkaCommon, KafkaConnectionProps, KafkaPrivateLinkCommon, + RdKafkaPropertiesCommon, }; use crate::sink::formatter::SinkFormatterImpl; use crate::sink::log_store::DeliveryFutureManagerAddFuture; @@ -215,7 +216,7 @@ pub struct KafkaConfig { pub common: KafkaCommon, #[serde(flatten)] - pub connection: KafkaConnection, + pub connection: KafkaConnectionProps, #[serde( rename = "properties.retry.max", diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index 763d7e9bba49a..473b3ef7f70dc 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -412,6 +412,7 @@ mod test { options: BTreeMap::default(), secret_refs: BTreeMap::default(), key_encode: None, + connection_id: None, }; let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc) @@ -490,6 +491,7 @@ mod test { options: btree_map, secret_refs: Default::default(), key_encode: None, + connection_id: None, }; let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc) diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index fc804ff4acdb6..06cd97580f6cd 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -32,6 +32,7 @@ use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo}; use risingwave_pb::plan_common::ExternalTableDesc; use risingwave_pb::source::ConnectorSplit; use serde::de::DeserializeOwned; +use serde_json::json; use tokio::sync::mpsc; use super::cdc::DebeziumCdcMeta; @@ -50,8 +51,9 @@ use crate::source::monitor::EnumeratorMetrics; use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc}; use crate::with_options::WithOptions; use crate::{ - dispatch_source_prop, dispatch_split_impl, for_all_sources, impl_connector_properties, - impl_split, match_source_name_str, WithOptionsSecResolved, + dispatch_source_prop, dispatch_split_impl, for_all_connections, for_all_sources, + impl_connection, impl_connector_properties, impl_split, match_source_name_str, + WithOptionsSecResolved, }; const SPLIT_TYPE_FIELD: &str = "split_type"; @@ -478,6 +480,7 @@ impl ConnectorProperties { } for_all_sources!(impl_split); +for_all_connections!(impl_connection); impl From<&SplitImpl> for ConnectorSplit { fn from(split: &SplitImpl) -> Self { diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index 541c3757c27cd..f77cfac1b9bbe 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -31,13 +31,14 @@ use crate::error::{ConnectorError, ConnectorResult}; use crate::source::base::SplitEnumerator; use crate::source::kafka::split::KafkaSplit; use crate::source::kafka::{ - KafkaConnection, KafkaContextCommon, KafkaProperties, RwConsumerContext, KAFKA_ISOLATION_LEVEL, + KafkaConnectionProps, KafkaContextCommon, KafkaProperties, RwConsumerContext, + KAFKA_ISOLATION_LEVEL, }; use crate::source::SourceEnumeratorContextRef; type KafkaClientType = BaseConsumer; -pub static SHARED_KAFKA_CLIENT: LazyLock>> = +pub static SHARED_KAFKA_CLIENT: LazyLock>> = LazyLock::new(|| moka::future::Cache::builder().build()); #[derive(Debug, Copy, Clone, Eq, PartialEq)] diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index 030c190eb4942..e515aebf8d492 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; -use crate::connector_common::{AwsAuthProps, KafkaConnection, KafkaPrivateLinkCommon}; +use crate::connector_common::{AwsAuthProps, KafkaConnectionProps, KafkaPrivateLinkCommon}; mod client_context; pub mod enumerator; @@ -144,7 +144,7 @@ pub struct KafkaProperties { pub common: KafkaCommon, #[serde(flatten)] - pub connection: KafkaConnection, + pub connection: KafkaConnectionProps, #[serde(flatten)] pub rdkafka_properties_common: RdKafkaPropertiesCommon, diff --git a/src/ctl/src/cmd_impl/meta/connection.rs b/src/ctl/src/cmd_impl/meta/connection.rs index 5df81d301ea01..c7bd94b58de62 100644 --- a/src/ctl/src/cmd_impl/meta/connection.rs +++ b/src/ctl/src/cmd_impl/meta/connection.rs @@ -35,6 +35,13 @@ pub async fn list_connections(context: &CtlContext) -> anyhow::Result<()> { "PrivateLink: service_name: {}, endpoint_id: {}, dns_entries: {:?}", svc.service_name, svc.endpoint_id, svc.dns_entries, ), + Some(Info::ConnectionParams(params)) => { + format!( + "CONNECTION_PARAMS_{}: {}", + params.get_connection_type().unwrap().as_str_name(), + serde_json::to_string(¶ms.get_properties()).unwrap() + ) + } None => "None".to_string(), } ); diff --git a/src/frontend/src/catalog/connection_catalog.rs b/src/frontend/src/catalog/connection_catalog.rs index 03b2ff4203c53..a938328c46d8f 100644 --- a/src/frontend/src/catalog/connection_catalog.rs +++ b/src/frontend/src/catalog/connection_catalog.rs @@ -30,12 +30,14 @@ impl ConnectionCatalog { pub fn connection_type(&self) -> &str { match &self.info { Info::PrivateLinkService(srv) => srv.get_provider().unwrap().as_str_name(), + Info::ConnectionParams(params) => params.get_connection_type().unwrap().as_str_name(), } } pub fn provider(&self) -> &str { match &self.info { Info::PrivateLinkService(_) => "PRIVATELINK", + Info::ConnectionParams(_) => panic!("ConnectionParams is not supported as provider."), } } } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_connections.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_connections.rs index fcc7e8efc3389..7eae4c37ee418 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_connections.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_connections.rs @@ -17,6 +17,7 @@ use risingwave_frontend_macro::system_catalog; use crate::catalog::system_catalog::SysCatalogReaderImpl; use crate::error::Result; +use crate::handler::create_connection::print_connection_params; #[derive(Fields)] struct RwConnection { @@ -28,6 +29,7 @@ struct RwConnection { type_: String, provider: String, acl: Vec, + connection_params: String, } #[system_catalog(table, "rw_catalog.rw_connections")] @@ -35,16 +37,30 @@ fn read_rw_connections(reader: &SysCatalogReaderImpl) -> Result { + rw_connection.provider = conn.provider().into(); + } + risingwave_pb::catalog::connection::Info::ConnectionParams(params) => { + rw_connection.connection_params = print_connection_params(params, schema); + } + }; + + rw_connection }) }) .collect()) diff --git a/src/frontend/src/handler/create_connection.rs b/src/frontend/src/handler/create_connection.rs index d7ef3aa10b883..54ca0f3d28520 100644 --- a/src/frontend/src/handler/create_connection.rs +++ b/src/frontend/src/handler/create_connection.rs @@ -15,45 +15,73 @@ use std::collections::BTreeMap; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION; +use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; +use risingwave_connector::source::kafka::{KAFKA_CONNECTOR, PRIVATELINK_CONNECTION}; +use risingwave_pb::catalog::connection_params::ConnectionType; +use risingwave_pb::catalog::{ConnectionParams, PbConnectionParams}; use risingwave_pb::ddl_service::create_connection_request; +use risingwave_pb::secret::SecretRef; use risingwave_sqlparser::ast::CreateConnectionStatement; use super::RwPgResponse; use crate::binder::Binder; +use crate::catalog::schema_catalog::SchemaCatalog; +use crate::catalog::SecretId; use crate::error::ErrorCode::ProtocolError; use crate::error::{ErrorCode, Result, RwError}; use crate::handler::HandlerArgs; +use crate::session::SessionImpl; +use crate::utils::{resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options}; +use crate::WithOptions; pub(crate) const CONNECTION_TYPE_PROP: &str = "type"; #[inline(always)] fn get_connection_property_required( - with_properties: &BTreeMap, + with_properties: &mut BTreeMap, property: &str, ) -> Result { - with_properties - .get(property) - .map(|s| s.to_lowercase()) - .ok_or_else(|| { - RwError::from(ProtocolError(format!( - "Required property \"{property}\" is not provided" - ))) - }) + with_properties.remove(property).ok_or_else(|| { + RwError::from(ProtocolError(format!( + "Required property \"{property}\" is not provided" + ))) + }) } fn resolve_create_connection_payload( - with_properties: &BTreeMap, + with_properties: WithOptions, + session: &SessionImpl, ) -> Result { - let connection_type = get_connection_property_required(with_properties, CONNECTION_TYPE_PROP)?; - match connection_type.as_str() { - PRIVATELINK_CONNECTION => Err(RwError::from(ErrorCode::Deprecated( + if !with_properties.connection_ref().is_empty() { + return Err(RwError::from(ErrorCode::InvalidParameterValue( + "Connection reference is not allowed in options in CREATE CONNECTION".to_string(), + ))); + } + + let (mut props, secret_refs) = + resolve_secret_ref_in_with_options(with_properties, session)?.into_parts(); + let connection_type = get_connection_property_required(&mut props, CONNECTION_TYPE_PROP)?; + let connection_type = match connection_type.as_str() { + PRIVATELINK_CONNECTION => { + return Err(RwError::from(ErrorCode::Deprecated( "CREATE CONNECTION to Private Link".to_string(), "RisingWave Cloud Portal (Please refer to the doc https://docs.risingwave.com/cloud/create-a-connection/)".to_string(), - ))), - _ => Err(RwError::from(ProtocolError(format!( - "Connection type \"{connection_type}\" is not supported" - )))), - } + ))); + } + KAFKA_CONNECTOR => ConnectionType::Kafka, + ICEBERG_CONNECTOR => ConnectionType::Iceberg, + _ => { + return Err(RwError::from(ProtocolError(format!( + "Connection type \"{connection_type}\" is not supported" + )))); + } + }; + Ok(create_connection_request::Payload::ConnectionParams( + ConnectionParams { + connection_type: connection_type as i32, + properties: props.into_iter().collect(), + secret_refs: secret_refs.into_iter().collect(), + }, + )) } pub async fn handle_create_connection( @@ -78,9 +106,9 @@ pub async fn handle_create_connection( }; } let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?; - let with_properties = handler_args.with_options.clone().into_connector_props(); - - let create_connection_payload = resolve_create_connection_payload(&with_properties)?; + let mut with_properties = handler_args.with_options.clone().into_connector_props(); + resolve_privatelink_in_with_option(&mut with_properties)?; + let create_connection_payload = resolve_create_connection_payload(with_properties, &session)?; let catalog_writer = session.catalog_writer()?; catalog_writer @@ -95,3 +123,24 @@ pub async fn handle_create_connection( Ok(PgResponse::empty_result(StatementType::CREATE_CONNECTION)) } + +pub fn print_connection_params(params: &PbConnectionParams, schema: &SchemaCatalog) -> String { + let print_secret_ref = |secret_ref: &SecretRef| -> String { + let secret_name = schema + .get_secret_by_id(&SecretId::from(secret_ref.secret_id)) + .map(|s| s.name.as_str()) + .unwrap(); + format!( + "SECRET {} AS {}", + secret_name, + secret_ref.get_ref_as().unwrap().as_str_name() + ) + }; + let deref_secrets = params + .get_secret_refs() + .iter() + .map(|(k, v)| (k.clone(), print_secret_ref(v))); + let mut props = params.get_properties().clone(); + props.extend(deref_secrets); + serde_json::to_string(&props).unwrap() +} diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index cfcddc8b8bb4a..8bd9aae2fd7f7 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -18,12 +18,14 @@ use std::sync::{Arc, LazyLock}; use anyhow::Context; use either::Either; use itertools::Itertools; -use maplit::{convert_args, hashmap}; +use maplit::{convert_args, hashmap, hashset}; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::bail; -use risingwave_common::catalog::{ColumnCatalog, DatabaseId, ObjectId, Schema, SchemaId, UserId}; +use risingwave_common::catalog::{ + ColumnCatalog, ConnectionId, DatabaseId, ObjectId, Schema, SchemaId, UserId, +}; use risingwave_common::secret::LocalSecretManager; use risingwave_common::types::DataType; use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc}; @@ -32,6 +34,8 @@ use risingwave_connector::sink::kafka::KAFKA_SINK; use risingwave_connector::sink::{ CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL, }; +use risingwave_connector::WithPropertiesExt; +use risingwave_pb::catalog::connection_params::PbConnectionType; use risingwave_pb::catalog::{PbSink, PbSource, Table}; use risingwave_pb::ddl_service::{ReplaceTablePlan, TableJobType}; use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody}; @@ -54,7 +58,9 @@ use crate::handler::alter_table_column::fetch_table_catalog_for_alter; use crate::handler::create_mv::parse_column_names; use crate::handler::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator}; use crate::handler::privilege::resolve_query_privileges; -use crate::handler::util::SourceSchemaCompatExt; +use crate::handler::util::{ + check_connector_match_connection_type, ensure_connection_type_allowed, SourceSchemaCompatExt, +}; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::{ generic, IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamProject, @@ -63,9 +69,25 @@ use crate::optimizer::{OptimizerContext, PlanRef, RelationCollectorVisitor}; use crate::scheduler::streaming_manager::CreatingStreamingJobInfo; use crate::session::SessionImpl; use crate::stream_fragmenter::build_graph; -use crate::utils::{resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options}; +use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option}; use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved}; +static ALLOWED_CONNECTION_CONNECTOR: LazyLock> = LazyLock::new(|| { + hashset! { + PbConnectionType::Unspecified, + PbConnectionType::Kafka, + PbConnectionType::Iceberg, + } +}); + +static ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock> = + LazyLock::new(|| { + hashset! { + PbConnectionType::Unspecified, + PbConnectionType::SchemaRegistry, + } + }); + // used to store result of `gen_sink_plan` pub struct SinkPlanContext { pub query: Box, @@ -90,7 +112,15 @@ pub async fn gen_sink_plan( let mut with_options = handler_args.with_options.clone(); resolve_privatelink_in_with_option(&mut with_options)?; - let mut resolved_with_options = resolve_secret_ref_in_with_options(with_options, session)?; + let (mut resolved_with_options, connection_type, connector_conn_ref) = + resolve_connection_ref_and_secret_ref(with_options, session)?; + ensure_connection_type_allowed(connection_type, &ALLOWED_CONNECTION_CONNECTOR)?; + + // if not using connection, we don't need to check connector match connection type + if !matches!(connection_type, PbConnectionType::Unspecified) { + let connector = resolved_with_options.get_connector().unwrap(); + check_connector_match_connection_type(connector.as_str(), &connection_type)?; + } let partition_info = get_partition_compute_info(&resolved_with_options).await?; @@ -271,7 +301,7 @@ pub async fn gen_sink_plan( SchemaId::new(sink_schema_id), DatabaseId::new(sink_database_id), UserId::new(session.user_id()), - None, // deprecated: private link connection id + connector_conn_ref.map(ConnectionId::from), ); if let Some(table_catalog) = &target_table_catalog { @@ -747,11 +777,13 @@ fn bind_sink_format_desc( } } - let (mut options, secret_refs) = resolve_secret_ref_in_with_options( - WithOptions::try_from(value.row_options.as_slice())?, - session, - )? - .into_parts(); + let (props, connection_type_flag, schema_registry_conn_ref) = + resolve_connection_ref_and_secret_ref( + WithOptions::try_from(value.row_options.as_slice())?, + session, + )?; + ensure_connection_type_allowed(connection_type_flag, &ALLOWED_CONNECTION_SCHEMA_REGISTRY)?; + let (mut options, secret_refs) = props.into_parts(); options .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned()) @@ -763,6 +795,7 @@ fn bind_sink_format_desc( options, secret_refs, key_encode, + connection_id: schema_registry_conn_ref, }) } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index f605687b3a731..6b84ee021fdad 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -12,14 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::rc::Rc; use std::sync::LazyLock; use anyhow::{anyhow, Context}; use either::Either; use itertools::Itertools; -use maplit::{convert_args, hashmap}; +use maplit::{convert_args, hashmap, hashset}; use pgwire::pg_response::{PgResponse, StatementType}; use rand::Rng; use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert}; @@ -61,6 +61,7 @@ use risingwave_connector::source::{ }; pub use risingwave_connector::source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR}; use risingwave_connector::WithPropertiesExt; +use risingwave_pb::catalog::connection_params::PbConnectionType; use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc}; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; use risingwave_pb::plan_common::{EncodeType, FormatType}; @@ -82,13 +83,16 @@ use crate::handler::create_table::{ bind_pk_and_row_id_on_relation, bind_sql_column_constraints, bind_sql_columns, bind_sql_pk_names, bind_table_constraints, ColumnIdGenerator, }; -use crate::handler::util::SourceSchemaCompatExt; +use crate::handler::util::{ + check_connector_match_connection_type, ensure_connection_type_allowed, SourceSchemaCompatExt, +}; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext}; use crate::session::SessionImpl; use crate::utils::{ - resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options, OverwriteOptions, + resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option, + resolve_secret_ref_in_with_options, OverwriteOptions, }; use crate::{bind_data_type, build_graph, OptimizerContext, WithOptions, WithOptionsSecResolved}; @@ -310,16 +314,32 @@ pub(crate) async fn bind_columns_from_source( const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy"; let options_with_secret = match with_properties { - Either::Left(options) => resolve_secret_ref_in_with_options(options.clone(), session)?, + Either::Left(options) => { + let (sec_resolve_props, connection_type, _) = + resolve_connection_ref_and_secret_ref(options.clone(), session)?; + if !ALLOWED_CONNECTION_CONNECTOR.contains(&connection_type) { + return Err(RwError::from(ProtocolError(format!( + "connection type {:?} is not allowed, allowed types: {:?}", + connection_type, ALLOWED_CONNECTION_CONNECTOR + )))); + } + + sec_resolve_props + } Either::Right(options_with_secret) => options_with_secret.clone(), }; let is_kafka: bool = options_with_secret.is_kafka_connector(); - let (format_encode_options, format_encode_secret_refs) = resolve_secret_ref_in_with_options( - WithOptions::try_from(format_encode.row_options())?, - session, - )? - .into_parts(); + + // todo: need to resolve connection ref for schema registry + let (sec_resolve_props, connection_type, schema_registry_conn_ref) = + resolve_connection_ref_and_secret_ref( + WithOptions::try_from(format_encode.row_options())?, + session, + )?; + ensure_connection_type_allowed(connection_type, &ALLOWED_CONNECTION_SCHEMA_REGISTRY)?; + + let (format_encode_options, format_encode_secret_refs) = sec_resolve_props.into_parts(); // Need real secret to access the schema registry let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets( format_encode_options.clone(), @@ -352,6 +372,7 @@ pub(crate) async fn bind_columns_from_source( row_encode: row_encode_to_prost(&format_encode.row_encode) as i32, format_encode_options, format_encode_secret_refs, + connection_id: schema_registry_conn_ref, ..Default::default() }; @@ -544,11 +565,15 @@ fn bind_columns_from_source_for_cdc( session: &SessionImpl, format_encode: &FormatEncodeOptions, ) -> Result<(Option>, StreamSourceInfo)> { - let (format_encode_options, format_encode_secret_refs) = resolve_secret_ref_in_with_options( - WithOptions::try_from(format_encode.row_options())?, - session, - )? - .into_parts(); + let with_options = WithOptions::try_from(format_encode.row_options())?; + if !with_options.connection_ref().is_empty() { + return Err(RwError::from(NotSupported( + "CDC connector does not support connection ref yet".to_string(), + "Explicitly specify the connection in WITH clause".to_string(), + ))); + } + let (format_encode_options, format_encode_secret_refs) = + resolve_secret_ref_in_with_options(with_options, session)?.into_parts(); // Need real secret to access the schema registry let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets( @@ -1051,6 +1076,22 @@ pub(super) fn bind_source_watermark( Ok(watermark_descs) } +static ALLOWED_CONNECTION_CONNECTOR: LazyLock> = LazyLock::new(|| { + hashset! { + PbConnectionType::Unspecified, + PbConnectionType::Kafka, + PbConnectionType::Iceberg, + } +}); + +static ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock> = + LazyLock::new(|| { + hashset! { + PbConnectionType::Unspecified, + PbConnectionType::SchemaRegistry, + } + }); + // TODO: Better design if we want to support ENCODE KEY where we will have 4 dimensional array static CONNECTORS_COMPATIBLE_FORMATS: LazyLock>>> = LazyLock::new(|| { @@ -1578,7 +1619,15 @@ pub async fn bind_create_source_or_table_with_connector( let mut with_properties = with_properties; resolve_privatelink_in_with_option(&mut with_properties)?; - let with_properties = resolve_secret_ref_in_with_options(with_properties, session)?; + let (with_properties, connection_type, connector_conn_ref) = + resolve_connection_ref_and_secret_ref(with_properties, session)?; + ensure_connection_type_allowed(connection_type, &ALLOWED_CONNECTION_CONNECTOR)?; + + // if not using connection, we don't need to check connector match connection type + if !matches!(connection_type, PbConnectionType::Unspecified) { + let connector = with_properties.get_connector().unwrap(); + check_connector_match_connection_type(connector.as_str(), &connection_type)?; + } let pk_names = bind_source_pk( &format_encode, @@ -1652,7 +1701,7 @@ pub async fn bind_create_source_or_table_with_connector( watermark_descs, associated_table_id, definition, - connection_id: None, // deprecated: private link connection id + connection_id: connector_conn_ref, created_at_epoch: None, initialized_at_epoch: None, version: INITIAL_SOURCE_VERSION_ID, diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 77975b1dec952..4b7d268682b6b 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -552,9 +552,9 @@ pub(crate) fn gen_create_table_plan( c.column_desc.column_id = col_id_gen.generate(c.name()) } - let (_, secret_refs) = context.with_options().clone().into_parts(); - if !secret_refs.is_empty() { - return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference is not allowed in options when creating table without external source".to_string()).into()); + let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts(); + if !secret_refs.is_empty() || !connection_refs.is_empty() { + return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_string()).into()); } gen_create_table_plan_without_source( diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index 5bffd99a08747..b965f02825460 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -89,10 +89,10 @@ pub async fn handle_create_as( let (graph, source, table) = { let context = OptimizerContext::from_handler_args(handler_args.clone()); - let (_, secret_refs) = context.with_options().clone().into_parts(); - if !secret_refs.is_empty() { + let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts(); + if !secret_refs.is_empty() || !connection_refs.is_empty() { return Err(crate::error::ErrorCode::InvalidParameterValue( - "Secret reference is not allowed in options for CREATE TABLE AS".to_string(), + "Secret reference and Connection reference are not allowed in options for CREATE TABLE AS".to_string(), ) .into()); } diff --git a/src/frontend/src/handler/create_view.rs b/src/frontend/src/handler/create_view.rs index 5ad0e8956b967..851c3a4fa89df 100644 --- a/src/frontend/src/handler/create_view.rs +++ b/src/frontend/src/handler/create_view.rs @@ -87,10 +87,11 @@ pub async fn handle_create_view( .collect() }; - let (properties, secret_refs) = properties.into_parts(); - if !secret_refs.is_empty() { + let (properties, secret_refs, connection_refs) = properties.into_parts(); + if !secret_refs.is_empty() || !connection_refs.is_empty() { return Err(crate::error::ErrorCode::InvalidParameterValue( - "Secret reference is not allowed in create view options".to_string(), + "Secret reference and Connection reference are not allowed in create view options" + .to_string(), ) .into()); } diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index 248636addee3a..94cb3afc9479a 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -35,6 +35,7 @@ use super::{fields_to_descriptors, RwPgResponse, RwPgResponseBuilderExt}; use crate::binder::{Binder, Relation}; use crate::catalog::{CatalogError, IndexCatalog}; use crate::error::Result; +use crate::handler::create_connection::print_connection_params; use crate::handler::HandlerArgs; use crate::session::cursor_manager::SubscriptionCursor; use crate::session::SessionImpl; @@ -413,6 +414,9 @@ pub async fn handle_show_object( connection::Info::PrivateLinkService(_) => { PRIVATELINK_CONNECTION.to_string() }, + connection::Info::ConnectionParams(params) => { + params.get_connection_type().unwrap().as_str_name().to_string() + } }; let source_names = schema .get_source_ids_by_connection(c.id) @@ -438,6 +442,10 @@ pub async fn handle_show_object( serde_json::to_string(&sink_names).unwrap(), ) } + connection::Info::ConnectionParams(params) => { + // todo: show dep relations + print_connection_params(params, schema) + } }; ShowConnectionRow { name, diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 169716cd504a2..899b43f3cd844 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -34,14 +34,19 @@ use risingwave_common::types::{ }; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; +use risingwave_connector::source::KAFKA_CONNECTOR; +use risingwave_pb::catalog::connection_params::PbConnectionType; use risingwave_sqlparser::ast::{ CompatibleFormatEncode, Expr, FormatEncodeOptions, Ident, ObjectName, OrderByExpr, Query, Select, SelectItem, SetExpr, TableFactor, TableWithJoins, }; use thiserror_ext::AsReport; -use crate::error::{ErrorCode, Result as RwResult}; +use crate::error::ErrorCode::ProtocolError; +use crate::error::{ErrorCode, Result as RwResult, RwError}; use crate::session::{current, SessionImpl}; +use crate::HashSet; pin_project! { /// Wrapper struct that converts a stream of DataChunk to a stream of RowSet based on formatting @@ -271,6 +276,40 @@ pub fn convert_interval_to_u64_seconds(interval: &String) -> RwResult { Ok(seconds) } +pub fn ensure_connection_type_allowed( + connection_type: PbConnectionType, + allowed_types: &HashSet, +) -> RwResult<()> { + if !allowed_types.contains(&connection_type) { + return Err(RwError::from(ProtocolError(format!( + "connection type {:?} is not allowed, allowed types: {:?}", + connection_type, allowed_types + )))); + } + Ok(()) +} + +fn connection_type_to_connector(connection_type: &PbConnectionType) -> &str { + match connection_type { + PbConnectionType::Kafka => KAFKA_CONNECTOR, + PbConnectionType::Iceberg => ICEBERG_CONNECTOR, + _ => unreachable!(), + } +} + +pub fn check_connector_match_connection_type( + connector: &str, + connection_type: &PbConnectionType, +) -> RwResult<()> { + if !connector.eq(connection_type_to_connector(connection_type)) { + return Err(RwError::from(ProtocolError(format!( + "connector {} and connection type {:?} are not compatible", + connector, connection_type + )))); + } + Ok(()) +} + #[cfg(test)] mod tests { use postgres_types::{ToSql, Type}; diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index cc8d6747aedd0..d800798a7b002 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -15,20 +15,26 @@ use std::collections::BTreeMap; use std::num::NonZeroU32; +use risingwave_common::catalog::ConnectionId; +use risingwave_connector::connector_common::{ + PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY, +}; use risingwave_connector::source::kafka::private_link::{ insert_privatelink_broker_rewrite_map, PRIVATELINK_ENDPOINT_KEY, }; pub use risingwave_connector::WithOptionsSecResolved; use risingwave_connector::WithPropertiesExt; +use risingwave_pb::catalog::connection::Info as ConnectionInfo; +use risingwave_pb::catalog::connection_params::PbConnectionType; use risingwave_pb::secret::secret_ref::PbRefAsType; use risingwave_pb::secret::PbSecretRef; use risingwave_sqlparser::ast::{ - CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement, - CreateSubscriptionStatement, SecretRef, SecretRefAsType, SqlOption, Statement, Value, + ConnectionRefValue, CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement, + CreateSubscriptionStatement, SecretRefAsType, SecretRefValue, SqlOption, SqlOptionValue, + Statement, Value, }; use super::OverwriteOptions; -use crate::catalog::ConnectionId; use crate::error::{ErrorCode, Result as RwResult, RwError}; use crate::handler::create_source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR}; use crate::session::SessionImpl; @@ -43,7 +49,8 @@ mod options { #[derive(Default, Clone, Debug, PartialEq, Eq, Hash)] pub struct WithOptions { inner: BTreeMap, - secret_ref: BTreeMap, + secret_ref: BTreeMap, + connection_ref: BTreeMap, } impl std::ops::Deref for WithOptions { @@ -66,12 +73,21 @@ impl WithOptions { Self { inner, secret_ref: Default::default(), + connection_ref: Default::default(), } } /// Create a new [`WithOptions`] from a option [`BTreeMap`] and secret ref. - pub fn new(inner: BTreeMap, secret_ref: BTreeMap) -> Self { - Self { inner, secret_ref } + pub fn new( + inner: BTreeMap, + secret_ref: BTreeMap, + connection_ref: BTreeMap, + ) -> Self { + Self { + inner, + secret_ref, + connection_ref, + } } pub fn inner_mut(&mut self) -> &mut BTreeMap { @@ -79,8 +95,14 @@ impl WithOptions { } /// Take the value of the option map and secret refs. - pub fn into_parts(self) -> (BTreeMap, BTreeMap) { - (self.inner, self.secret_ref) + pub fn into_parts( + self, + ) -> ( + BTreeMap, + BTreeMap, + BTreeMap, + ) { + (self.inner, self.secret_ref, self.connection_ref) } /// Convert to connector props, remove the key-value pairs used in the top-level. @@ -96,6 +118,7 @@ impl WithOptions { Self { inner, secret_ref: self.secret_ref, + connection_ref: self.connection_ref, } } @@ -120,6 +143,7 @@ impl WithOptions { Self { inner, secret_ref: self.secret_ref.clone(), + connection_ref: self.connection_ref.clone(), } } @@ -132,23 +156,26 @@ impl WithOptions { false } - pub fn secret_ref(&self) -> &BTreeMap { + pub fn secret_ref(&self) -> &BTreeMap { &self.secret_ref } - pub fn encode_options_to_map(sql_options: &[SqlOption]) -> RwResult> { - let WithOptions { inner, secret_ref } = WithOptions::try_from(sql_options)?; - if secret_ref.is_empty() { - Ok(inner) - } else { - Err(RwError::from(ErrorCode::InvalidParameterValue( - "Secret reference is not allowed in encode options".to_string(), - ))) - } + pub fn secret_ref_mut(&mut self) -> &mut BTreeMap { + &mut self.secret_ref + } + + pub fn connection_ref(&self) -> &BTreeMap { + &self.connection_ref + } + + pub fn connection_ref_mut(&mut self) -> &mut BTreeMap { + &mut self.connection_ref } pub fn oauth_options_to_map(sql_options: &[SqlOption]) -> RwResult> { - let WithOptions { inner, secret_ref } = WithOptions::try_from(sql_options)?; + let WithOptions { + inner, secret_ref, .. + } = WithOptions::try_from(sql_options)?; if secret_ref.is_empty() { Ok(inner) } else { @@ -164,12 +191,118 @@ impl WithOptions { } } +pub(crate) fn resolve_connection_ref_and_secret_ref( + with_options: WithOptions, + session: &SessionImpl, +) -> RwResult<(WithOptionsSecResolved, PbConnectionType, Option)> { + let db_name: &str = session.database(); + let (mut options, secret_refs, connection_refs) = with_options.clone().into_parts(); + + let mut connection_id = None; + let mut connection_params = None; + for connection_ref in connection_refs.values() { + // at most one connection ref in the map + connection_params = { + // get connection params from catalog + let (schema_name, connection_name) = Binder::resolve_schema_qualified_name( + db_name, + connection_ref.connection_name.clone(), + )?; + let connection_catalog = + session.get_connection_by_name(schema_name, &connection_name)?; + if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info { + connection_id = Some(connection_catalog.id); + Some(params.clone()) + } else { + return Err(RwError::from(ErrorCode::InvalidParameterValue( + "Private Link Service has been deprecated. Please create a new connection instead." + .to_string(), + ))); + } + }; + } + + let mut inner_secret_refs = { + let mut resolved_secret_refs = BTreeMap::new(); + for (key, secret_ref) in secret_refs { + let (schema_name, secret_name) = + Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?; + let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?; + let ref_as = match secret_ref.ref_as { + SecretRefAsType::Text => PbRefAsType::Text, + SecretRefAsType::File => PbRefAsType::File, + }; + let pb_secret_ref = PbSecretRef { + secret_id: secret_catalog.id.secret_id(), + ref_as: ref_as.into(), + }; + resolved_secret_refs.insert(key.clone(), pb_secret_ref); + } + resolved_secret_refs + }; + + let mut connection_type = PbConnectionType::Unspecified; + let connection_params_is_none_flag = connection_params.is_none(); + + if let Some(connection_params) = connection_params { + // Do key checks on `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY`, `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY` + // `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is generated from `private_link_targets` and `private_link_endpoint`, instead of given by users. + // + // We resolve private link via `resolve_privatelink_in_with_option` when creating Connection, + // so here we need to check `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY` are not given + // if `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is in Connection catalog. + + if let Some(broker_rewrite_map) = connection_params + .get_properties() + .get(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY) + { + if options.contains_key(PRIVATE_LINK_TARGETS_KEY) + || options.contains_key(PRIVATELINK_ENDPOINT_KEY) + { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "PrivateLink related options already defined in Connection (rewrite map: {}), please remove {} and {} from WITH clause", + broker_rewrite_map, PRIVATE_LINK_TARGETS_KEY, PRIVATELINK_ENDPOINT_KEY + )))); + } + } + + connection_type = connection_params.connection_type(); + for (k, v) in connection_params.properties { + if options.insert(k.clone(), v).is_some() { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "Duplicated key in both WITH clause and Connection catalog: {}", + k + )))); + } + } + + for (k, v) in connection_params.secret_refs { + if inner_secret_refs.insert(k.clone(), v).is_some() { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "Duplicated key in both WITH clause and Connection catalog: {}", + k + )))); + } + } + } + + // connection_params is None means the connection is not retrieved, so the connection type should be unspecified + if connection_params_is_none_flag { + debug_assert!(matches!(connection_type, PbConnectionType::Unspecified)); + } + Ok(( + WithOptionsSecResolved::new(options, inner_secret_refs), + connection_type, + connection_id, + )) +} + /// Get the secret id from the name. pub(crate) fn resolve_secret_ref_in_with_options( with_options: WithOptions, session: &SessionImpl, ) -> RwResult { - let (options, secret_refs) = with_options.into_parts(); + let (options, secret_refs, _) = with_options.into_parts(); let mut resolved_secret_refs = BTreeMap::new(); let db_name: &str = session.database(); for (key, secret_ref) in secret_refs { @@ -213,23 +346,40 @@ impl TryFrom<&[SqlOption]> for WithOptions { fn try_from(options: &[SqlOption]) -> Result { let mut inner: BTreeMap = BTreeMap::new(); - let mut secret_ref: BTreeMap = BTreeMap::new(); + let mut secret_ref: BTreeMap = BTreeMap::new(); + let mut connection_ref: BTreeMap = BTreeMap::new(); for option in options { let key = option.name.real_value(); - if let Value::Ref(r) = &option.value { - if secret_ref.insert(key.clone(), r.clone()).is_some() || inner.contains_key(&key) { - return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( - "Duplicated option: {}", - key - )))); + match &option.value { + SqlOptionValue::SecretRef(r) => { + if secret_ref.insert(key.clone(), r.clone()).is_some() + || inner.contains_key(&key) + { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "Duplicated option: {}", + key + )))); + } + continue; } - continue; + SqlOptionValue::ConnectionRef(r) => { + if connection_ref.insert(key.clone(), r.clone()).is_some() + || inner.contains_key(&key) + { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "Duplicated option: {}", + key + )))); + } + continue; + } + _ => {} } let value: String = match option.value.clone() { - Value::CstyleEscapedString(s) => s.value, - Value::SingleQuotedString(s) => s, - Value::Number(n) => n, - Value::Boolean(b) => b.to_string(), + SqlOptionValue::Value(Value::CstyleEscapedString(s)) => s.value, + SqlOptionValue::Value(Value::SingleQuotedString(s)) => s, + SqlOptionValue::Value(Value::Number(n)) => n, + SqlOptionValue::Value(Value::Boolean(b)) => b.to_string(), _ => { return Err(RwError::from(ErrorCode::InvalidParameterValue( "`with options` or `with properties` only support single quoted string value and C style escaped string" @@ -245,7 +395,11 @@ impl TryFrom<&[SqlOption]> for WithOptions { } } - Ok(Self { inner, secret_ref }) + Ok(Self { + inner, + secret_ref, + connection_ref, + }) } } diff --git a/src/meta/model/migration/src/lib.rs b/src/meta/model/migration/src/lib.rs index baf4373343e12..867506365e558 100644 --- a/src/meta/model/migration/src/lib.rs +++ b/src/meta/model/migration/src/lib.rs @@ -29,6 +29,7 @@ mod m20241022_072553_node_label; mod m20241025_062548_singleton_vnode_count; mod m20241115_085007_remove_function_type; mod m20241120_182555_hummock_add_time_travel_sst_index; +mod m20241125_043732_connection_params; mod utils; pub struct Migrator; @@ -95,6 +96,7 @@ impl MigratorTrait for Migrator { Box::new(m20241120_182555_hummock_add_time_travel_sst_index::Migration), Box::new(m20241022_072553_node_label::Migration), Box::new(m20241001_013810_webhook_source::Migration), + Box::new(m20241125_043732_connection_params::Migration), ] } } diff --git a/src/meta/model/migration/src/m20241125_043732_connection_params.rs b/src/meta/model/migration/src/m20241125_043732_connection_params.rs new file mode 100644 index 0000000000000..f2e03bafd9207 --- /dev/null +++ b/src/meta/model/migration/src/m20241125_043732_connection_params.rs @@ -0,0 +1,35 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Connection::Table) + .add_column(ColumnDef::new(Connection::Params).binary().not_null()) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Connection::Table) + .drop_column(Connection::Params) + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +enum Connection { + Table, + Params, +} diff --git a/src/meta/model/src/connection.rs b/src/meta/model/src/connection.rs index dce0daa462fc5..eb93a1d82fd38 100644 --- a/src/meta/model/src/connection.rs +++ b/src/meta/model/src/connection.rs @@ -18,7 +18,7 @@ use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; use serde::{Deserialize, Serialize}; -use crate::{ConnectionId, PrivateLinkService}; +use crate::{ConnectionId, ConnectionParams, PrivateLinkService}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "connection")] @@ -27,8 +27,9 @@ pub struct Model { pub connection_id: ConnectionId, pub name: String, - // todo: Private link service has been deprecated, consider using a new field for the connection info + // Private link service has been deprecated pub info: PrivateLinkService, + pub params: ConnectionParams, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -69,14 +70,15 @@ impl ActiveModelBehavior for ActiveModel {} impl From for ActiveModel { fn from(conn: PbConnection) -> Self { - let Some(PbInfo::PrivateLinkService(private_link_srv)) = conn.info else { - unreachable!("private link not provided.") + let Some(PbInfo::ConnectionParams(connection_params)) = conn.info else { + unreachable!("private link has been deprecated.") }; Self { connection_id: Set(conn.id as _), name: Set(conn.name), - info: Set(PrivateLinkService::from(&private_link_srv)), + info: Set(PrivateLinkService::default()), + params: Set(ConnectionParams::from(&connection_params)), } } } diff --git a/src/meta/model/src/lib.rs b/src/meta/model/src/lib.rs index 9995d8482b8fb..5816a99557a9e 100644 --- a/src/meta/model/src/lib.rs +++ b/src/meta/model/src/lib.rs @@ -401,6 +401,7 @@ derive_from_blob!( PrivateLinkService, risingwave_pb::catalog::connection::PbPrivateLinkService ); +derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams); derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo); derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits); diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 553aaeb986e78..ac7cf70642058 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -25,7 +25,8 @@ use risingwave_connector::sink::catalog::SinkId; use risingwave_meta::manager::{EventLogManagerRef, MetadataManager}; use risingwave_meta::rpc::metrics::MetaMetrics; use risingwave_meta_model::ObjectId; -use risingwave_pb::catalog::{Comment, CreateType, Secret, Table}; +use risingwave_pb::catalog::connection::Info as ConnectionInfo; +use risingwave_pb::catalog::{Comment, Connection, CreateType, Secret, Table}; use risingwave_pb::common::worker_node::State; use risingwave_pb::common::WorkerType; use risingwave_pb::ddl_service::ddl_service_server::DdlService; @@ -767,7 +768,22 @@ impl DdlService for DdlServiceImpl { create_connection_request::Payload::PrivateLink(_) => { panic!("Private Link Connection has been deprecated") } - }; + create_connection_request::Payload::ConnectionParams(params) => { + let pb_connection = Connection { + id: 0, + schema_id: req.schema_id, + database_id: req.database_id, + name: req.name, + info: Some(ConnectionInfo::ConnectionParams(params)), + owner: req.owner_id, + }; + let version = self + .ddl_controller + .run_command(DdlCommand::CreateConnection(pb_connection)) + .await?; + Ok(Response::new(CreateConnectionResponse { version })) + } + } } async fn list_connections( diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 5950f0357eee5..4b66aad51c0be 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -36,6 +36,7 @@ use risingwave_meta_model::{ SourceId, StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, UserId, ViewId, }; +use risingwave_pb::catalog::connection::Info as ConnectionInfo; use risingwave_pb::catalog::subscription::SubscriptionState; use risingwave_pb::catalog::table::PbTableType; use risingwave_pb::catalog::{ @@ -78,8 +79,8 @@ use crate::controller::utils::{ }; use crate::controller::ObjectModel; use crate::manager::{ - get_referred_secret_ids_from_source, MetaSrvEnv, NotificationVersion, - IGNORED_NOTIFICATION_VERSION, + get_referred_connection_ids_from_source, get_referred_secret_ids_from_source, MetaSrvEnv, + NotificationVersion, IGNORED_NOTIFICATION_VERSION, }; use crate::rpc::ddl_controller::DropMode; use crate::telemetry::MetaTelemetryJobDesc; @@ -1237,6 +1238,7 @@ impl CatalogController { // handle secret ref let secret_ids = get_referred_secret_ids_from_source(&pb_source)?; + let connection_ids = get_referred_connection_ids_from_source(&pb_source); let source_obj = Self::create_object( &txn, @@ -1252,8 +1254,9 @@ impl CatalogController { Source::insert(source).exec(&txn).await?; // add secret dependency - if !secret_ids.is_empty() { - ObjectDependency::insert_many(secret_ids.iter().map(|id| { + let dep_relation_ids = secret_ids.iter().chain(connection_ids.iter()); + if !secret_ids.is_empty() || !connection_ids.is_empty() { + ObjectDependency::insert_many(dep_relation_ids.map(|id| { object_dependency::ActiveModel { oid: Set(*id as _), used_by: Set(source_id as _), @@ -1501,6 +1504,16 @@ impl CatalogController { ensure_object_id(ObjectType::Schema, pb_connection.schema_id as _, &txn).await?; check_connection_name_duplicate(&pb_connection, &txn).await?; + let mut dep_secrets = HashSet::new(); + if let Some(ConnectionInfo::ConnectionParams(params)) = &pb_connection.info { + dep_secrets.extend( + params + .secret_refs + .values() + .map(|secret_ref| secret_ref.secret_id), + ); + } + let conn_obj = Self::create_object( &txn, ObjectType::Connection, @@ -1513,6 +1526,16 @@ impl CatalogController { let connection: connection::ActiveModel = pb_connection.clone().into(); Connection::insert(connection).exec(&txn).await?; + for secret_id in dep_secrets { + ObjectDependency::insert(object_dependency::ActiveModel { + oid: Set(secret_id as _), + used_by: Set(conn_obj.oid), + ..Default::default() + }) + .exec(&txn) + .await?; + } + txn.commit().await?; let version = self diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 831a639eac416..c2b6fccf97562 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -19,7 +19,7 @@ use risingwave_common::hash::VnodeCount; use risingwave_common::util::epoch::Epoch; use risingwave_meta_model::{ connection, database, function, index, object, schema, secret, sink, source, subscription, - table, view, + table, view, PrivateLinkService, }; use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait}; use risingwave_pb::catalog::connection::PbInfo as PbConnectionInfo; @@ -365,15 +365,18 @@ impl From> for PbView { impl From> for PbConnection { fn from(value: ObjectModel) -> Self { + let info: PbConnectionInfo = if value.0.info == PrivateLinkService::default() { + PbConnectionInfo::ConnectionParams(value.0.params.to_protobuf()) + } else { + PbConnectionInfo::PrivateLinkService(value.0.info.to_protobuf()) + }; Self { id: value.1.oid as _, schema_id: value.1.schema_id.unwrap() as _, database_id: value.1.database_id.unwrap() as _, name: value.0.name, owner: value.1.owner_id as _, - info: Some(PbConnectionInfo::PrivateLinkService( - value.0.info.to_protobuf(), - )), + info: Some(info), } } } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 3c8c793256db9..497b4e6b4c631 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -321,6 +321,13 @@ impl CatalogController { .into_iter() .map(|secret_id| secret_id as ObjectId), ); + // collect dependent connection + dependencies.extend( + streaming_job + .dependent_connection_ids()? + .into_iter() + .map(|conn_id| conn_id as ObjectId), + ); // record object dependency. if !dependencies.is_empty() { diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index cda027c2d634c..cffdf5f30cd3a 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -13,6 +13,7 @@ // limitations under the License. use risingwave_common::error::{BoxedError, NotImplemented}; +use risingwave_common::secret::SecretError; use risingwave_common::session_config::SessionConfigError; use risingwave_connector::error::ConnectorError; use risingwave_connector::sink::SinkError; @@ -135,6 +136,13 @@ pub enum MetaErrorInner { #[error(transparent)] NotImplemented(#[from] NotImplemented), + + #[error("Secret error: {0}")] + SecretError( + #[from] + #[backtrace] + SecretError, + ), } impl MetaError { diff --git a/src/meta/src/manager/diagnose.rs b/src/meta/src/manager/diagnose.rs index fd4d67c256685..33a97537ab467 100644 --- a/src/meta/src/manager/diagnose.rs +++ b/src/meta/src/manager/diagnose.rs @@ -759,12 +759,12 @@ fn redact_all_sql_options(sql: &str) -> Option { }; if let Some(options) = options.0 { for option in options { - option.value = Value::SingleQuotedString("[REDACTED]".into()); + option.value = Value::SingleQuotedString("[REDACTED]".into()).into(); } } if let Some(options) = options.1 { for option in options { - option.value = Value::SingleQuotedString("[REDACTED]".into()); + option.value = Value::SingleQuotedString("[REDACTED]".into()).into(); } } writeln!(&mut redacted, "{statement}").unwrap(); diff --git a/src/meta/src/manager/mod.rs b/src/meta/src/manager/mod.rs index b49ce350c5501..80ecda8cabaf8 100644 --- a/src/meta/src/manager/mod.rs +++ b/src/meta/src/manager/mod.rs @@ -71,6 +71,32 @@ pub fn get_referred_secret_ids_from_source(source: &PbSource) -> MetaResult HashSet { + let mut connection_ids = HashSet::new(); + if let Some(conn_id) = source.connection_id { + connection_ids.insert(conn_id); + } + if let Some(info) = &source.info + && let Some(conn_id) = info.connection_id + { + connection_ids.insert(conn_id); + } + connection_ids +} + +pub fn get_referred_connection_ids_from_sink(sink: &PbSink) -> HashSet { + let mut connection_ids = HashSet::new(); + if let Some(format_desc) = &sink.format_desc + && let Some(conn_id) = format_desc.connection_id + { + connection_ids.insert(conn_id); + } + if let Some(conn_id) = sink.connection_id { + connection_ids.insert(conn_id); + } + connection_ids +} + pub fn get_referred_secret_ids_from_sink(sink: &PbSink) -> HashSet { let mut secret_ids = HashSet::new(); for secret_ref in sink.get_secret_refs().values() { diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 924cdb0124a9a..6d808814796c7 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -26,7 +26,10 @@ use sea_orm::entity::prelude::*; use sea_orm::{DatabaseTransaction, QuerySelect}; use strum::{EnumDiscriminants, EnumIs}; -use super::{get_referred_secret_ids_from_sink, get_referred_secret_ids_from_source}; +use super::{ + get_referred_connection_ids_from_sink, get_referred_connection_ids_from_source, + get_referred_secret_ids_from_sink, get_referred_secret_ids_from_source, +}; use crate::model::FragmentId; use crate::{MetaError, MetaResult}; @@ -334,6 +337,21 @@ impl StreamingJob { } } + pub fn dependent_connection_ids(&self) -> MetaResult> { + match self { + StreamingJob::Source(source) => Ok(get_referred_connection_ids_from_source(source)), + StreamingJob::Table(source, _, _) => { + if let Some(source) = source { + Ok(get_referred_connection_ids_from_source(source)) + } else { + Ok(HashSet::new()) + } + } + StreamingJob::Sink(sink, _) => Ok(get_referred_connection_ids_from_sink(sink)), + StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()), + } + } + // Get the secret ids that are referenced by this job. pub fn dependent_secret_ids(&self) -> MetaResult> { match self { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index ffb46ea03a6ac..7faf8aa2eab99 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -30,6 +30,7 @@ use risingwave_common::util::stream_graph_visitor::{ visit_stream_node, visit_stream_node_cont_mut, }; use risingwave_common::{bail, hash, must_match}; +use risingwave_connector::connector_common::validate_connection; use risingwave_connector::error::ConnectorError; use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SourceProperties, SplitEnumerator, @@ -504,6 +505,7 @@ impl DdlController { } async fn create_connection(&self, connection: Connection) -> MetaResult { + validate_connection(&connection).await?; self.metadata_manager .catalog_controller .create_connection(connection) diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 7d3d07a9f8d5e..c7cf401221b07 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -20,10 +20,10 @@ use core::fmt; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; -use super::{FormatEncodeOptions, Value}; +use super::FormatEncodeOptions; use crate::ast::{ - display_comma_separated, display_separated, DataType, Expr, Ident, ObjectName, SecretRef, - SetVariableValue, + display_comma_separated, display_separated, DataType, Expr, Ident, ObjectName, SecretRefValue, + SetVariableValue, Value, }; use crate::tokenizer::Token; @@ -824,6 +824,6 @@ impl fmt::Display for ReferentialAction { #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct WebhookSourceInfo { - pub secret_ref: SecretRef, + pub secret_ref: SecretRefValue, pub signature_expr: Expr, } diff --git a/src/sqlparser/src/ast/legacy_source.rs b/src/sqlparser/src/ast/legacy_source.rs index 7a5abf35a2df8..592cbf79c5816 100644 --- a/src/sqlparser/src/ast/legacy_source.rs +++ b/src/sqlparser/src/ast/legacy_source.rs @@ -163,7 +163,7 @@ impl LegacyRowFormat { value: "message".into(), quote_style: None, }]), - value: Value::SingleQuotedString(schema.message_name.0), + value: Value::SingleQuotedString(schema.message_name.0).into(), }]; if schema.use_schema_registry { options.push(SqlOption { @@ -171,7 +171,7 @@ impl LegacyRowFormat { value: "schema.registry".into(), quote_style: None, }]), - value: Value::SingleQuotedString(schema.row_schema_location.0), + value: Value::SingleQuotedString(schema.row_schema_location.0).into(), }); } else { options.push(SqlOption { @@ -179,7 +179,7 @@ impl LegacyRowFormat { value: "schema.location".into(), quote_style: None, }]), - value: Value::SingleQuotedString(schema.row_schema_location.0), + value: Value::SingleQuotedString(schema.row_schema_location.0).into(), }) } options @@ -191,7 +191,7 @@ impl LegacyRowFormat { value: "schema.registry".into(), quote_style: None, }]), - value: Value::SingleQuotedString(schema.row_schema_location.0), + value: Value::SingleQuotedString(schema.row_schema_location.0).into(), }] } else { vec![SqlOption { @@ -199,7 +199,7 @@ impl LegacyRowFormat { value: "schema.location".into(), quote_style: None, }]), - value: Value::SingleQuotedString(schema.row_schema_location.0), + value: Value::SingleQuotedString(schema.row_schema_location.0).into(), }] } } @@ -209,7 +209,7 @@ impl LegacyRowFormat { value: "schema.registry".into(), quote_style: None, }]), - value: Value::SingleQuotedString(schema.row_schema_location.0), + value: Value::SingleQuotedString(schema.row_schema_location.0).into(), }] } LegacyRowFormat::Csv(schema) => { @@ -221,7 +221,8 @@ impl LegacyRowFormat { }]), value: Value::SingleQuotedString( String::from_utf8_lossy(&[schema.delimiter]).into(), - ), + ) + .into(), }, SqlOption { name: ObjectName(vec![Ident { @@ -232,7 +233,8 @@ impl LegacyRowFormat { "false".into() } else { "true".into() - }), + }) + .into(), }, ] } diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 510df40865f6f..357eb45275458 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -52,8 +52,8 @@ pub use self::query::{ }; pub use self::statement::*; pub use self::value::{ - CstyleEscapedString, DateTimeField, DollarQuotedString, JsonPredicateType, SecretRef, - SecretRefAsType, TrimWhereField, Value, + ConnectionRefValue, CstyleEscapedString, DateTimeField, DollarQuotedString, JsonPredicateType, + SecretRefAsType, SecretRefValue, TrimWhereField, Value, }; pub use crate::ast::ddl::{ AlterIndexOperation, AlterSinkOperation, AlterSourceOperation, AlterSubscriptionOperation, @@ -2757,7 +2757,7 @@ impl ParseTo for ObjectType { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct SqlOption { pub name: ObjectName, - pub value: Value, + pub value: SqlOptionValue, } impl fmt::Display for SqlOption { @@ -2776,6 +2776,44 @@ impl fmt::Display for SqlOption { } } +#[derive(Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum SqlOptionValue { + Value(Value), + SecretRef(SecretRefValue), + ConnectionRef(ConnectionRefValue), +} + +impl fmt::Debug for SqlOptionValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SqlOptionValue::Value(value) => write!(f, "{:?}", value), + SqlOptionValue::SecretRef(secret_ref) => write!(f, "secret {:?}", secret_ref), + SqlOptionValue::ConnectionRef(connection_ref) => { + write!(f, "connection {:?}", connection_ref) + } + } + } +} + +impl fmt::Display for SqlOptionValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SqlOptionValue::Value(value) => write!(f, "{}", value), + SqlOptionValue::SecretRef(secret_ref) => write!(f, "secret {}", secret_ref), + SqlOptionValue::ConnectionRef(connection_ref) => { + write!(f, "connection {}", connection_ref) + } + } + } +} + +impl From for SqlOptionValue { + fn from(value: Value) -> Self { + SqlOptionValue::Value(value) + } +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum EmitMode { @@ -3169,7 +3207,10 @@ impl TryFrom> for CreateFunctionWithOptions { let mut always_retry_on_network_error = None; for option in with_options { if option.name.to_string().to_lowercase() == "always_retry_on_network_error" { - always_retry_on_network_error = Some(option.value == Value::Boolean(true)); + always_retry_on_network_error = Some(matches!( + option.value, + SqlOptionValue::Value(Value::Boolean(true)) + )); } else { return Err(StrError(format!("Unsupported option: {}", option.name))); } diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index d817c8c2759ae..d1ae3c98f82a9 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -850,7 +850,7 @@ impl ParseTo for CreateSecretStatement { impl_parse_to!(with_properties: WithProperties, parser); let mut credential = Value::Null; if parser.parse_keyword(Keyword::AS) { - credential = parser.parse_value()?; + credential = parser.ensure_parse_value()?; } Ok(Self { if_not_exists, diff --git a/src/sqlparser/src/ast/value.rs b/src/sqlparser/src/ast/value.rs index 2bf8a6fdf3a02..7f6cdb048b4e6 100644 --- a/src/sqlparser/src/ast/value.rs +++ b/src/sqlparser/src/ast/value.rs @@ -59,8 +59,6 @@ pub enum Value { }, /// `NULL` value Null, - /// name of the reference to secret - Ref(SecretRef), } impl fmt::Display for Value { @@ -115,7 +113,6 @@ impl fmt::Display for Value { Ok(()) } Value::Null => write!(f, "NULL"), - Value::Ref(v) => write!(f, "secret {}", v), } } } @@ -240,12 +237,12 @@ impl fmt::Display for JsonPredicateType { } #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub struct SecretRef { +pub struct SecretRefValue { pub secret_name: ObjectName, pub ref_as: SecretRefAsType, } -impl fmt::Display for SecretRef { +impl fmt::Display for SecretRefValue { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self.ref_as { SecretRefAsType::Text => write!(f, "{}", self.secret_name), @@ -260,3 +257,15 @@ pub enum SecretRefAsType { Text, File, } + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct ConnectionRefValue { + pub connection_name: ObjectName, +} + +impl fmt::Display for ConnectionRefValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.connection_name) + } +} diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 09f6393eff19d..43c7997818a14 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -585,7 +585,7 @@ impl Parser<'_> { Token::Word(w) => match w.keyword { Keyword::TRUE | Keyword::FALSE | Keyword::NULL => { *self = checkpoint; - Ok(Expr::Value(self.parse_value()?)) + Ok(Expr::Value(self.ensure_parse_value()?)) } Keyword::CASE => self.parse_case_expr(), Keyword::CAST => self.parse_cast_expr(), @@ -708,7 +708,7 @@ impl Parser<'_> { | Token::HexStringLiteral(_) | Token::CstyleEscapesString(_) => { *self = checkpoint; - Ok(Expr::Value(self.parse_value()?)) + Ok(Expr::Value(self.ensure_parse_value()?)) } Token::Parameter(number) => self.parse_param(number), Token::Pipe => { @@ -2971,7 +2971,15 @@ impl Parser<'_> { pub fn parse_sql_option(&mut self) -> PResult { let name = self.parse_object_name()?; self.expect_token(&Token::Eq)?; - let value = self.parse_value()?; + let value = { + const CONNECTION_REF_KEY: &str = "connection"; + if name.real_value().eq_ignore_ascii_case(CONNECTION_REF_KEY) { + let connection_name = self.parse_object_name()?; + SqlOptionValue::ConnectionRef(ConnectionRefValue { connection_name }) + } else { + self.parse_value_and_obj_ref::()? + } + }; Ok(SqlOption { name, value }) } @@ -3577,7 +3585,7 @@ impl Parser<'_> { let secret_name = self.parse_object_name()?; let with_options = self.parse_with_properties()?; self.expect_keyword(Keyword::AS)?; - let new_credential = self.parse_value()?; + let new_credential = self.ensure_parse_value()?; let operation = AlterSecretOperation::ChangeCredential { new_credential }; Ok(Statement::AlterSecret { name: secret_name, @@ -3637,44 +3645,61 @@ impl Parser<'_> { values } + pub fn ensure_parse_value(&mut self) -> PResult { + match self.parse_value_and_obj_ref::()? { + SqlOptionValue::Value(value) => Ok(value), + SqlOptionValue::SecretRef(_) | SqlOptionValue::ConnectionRef(_) => unreachable!(), + } + } + /// Parse a literal value (numbers, strings, date/time, booleans) - pub fn parse_value(&mut self) -> PResult { + pub fn parse_value_and_obj_ref( + &mut self, + ) -> PResult { let checkpoint = *self; let token = self.next_token(); match token.token { Token::Word(w) => match w.keyword { - Keyword::TRUE => Ok(Value::Boolean(true)), - Keyword::FALSE => Ok(Value::Boolean(false)), - Keyword::NULL => Ok(Value::Null), + Keyword::TRUE => Ok(Value::Boolean(true).into()), + Keyword::FALSE => Ok(Value::Boolean(false).into()), + Keyword::NULL => Ok(Value::Null.into()), Keyword::NoKeyword if w.quote_style.is_some() => match w.quote_style { - Some('"') => Ok(Value::DoubleQuotedString(w.value)), - Some('\'') => Ok(Value::SingleQuotedString(w.value)), + Some('"') => Ok(Value::DoubleQuotedString(w.value).into()), + Some('\'') => Ok(Value::SingleQuotedString(w.value).into()), _ => self.expected_at(checkpoint, "A value")?, }, Keyword::SECRET => { + if FORBID_OBJ_REF { + return self.expected_at( + checkpoint, + "a concrete value rather than a secret reference", + ); + } let secret = self.parse_secret_ref()?; - Ok(Value::Ref(secret)) + Ok(SqlOptionValue::SecretRef(secret)) } _ => self.expected_at(checkpoint, "a concrete value"), }, - Token::Number(ref n) => Ok(Value::Number(n.clone())), - Token::SingleQuotedString(ref s) => Ok(Value::SingleQuotedString(s.to_string())), - Token::DollarQuotedString(ref s) => Ok(Value::DollarQuotedString(s.clone())), - Token::CstyleEscapesString(ref s) => Ok(Value::CstyleEscapedString(s.clone())), - Token::NationalStringLiteral(ref s) => Ok(Value::NationalStringLiteral(s.to_string())), - Token::HexStringLiteral(ref s) => Ok(Value::HexStringLiteral(s.to_string())), + Token::Number(ref n) => Ok(Value::Number(n.clone()).into()), + Token::SingleQuotedString(ref s) => Ok(Value::SingleQuotedString(s.to_string()).into()), + Token::DollarQuotedString(ref s) => Ok(Value::DollarQuotedString(s.clone()).into()), + Token::CstyleEscapesString(ref s) => Ok(Value::CstyleEscapedString(s.clone()).into()), + Token::NationalStringLiteral(ref s) => { + Ok(Value::NationalStringLiteral(s.to_string()).into()) + } + Token::HexStringLiteral(ref s) => Ok(Value::HexStringLiteral(s.to_string()).into()), _ => self.expected_at(checkpoint, "a value"), } } - fn parse_secret_ref(&mut self) -> PResult { + fn parse_secret_ref(&mut self) -> PResult { let secret_name = self.parse_object_name()?; let ref_as = if self.parse_keywords(&[Keyword::AS, Keyword::FILE]) { SecretRefAsType::File } else { SecretRefAsType::Text }; - Ok(SecretRef { + Ok(SecretRefValue { secret_name, ref_as, }) @@ -3686,7 +3711,7 @@ impl Parser<'_> { separated( 1.., alt(( - Self::parse_value.map(SetVariableValueSingle::Literal), + Self::ensure_parse_value.map(SetVariableValueSingle::Literal), |parser: &mut Self| { let checkpoint = *parser; let ident = parser.parse_identifier()?; @@ -3713,7 +3738,7 @@ impl Parser<'_> { pub fn parse_number_value(&mut self) -> PResult { let checkpoint = *self; - match self.parse_value()? { + match self.ensure_parse_value()? { Value::Number(v) => Ok(v), _ => self.expected_at(checkpoint, "literal number"), } @@ -4398,7 +4423,7 @@ impl Parser<'_> { })), ), Self::parse_identifier.map(SetTimeZoneValue::Ident), - Self::parse_value.map(SetTimeZoneValue::Literal), + Self::ensure_parse_value.map(SetTimeZoneValue::Literal), )) .expect("variable") .parse_next(self)?; @@ -4417,7 +4442,7 @@ impl Parser<'_> { }) } else if self.parse_keyword(Keyword::TRANSACTION) && modifier.is_none() { if self.parse_keyword(Keyword::SNAPSHOT) { - let snapshot_id = self.parse_value()?; + let snapshot_id = self.ensure_parse_value()?; return Ok(Statement::SetTransaction { modes: vec![], snapshot: Some(snapshot_id), diff --git a/src/sqlparser/tests/sqlparser_common.rs b/src/sqlparser/tests/sqlparser_common.rs index c8f6fb41d32a9..71af4bba29871 100644 --- a/src/sqlparser/tests/sqlparser_common.rs +++ b/src/sqlparser/tests/sqlparser_common.rs @@ -1543,11 +1543,11 @@ fn parse_create_table_with_options() { vec![ SqlOption { name: vec!["foo".into()].into(), - value: Value::SingleQuotedString("bar".into()) + value: Value::SingleQuotedString("bar".into()).into(), }, SqlOption { name: vec!["a".into()].into(), - value: number("123") + value: number("123").into(), }, ], with_options @@ -3145,11 +3145,11 @@ fn parse_create_view_with_options() { vec![ SqlOption { name: vec!["foo".into()].into(), - value: Value::SingleQuotedString("bar".into()) + value: Value::SingleQuotedString("bar".into()).into(), }, SqlOption { name: vec!["a".into()].into(), - value: number("123") + value: number("123").into(), }, ], with_options diff --git a/src/sqlparser/tests/sqlparser_postgres.rs b/src/sqlparser/tests/sqlparser_postgres.rs index 549920d1c7585..a2edb08edda44 100644 --- a/src/sqlparser/tests/sqlparser_postgres.rs +++ b/src/sqlparser/tests/sqlparser_postgres.rs @@ -159,15 +159,15 @@ fn parse_create_table_with_defaults() { vec![ SqlOption { name: vec!["fillfactor".into()].into(), - value: number("20") + value: number("20").into(), }, SqlOption { name: vec!["user_catalog_table".into()].into(), - value: Value::Boolean(true) + value: Value::Boolean(true).into(), }, SqlOption { name: vec!["autovacuum_vacuum_threshold".into()].into(), - value: number("100") + value: number("100").into(), }, ] );