Skip to content

Commit

Permalink
remove max-concurrent
Browse files Browse the repository at this point in the history
  • Loading branch information
ic4y committed Aug 10, 2023
1 parent 67a603c commit e4d747e
Show file tree
Hide file tree
Showing 14 changed files with 0 additions and 47 deletions.
1 change: 0 additions & 1 deletion config/seatunnel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ seatunnel:
checkpoint:
interval: 10000
timeout: 60000
max-concurrent: 1
storage:
type: hdfs
max-retained: 3
Expand Down
4 changes: 0 additions & 4 deletions docs/en/seatunnel-engine/checkpoint-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
max-concurrent: 1
storage:
type: hdfs
max-retained: 3
Expand Down Expand Up @@ -93,7 +92,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
max-concurrent: 1
storage:
type: hdfs
max-retained: 3
Expand All @@ -117,7 +115,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
max-concurrent: 1
storage:
type: hdfs
max-retained: 3
Expand Down Expand Up @@ -157,7 +154,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
max-concurrent: 1
storage:
type: hdfs
max-retained: 3
Expand Down
5 changes: 0 additions & 5 deletions docs/en/seatunnel-engine/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ The interval between two checkpoints, unit is milliseconds. If the `checkpoint.i

The timeout of a checkpoint. If a checkpoint cannot be completed within the timeout period, a checkpoint failure will be triggered. Therefore, Job will be restored.

**max-concurrent**

How many checkpoints can be performed simultaneously at most.

Example

```
Expand All @@ -91,7 +87,6 @@ seatunnel:
checkpoint:
interval: 300000
timeout: 10000
max-concurrent: 1
```

**checkpoint storage**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ seatunnel:
checkpoint:
interval: 300000
timeout: 10000
max-concurrent: 1
storage:
type: localfile
max-retained: 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
max-concurrent: 1
storage:
type: hdfs
max-retained: 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,6 @@ private CheckpointConfig parseCheckpointConfig(Node checkpointNode) {
getIntegerValue(
ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.key(),
getTextContent(node)));
} else if (ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.key().equals(name)) {
checkpointConfig.setMaxConcurrentCheckpoints(
getIntegerValue(
ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.key(),
getTextContent(node)));
} else if (ServerConfigOptions.CHECKPOINT_STORAGE.key().equals(name)) {
checkpointConfig.setStorage(parseCheckpointStorageConfig(node));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ public class CheckpointConfig implements Serializable {
private long checkpointTimeout = ServerConfigOptions.CHECKPOINT_TIMEOUT.defaultValue();
private long schemaChangeCheckpointTimeout =
ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.defaultValue();
private int maxConcurrentCheckpoints =
ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.defaultValue();

private CheckpointStorageConfig storage = ServerConfigOptions.CHECKPOINT_STORAGE.defaultValue();

Expand All @@ -58,11 +56,4 @@ public void setSchemaChangeCheckpointTimeout(long checkpointTimeout) {
"The minimum checkpoint timeout is 10 ms.");
this.schemaChangeCheckpointTimeout = checkpointTimeout;
}

public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) {
checkArgument(
maxConcurrentCheckpoints >= 1,
"The minimum number of concurrent checkpoints is 1.");
this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,6 @@ public class ServerConfigOptions {
.withDescription(
"The timeout (in milliseconds) for a schema change checkpoint.");

public static final Option<Integer> CHECKPOINT_MAX_CONCURRENT =
Options.key("max-concurrent")
.intType()
.defaultValue(1)
.withDescription("The maximum number of concurrent checkpoints.");

public static final Option<String> CHECKPOINT_STORAGE_TYPE =
Options.key("type")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ seatunnel:
checkpoint:
interval: 300000
timeout: 10000
max-concurrent: 1
storage:
type: hdfs
max-retained: 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ public void testSeaTunnelConfig() {
Assertions.assertEquals(
7000, config.getEngineConfig().getCheckpointConfig().getCheckpointTimeout());

Assertions.assertEquals(
1, config.getEngineConfig().getCheckpointConfig().getMaxConcurrentCheckpoints());

Assertions.assertEquals(
"hdfs", config.getEngineConfig().getCheckpointConfig().getStorage().getStorage());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
max-concurrent: 1
storage:
type: hdfs
max-retained: 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,6 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
if (checkpointType.notFinalCheckpoint() && checkpointType.notSchemaChangeCheckpoint()) {
if (currentTimestamp - latestTriggerTimestamp.get()
< coordinatorConfig.getCheckpointInterval()
|| pendingCounter.get() >= coordinatorConfig.getMaxConcurrentCheckpoints()
|| !isAllTaskReady) {
return;
}
Expand Down Expand Up @@ -737,12 +736,6 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed
notifyCompleted(completedCheckpoint);
pendingCheckpoints.remove(checkpointId);
pendingCounter.decrementAndGet();
if (pendingCheckpoints.size() + 1 == coordinatorConfig.getMaxConcurrentCheckpoints()) {
// latest checkpoint completed time > checkpoint interval
if (completedCheckpoint.getCheckpointType().notFinalCheckpoint()) {
scheduleTriggerPendingCheckpoint(0L);
}
}
if (isCompleted()) {
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
if (latestCompletedCheckpoint.getCheckpointType().isSavepoint()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,6 @@ private CheckpointConfig createJobCheckpointConfig(
CheckpointConfig jobCheckpointConfig = new CheckpointConfig();
jobCheckpointConfig.setCheckpointTimeout(defaultCheckpointConfig.getCheckpointTimeout());
jobCheckpointConfig.setCheckpointInterval(defaultCheckpointConfig.getCheckpointInterval());
jobCheckpointConfig.setMaxConcurrentCheckpoints(
defaultCheckpointConfig.getMaxConcurrentCheckpoints());

CheckpointStorageConfig jobCheckpointStorageConfig = new CheckpointStorageConfig();
jobCheckpointStorageConfig.setStorage(defaultCheckpointConfig.getStorage().getStorage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
max-concurrent: 1
storage:
type: hdfs
max-retained: 3
Expand Down

0 comments on commit e4d747e

Please sign in to comment.