From 661939a3491fe40807dbb84593e6ef3157b74b91 Mon Sep 17 00:00:00 2001 From: Boudewijn van Groos Date: Tue, 29 Oct 2024 07:14:52 +0100 Subject: [PATCH] feat(connector): Add topic to mqtt additional columns (#19017) --- e2e_test/sink/mqtt_sink.slt | 55 +++++++++++++++---- integration_tests/mqtt/create_source.sql | 2 +- .../src/parser/additional_columns.rs | 5 +- .../src/source/mqtt/enumerator/mod.rs | 22 ++++++-- .../src/source/mqtt/source/reader.rs | 3 +- 5 files changed, 65 insertions(+), 22 deletions(-) diff --git a/e2e_test/sink/mqtt_sink.slt b/e2e_test/sink/mqtt_sink.slt index 2602d2ddc6198..cdf0f1d5233f4 100644 --- a/e2e_test/sink/mqtt_sink.slt +++ b/e2e_test/sink/mqtt_sink.slt @@ -47,6 +47,34 @@ WITH force_append_only='true', ); +# First the (retained) topics are primed, so that they will be listened +# to when the mqtt source initializes. Otherwise it would take 30 seconds +# for the next enumerator tick + +statement ok +INSERT INTO mqtt (device_id, temperature) +VALUES ( '12', 56.0 ); + +statement ok +FLUSH; + +statement ok +INSERT INTO mqtt (device_id, temperature) +VALUES ( '13', 20.0 ); + +statement ok +FLUSH; + +statement ok +INSERT INTO mqtt_nested (info, temperature) +VALUES( ROW('12', '/nested/12'), 56.0 ); + +statement ok +FLUSH; + +statement ok +INSERT INTO mqtt_nested (info, temperature) +VALUES( ROW('13', null), 22.0 ); statement ok CREATE TABLE mqtt_source @@ -54,10 +82,11 @@ CREATE TABLE mqtt_source device_id varchar, temperature double ) +INCLUDE partition AS mqtt_topic WITH ( - connector='mqtt', - url='tcp://mqtt-server', - topic= '/device/+', + connector ='mqtt', + url ='tcp://mqtt-server', + topic = '/device/+', qos = 'at_least_once', ) FORMAT PLAIN ENCODE JSON; @@ -75,29 +104,23 @@ WITH ( ) FORMAT PLAIN ENCODE JSON; -statement ok -INSERT INTO mqtt (device_id, temperature) -VALUES ( '12', 56.0 ); - statement ok INSERT INTO mqtt (device_id, temperature) VALUES ( '12', 59.0 ); statement ok -INSERT INTO mqtt (device_id, temperature) -VALUES ( '13', 20.0 ); +FLUSH; statement ok INSERT INTO mqtt (device_id, temperature) VALUES ( '13', 22.0 ); statement ok -INSERT INTO mqtt_nested (info, temperature) -VALUES( ROW('12', '/nested/12'), 56.0 ); +FLUSH; statement ok INSERT INTO mqtt_nested (info, temperature) -VALUES( ROW('13', null), 22.0 ); +VALUES( ROW('12', '/nested/12'), 56.0 ); statement ok FLUSH; @@ -112,6 +135,14 @@ SELECT device_id, temperature FROM mqtt ORDER BY device_id, temperature; 13 20 13 22 +query ITT rowsort +SELECT device_id, temperature, mqtt_topic FROM mqtt_source ORDER BY device_id, temperature; +---- +12 56 /device/12 +12 59 /device/12 +13 20 /device/13 +13 22 /device/13 + query IT rowsort SELECT (info).device_id device_id, temperature from mqtt_nested_source ORDER BY device_id, temperature ; ---- diff --git a/integration_tests/mqtt/create_source.sql b/integration_tests/mqtt/create_source.sql index 7ebceaa706bcc..bf43ba2d75083 100644 --- a/integration_tests/mqtt/create_source.sql +++ b/integration_tests/mqtt/create_source.sql @@ -4,7 +4,7 @@ CREATE TABLE CREATE TABLE mqtt_source_table ( id integer, - name varchar, + name varchar ) WITH ( connector='mqtt', diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index 5c91294afa0a7..e5712d95066cb 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -29,8 +29,8 @@ use risingwave_pb::plan_common::{ use crate::error::ConnectorResult; use crate::source::cdc::MONGODB_CDC_CONNECTOR; use crate::source::{ - AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, - OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, + AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, MQTT_CONNECTOR, + NATS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, }; // Hidden additional columns connectors which do not support `include` syntax. @@ -87,6 +87,7 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock ConnectorResult> { if !self.connected.load(std::sync::atomic::Ordering::Relaxed) { - return Err(ConnectorError::from(MqttError(format!( - "Failed to connect to MQTT broker for topic {}", - self.topic - )))); + let start = std::time::Instant::now(); + loop { + if self.connected.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + + if start.elapsed().as_secs() > 10 { + bail!("Failed to connect to mqtt broker"); + } + + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } } let topics = self.topics.read().await; diff --git a/src/connector/src/source/mqtt/source/reader.rs b/src/connector/src/source/mqtt/source/reader.rs index 2c57c8b9966b1..f4ab79bfffed8 100644 --- a/src/connector/src/source/mqtt/source/reader.rs +++ b/src/connector/src/source/mqtt/source/reader.rs @@ -52,7 +52,8 @@ impl SplitReader for MqttSplitReader { ) -> Result { let (client, eventloop) = properties .common - .build_client(source_ctx.actor_id, source_ctx.fragment_id as u64)?; + .build_client(source_ctx.actor_id, source_ctx.fragment_id as u64) + .inspect_err(|e| tracing::error!("Failed to build mqtt client: {}", e.as_report()))?; let qos = properties.common.qos();