From fba10d3e5736010068e9411565cbee8ef04fbe3c Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Thu, 23 Jan 2025 14:33:13 +0800 Subject: [PATCH] [FLINK-37204] Add missing StarRocks connector options for 1.2.10. --- .../starrocks/sink/StarRocksDataSinkFactory.java | 6 ++++++ .../starrocks/sink/StarRocksDataSinkOptions.java | 8 ++++++++ 2 files changed, 14 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java index f9fe58034fa..fbb34cd518e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java @@ -81,6 +81,11 @@ private StarRocksSinkOptions buildSinkConnectorOptions(Configuration cdcConfig) .ifPresent( config -> sinkConfig.set(StarRocksSinkOptions.SINK_CONNECT_TIMEOUT, config)); + + cdcConfig + .getOptional(StarRocksDataSinkOptions.SINK_SOCKET_TIMEOUT) + .ifPresent( + config -> sinkConfig.set(StarRocksSinkOptions.SINK_SOCKET_TIMEOUT, config)); cdcConfig .getOptional(StarRocksDataSinkOptions.SINK_WAIT_FOR_CONTINUE_TIMEOUT) .ifPresent( @@ -167,6 +172,7 @@ public Set> optionalOptions() { Set> optionalOptions = new HashSet<>(); optionalOptions.add(StarRocksDataSinkOptions.SINK_LABEL_PREFIX); optionalOptions.add(StarRocksDataSinkOptions.SINK_CONNECT_TIMEOUT); + optionalOptions.add(StarRocksDataSinkOptions.SINK_SOCKET_TIMEOUT); optionalOptions.add(StarRocksDataSinkOptions.SINK_WAIT_FOR_CONTINUE_TIMEOUT); optionalOptions.add(StarRocksDataSinkOptions.SINK_BATCH_MAX_SIZE); optionalOptions.add(StarRocksDataSinkOptions.SINK_BATCH_FLUSH_INTERVAL); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java index d9cb611fb59..d79050febe4 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java @@ -73,6 +73,14 @@ public class StarRocksDataSinkOptions { .defaultValue(30000) .withDescription("Timeout in millisecond for connecting to the `load-url`."); + public static final ConfigOption SINK_SOCKET_TIMEOUT = + ConfigOptions.key("sink.socket.timeout-ms") + .intType() + .defaultValue(-1) + .withDescription( + "Supported since 1.2.10. The time duration for which the HTTP client waits for data." + + " Unit: ms. The default value -1 means there is no timeout."); + public static final ConfigOption SINK_WAIT_FOR_CONTINUE_TIMEOUT = ConfigOptions.key("sink.wait-for-continue.timeout-ms") .intType()