Skip to content

Commit

Permalink
feat: share kafka client on meta (#19058)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabversion <[email protected]>
Co-authored-by: tabversion <[email protected]>
  • Loading branch information
tabVersion and tabversion authored Nov 2, 2024
1 parent 3a8d5a8 commit e7e4a2c
Show file tree
Hide file tree
Showing 15 changed files with 145 additions and 179 deletions.
110 changes: 13 additions & 97 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 2 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,7 @@ arrow-udf-flight = "0.4"
clap = { version = "4", features = ["cargo", "derive", "env"] }
# Use a forked version which removes the dependencies on dynamo db to reduce
# compile time and binary size.
deltalake = { version = "0.20.1", features = [
"s3",
"gcs",
"datafusion",
] }
deltalake = { version = "0.20.1", features = ["s3", "gcs", "datafusion"] }
itertools = "0.13.0"
jsonbb = "0.1.4"
lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "2682b85" }
Expand Down Expand Up @@ -347,7 +343,7 @@ opt-level = 2

[patch.crates-io]
# Patch third-party crates for deterministic simulation.
quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" }
quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "ea9ba802327b1d72c4b1c7202c759b0a5243271e" }
getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae" }
# Don't patch `tokio-stream`, but only use the madsim version for **direct** dependencies.
# Imagine an unpatched dependency depends on the original `tokio` and the patched `tokio-stream`.
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/source_legacy/basic/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to create source worker
3: failed to parse json
4: missing field `properties.bootstrap.server`
4: missing field `topic`


statement error
Expand Down
2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ jni = { version = "0.21.1", features = ["invocation"] }
jsonbb = { workspace = true }
jsonwebtoken = "9.2.0"
maplit = "1.0.2"
moka = { version = "0.12.0", features = ["future"] }
moka = { version = "0.12.8", features = ["future"] }
mongodb = { version = "2.8.2", features = ["tokio-runtime"] }
mysql_async = { workspace = true }
mysql_common = { version = "0.32", default-features = false, features = [
Expand Down
35 changes: 20 additions & 15 deletions src/connector/src/connector_common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::BTreeMap;
use std::hash::Hash;
use std::io::Write;
use std::time::Duration;

Expand Down Expand Up @@ -61,7 +62,7 @@ use aws_types::SdkConfig;
use risingwave_common::util::env_var::env_var_is_true;

/// A flatten config map for aws auth.
#[derive(Deserialize, Debug, Clone, WithOptions)]
#[derive(Deserialize, Debug, Clone, WithOptions, PartialEq)]
pub struct AwsAuthProps {
#[serde(rename = "aws.region", alias = "region")]
pub region: Option<String>,
Expand Down Expand Up @@ -161,21 +162,11 @@ impl AwsAuthProps {
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions)]
pub struct KafkaCommon {
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)]
pub struct KafkaConnection {
#[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")]
pub brokers: String,

#[serde(rename = "topic", alias = "kafka.topic")]
pub topic: String,

#[serde(
rename = "properties.sync.call.timeout",
deserialize_with = "deserialize_duration_from_string",
default = "default_kafka_sync_call_timeout"
)]
pub sync_call_timeout: Duration,

/// Security protocol used for RisingWave to communicate with Kafka brokers. Could be
/// PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL.
#[serde(rename = "properties.security.protocol")]
Expand Down Expand Up @@ -252,6 +243,20 @@ pub struct KafkaCommon {

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions)]
pub struct KafkaCommon {
#[serde(rename = "topic", alias = "kafka.topic")]
pub topic: String,

#[serde(
rename = "properties.sync.call.timeout",
deserialize_with = "deserialize_duration_from_string",
default = "default_kafka_sync_call_timeout"
)]
pub sync_call_timeout: Duration,
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq)]
pub struct KafkaPrivateLinkCommon {
/// This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users.
#[serde(rename = "broker.rewrite.endpoints")]
Expand All @@ -269,7 +274,7 @@ pub struct RdKafkaPropertiesCommon {
/// Maximum Kafka protocol request message size. Due to differing framing overhead between
/// protocol versions the producer is unable to reliably enforce a strict max message limit at
/// produce time and may exceed the maximum size by one message in protocol ProduceRequests,
/// the broker will enforce the the topic's max.message.bytes limit
/// the broker will enforce the topic's max.message.bytes limit
#[serde(rename = "properties.message.max.bytes")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub message_max_bytes: Option<usize>,
Expand Down Expand Up @@ -316,7 +321,7 @@ impl RdKafkaPropertiesCommon {
}
}

impl KafkaCommon {
impl KafkaConnection {
pub(crate) fn set_security_properties(&self, config: &mut ClientConfig) {
// AWS_MSK_IAM
if self.is_aws_msk_iam() {
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/connector_common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ pub use mqtt_common::{MqttCommon, QualityOfService as MqttQualityOfService};

mod common;
pub use common::{
AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, KafkaPrivateLinkCommon, KinesisCommon,
MongodbCommon, NatsCommon, PulsarCommon, PulsarOauthCommon, RdKafkaPropertiesCommon,
PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, KafkaConnection, KafkaPrivateLinkCommon,
KinesisCommon, MongodbCommon, NatsCommon, PulsarCommon, PulsarOauthCommon,
RdKafkaPropertiesCommon, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
};

mod iceberg;
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use risingwave_common::array::ArrayError;
use risingwave_common::error::def_anyhow_newtype;
use risingwave_pb::PbFieldNotFound;
Expand All @@ -29,6 +31,7 @@ def_anyhow_newtype! {

// Common errors
std::io::Error => transparent,
Arc<ConnectorError> => transparent,

// Fine-grained connector errors
AccessError => transparent,
Expand Down
Loading

0 comments on commit e7e4a2c

Please sign in to comment.