From 8d6b07e466ea46451cdb7a06e7999e4dd714f62f Mon Sep 17 00:00:00 2001 From: Guangdong Liu <804167098@qq.com> Date: Mon, 14 Aug 2023 17:44:10 +0800 Subject: [PATCH] [Improve] [Connector-V2] Remove scheduler in Tablestore sink (#5272) --------- Co-authored-by: gdliu3 --- docs/en/connector-v2/sink/Tablestore.md | 1 - .../tablestore/config/TablestoreConfig.java | 5 --- .../tablestore/config/TablestoreOptions.java | 5 --- .../tablestore/sink/TablestoreSinkClient.java | 31 +------------------ .../sink/TablestoreSinkFactory.java | 3 +- .../tablestore/sink/TablestoreWriter.java | 7 +++++ 6 files changed, 9 insertions(+), 43 deletions(-) diff --git a/docs/en/connector-v2/sink/Tablestore.md b/docs/en/connector-v2/sink/Tablestore.md index ed59895c65f..8f161ad25f6 100644 --- a/docs/en/connector-v2/sink/Tablestore.md +++ b/docs/en/connector-v2/sink/Tablestore.md @@ -21,7 +21,6 @@ Write data to `Tablestore` | table | string | yes | - | | primary_keys | array | yes | - | | batch_size | string | no | 25 | -| batch_interval_ms | string | no | 1000 | | common-options | config | no | - | ### end_point [string] diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java index f64eb8473b0..3e1714c5516 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java @@ -50,11 +50,6 @@ public class TablestoreConfig implements Serializable { .stringType() .defaultValue("25") .withDescription(" Tablestore batch_size"); - public static final Option BATCH_INTERVAL_MS = - Options.key("batch_interval_ms") - .stringType() - .defaultValue("1000") - .withDescription(" Tablestore batch_interval_ms"); public static final Option PRIMARY_KEYS = Options.key("primary_keys") .stringType() diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java index ba6c0089395..7b2aa6bae67 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java @@ -25,7 +25,6 @@ import java.io.Serializable; import java.util.List; -import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_INTERVAL_MS; import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE; @Data @@ -45,7 +44,6 @@ public class TablestoreOptions implements Serializable { private List primaryKeys; public int batchSize = Integer.parseInt(BATCH_SIZE.defaultValue()); - public int batchIntervalMs = Integer.parseInt(BATCH_INTERVAL_MS.defaultValue()); public TablestoreOptions(Config config) { this.endpoint = config.getString(TablestoreConfig.END_POINT.key()); @@ -58,8 +56,5 @@ public TablestoreOptions(Config config) { if (config.hasPath(BATCH_SIZE.key())) { this.batchSize = config.getInt(BATCH_SIZE.key()); } - if (config.hasPath(TablestoreConfig.BATCH_INTERVAL_MS.key())) { - this.batchIntervalMs = config.getInt(TablestoreConfig.BATCH_INTERVAL_MS.key()); - } } } diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java index e3b6f2fbdf3..0637b9b038c 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java @@ -27,22 +27,15 @@ import com.alicloud.openservices.tablestore.model.BatchWriteRowRequest; import com.alicloud.openservices.tablestore.model.BatchWriteRowResponse; import com.alicloud.openservices.tablestore.model.RowPutChange; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; @Slf4j public class TablestoreSinkClient { private final TablestoreOptions tablestoreOptions; - private ScheduledExecutorService scheduler; - private ScheduledFuture scheduledFuture; private volatile boolean initialize; private volatile Exception flushException; private SyncClient syncClient; @@ -64,24 +57,6 @@ private void tryInit() throws IOException { tablestoreOptions.getAccessKeySecret(), tablestoreOptions.getInstanceName()); - scheduler = - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setNameFormat("Tablestore-sink-output-%s") - .build()); - scheduledFuture = - scheduler.scheduleAtFixedRate( - () -> { - try { - flush(); - } catch (IOException e) { - flushException = e; - } - }, - tablestoreOptions.getBatchIntervalMs(), - tablestoreOptions.getBatchIntervalMs(), - TimeUnit.MILLISECONDS); - initialize = true; } @@ -96,17 +71,13 @@ public void write(RowPutChange rowPutChange) throws IOException { } public void close() throws IOException { - if (scheduledFuture != null) { - scheduledFuture.cancel(false); - scheduler.shutdown(); - } if (syncClient != null) { flush(); syncClient.shutdown(); } } - synchronized void flush() throws IOException { + synchronized void flush() { checkFlushException(); if (batchList.isEmpty()) { return; diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java index efe39a08c4a..674f641ad64 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java @@ -26,7 +26,6 @@ import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_ID; import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_SECRET; -import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_INTERVAL_MS; import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE; import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.END_POINT; import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.INSTANCE_NAME; @@ -51,7 +50,7 @@ public OptionRule optionRule() { ACCESS_KEY_SECRET, PRIMARY_KEYS, CatalogTableUtil.SCHEMA) - .optional(BATCH_INTERVAL_MS, BATCH_SIZE) + .optional(BATCH_SIZE) .build(); } } diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java index 929a421f7f5..22bfe1be27f 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.SeaTunnelRowSerializer; import java.io.IOException; +import java.util.Optional; public class TablestoreWriter extends AbstractSinkWriter { @@ -46,4 +47,10 @@ public void write(SeaTunnelRow element) throws IOException { public void close() throws IOException { tablestoreSinkClient.close(); } + + @Override + public Optional prepareCommit() { + tablestoreSinkClient.flush(); + return super.prepareCommit(); + } }