Skip to content

Commit

Permalink
[Improve] [Connector-V2] Remove scheduler in Tablestore sink (#5272)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: gdliu3 <[email protected]>
  • Loading branch information
liugddx and gdliu3 authored Aug 14, 2023
1 parent de9a324 commit 8d6b07e
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 43 deletions.
1 change: 0 additions & 1 deletion docs/en/connector-v2/sink/Tablestore.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ Write data to `Tablestore`
| table | string | yes | - |
| primary_keys | array | yes | - |
| batch_size | string | no | 25 |
| batch_interval_ms | string | no | 1000 |
| common-options | config | no | - |

### end_point [string]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@ public class TablestoreConfig implements Serializable {
.stringType()
.defaultValue("25")
.withDescription(" Tablestore batch_size");
public static final Option<String> BATCH_INTERVAL_MS =
Options.key("batch_interval_ms")
.stringType()
.defaultValue("1000")
.withDescription(" Tablestore batch_interval_ms");
public static final Option<String> PRIMARY_KEYS =
Options.key("primary_keys")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.io.Serializable;
import java.util.List;

import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_INTERVAL_MS;
import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE;

@Data
Expand All @@ -45,7 +44,6 @@ public class TablestoreOptions implements Serializable {
private List<String> primaryKeys;

public int batchSize = Integer.parseInt(BATCH_SIZE.defaultValue());
public int batchIntervalMs = Integer.parseInt(BATCH_INTERVAL_MS.defaultValue());

public TablestoreOptions(Config config) {
this.endpoint = config.getString(TablestoreConfig.END_POINT.key());
Expand All @@ -58,8 +56,5 @@ public TablestoreOptions(Config config) {
if (config.hasPath(BATCH_SIZE.key())) {
this.batchSize = config.getInt(BATCH_SIZE.key());
}
if (config.hasPath(TablestoreConfig.BATCH_INTERVAL_MS.key())) {
this.batchIntervalMs = config.getInt(TablestoreConfig.BATCH_INTERVAL_MS.key());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,15 @@
import com.alicloud.openservices.tablestore.model.BatchWriteRowRequest;
import com.alicloud.openservices.tablestore.model.BatchWriteRowResponse;
import com.alicloud.openservices.tablestore.model.RowPutChange;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

@Slf4j
public class TablestoreSinkClient {
private final TablestoreOptions tablestoreOptions;
private ScheduledExecutorService scheduler;
private ScheduledFuture<?> scheduledFuture;
private volatile boolean initialize;
private volatile Exception flushException;
private SyncClient syncClient;
Expand All @@ -64,24 +57,6 @@ private void tryInit() throws IOException {
tablestoreOptions.getAccessKeySecret(),
tablestoreOptions.getInstanceName());

scheduler =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("Tablestore-sink-output-%s")
.build());
scheduledFuture =
scheduler.scheduleAtFixedRate(
() -> {
try {
flush();
} catch (IOException e) {
flushException = e;
}
},
tablestoreOptions.getBatchIntervalMs(),
tablestoreOptions.getBatchIntervalMs(),
TimeUnit.MILLISECONDS);

initialize = true;
}

Expand All @@ -96,17 +71,13 @@ public void write(RowPutChange rowPutChange) throws IOException {
}

public void close() throws IOException {
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
scheduler.shutdown();
}
if (syncClient != null) {
flush();
syncClient.shutdown();
}
}

synchronized void flush() throws IOException {
synchronized void flush() {
checkFlushException();
if (batchList.isEmpty()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_ID;
import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_SECRET;
import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_INTERVAL_MS;
import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.END_POINT;
import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.INSTANCE_NAME;
Expand All @@ -51,7 +50,7 @@ public OptionRule optionRule() {
ACCESS_KEY_SECRET,
PRIMARY_KEYS,
CatalogTableUtil.SCHEMA)
.optional(BATCH_INTERVAL_MS, BATCH_SIZE)
.optional(BATCH_SIZE)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.SeaTunnelRowSerializer;

import java.io.IOException;
import java.util.Optional;

public class TablestoreWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {

Expand All @@ -46,4 +47,10 @@ public void write(SeaTunnelRow element) throws IOException {
public void close() throws IOException {
tablestoreSinkClient.close();
}

@Override
public Optional<Void> prepareCommit() {
tablestoreSinkClient.flush();
return super.prepareCommit();
}
}

0 comments on commit 8d6b07e

Please sign in to comment.