From cef03f66737b4a9eecde3c450c4ec7b4dedf835b Mon Sep 17 00:00:00 2001 From: happyboy1024 <137260654+happyboy1024@users.noreply.github.com> Date: Tue, 12 Sep 2023 15:20:26 +0800 Subject: [PATCH] [Bugfix][Clickhouse] Fix clickhouse sink flush bug (#5448) * [Bug][connector-cdc-mysql] mysql connections and memory of jvm increased abnormally (#5008) * [bugfix][connector-cdc-mysql] reset the listener of binaryLogClient before fetch task start (#5008) * [Bugfix][Clickhouse] fix when the checkpoint triggers flush, the connection is closed, causing subsequent data writing to fail --------- Co-authored-by: dengjunjie <296442618@qq.com> --- .../clickhouse/sink/client/ClickhouseSinkWriter.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java index de29c6cf8b4..6220e4b8071 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java @@ -90,7 +90,14 @@ public void write(SeaTunnelRow element) throws IOException { @Override public Optional prepareCommit() throws IOException { - flush(); + for (ClickhouseBatchStatement batchStatement : statementMap.values()) { + JdbcBatchStatementExecutor statement = batchStatement.getJdbcBatchStatementExecutor(); + IntHolder intHolder = batchStatement.getIntHolder(); + if (intHolder.getValue() > 0) { + flush(statement); + intHolder.setValue(0); + } + } return Optional.empty(); }