From 67a603c2069019259ced92c70bf7e10692c9e088 Mon Sep 17 00:00:00 2001 From: liuli Date: Thu, 10 Aug 2023 14:38:12 +0800 Subject: [PATCH 1/2] [Bugfix][zeta] Resolved the issue causing checkpoints to halt on tolerable-failure=0. --- config/seatunnel.yaml | 1 - docs/en/seatunnel-engine/checkpoint-storage.md | 4 ---- docs/en/seatunnel-engine/deployment.md | 5 ----- .../src/test/resources/seatunnel.yaml | 1 - .../src/test/resources/seatunnel.yaml | 1 - .../config/YamlSeaTunnelDomConfigProcessor.java | 5 ----- .../common/config/server/CheckpointConfig.java | 9 --------- .../common/config/server/ServerConfigOptions.java | 6 ------ .../src/main/resources/seatunnel.yaml | 1 - .../config/YamlSeaTunnelConfigParserTest.java | 3 --- .../src/test/resources/seatunnel.yaml | 1 - .../server/checkpoint/CheckpointCoordinator.java | 15 +++------------ .../seatunnel/engine/server/master/JobMaster.java | 2 -- .../src/test/resources/seatunnel.yaml | 1 - 14 files changed, 3 insertions(+), 52 deletions(-) diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml index 7e496ca39ad..6c0d2eb4ec1 100644 --- a/config/seatunnel.yaml +++ b/config/seatunnel.yaml @@ -27,7 +27,6 @@ seatunnel: interval: 10000 timeout: 60000 max-concurrent: 1 - tolerable-failure: 2 storage: type: hdfs max-retained: 3 diff --git a/docs/en/seatunnel-engine/checkpoint-storage.md b/docs/en/seatunnel-engine/checkpoint-storage.md index a88f301439e..416c32de1aa 100644 --- a/docs/en/seatunnel-engine/checkpoint-storage.md +++ b/docs/en/seatunnel-engine/checkpoint-storage.md @@ -60,7 +60,6 @@ seatunnel: interval: 6000 timeout: 7000 max-concurrent: 1 - tolerable-failure: 2 storage: type: hdfs max-retained: 3 @@ -95,7 +94,6 @@ seatunnel: interval: 6000 timeout: 7000 max-concurrent: 1 - tolerable-failure: 2 storage: type: hdfs max-retained: 3 @@ -120,7 +118,6 @@ seatunnel: interval: 6000 timeout: 7000 max-concurrent: 1 - tolerable-failure: 2 storage: type: hdfs max-retained: 3 @@ -161,7 +158,6 @@ seatunnel: interval: 6000 timeout: 7000 max-concurrent: 1 - tolerable-failure: 2 storage: type: hdfs max-retained: 3 diff --git a/docs/en/seatunnel-engine/deployment.md b/docs/en/seatunnel-engine/deployment.md index c07cd45d6b1..bcc94f2c075 100644 --- a/docs/en/seatunnel-engine/deployment.md +++ b/docs/en/seatunnel-engine/deployment.md @@ -79,10 +79,6 @@ The timeout of a checkpoint. If a checkpoint cannot be completed within the time How many checkpoints can be performed simultaneously at most. -**tolerable-failure** - -Maximum number of retries after checkpoint failure. - Example ``` @@ -96,7 +92,6 @@ seatunnel: interval: 300000 timeout: 10000 max-concurrent: 1 - tolerable-failure: 2 ``` **checkpoint storage** diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml index 16b9f55c30d..2f10ef654eb 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml @@ -26,7 +26,6 @@ seatunnel: interval: 300000 timeout: 10000 max-concurrent: 1 - tolerable-failure: 2 storage: type: localfile max-retained: 3 diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml b/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml index ea5b5ac2307..f3ad6359952 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml @@ -25,7 +25,6 @@ seatunnel: interval: 6000 timeout: 7000 max-concurrent: 1 - tolerable-failure: 2 storage: type: hdfs max-retained: 3 diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java index a901fbb5e6a..cbb77a42a7f 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java @@ -161,11 +161,6 @@ private CheckpointConfig parseCheckpointConfig(Node checkpointNode) { getIntegerValue( ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.key(), getTextContent(node))); - } else if (ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.key().equals(name)) { - checkpointConfig.setTolerableFailureCheckpoints( - getIntegerValue( - ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.key(), - getTextContent(node))); } else if (ServerConfigOptions.CHECKPOINT_STORAGE.key().equals(name)) { checkpointConfig.setStorage(parseCheckpointStorageConfig(node)); } else { diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java index 7038a65b422..203c6f2f19e 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java @@ -35,8 +35,6 @@ public class CheckpointConfig implements Serializable { ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.defaultValue(); private int maxConcurrentCheckpoints = ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.defaultValue(); - private int tolerableFailureCheckpoints = - ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.defaultValue(); private CheckpointStorageConfig storage = ServerConfigOptions.CHECKPOINT_STORAGE.defaultValue(); @@ -67,11 +65,4 @@ public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) { "The minimum number of concurrent checkpoints is 1."); this.maxConcurrentCheckpoints = maxConcurrentCheckpoints; } - - public void setTolerableFailureCheckpoints(int tolerableFailureCheckpoints) { - checkArgument( - maxConcurrentCheckpoints >= 0, - "The number of tolerance failed checkpoints must be a natural number."); - this.tolerableFailureCheckpoints = tolerableFailureCheckpoints; - } } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index 2de8acad012..04dab34679a 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -98,12 +98,6 @@ public class ServerConfigOptions { .defaultValue(1) .withDescription("The maximum number of concurrent checkpoints."); - public static final Option CHECKPOINT_TOLERABLE_FAILURE = - Options.key("tolerable-failure") - .intType() - .defaultValue(0) - .withDescription("The tolerable failure number of a checkpoint."); - public static final Option CHECKPOINT_STORAGE_TYPE = Options.key("type") .stringType() diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml index e5d92281da7..cbd856fc8df 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml +++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml @@ -26,7 +26,6 @@ seatunnel: interval: 300000 timeout: 10000 max-concurrent: 1 - tolerable-failure: 2 storage: type: hdfs max-retained: 3 diff --git a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java index 4c199b352ef..cc00b687fd7 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java +++ b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java @@ -56,9 +56,6 @@ public void testSeaTunnelConfig() { Assertions.assertEquals( 1, config.getEngineConfig().getCheckpointConfig().getMaxConcurrentCheckpoints()); - Assertions.assertEquals( - 2, config.getEngineConfig().getCheckpointConfig().getTolerableFailureCheckpoints()); - Assertions.assertEquals( "hdfs", config.getEngineConfig().getCheckpointConfig().getStorage().getStorage()); diff --git a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml index 4f6ce5f4ef1..94e9ab3db0b 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml +++ b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml @@ -26,7 +26,6 @@ seatunnel: interval: 6000 timeout: 7000 max-concurrent: 1 - tolerable-failure: 2 storage: type: hdfs max-retained: 3 diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 1b6bc6b6871..bd1b4a39131 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -117,7 +117,6 @@ public class CheckpointCoordinator { private final CheckpointConfig coordinatorConfig; - private int tolerableFailureCheckpoints; private transient ScheduledExecutorService scheduler; private final AtomicLong latestTriggerTimestamp = new AtomicLong(0); @@ -165,7 +164,6 @@ public CheckpointCoordinator( this.runningJobStateIMap = runningJobStateIMap; this.plan = plan; this.coordinatorConfig = checkpointConfig; - this.tolerableFailureCheckpoints = coordinatorConfig.getTolerableFailureCheckpoints(); this.pendingCheckpoints = new ConcurrentHashMap<>(); this.completedCheckpoints = new ArrayDeque<>(coordinatorConfig.getStorage().getMaxRetainedCheckpoints() + 1); @@ -531,16 +529,9 @@ private void startTriggerPendingCheckpoint( if (pendingCheckpoints.get(pendingCheckpoint.getCheckpointId()) != null && !pendingCheckpoint.isFullyAcknowledged()) { - if (tolerableFailureCheckpoints-- <= 0 - || pendingCheckpoint - .getCheckpointType() - .isSchemaChangeCheckpoint()) { - LOG.info( - "timeout checkpoint: " - + pendingCheckpoint.getInfo()); - handleCoordinatorError( - CheckpointCloseReason.CHECKPOINT_EXPIRED, null); - } + LOG.info("timeout checkpoint: " + pendingCheckpoint.getInfo()); + handleCoordinatorError( + CheckpointCloseReason.CHECKPOINT_EXPIRED, null); } }, checkpointTimeout, diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 404956a7e71..06b95f6ce7f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -266,8 +266,6 @@ private CheckpointConfig createJobCheckpointConfig( jobCheckpointConfig.setCheckpointInterval(defaultCheckpointConfig.getCheckpointInterval()); jobCheckpointConfig.setMaxConcurrentCheckpoints( defaultCheckpointConfig.getMaxConcurrentCheckpoints()); - jobCheckpointConfig.setTolerableFailureCheckpoints( - defaultCheckpointConfig.getTolerableFailureCheckpoints()); CheckpointStorageConfig jobCheckpointStorageConfig = new CheckpointStorageConfig(); jobCheckpointStorageConfig.setStorage(defaultCheckpointConfig.getStorage().getStorage()); diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml b/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml index 8f22b0613ca..e29978615bf 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml @@ -26,7 +26,6 @@ seatunnel: interval: 6000 timeout: 7000 max-concurrent: 1 - tolerable-failure: 2 storage: type: hdfs max-retained: 3 From e4d747ec5a1c6e9ec14072228c9156acb2617c99 Mon Sep 17 00:00:00 2001 From: liuli Date: Thu, 10 Aug 2023 18:19:44 +0800 Subject: [PATCH 2/2] remove max-concurrent --- config/seatunnel.yaml | 1 - docs/en/seatunnel-engine/checkpoint-storage.md | 4 ---- docs/en/seatunnel-engine/deployment.md | 5 ----- .../src/test/resources/seatunnel.yaml | 1 - .../src/test/resources/seatunnel.yaml | 1 - .../common/config/YamlSeaTunnelDomConfigProcessor.java | 5 ----- .../engine/common/config/server/CheckpointConfig.java | 9 --------- .../engine/common/config/server/ServerConfigOptions.java | 6 ------ .../src/main/resources/seatunnel.yaml | 1 - .../common/config/YamlSeaTunnelConfigParserTest.java | 3 --- .../src/test/resources/seatunnel.yaml | 1 - .../engine/server/checkpoint/CheckpointCoordinator.java | 7 ------- .../apache/seatunnel/engine/server/master/JobMaster.java | 2 -- .../src/test/resources/seatunnel.yaml | 1 - 14 files changed, 47 deletions(-) diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml index 6c0d2eb4ec1..735acc04b15 100644 --- a/config/seatunnel.yaml +++ b/config/seatunnel.yaml @@ -26,7 +26,6 @@ seatunnel: checkpoint: interval: 10000 timeout: 60000 - max-concurrent: 1 storage: type: hdfs max-retained: 3 diff --git a/docs/en/seatunnel-engine/checkpoint-storage.md b/docs/en/seatunnel-engine/checkpoint-storage.md index 416c32de1aa..afe1fa6bc1f 100644 --- a/docs/en/seatunnel-engine/checkpoint-storage.md +++ b/docs/en/seatunnel-engine/checkpoint-storage.md @@ -59,7 +59,6 @@ seatunnel: checkpoint: interval: 6000 timeout: 7000 - max-concurrent: 1 storage: type: hdfs max-retained: 3 @@ -93,7 +92,6 @@ seatunnel: checkpoint: interval: 6000 timeout: 7000 - max-concurrent: 1 storage: type: hdfs max-retained: 3 @@ -117,7 +115,6 @@ seatunnel: checkpoint: interval: 6000 timeout: 7000 - max-concurrent: 1 storage: type: hdfs max-retained: 3 @@ -157,7 +154,6 @@ seatunnel: checkpoint: interval: 6000 timeout: 7000 - max-concurrent: 1 storage: type: hdfs max-retained: 3 diff --git a/docs/en/seatunnel-engine/deployment.md b/docs/en/seatunnel-engine/deployment.md index bcc94f2c075..6cbf4c9b6b4 100644 --- a/docs/en/seatunnel-engine/deployment.md +++ b/docs/en/seatunnel-engine/deployment.md @@ -75,10 +75,6 @@ The interval between two checkpoints, unit is milliseconds. If the `checkpoint.i The timeout of a checkpoint. If a checkpoint cannot be completed within the timeout period, a checkpoint failure will be triggered. Therefore, Job will be restored. -**max-concurrent** - -How many checkpoints can be performed simultaneously at most. - Example ``` @@ -91,7 +87,6 @@ seatunnel: checkpoint: interval: 300000 timeout: 10000 - max-concurrent: 1 ``` **checkpoint storage** diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml index 2f10ef654eb..2f6f8016f12 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml @@ -25,7 +25,6 @@ seatunnel: checkpoint: interval: 300000 timeout: 10000 - max-concurrent: 1 storage: type: localfile max-retained: 3 diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml b/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml index f3ad6359952..4678cfed3d5 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml @@ -24,7 +24,6 @@ seatunnel: checkpoint: interval: 6000 timeout: 7000 - max-concurrent: 1 storage: type: hdfs max-retained: 3 diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java index cbb77a42a7f..2d676153b8c 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java @@ -156,11 +156,6 @@ private CheckpointConfig parseCheckpointConfig(Node checkpointNode) { getIntegerValue( ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.key(), getTextContent(node))); - } else if (ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.key().equals(name)) { - checkpointConfig.setMaxConcurrentCheckpoints( - getIntegerValue( - ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.key(), - getTextContent(node))); } else if (ServerConfigOptions.CHECKPOINT_STORAGE.key().equals(name)) { checkpointConfig.setStorage(parseCheckpointStorageConfig(node)); } else { diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java index 203c6f2f19e..8d521f2b8b8 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java @@ -33,8 +33,6 @@ public class CheckpointConfig implements Serializable { private long checkpointTimeout = ServerConfigOptions.CHECKPOINT_TIMEOUT.defaultValue(); private long schemaChangeCheckpointTimeout = ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.defaultValue(); - private int maxConcurrentCheckpoints = - ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.defaultValue(); private CheckpointStorageConfig storage = ServerConfigOptions.CHECKPOINT_STORAGE.defaultValue(); @@ -58,11 +56,4 @@ public void setSchemaChangeCheckpointTimeout(long checkpointTimeout) { "The minimum checkpoint timeout is 10 ms."); this.schemaChangeCheckpointTimeout = checkpointTimeout; } - - public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) { - checkArgument( - maxConcurrentCheckpoints >= 1, - "The minimum number of concurrent checkpoints is 1."); - this.maxConcurrentCheckpoints = maxConcurrentCheckpoints; - } } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index 04dab34679a..79ed1a4d07a 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -92,12 +92,6 @@ public class ServerConfigOptions { .withDescription( "The timeout (in milliseconds) for a schema change checkpoint."); - public static final Option CHECKPOINT_MAX_CONCURRENT = - Options.key("max-concurrent") - .intType() - .defaultValue(1) - .withDescription("The maximum number of concurrent checkpoints."); - public static final Option CHECKPOINT_STORAGE_TYPE = Options.key("type") .stringType() diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml index cbd856fc8df..cc14d81eafa 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml +++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml @@ -25,7 +25,6 @@ seatunnel: checkpoint: interval: 300000 timeout: 10000 - max-concurrent: 1 storage: type: hdfs max-retained: 3 diff --git a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java index cc00b687fd7..ed6853e39b4 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java +++ b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java @@ -53,9 +53,6 @@ public void testSeaTunnelConfig() { Assertions.assertEquals( 7000, config.getEngineConfig().getCheckpointConfig().getCheckpointTimeout()); - Assertions.assertEquals( - 1, config.getEngineConfig().getCheckpointConfig().getMaxConcurrentCheckpoints()); - Assertions.assertEquals( "hdfs", config.getEngineConfig().getCheckpointConfig().getStorage().getStorage()); diff --git a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml index 94e9ab3db0b..8453bdeecaa 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml +++ b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml @@ -25,7 +25,6 @@ seatunnel: checkpoint: interval: 6000 timeout: 7000 - max-concurrent: 1 storage: type: hdfs max-retained: 3 diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index bd1b4a39131..222f60a5cb5 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -390,7 +390,6 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) { if (checkpointType.notFinalCheckpoint() && checkpointType.notSchemaChangeCheckpoint()) { if (currentTimestamp - latestTriggerTimestamp.get() < coordinatorConfig.getCheckpointInterval() - || pendingCounter.get() >= coordinatorConfig.getMaxConcurrentCheckpoints() || !isAllTaskReady) { return; } @@ -737,12 +736,6 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed notifyCompleted(completedCheckpoint); pendingCheckpoints.remove(checkpointId); pendingCounter.decrementAndGet(); - if (pendingCheckpoints.size() + 1 == coordinatorConfig.getMaxConcurrentCheckpoints()) { - // latest checkpoint completed time > checkpoint interval - if (completedCheckpoint.getCheckpointType().notFinalCheckpoint()) { - scheduleTriggerPendingCheckpoint(0L); - } - } if (isCompleted()) { cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED); if (latestCompletedCheckpoint.getCheckpointType().isSavepoint()) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 06b95f6ce7f..5137f23b7ba 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -264,8 +264,6 @@ private CheckpointConfig createJobCheckpointConfig( CheckpointConfig jobCheckpointConfig = new CheckpointConfig(); jobCheckpointConfig.setCheckpointTimeout(defaultCheckpointConfig.getCheckpointTimeout()); jobCheckpointConfig.setCheckpointInterval(defaultCheckpointConfig.getCheckpointInterval()); - jobCheckpointConfig.setMaxConcurrentCheckpoints( - defaultCheckpointConfig.getMaxConcurrentCheckpoints()); CheckpointStorageConfig jobCheckpointStorageConfig = new CheckpointStorageConfig(); jobCheckpointStorageConfig.setStorage(defaultCheckpointConfig.getStorage().getStorage()); diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml b/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml index e29978615bf..f8739cc4830 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml @@ -25,7 +25,6 @@ seatunnel: checkpoint: interval: 6000 timeout: 7000 - max-concurrent: 1 storage: type: hdfs max-retained: 3