diff --git a/docs/en/connector-v2/sink/StarRocks.md b/docs/en/connector-v2/sink/StarRocks.md
index 1bf9bc8e85f9..7c6491fb591e 100644
--- a/docs/en/connector-v2/sink/StarRocks.md
+++ b/docs/en/connector-v2/sink/StarRocks.md
@@ -16,7 +16,7 @@ The internal implementation of StarRocks sink connector is cached and imported b
| name | type | required | default value |
|-----------------------------|---------|----------|-----------------|
-| nodeUrls | list | yes | - |
+| nodeUrls | list | yes | - |
| base-url | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
diff --git a/docs/en/seatunnel-engine/deployment.md b/docs/en/seatunnel-engine/deployment.md
index c07cd45d6b1a..1206feff4ae0 100644
--- a/docs/en/seatunnel-engine/deployment.md
+++ b/docs/en/seatunnel-engine/deployment.md
@@ -222,6 +222,29 @@ map:
fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider
```
+if you used kafka, the kafka used must support creating a compact topic, you can config like this :
+
+```yaml
+map:
+ engine*:
+ map-store:
+ enabled: true
+ initial-mode: EAGER
+ factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory
+ properties:
+ type: kafka
+ bootstrap.servers: localhost:9092
+ storage.compact.topic.prefix: imap-
+ storage.compact.topic.replication.factor: 3
+ consumer.override.auto.offset.reset: earliest
+ producer.override.acks: all
+
+```
+
++ The configuration with the prefix 'consumer.override.' is used to override the configuration of the consumer
++ The configuration with the prefix 'producer.override.' is used to override the configuration of the producer
++ The configuration with the prefix 'admin.override.' is used to override the configuration of the admin
+
## 6. Config SeaTunnel Engine Client
All SeaTunnel Engine Client config in `hazelcast-client.yaml`.
diff --git a/pom.xml b/pom.xml
index 51b03a26d5bb..bb6f7d3966a5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -146,6 +146,7 @@
2.4.7
3.1.4
4.1.60.Final
+ 3.4.1
@@ -485,6 +486,12 @@
${netty-buffer.version}
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+
+
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
index a1315565349f..5883b0ee3388 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
@@ -58,6 +58,13 @@
${project.version}
test
+
+
+ org.apache.seatunnel
+ imap-storage-kafka
+ ${project.version}
+ test
+
org.apache.seatunnel
seatunnel-hadoop3-3.1.4-uber
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index f7571968e8ff..eff1297e24fc 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -1154,4 +1154,213 @@ public void testStreamJobRestoreFromOssInAllNodeDown()
}
}
}
+
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ @Test
+ @Disabled
+ public void testStreamJobRestoreFromKafkaInAllNodeDown()
+ throws ExecutionException, InterruptedException {
+ String BOOTSTRAP_SERVERS = "localhost:9092";
+ String TOPIC_PREFIX = "imap-";
+ Integer TOPIC_REPLICATION_FACTOR = 1;
+
+ String testCaseName = "testStreamJobRestoreFromKafkaInAllNodeDown";
+ String testClusterName =
+ "ClusterFaultToleranceIT_testStreamJobRestoreFromKafkaInAllNodeDown_"
+ + System.currentTimeMillis();
+ int testRowNumber = 1000;
+ int testParallelism = 6;
+ HazelcastInstanceImpl node1 = null;
+ HazelcastInstanceImpl node2 = null;
+ SeaTunnelClient engineClient = null;
+
+ try {
+ String yaml =
+ "hazelcast:\n"
+ + " cluster-name: seatunnel\n"
+ + " network:\n"
+ + " rest-api:\n"
+ + " enabled: true\n"
+ + " endpoint-groups:\n"
+ + " CLUSTER_WRITE:\n"
+ + " enabled: true\n"
+ + " join:\n"
+ + " tcp-ip:\n"
+ + " enabled: true\n"
+ + " member-list:\n"
+ + " - localhost\n"
+ + " port:\n"
+ + " auto-increment: true\n"
+ + " port-count: 100\n"
+ + " port: 5801\n"
+ + " map:\n"
+ + " engine*:\n"
+ + " map-store:\n"
+ + " enabled: true\n"
+ + " initial-mode: EAGER\n"
+ + " factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory\n"
+ + " properties:\n"
+ + " type: kafka\n"
+ + " bootstrap.servers: "
+ + BOOTSTRAP_SERVERS
+ + "\n"
+ + " storage.compact.topic.prefix: "
+ + TOPIC_PREFIX
+ + "\n"
+ + " storage.compact.topic.replication.factor: "
+ + TOPIC_REPLICATION_FACTOR
+ + "\n"
+ + " properties:\n"
+ + " hazelcast.invocation.max.retry.count: 200\n"
+ + " hazelcast.tcp.join.port.try.count: 30\n"
+ + " hazelcast.invocation.retry.pause.millis: 2000\n"
+ + " hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n"
+ + " hazelcast.logging.type: log4j2\n"
+ + " hazelcast.operation.generic.thread.count: 200\n";
+
+ Config hazelcastConfig = Config.loadFromString(yaml);
+ hazelcastConfig.setClusterName(TestUtils.getClusterName(testClusterName));
+ SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig.setHazelcastConfig(hazelcastConfig);
+ node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+ node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+ // waiting all node added to cluster
+ HazelcastInstanceImpl finalNode = node1;
+ Awaitility.await()
+ .atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ 2, finalNode.getCluster().getMembers().size()));
+
+ Common.setDeployMode(DeployMode.CLIENT);
+ ImmutablePair testResources =
+ createTestResources(
+ testCaseName, JobMode.STREAMING, testRowNumber, testParallelism);
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName(testCaseName);
+
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
+ engineClient = new SeaTunnelClient(clientConfig);
+ JobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(testResources.getRight(), jobConfig);
+ ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ Long jobId = clientJobProxy.getJobId();
+
+ ClientJobProxy finalClientJobProxy = clientJobProxy;
+ Awaitility.await()
+ .atMost(600000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ // Wait some tasks commit finished, and we can get rows from the
+ // sink target dir
+ Thread.sleep(2000);
+ System.out.println(
+ "\n================================="
+ + FileUtils.getFileLineNumberFromDir(
+ testResources.getLeft())
+ + "=================================\n");
+ Assertions.assertTrue(
+ JobStatus.RUNNING.equals(finalClientJobProxy.getJobStatus())
+ && FileUtils.getFileLineNumberFromDir(
+ testResources.getLeft())
+ > 1);
+ });
+
+ Thread.sleep(5000);
+ // shutdown all node
+ node1.shutdown();
+ node2.shutdown();
+
+ log.info(
+ "==========================================All node is done========================================");
+ Thread.sleep(10000);
+
+ node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+ node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+ log.info(
+ "==========================================All node is start, begin check node size ========================================");
+ // waiting all node added to cluster
+ HazelcastInstanceImpl restoreFinalNode = node1;
+ Awaitility.await()
+ .atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ 2, restoreFinalNode.getCluster().getMembers().size()));
+
+ log.info(
+ "==========================================All node is running========================================");
+ engineClient = new SeaTunnelClient(clientConfig);
+ ClientJobProxy newClientJobProxy = engineClient.createJobClient().getJobProxy(jobId);
+ CompletableFuture waitForJobCompleteFuture =
+ CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete);
+
+ Thread.sleep(10000);
+
+ Awaitility.await()
+ .atMost(100000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ // Wait job write all rows in file
+ Thread.sleep(2000);
+ System.out.println(
+ "\n================================="
+ + FileUtils.getFileLineNumberFromDir(
+ testResources.getLeft())
+ + "=================================\n");
+ JobStatus jobStatus = null;
+ try {
+ jobStatus = newClientJobProxy.getJobStatus();
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getMessage(e));
+ }
+
+ Assertions.assertTrue(
+ JobStatus.RUNNING.equals(jobStatus)
+ && testRowNumber * testParallelism
+ == FileUtils.getFileLineNumberFromDir(
+ testResources.getLeft()));
+ });
+
+ // sleep 10s and expect the job don't write more rows.
+ Thread.sleep(10000);
+ log.info(
+ "==========================================Cancel Job========================================");
+ newClientJobProxy.cancelJob();
+
+ Awaitility.await()
+ .atMost(600000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertTrue(
+ waitForJobCompleteFuture.isDone()
+ && JobStatus.CANCELED.equals(
+ waitForJobCompleteFuture.get())));
+ // prove that the task was restarted
+ Long fileLineNumberFromDir =
+ FileUtils.getFileLineNumberFromDir(testResources.getLeft());
+ Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+
+ } finally {
+ log.info(
+ "==========================================Clean test resource ========================================");
+ if (engineClient != null) {
+ engineClient.shutdown();
+ }
+
+ if (node1 != null) {
+ node1.shutdown();
+ }
+
+ if (node2 != null) {
+ node2.shutdown();
+ }
+ }
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/pom.xml b/seatunnel-engine/seatunnel-engine-server/pom.xml
index b31756674bf8..bbf68bb49b09 100644
--- a/seatunnel-engine/seatunnel-engine-server/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-server/pom.xml
@@ -43,6 +43,11 @@
imap-storage-file
${project.version}
+
+ org.apache.seatunnel
+ imap-storage-kafka
+ ${project.version}
+
com.hazelcast
hazelcast
diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-kafka/pom.xml b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-kafka/pom.xml
new file mode 100644
index 000000000000..640e306b46f9
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-kafka/pom.xml
@@ -0,0 +1,38 @@
+
+
+ 4.0.0
+
+ org.apache.seatunnel
+ imap-storage-plugins
+ ${revision}
+
+
+ org.apache.seatunnel
+ imap-storage-kafka
+ SeaTunnel : Engine : Storage : IMap Storage Plugins : Kafka
+
+
+ UTF-8
+
+
+
+
+ org.apache.seatunnel
+ serializer-protobuf
+ ${project.version}
+
+
+ org.apache.kafka
+ kafka-clients
+
+
+ org.awaitility
+ awaitility
+
+
+ org.apache.commons
+ commons-lang3
+
+
+
diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-kafka/src/main/java/org/apache/seatunnel/engine/imap/storage/kafka/IMapKafkaStorage.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-kafka/src/main/java/org/apache/seatunnel/engine/imap/storage/kafka/IMapKafkaStorage.java
new file mode 100644
index 000000000000..e16496265658
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-kafka/src/main/java/org/apache/seatunnel/engine/imap/storage/kafka/IMapKafkaStorage.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.imap.storage.kafka;
+
+import org.apache.seatunnel.engine.imap.storage.api.IMapStorage;
+import org.apache.seatunnel.engine.imap.storage.kafka.bean.IMapDataStruct;
+import org.apache.seatunnel.engine.imap.storage.kafka.config.KafkaConfiguration;
+import org.apache.seatunnel.engine.imap.storage.kafka.config.KafkaConfigurationConstants;
+import org.apache.seatunnel.engine.imap.storage.kafka.utils.TopicAdmin;
+import org.apache.seatunnel.engine.serializer.api.Serializer;
+import org.apache.seatunnel.engine.serializer.protobuf.ProtoStuffSerializer;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.utils.Utils;
+
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static org.apache.seatunnel.engine.imap.storage.kafka.config.KafkaConfigurationConstants.BUSINESS_KEY;
+
+@Slf4j
+public class IMapKafkaStorage implements IMapStorage {
+ private static final long CREATE_TOPIC_TIMEOUT_MS = 30000;
+ private KafkaConfiguration kafkaConfiguration;
+
+ private Consumer consumer;
+ private Producer producer;
+
+ private Serializer serializer;
+
+ private List partitions = new ArrayList<>();
+
+ private String businessName;
+
+ @Override
+ public void initialize(Map config) {
+ String bootstrapServers =
+ config.get(KafkaConfigurationConstants.KAFKA_BOOTSTRAP_SERVERS).toString();
+ String compactTopicPrefix =
+ config.get(KafkaConfigurationConstants.KAFKA_STORAGE_COMPACT_TOPIC_PREFIX)
+ .toString();
+ this.businessName = (String) config.get(BUSINESS_KEY);
+ String compactTopic = compactTopicPrefix.concat(businessName);
+ Integer topicReplicationFactor =
+ Integer.parseInt(
+ config.getOrDefault(
+ KafkaConfigurationConstants
+ .KAFKA_STORAGE_COMPACT_TOPIC_REPLICATION_FACTOR,
+ 3)
+ .toString());
+ Integer topicPartition =
+ Integer.parseInt(
+ config.getOrDefault(
+ KafkaConfigurationConstants
+ .KAFKA_STORAGE_COMPACT_TOPIC_PARTITION,
+ 1)
+ .toString());
+
+ kafkaConfiguration =
+ KafkaConfiguration.builder()
+ .bootstrapServers(bootstrapServers)
+ .storageTopic(compactTopic)
+ .storageTopicPartition(topicPartition)
+ .storageTopicReplicationFactor(topicReplicationFactor)
+ .consumerConfigs(
+ KafkaConfiguration.setExtraConfiguration(
+ config,
+ KafkaConfigurationConstants.KAFKA_CONSUMER_CONFIGS_PREFIX))
+ .producerConfigs(
+ KafkaConfiguration.setExtraConfiguration(
+ config,
+ KafkaConfigurationConstants.KAFKA_PRODUCER_CONFIGS_PREFIX))
+ .adminConfigs(
+ KafkaConfiguration.setExtraConfiguration(
+ config,
+ KafkaConfigurationConstants.KAFKA_ADMIN_CONFIGS_PREFIX))
+ .build();
+
+ // Init serializer, default ProtoStuffSerializer
+ this.serializer = new ProtoStuffSerializer();
+ maybeCreateTopicAndValidateCompactConfig();
+ this.consumer = createConsumer();
+ this.producer = createProducer();
+ }
+
+ private void maybeCreateTopicAndValidateCompactConfig() {
+ // create admin client
+ TopicAdmin topicAdmin = new TopicAdmin(kafkaConfiguration);
+ try {
+ // It must be compact topic
+ topicAdmin.maybeCreateTopic(kafkaConfiguration.getStorageTopic());
+ topicAdmin.verifyTopicCleanupPolicyOnlyCompact(kafkaConfiguration.getStorageTopic());
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ topicAdmin.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Create producer
+ *
+ * @return
+ */
+ private Producer createProducer() {
+ Map producerConfigs = kafkaConfiguration.getProducerConfigs();
+ producerConfigs.put(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBootstrapServers());
+ producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
+ producerConfigs.put(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ producerConfigs.put(
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ return new KafkaProducer<>(producerConfigs);
+ }
+
+ /**
+ * Create consumer
+ *
+ * @return
+ */
+ private Consumer createConsumer() {
+ // create topic
+ Map consumerConfigs = kafkaConfiguration.getConsumerConfigs();
+ consumerConfigs.put(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBootstrapServers());
+ consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ consumerConfigs.put(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ consumerConfigs.put(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ Consumer consumer = new KafkaConsumer<>(consumerConfigs);
+ List partitionInfos = null;
+ long started = System.currentTimeMillis();
+ while (partitionInfos == null
+ && System.currentTimeMillis() - started < CREATE_TOPIC_TIMEOUT_MS) {
+ partitionInfos = consumer.partitionsFor(kafkaConfiguration.getStorageTopic());
+ Utils.sleep(Math.min(System.currentTimeMillis() - started, 1000));
+ }
+ if (partitionInfos == null) {
+ throw new RuntimeException(
+ "Could not look up partition metadata for offset backing store topic in"
+ + " allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if"
+ + " this is your first use of the topic it may have taken too long to create.");
+ }
+ for (PartitionInfo partition : partitionInfos) {
+ this.partitions.add(new TopicPartition(partition.topic(), partition.partition()));
+ }
+ consumer.assign(this.partitions);
+ return consumer;
+ }
+
+ @Override
+ public boolean store(Object key, Object value) {
+ try {
+ Map.Entry data = convertToMapEntry(key, value);
+ Future callback =
+ producer.send(
+ new ProducerRecord<>(
+ kafkaConfiguration.getStorageTopic(),
+ data.getKey(),
+ data.getValue()));
+ return Objects.nonNull(callback.get());
+ } catch (IOException | InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Set