Skip to content

Commit

Permalink
[Feature][Connector-V2][Clickhouse] clickhouse writes with checkpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouyao committed Jun 29, 2023
1 parent c226acc commit 01d8854
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 18 deletions.
2 changes: 1 addition & 1 deletion docs/en/connector-v2/sink/Clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ In addition to the above mandatory parameters that must be specified by `clickho

### bulk_size [number]

The number of rows written through [Clickhouse-jdbc](https://github.com/ClickHouse/clickhouse-jdbc) each time, the `default is 20000` .
The number of rows written through [Clickhouse-jdbc](https://github.com/ClickHouse/clickhouse-jdbc) each time, the `default is 20000`, if checkpoints are enabled, writing will also occur at the times when the checkpoints are satisfied .

### split_mode [boolean]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public void write(SeaTunnelRow element) throws IOException {

@Override
public Optional<CKCommitInfo> prepareCommit() throws IOException {
flush();
return Optional.empty();
}

Expand All @@ -99,23 +100,7 @@ public void abortPrepare() {}
@Override
public void close() throws IOException {
this.proxy.close();
for (ClickhouseBatchStatement batchStatement : statementMap.values()) {
try (ClickHouseConnectionImpl needClosedConnection =
batchStatement.getClickHouseConnection();
JdbcBatchStatementExecutor needClosedStatement =
batchStatement.getJdbcBatchStatementExecutor()) {
IntHolder intHolder = batchStatement.getIntHolder();
if (intHolder.getValue() > 0) {
flush(needClosedStatement);
intHolder.setValue(0);
}
} catch (SQLException e) {
throw new ClickhouseConnectorException(
CommonErrorCode.SQL_OPERATION_FAILED,
"Failed to close prepared statement.",
e);
}
}
flush();
}

private void addIntoBatch(SeaTunnelRow row, JdbcBatchStatementExecutor clickHouseStatement) {
Expand All @@ -138,6 +123,26 @@ private void flush(JdbcBatchStatementExecutor clickHouseStatement) {
}
}

private void flush() {
for (ClickhouseBatchStatement batchStatement : statementMap.values()) {
try (ClickHouseConnectionImpl needClosedConnection =
batchStatement.getClickHouseConnection();
JdbcBatchStatementExecutor needClosedStatement =
batchStatement.getJdbcBatchStatementExecutor()) {
IntHolder intHolder = batchStatement.getIntHolder();
if (intHolder.getValue() > 0) {
flush(needClosedStatement);
intHolder.setValue(0);
}
} catch (SQLException e) {
throw new ClickhouseConnectorException(
CommonErrorCode.SQL_OPERATION_FAILED,
"Failed to close prepared statement.",
e);
}
}
}

private Map<Shard, ClickhouseBatchStatement> initStatementMap() {
Map<Shard, ClickhouseBatchStatement> result = new HashMap<>(Common.COLLECTION_SIZE);
shardRouter
Expand Down

0 comments on commit 01d8854

Please sign in to comment.