Skip to content

Commit

Permalink
Imap storage supports kafka compact topic in cluster mode #4961
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Aug 4, 2023
1 parent 2917542 commit 16c1a72
Show file tree
Hide file tree
Showing 17 changed files with 1,548 additions and 2 deletions.
24 changes: 24 additions & 0 deletions docs/en/seatunnel-engine/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,30 @@ map:
fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider
```

if you used kafka, the kafka used must support creating a compact topic, you can config like this :

```yaml
map:
engine*:
map-store:
enabled: true
initial-mode: EAGER
factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory
properties:
type: kafka
bootstrap.servers: localhost:9092
storage.compact.topic.prefix: imap-
storage.compact.topic.replication.factor: 3
consumer.override.auto.offset.reset: earliest
producer.override.acks: all
```

+ The configuration with the prefix 'consumer.override.' is used to override the configuration of the consumer
+ The configuration with the prefix 'producer.override.' is used to override the configuration of the producer
+ The configuration with the prefix 'admin.override.' is used to override the configuration of the admin
+ The configuration with the prefix 'topic.override.' is used to override the configuration of the topic

## 6. Config SeaTunnel Engine Client

All SeaTunnel Engine Client config in `hazelcast-client.yaml`.
Expand Down
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@
<checker.qual.version>3.10.0</checker.qual.version>
<awaitility.version>4.2.0</awaitility.version>
<e2e.dependency.skip>true</e2e.dependency.skip>

</properties>

<dependencyManagement>
Expand Down
3 changes: 3 additions & 0 deletions seatunnel-dist/release-docs/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ The text of each license is the standard Apache 2.0 license.
(Apache-2.0) listenablefuture (com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava https://mvnrepository.com/artifact/com.google.guava/listenablefuture/9999.0-empty-to-avoid-conflict-with-guava)
(Apache-2.0) accessors-smart (com.google.guava:accessors-smart:2.4.7 - https://mvnrepository.com/artifact/net.minidev/accessors-smart)
(Apache-2.0) json-smart (net.minidev:json-smart:2.4.7 - https://mvnrepository.com/artifact/net.minidev/json-smart)
(Apache-2.0) kafka-clients (org.apache.kafka:kafka-clients:3.4.1 - https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients)
(Apache-2.0) lz4-java (org.lz4:lz4-java:1.8.0 - https://mvnrepository.com/artifact/org.lz4/lz4-java)

========================================================================
MOZILLA PUBLIC LICENSE License
Expand All @@ -294,6 +296,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (com.google.protobuf:protobuf-java:2.5.0 - http://code.google.com/p/protobuf)
(BSD 3-Clause) Scala Library (org.scala-lang:scala-library:2.11.12 - http://www.scala-lang.org/)
(BSD 3-Clause) Scala Library (org.ow2.asm:asm:9.1 - https://mvnrepository.com/artifact/org.ow2.asm/asm/)
(BSD 2-Clause) zstd-jni (com.github.luben:zstd-jni:1.5.2-1 - https://mvnrepository.com/artifact/com.github.luben/zstd-jni)
========================================================================
CDDL License
========================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>imap-storage-kafka</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
Expand Down Expand Up @@ -92,6 +99,19 @@
<version>${netty-buffer.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.config.Config;
Expand All @@ -59,6 +63,8 @@
@Slf4j
public class ClusterFaultToleranceIT {

private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.0.9";

public static final String DYNAMIC_TEST_CASE_NAME = "dynamic_test_case_name";

public static final String DYNAMIC_JOB_MODE = "dynamic_job_mode";
Expand All @@ -68,6 +74,11 @@ public class ClusterFaultToleranceIT {

public static final String DYNAMIC_TEST_PARALLELISM = "dynamic_test_parallelism";

private static KafkaContainer kafkaContainer =
new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
.withLogConsumer(
new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));

@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
public void testBatchJobRunOkIn2Node() throws ExecutionException, InterruptedException {
Expand Down Expand Up @@ -1154,4 +1165,217 @@ public void testStreamJobRestoreFromOssInAllNodeDown()
}
}
}

@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
public void testStreamJobRestoreFromKafkaInAllNodeDown()
throws ExecutionException, InterruptedException {

kafkaContainer.start();
String BOOTSTRAP_SERVERS = kafkaContainer.getBootstrapServers();
String TOPIC_PREFIX = "imap-";
Integer TOPIC_REPLICATION_FACTOR = 1;

String testCaseName = "testStreamJobRestoreFromKafkaInAllNodeDown";
String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRestoreFromKafkaInAllNodeDown_"
+ System.currentTimeMillis();
int testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
HazelcastInstanceImpl node2 = null;
SeaTunnelClient engineClient = null;

try {
String yaml =
"hazelcast:\n"
+ " cluster-name: seatunnel\n"
+ " network:\n"
+ " rest-api:\n"
+ " enabled: true\n"
+ " endpoint-groups:\n"
+ " CLUSTER_WRITE:\n"
+ " enabled: true\n"
+ " join:\n"
+ " tcp-ip:\n"
+ " enabled: true\n"
+ " member-list:\n"
+ " - localhost\n"
+ " port:\n"
+ " auto-increment: true\n"
+ " port-count: 100\n"
+ " port: 5801\n"
+ " map:\n"
+ " engine*:\n"
+ " map-store:\n"
+ " enabled: true\n"
+ " initial-mode: EAGER\n"
+ " factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory\n"
+ " properties:\n"
+ " type: kafka\n"
+ " bootstrap.servers: "
+ BOOTSTRAP_SERVERS
+ "\n"
+ " storage.compact.topic.prefix: "
+ TOPIC_PREFIX
+ "\n"
+ " storage.compact.topic.replication.factor: "
+ TOPIC_REPLICATION_FACTOR
+ "\n"
+ " properties:\n"
+ " hazelcast.invocation.max.retry.count: 200\n"
+ " hazelcast.tcp.join.port.try.count: 30\n"
+ " hazelcast.invocation.retry.pause.millis: 2000\n"
+ " hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n"
+ " hazelcast.logging.type: log4j2\n"
+ " hazelcast.operation.generic.thread.count: 200\n";

Config hazelcastConfig = Config.loadFromString(yaml);
hazelcastConfig.setClusterName(TestUtils.getClusterName(testClusterName));
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig.setHazelcastConfig(hazelcastConfig);
node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
Awaitility.await()
.atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
2, finalNode.getCluster().getMembers().size()));

Common.setDeployMode(DeployMode.CLIENT);
ImmutablePair<String, String> testResources =
createTestResources(
testCaseName, JobMode.STREAMING, testRowNumber, testParallelism);
JobConfig jobConfig = new JobConfig();
jobConfig.setName(testCaseName);

ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
Long jobId = clientJobProxy.getJobId();

ClientJobProxy finalClientJobProxy = clientJobProxy;
Awaitility.await()
.atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
// Wait some tasks commit finished, and we can get rows from the
// sink target dir
Thread.sleep(2000);
System.out.println(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
+ "=================================\n");
Assertions.assertTrue(
JobStatus.RUNNING.equals(finalClientJobProxy.getJobStatus())
&& FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
> 1);
});

Thread.sleep(5000);
// shutdown all node
node1.shutdown();
node2.shutdown();

log.info(
"==========================================All node is done========================================");
Thread.sleep(10000);

node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

log.info(
"==========================================All node is start, begin check node size ========================================");
// waiting all node added to cluster
HazelcastInstanceImpl restoreFinalNode = node1;
Awaitility.await()
.atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
2, restoreFinalNode.getCluster().getMembers().size()));

log.info(
"==========================================All node is running========================================");
engineClient = new SeaTunnelClient(clientConfig);
ClientJobProxy newClientJobProxy = engineClient.createJobClient().getJobProxy(jobId);
CompletableFuture<JobStatus> waitForJobCompleteFuture =
CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete);

Thread.sleep(10000);

Awaitility.await()
.atMost(100000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
// Wait job write all rows in file
Thread.sleep(2000);
System.out.println(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
+ "=================================\n");
JobStatus jobStatus = null;
try {
jobStatus = newClientJobProxy.getJobStatus();
} catch (Exception e) {
log.error(ExceptionUtils.getMessage(e));
}

Assertions.assertTrue(
JobStatus.RUNNING.equals(jobStatus)
&& testRowNumber * testParallelism
== FileUtils.getFileLineNumberFromDir(
testResources.getLeft()));
});

// sleep 10s and expect the job don't write more rows.
Thread.sleep(10000);
log.info(
"==========================================Cancel Job========================================");
newClientJobProxy.cancelJob();

Awaitility.await()
.atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertTrue(
waitForJobCompleteFuture.isDone()
&& JobStatus.CANCELED.equals(
waitForJobCompleteFuture.get())));
// prove that the task was restarted
Long fileLineNumberFromDir =
FileUtils.getFileLineNumberFromDir(testResources.getLeft());
Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);

} finally {
log.info(
"==========================================Clean test resource ========================================");
if (engineClient != null) {
engineClient.shutdown();
}

if (node1 != null) {
node1.shutdown();
}

if (node2 != null) {
node2.shutdown();
}
if (kafkaContainer != null) {
kafkaContainer.close();
}
}
}
}
5 changes: 5 additions & 0 deletions seatunnel-engine/seatunnel-engine-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
<artifactId>imap-storage-file</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>imap-storage-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
Expand Down
Loading

0 comments on commit 16c1a72

Please sign in to comment.