diff --git a/Cargo.lock b/Cargo.lock index 2ec4620e383df..633698b84ac36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1040,7 +1040,7 @@ dependencies = [ "async-channel 2.2.1", "async-executor", "async-io", - "async-lock 3.4.0", + "async-lock", "blocking", "futures-lite", "once_cell", @@ -1053,7 +1053,7 @@ version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d6baa8f0178795da0e71bc42c9e5d13261aac7ee549853162e66a241ba17964" dependencies = [ - "async-lock 3.4.0", + "async-lock", "cfg-if", "concurrent-queue", "futures-io", @@ -1066,15 +1066,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "async-lock" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" -dependencies = [ - "event-listener 2.5.3", -] - [[package]] name = "async-lock" version = "3.4.0" @@ -1141,7 +1132,7 @@ dependencies = [ "async-channel 1.9.0", "async-global-executor", "async-io", - "async-lock 3.4.0", + "async-lock", "crossbeam-utils", "futures-channel", "futures-core", @@ -2333,12 +2324,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "bytecount" -version = "0.6.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce" - [[package]] name = "bytemuck" version = "1.14.0" @@ -2400,15 +2385,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "camino" -version = "1.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b96ec4966b5813e2c0507c1f86115c8c5abaadc3980879c3424042a02fd1ad3" -dependencies = [ - "serde", -] - [[package]] name = "cap-fs-ext" version = "3.0.0" @@ -2480,28 +2456,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1582e1c9e755dd6ad6b224dcffb135d199399a4568d454bd89fe515ca8425695" -[[package]] -name = "cargo-platform" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24b1f0365a6c6bb4020cd05806fd0d33c44d38046b8bd7f0e40814b9763cabfc" -dependencies = [ - "serde", -] - -[[package]] -name = "cargo_metadata" -version = "0.14.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" -dependencies = [ - "camino", - "cargo-platform", - "semver 1.0.18", - "serde", - "serde_json", -] - [[package]] name = "cast" version = "0.3.0" @@ -4743,15 +4697,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "error-chain" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" -dependencies = [ - "version_check", -] - [[package]] name = "escape8259" version = "0.5.2" @@ -7710,21 +7655,21 @@ dependencies = [ [[package]] name = "moka" -version = "0.12.0" +version = "0.12.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8dc65d4615c08c8a13d91fd404b5a2a4485ba35b4091e3315cf8798d280c2f29" +checksum = "32cf62eb4dd975d2dde76432fb1075c49e3ee2331cf36f1f8fd4b66550d32b6f" dependencies = [ - "async-lock 2.8.0", + "async-lock", "async-trait", "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", + "event-listener 5.3.1", "futures-util", "once_cell", "parking_lot 0.12.1", "quanta", "rustc_version 0.4.0", - "skeptic", "smallvec", "tagptr", "thiserror", @@ -9798,17 +9743,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "pulldown-cmark" -version = "0.9.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57206b407293d2bcd3af849ce869d52068623f19e1b5ff8e8778e3309439682b" -dependencies = [ - "bitflags 2.6.0", - "memchr", - "unicase", -] - [[package]] name = "pulsar" version = "6.3.0" @@ -9918,12 +9852,11 @@ checksum = "658fa1faf7a4cc5f057c9ee5ef560f717ad9d8dc66d975267f709624d6e1ab88" [[package]] name = "quanta" -version = "0.11.0" -source = "git+https://github.com/madsim-rs/quanta.git?rev=948bdc3#948bdc3d4cd3fcfe3d52d03dd83deee96d97d770" +version = "0.12.3" +source = "git+https://github.com/madsim-rs/quanta.git?rev=ea9ba802327b1d72c4b1c7202c759b0a5243271e#ea9ba802327b1d72c4b1c7202c759b0a5243271e" dependencies = [ "crossbeam-utils", "libc", - "mach2", "once_cell", "raw-cpuid", "wasi", @@ -10022,11 +9955,11 @@ dependencies = [ [[package]] name = "raw-cpuid" -version = "10.7.0" +version = "11.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.6.0", ] [[package]] @@ -11677,6 +11610,7 @@ dependencies = [ "maplit", "memcomparable", "mime_guess", + "moka", "notify", "num-integer", "num-traits", @@ -13138,9 +13072,6 @@ name = "semver" version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918" -dependencies = [ - "serde", -] [[package]] name = "semver-parser" @@ -13601,21 +13532,6 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fed904c7fb2856d868b92464fc8fa597fce366edea1a9cbfaa8cb5fe080bd6d" -[[package]] -name = "skeptic" -version = "0.13.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" -dependencies = [ - "bytecount", - "cargo_metadata", - "error-chain", - "glob", - "pulldown-cmark", - "tempfile", - "walkdir", -] - [[package]] name = "slab" version = "0.4.9" diff --git a/Cargo.toml b/Cargo.toml index f68f6340d0012..2000a730297f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -153,11 +153,7 @@ arrow-udf-flight = "0.4" clap = { version = "4", features = ["cargo", "derive", "env"] } # Use a forked version which removes the dependencies on dynamo db to reduce # compile time and binary size. -deltalake = { version = "0.20.1", features = [ - "s3", - "gcs", - "datafusion", -] } +deltalake = { version = "0.20.1", features = ["s3", "gcs", "datafusion"] } itertools = "0.13.0" jsonbb = "0.1.4" lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "2682b85" } @@ -347,7 +343,7 @@ opt-level = 2 [patch.crates-io] # Patch third-party crates for deterministic simulation. -quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" } +quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "ea9ba802327b1d72c4b1c7202c759b0a5243271e" } getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae" } # Don't patch `tokio-stream`, but only use the madsim version for **direct** dependencies. # Imagine an unpatched dependency depends on the original `tokio` and the patched `tokio-stream`. diff --git a/e2e_test/source_legacy/basic/ddl.slt b/e2e_test/source_legacy/basic/ddl.slt index e8b9e10b249fa..edf2ea3d9bdc0 100644 --- a/e2e_test/source_legacy/basic/ddl.slt +++ b/e2e_test/source_legacy/basic/ddl.slt @@ -30,7 +30,7 @@ Caused by these errors (recent errors listed first): 1: gRPC request to meta service failed: Internal error 2: failed to create source worker 3: failed to parse json - 4: missing field `properties.bootstrap.server` + 4: missing field `topic` statement error diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 44fb2d7ba840f..14257251041f4 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -71,7 +71,7 @@ jni = { version = "0.21.1", features = ["invocation"] } jsonbb = { workspace = true } jsonwebtoken = "9.2.0" maplit = "1.0.2" -moka = { version = "0.12.0", features = ["future"] } +moka = { version = "0.12.8", features = ["future"] } mongodb = { version = "2.8.2", features = ["tokio-runtime"] } mysql_async = { workspace = true } mysql_common = { version = "0.32", default-features = false, features = [ diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index 1be41b4e09caa..3eaffa93d02a4 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::BTreeMap; +use std::hash::Hash; use std::io::Write; use std::time::Duration; @@ -61,7 +62,7 @@ use aws_types::SdkConfig; use risingwave_common::util::env_var::env_var_is_true; /// A flatten config map for aws auth. -#[derive(Deserialize, Debug, Clone, WithOptions)] +#[derive(Deserialize, Debug, Clone, WithOptions, PartialEq)] pub struct AwsAuthProps { #[serde(rename = "aws.region", alias = "region")] pub region: Option, @@ -161,21 +162,11 @@ impl AwsAuthProps { } #[serde_as] -#[derive(Debug, Clone, Deserialize, WithOptions)] -pub struct KafkaCommon { +#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)] +pub struct KafkaConnection { #[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")] pub brokers: String, - #[serde(rename = "topic", alias = "kafka.topic")] - pub topic: String, - - #[serde( - rename = "properties.sync.call.timeout", - deserialize_with = "deserialize_duration_from_string", - default = "default_kafka_sync_call_timeout" - )] - pub sync_call_timeout: Duration, - /// Security protocol used for RisingWave to communicate with Kafka brokers. Could be /// PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL. #[serde(rename = "properties.security.protocol")] @@ -252,6 +243,20 @@ pub struct KafkaCommon { #[serde_as] #[derive(Debug, Clone, Deserialize, WithOptions)] +pub struct KafkaCommon { + #[serde(rename = "topic", alias = "kafka.topic")] + pub topic: String, + + #[serde( + rename = "properties.sync.call.timeout", + deserialize_with = "deserialize_duration_from_string", + default = "default_kafka_sync_call_timeout" + )] + pub sync_call_timeout: Duration, +} + +#[serde_as] +#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq)] pub struct KafkaPrivateLinkCommon { /// This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users. #[serde(rename = "broker.rewrite.endpoints")] @@ -269,7 +274,7 @@ pub struct RdKafkaPropertiesCommon { /// Maximum Kafka protocol request message size. Due to differing framing overhead between /// protocol versions the producer is unable to reliably enforce a strict max message limit at /// produce time and may exceed the maximum size by one message in protocol ProduceRequests, - /// the broker will enforce the the topic's max.message.bytes limit + /// the broker will enforce the topic's max.message.bytes limit #[serde(rename = "properties.message.max.bytes")] #[serde_as(as = "Option")] pub message_max_bytes: Option, @@ -316,7 +321,7 @@ impl RdKafkaPropertiesCommon { } } -impl KafkaCommon { +impl KafkaConnection { pub(crate) fn set_security_properties(&self, config: &mut ClientConfig) { // AWS_MSK_IAM if self.is_aws_msk_iam() { diff --git a/src/connector/src/connector_common/mod.rs b/src/connector/src/connector_common/mod.rs index 30d699198ccd4..57b614fdf548a 100644 --- a/src/connector/src/connector_common/mod.rs +++ b/src/connector/src/connector_common/mod.rs @@ -19,9 +19,9 @@ pub use mqtt_common::{MqttCommon, QualityOfService as MqttQualityOfService}; mod common; pub use common::{ - AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, KafkaPrivateLinkCommon, KinesisCommon, - MongodbCommon, NatsCommon, PulsarCommon, PulsarOauthCommon, RdKafkaPropertiesCommon, - PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY, + AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, KafkaConnection, KafkaPrivateLinkCommon, + KinesisCommon, MongodbCommon, NatsCommon, PulsarCommon, PulsarOauthCommon, + RdKafkaPropertiesCommon, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY, }; mod iceberg; diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index cfee9674d1ce4..c95d7fc97f4e3 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use risingwave_common::array::ArrayError; use risingwave_common::error::def_anyhow_newtype; use risingwave_pb::PbFieldNotFound; @@ -29,6 +31,7 @@ def_anyhow_newtype! { // Common errors std::io::Error => transparent, + Arc => transparent, // Fine-grained connector errors AccessError => transparent, diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 68f20fcf7d581..9fc8da7ef7a48 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -35,7 +35,7 @@ use with_options::WithOptions; use super::catalog::{SinkFormat, SinkFormatDesc}; use super::{Sink, SinkError, SinkParam}; use crate::connector_common::{ - AwsAuthProps, KafkaCommon, KafkaPrivateLinkCommon, RdKafkaPropertiesCommon, + AwsAuthProps, KafkaCommon, KafkaConnection, KafkaPrivateLinkCommon, RdKafkaPropertiesCommon, }; use crate::sink::formatter::SinkFormatterImpl; use crate::sink::log_store::DeliveryFutureManagerAddFuture; @@ -214,6 +214,9 @@ pub struct KafkaConfig { #[serde(flatten)] pub common: KafkaCommon, + #[serde(flatten)] + pub connection: KafkaConnection, + #[serde( rename = "properties.retry.max", default = "_default_max_retries", @@ -269,6 +272,7 @@ impl From for KafkaProperties { time_offset: None, upsert: None, common: val.common, + connection: val.connection, rdkafka_properties_common: val.rdkafka_properties_common, rdkafka_properties_consumer: Default::default(), privatelink_common: val.privatelink_common, @@ -368,7 +372,7 @@ impl Sink for KafkaSink { if !check.check_reachability().await { return Err(SinkError::Config(anyhow!( "cannot connect to kafka broker ({})", - self.config.common.brokers + self.config.connection.brokers ))); } Ok(()) @@ -413,11 +417,11 @@ impl KafkaSinkWriter { let mut c = ClientConfig::new(); // KafkaConfig configuration - config.common.set_security_properties(&mut c); + config.connection.set_security_properties(&mut c); config.set_client(&mut c); // ClientConfig configuration - c.set("bootstrap.servers", &config.common.brokers); + c.set("bootstrap.servers", &config.connection.brokers); // Create the producer context, will be used to create the producer let broker_rewrite_map = config.privatelink_common.broker_rewrite_map.clone(); @@ -426,7 +430,7 @@ impl KafkaSinkWriter { None, None, config.aws_auth_props.clone(), - config.common.is_aws_msk_iam(), + config.connection.is_aws_msk_iam(), ) .await?; let producer_ctx = RwProducerContext::new(ctx_common); @@ -685,7 +689,7 @@ mod test { "broker.rewrite.endpoints".to_string() => "{\"broker1\": \"10.0.0.1:8001\"}".to_string(), }; let config = KafkaConfig::from_btreemap(properties).unwrap(); - assert_eq!(config.common.brokers, "localhost:9092"); + assert_eq!(config.connection.brokers, "localhost:9092"); assert_eq!(config.common.topic, "test"); assert_eq!(config.max_retry_num, 20); assert_eq!(config.retry_interval, Duration::from_millis(500)); diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index a425de418ef4a..1d7525bc7a613 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -12,26 +12,34 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; +use std::sync::{Arc, LazyLock, Weak}; use std::time::Duration; use anyhow::{anyhow, Context}; use async_trait::async_trait; +use moka::future::Cache as MokaCache; +use moka::ops::compute::Op; use prometheus::core::{AtomicI64, GenericGauge}; use rdkafka::consumer::{BaseConsumer, Consumer}; use rdkafka::error::KafkaResult; -use rdkafka::{Offset, TopicPartitionList}; +use rdkafka::{ClientConfig, Offset, TopicPartitionList}; use risingwave_common::bail; use risingwave_common::metrics::LabelGuardedMetric; -use crate::error::ConnectorResult; +use crate::error::{ConnectorError, ConnectorResult}; use crate::source::base::SplitEnumerator; use crate::source::kafka::split::KafkaSplit; use crate::source::kafka::{ - KafkaContextCommon, KafkaProperties, RwConsumerContext, KAFKA_ISOLATION_LEVEL, + KafkaConnection, KafkaContextCommon, KafkaProperties, RwConsumerContext, KAFKA_ISOLATION_LEVEL, }; use crate::source::SourceEnumeratorContextRef; +type KafkaClientType = BaseConsumer; + +pub static SHARED_KAFKA_CLIENT: LazyLock>> = + LazyLock::new(|| moka::future::Cache::builder().build()); + #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum KafkaEnumeratorOffset { Earliest, @@ -44,7 +52,7 @@ pub struct KafkaSplitEnumerator { context: SourceEnumeratorContextRef, broker_address: String, topic: String, - client: BaseConsumer, + client: Arc, start_offset: KafkaEnumeratorOffset, // maybe used in the future for batch processing @@ -68,12 +76,12 @@ impl SplitEnumerator for KafkaSplitEnumerator { let mut config = rdkafka::ClientConfig::new(); let common_props = &properties.common; - let broker_address = common_props.brokers.clone(); + let broker_address = properties.connection.brokers.clone(); let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone(); let topic = common_props.topic.clone(); config.set("bootstrap.servers", &broker_address); config.set("isolation.level", KAFKA_ISOLATION_LEVEL); - common_props.set_security_properties(&mut config); + properties.connection.set_security_properties(&mut config); properties.set_client(&mut config); let mut scan_start_offset = match properties .scan_startup_mode @@ -94,36 +102,64 @@ impl SplitEnumerator for KafkaSplitEnumerator { scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset) } - // don't need kafka metrics from enumerator - let ctx_common = KafkaContextCommon::new( - broker_rewrite_map, - None, - None, - properties.aws_auth_props, - common_props.is_aws_msk_iam(), - ) - .await?; - let client_ctx = RwConsumerContext::new(ctx_common); - let client: BaseConsumer = - config.create_with_context(client_ctx).await?; - - // Note that before any SASL/OAUTHBEARER broker connection can succeed the application must call - // rd_kafka_oauthbearer_set_token() once – either directly or, more typically, by invoking either - // rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval - // of an initial token to occur. - // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf - if common_props.is_aws_msk_iam() { - #[cfg(not(madsim))] - client.poll(Duration::from_secs(10)); // note: this is a blocking call - #[cfg(madsim)] - client.poll(Duration::from_secs(10)).await; + async fn build_kafka_client( + config: &ClientConfig, + properties: &KafkaProperties, + rewrite_map: Option>, + ) -> ConnectorResult { + let ctx_common = KafkaContextCommon::new( + rewrite_map, + None, + None, + properties.aws_auth_props.clone(), + properties.connection.is_aws_msk_iam(), + ) + .await?; + let client_ctx = RwConsumerContext::new(ctx_common); + let client: BaseConsumer = + config.create_with_context(client_ctx).await?; + + // Note that before any SASL/OAUTHBEARER broker connection can succeed the application must call + // rd_kafka_oauthbearer_set_token() once – either directly or, more typically, by invoking either + // rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval + // of an initial token to occur. + // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf + if properties.connection.is_aws_msk_iam() { + #[cfg(not(madsim))] + client.poll(Duration::from_secs(10)); // note: this is a blocking call + #[cfg(madsim)] + client.poll(Duration::from_secs(10)).await; + } + Ok(client) } + let mut client_arc: Option> = None; + SHARED_KAFKA_CLIENT + .entry_by_ref(&properties.connection) + .and_try_compute_with::<_, _, ConnectorError>(|maybe_entry| async { + if let Some(entry) = maybe_entry { + let entry_value = entry.into_value(); + if let Some(client) = entry_value.upgrade() { + // return if the client is already built + tracing::info!("reuse existing kafka client for {}", broker_address); + client_arc = Some(client); + return Ok(Op::Nop); + } + } + let new_client_arc = Arc::new( + build_kafka_client(&config, &properties, broker_rewrite_map.clone()).await?, + ); + tracing::info!("build new kafka client for {}", broker_address); + client_arc = Some(new_client_arc.clone()); + Ok(Op::Put(Arc::downgrade(&new_client_arc))) + }) + .await?; + Ok(Self { context, broker_address, topic, - client, + client: client_arc.unwrap(), start_offset: scan_start_offset, stop_offset: KafkaEnumeratorOffset::None, sync_call_timeout: properties.common.sync_call_timeout, @@ -148,7 +184,7 @@ impl SplitEnumerator for KafkaSplitEnumerator { .fetch_stop_offset(topic_partitions.as_ref(), &watermarks) .await?; - let ret = topic_partitions + let ret: Vec<_> = topic_partitions .into_iter() .map(|partition| KafkaSplit { topic: self.topic.clone(), diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index 125bf73a3529f..030c190eb4942 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; -use crate::connector_common::{AwsAuthProps, KafkaPrivateLinkCommon}; +use crate::connector_common::{AwsAuthProps, KafkaConnection, KafkaPrivateLinkCommon}; mod client_context; pub mod enumerator; @@ -143,6 +143,9 @@ pub struct KafkaProperties { #[serde(flatten)] pub common: KafkaCommon, + #[serde(flatten)] + pub connection: KafkaConnection, + #[serde(flatten)] pub rdkafka_properties_common: RdKafkaPropertiesCommon, diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index d58f1b70dd9fc..b9523eca98b57 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -64,7 +64,7 @@ impl SplitReader for KafkaSplitReader { ) -> Result { let mut config = ClientConfig::new(); - let bootstrap_servers = &properties.common.brokers; + let bootstrap_servers = &properties.connection.brokers; let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone(); // disable partition eof @@ -73,7 +73,7 @@ impl SplitReader for KafkaSplitReader { config.set("isolation.level", KAFKA_ISOLATION_LEVEL); config.set("bootstrap.servers", bootstrap_servers); - properties.common.set_security_properties(&mut config); + properties.connection.set_security_properties(&mut config); properties.set_client(&mut config); let group_id_prefix = properties @@ -95,7 +95,7 @@ impl SplitReader for KafkaSplitReader { // explicitly Some(source_ctx.metrics.rdkafka_native_metric.clone()), properties.aws_auth_props, - properties.common.is_aws_msk_iam(), + properties.connection.is_aws_msk_iam(), ) .await?; diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index eb03288dfbcca..dd2dd7098f99f 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -434,11 +434,6 @@ IcebergConfig: default: Default::default KafkaConfig: fields: - - name: properties.bootstrap.server - field_type: String - required: true - alias: - - kafka.brokers - name: topic field_type: String required: true @@ -448,6 +443,11 @@ KafkaConfig: field_type: Duration required: false default: 'Duration :: from_secs (5)' + - name: properties.bootstrap.server + field_type: String + required: true + alias: + - kafka.brokers - name: properties.security.protocol field_type: String comments: |- @@ -542,7 +542,7 @@ KafkaConfig: Maximum Kafka protocol request message size. Due to differing framing overhead between protocol versions the producer is unable to reliably enforce a strict max message limit at produce time and may exceed the maximum size by one message in protocol ProduceRequests, - the broker will enforce the the topic's max.message.bytes limit + the broker will enforce the topic's max.message.bytes limit required: false - name: properties.receive.message.max.bytes field_type: usize diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index dc8d31d281be9..41c27a1af7eb6 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -182,11 +182,6 @@ KafkaProperties: combine both key and value fields of the Kafka message. TODO: Currently, `Option` can not be parsed here. required: false - - name: properties.bootstrap.server - field_type: String - required: true - alias: - - kafka.brokers - name: topic field_type: String required: true @@ -196,6 +191,11 @@ KafkaProperties: field_type: Duration required: false default: 'Duration :: from_secs (5)' + - name: properties.bootstrap.server + field_type: String + required: true + alias: + - kafka.brokers - name: properties.security.protocol field_type: String comments: |- @@ -275,7 +275,7 @@ KafkaProperties: Maximum Kafka protocol request message size. Due to differing framing overhead between protocol versions the producer is unable to reliably enforce a strict max message limit at produce time and may exceed the maximum size by one message in protocol ProduceRequests, - the broker will enforce the the topic's max.message.bytes limit + the broker will enforce the topic's max.message.bytes limit required: false - name: properties.receive.message.max.bytes field_type: usize diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 01aed8dd76b6b..16a303609699b 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -42,7 +42,10 @@ jsonbb = { workspace = true } maplit = "1.0.2" memcomparable = { version = "0.2" } mime_guess = "2" -notify = { version = "7", default-features = false, features = ["macos_fsevent"] } +moka = { version = "0.12.3", features = ["future"] } +notify = { version = "7", default-features = false, features = [ + "macos_fsevent", +] } num-integer = "0.1" num-traits = "0.2" otlp-embedded = { workspace = true } diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 4db8fe95146d9..b13acb68ac39b 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -138,7 +138,6 @@ pub async fn create_source_worker_handle( tokio::spawn(async move { worker.run(sync_call_rx).await }) }); - Ok(ConnectorSourceWorkerHandle { handle, sync_call_tx, @@ -1052,6 +1051,7 @@ impl SourceManager { let source_id = source.id; let connector_properties = extract_prop_from_existing_source(&source)?; + let enable_scale_in = connector_properties.enable_split_scale_in(); let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); let handle = tokio::spawn(async move {