Skip to content

Commit

Permalink
Imap storage supports kafka compact topic in cluster mode #4961
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jul 5, 2023
1 parent f8fefa1 commit bd395bc
Show file tree
Hide file tree
Showing 17 changed files with 1,489 additions and 0 deletions.
24 changes: 24 additions & 0 deletions docs/en/seatunnel-engine/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,30 @@ map:
fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider
```

if you used kafka, the kafka used must support creating a compact topic, you can config like this :

```yaml
map:
engine*:
map-store:
enabled: true
initial-mode: EAGER
factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory
properties:
type: kafka
bootstrap.servers: localhost:9092
storage.compact.topic.prefix: imap-
storage.compact.topic.replication.factor: 3
consumer.override.auto.offset.reset: earliest
producer.override.acks: all
```

+ The configuration with the prefix 'consumer.override.' is used to override the configuration of the consumer
+ The configuration with the prefix 'producer.override.' is used to override the configuration of the producer
+ The configuration with the prefix 'admin.override.' is used to override the configuration of the admin
+ The configuration with the prefix 'topic.override.' is used to override the configuration of the topic

## 6. Config SeaTunnel Engine Client

All SeaTunnel Engine Client config in `hazelcast-client.yaml`.
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
<json-smart.version>2.4.7</json-smart.version>
<hadoop-aws.version>3.1.4</hadoop-aws.version>
<netty-buffer.version>4.1.60.Final</netty-buffer.version>
<kafka.version>3.4.1</kafka.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -485,6 +486,12 @@
<version>${netty-buffer.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

</dependencies>
</dependencyManagement>

Expand Down
3 changes: 3 additions & 0 deletions seatunnel-dist/release-docs/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ The text of each license is the standard Apache 2.0 license.
(Apache-2.0) listenablefuture (com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava https://mvnrepository.com/artifact/com.google.guava/listenablefuture/9999.0-empty-to-avoid-conflict-with-guava)
(Apache-2.0) accessors-smart (com.google.guava:accessors-smart:2.4.7 - https://mvnrepository.com/artifact/net.minidev/accessors-smart)
(Apache-2.0) json-smart (net.minidev:json-smart:2.4.7 - https://mvnrepository.com/artifact/net.minidev/json-smart)
(Apache-2.0) kafka-clients (org.apache.kafka:kafka-clients:3.4.1 - https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients)
(Apache-2.0) lz4-java (org.lz4:lz4-java:1.8.0 - https://mvnrepository.com/artifact/org.lz4/lz4-java)

========================================================================
MOZILLA PUBLIC LICENSE License
Expand All @@ -294,6 +296,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (com.google.protobuf:protobuf-java:2.5.0 - http://code.google.com/p/protobuf)
(BSD 3-Clause) Scala Library (org.scala-lang:scala-library:2.11.12 - http://www.scala-lang.org/)
(BSD 3-Clause) Scala Library (org.ow2.asm:asm:9.1 - https://mvnrepository.com/artifact/org.ow2.asm/asm/)
(BSD 2-Clause) zstd-jni (com.github.luben:zstd-jni:1.5.2-1 - https://mvnrepository.com/artifact/com.github.luben/zstd-jni)
========================================================================
CDDL License
========================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>imap-storage-kafka</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1154,4 +1154,213 @@ public void testStreamJobRestoreFromOssInAllNodeDown()
}
}
}

@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
@Disabled
public void testStreamJobRestoreFromKafkaInAllNodeDown()
throws ExecutionException, InterruptedException {
String BOOTSTRAP_SERVERS = "localhost:9092";
String TOPIC_PREFIX = "imap-";
Integer TOPIC_REPLICATION_FACTOR = 1;

String testCaseName = "testStreamJobRestoreFromKafkaInAllNodeDown";
String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRestoreFromKafkaInAllNodeDown_"
+ System.currentTimeMillis();
int testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
HazelcastInstanceImpl node2 = null;
SeaTunnelClient engineClient = null;

try {
String yaml =
"hazelcast:\n"
+ " cluster-name: seatunnel\n"
+ " network:\n"
+ " rest-api:\n"
+ " enabled: true\n"
+ " endpoint-groups:\n"
+ " CLUSTER_WRITE:\n"
+ " enabled: true\n"
+ " join:\n"
+ " tcp-ip:\n"
+ " enabled: true\n"
+ " member-list:\n"
+ " - localhost\n"
+ " port:\n"
+ " auto-increment: true\n"
+ " port-count: 100\n"
+ " port: 5801\n"
+ " map:\n"
+ " engine*:\n"
+ " map-store:\n"
+ " enabled: true\n"
+ " initial-mode: EAGER\n"
+ " factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory\n"
+ " properties:\n"
+ " type: kafka\n"
+ " bootstrap.servers: "
+ BOOTSTRAP_SERVERS
+ "\n"
+ " storage.compact.topic.prefix: "
+ TOPIC_PREFIX
+ "\n"
+ " storage.compact.topic.replication.factor: "
+ TOPIC_REPLICATION_FACTOR
+ "\n"
+ " properties:\n"
+ " hazelcast.invocation.max.retry.count: 200\n"
+ " hazelcast.tcp.join.port.try.count: 30\n"
+ " hazelcast.invocation.retry.pause.millis: 2000\n"
+ " hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n"
+ " hazelcast.logging.type: log4j2\n"
+ " hazelcast.operation.generic.thread.count: 200\n";

Config hazelcastConfig = Config.loadFromString(yaml);
hazelcastConfig.setClusterName(TestUtils.getClusterName(testClusterName));
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig.setHazelcastConfig(hazelcastConfig);
node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
Awaitility.await()
.atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
2, finalNode.getCluster().getMembers().size()));

Common.setDeployMode(DeployMode.CLIENT);
ImmutablePair<String, String> testResources =
createTestResources(
testCaseName, JobMode.STREAMING, testRowNumber, testParallelism);
JobConfig jobConfig = new JobConfig();
jobConfig.setName(testCaseName);

ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
Long jobId = clientJobProxy.getJobId();

ClientJobProxy finalClientJobProxy = clientJobProxy;
Awaitility.await()
.atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
// Wait some tasks commit finished, and we can get rows from the
// sink target dir
Thread.sleep(2000);
System.out.println(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
+ "=================================\n");
Assertions.assertTrue(
JobStatus.RUNNING.equals(finalClientJobProxy.getJobStatus())
&& FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
> 1);
});

Thread.sleep(5000);
// shutdown all node
node1.shutdown();
node2.shutdown();

log.info(
"==========================================All node is done========================================");
Thread.sleep(10000);

node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

log.info(
"==========================================All node is start, begin check node size ========================================");
// waiting all node added to cluster
HazelcastInstanceImpl restoreFinalNode = node1;
Awaitility.await()
.atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
2, restoreFinalNode.getCluster().getMembers().size()));

log.info(
"==========================================All node is running========================================");
engineClient = new SeaTunnelClient(clientConfig);
ClientJobProxy newClientJobProxy = engineClient.createJobClient().getJobProxy(jobId);
CompletableFuture<JobStatus> waitForJobCompleteFuture =
CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete);

Thread.sleep(10000);

Awaitility.await()
.atMost(100000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
// Wait job write all rows in file
Thread.sleep(2000);
System.out.println(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
+ "=================================\n");
JobStatus jobStatus = null;
try {
jobStatus = newClientJobProxy.getJobStatus();
} catch (Exception e) {
log.error(ExceptionUtils.getMessage(e));
}

Assertions.assertTrue(
JobStatus.RUNNING.equals(jobStatus)
&& testRowNumber * testParallelism
== FileUtils.getFileLineNumberFromDir(
testResources.getLeft()));
});

// sleep 10s and expect the job don't write more rows.
Thread.sleep(10000);
log.info(
"==========================================Cancel Job========================================");
newClientJobProxy.cancelJob();

Awaitility.await()
.atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertTrue(
waitForJobCompleteFuture.isDone()
&& JobStatus.CANCELED.equals(
waitForJobCompleteFuture.get())));
// prove that the task was restarted
Long fileLineNumberFromDir =
FileUtils.getFileLineNumberFromDir(testResources.getLeft());
Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);

} finally {
log.info(
"==========================================Clean test resource ========================================");
if (engineClient != null) {
engineClient.shutdown();
}

if (node1 != null) {
node1.shutdown();
}

if (node2 != null) {
node2.shutdown();
}
}
}
}
5 changes: 5 additions & 0 deletions seatunnel-engine/seatunnel-engine-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
<artifactId>imap-storage-file</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>imap-storage-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>imap-storage-plugins</artifactId>
<version>${revision}</version>
</parent>

<groupId>org.apache.seatunnel</groupId>
<artifactId>imap-storage-kafka</artifactId>
<name>SeaTunnel : Engine : Storage : IMap Storage Plugins : Kafka</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>serializer-protobuf</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
</project>
Loading

0 comments on commit bd395bc

Please sign in to comment.