Skip to content

Commit

Permalink
(init) Imap storage supports kafka compact topic in cluster mode #4961
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jul 5, 2023
1 parent b1b1f5e commit 404336e
Show file tree
Hide file tree
Showing 16 changed files with 1,460 additions and 1 deletion.
2 changes: 1 addition & 1 deletion docs/en/connector-v2/sink/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The internal implementation of StarRocks sink connector is cached and imported b

| name | type | required | default value |
|-----------------------------|---------|----------|-----------------|
| nodeUrls | list | yes | - |
| nodeUrls | list | yes | - |
| base-url | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
Expand Down
23 changes: 23 additions & 0 deletions docs/en/seatunnel-engine/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,29 @@ map:
fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider
```

if you used kafka, the kafka used must support creating a compact topic, you can config like this :

```yaml
map:
engine*:
map-store:
enabled: true
initial-mode: EAGER
factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory
properties:
type: kafka
bootstrap.servers: localhost:9092
storage.compact.topic.prefix: imap-
storage.compact.topic.replication.factor: 3
consumer.override.auto.offset.reset: earliest
producer.override.acks: all
```

+ The configuration with the prefix 'consumer.override.' is used to override the configuration of the consumer
+ The configuration with the prefix 'producer.override.' is used to override the configuration of the producer
+ The configuration with the prefix 'admin.override.' is used to override the configuration of the admin

## 6. Config SeaTunnel Engine Client

All SeaTunnel Engine Client config in `hazelcast-client.yaml`.
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
<json-smart.version>2.4.7</json-smart.version>
<hadoop-aws.version>3.1.4</hadoop-aws.version>
<netty-buffer.version>4.1.60.Final</netty-buffer.version>
<kafka.version>3.4.1</kafka.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -485,6 +486,12 @@
<version>${netty-buffer.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

</dependencies>
</dependencyManagement>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>imap-storage-kafka</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1154,4 +1154,213 @@ public void testStreamJobRestoreFromOssInAllNodeDown()
}
}
}

@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
@Disabled
public void testStreamJobRestoreFromKafkaInAllNodeDown()
throws ExecutionException, InterruptedException {
String BOOTSTRAP_SERVERS = "localhost:9092";
String TOPIC_PREFIX = "imap-";
Integer TOPIC_REPLICATION_FACTOR = 1;

String testCaseName = "testStreamJobRestoreFromKafkaInAllNodeDown";
String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRestoreFromKafkaInAllNodeDown_"
+ System.currentTimeMillis();
int testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
HazelcastInstanceImpl node2 = null;
SeaTunnelClient engineClient = null;

try {
String yaml =
"hazelcast:\n"
+ " cluster-name: seatunnel\n"
+ " network:\n"
+ " rest-api:\n"
+ " enabled: true\n"
+ " endpoint-groups:\n"
+ " CLUSTER_WRITE:\n"
+ " enabled: true\n"
+ " join:\n"
+ " tcp-ip:\n"
+ " enabled: true\n"
+ " member-list:\n"
+ " - localhost\n"
+ " port:\n"
+ " auto-increment: true\n"
+ " port-count: 100\n"
+ " port: 5801\n"
+ " map:\n"
+ " engine*:\n"
+ " map-store:\n"
+ " enabled: true\n"
+ " initial-mode: EAGER\n"
+ " factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory\n"
+ " properties:\n"
+ " type: kafka\n"
+ " bootstrap.servers: "
+ BOOTSTRAP_SERVERS
+ "\n"
+ " storage.compact.topic.prefix: "
+ TOPIC_PREFIX
+ "\n"
+ " storage.compact.topic.replication.factor: "
+ TOPIC_REPLICATION_FACTOR
+ "\n"
+ " properties:\n"
+ " hazelcast.invocation.max.retry.count: 200\n"
+ " hazelcast.tcp.join.port.try.count: 30\n"
+ " hazelcast.invocation.retry.pause.millis: 2000\n"
+ " hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n"
+ " hazelcast.logging.type: log4j2\n"
+ " hazelcast.operation.generic.thread.count: 200\n";

Config hazelcastConfig = Config.loadFromString(yaml);
hazelcastConfig.setClusterName(TestUtils.getClusterName(testClusterName));
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig.setHazelcastConfig(hazelcastConfig);
node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
Awaitility.await()
.atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
2, finalNode.getCluster().getMembers().size()));

Common.setDeployMode(DeployMode.CLIENT);
ImmutablePair<String, String> testResources =
createTestResources(
testCaseName, JobMode.STREAMING, testRowNumber, testParallelism);
JobConfig jobConfig = new JobConfig();
jobConfig.setName(testCaseName);

ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
Long jobId = clientJobProxy.getJobId();

ClientJobProxy finalClientJobProxy = clientJobProxy;
Awaitility.await()
.atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
// Wait some tasks commit finished, and we can get rows from the
// sink target dir
Thread.sleep(2000);
System.out.println(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
+ "=================================\n");
Assertions.assertTrue(
JobStatus.RUNNING.equals(finalClientJobProxy.getJobStatus())
&& FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
> 1);
});

Thread.sleep(5000);
// shutdown all node
node1.shutdown();
node2.shutdown();

log.info(
"==========================================All node is done========================================");
Thread.sleep(10000);

node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

log.info(
"==========================================All node is start, begin check node size ========================================");
// waiting all node added to cluster
HazelcastInstanceImpl restoreFinalNode = node1;
Awaitility.await()
.atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
2, restoreFinalNode.getCluster().getMembers().size()));

log.info(
"==========================================All node is running========================================");
engineClient = new SeaTunnelClient(clientConfig);
ClientJobProxy newClientJobProxy = engineClient.createJobClient().getJobProxy(jobId);
CompletableFuture<JobStatus> waitForJobCompleteFuture =
CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete);

Thread.sleep(10000);

Awaitility.await()
.atMost(100000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
// Wait job write all rows in file
Thread.sleep(2000);
System.out.println(
"\n================================="
+ FileUtils.getFileLineNumberFromDir(
testResources.getLeft())
+ "=================================\n");
JobStatus jobStatus = null;
try {
jobStatus = newClientJobProxy.getJobStatus();
} catch (Exception e) {
log.error(ExceptionUtils.getMessage(e));
}

Assertions.assertTrue(
JobStatus.RUNNING.equals(jobStatus)
&& testRowNumber * testParallelism
== FileUtils.getFileLineNumberFromDir(
testResources.getLeft()));
});

// sleep 10s and expect the job don't write more rows.
Thread.sleep(10000);
log.info(
"==========================================Cancel Job========================================");
newClientJobProxy.cancelJob();

Awaitility.await()
.atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertTrue(
waitForJobCompleteFuture.isDone()
&& JobStatus.CANCELED.equals(
waitForJobCompleteFuture.get())));
// prove that the task was restarted
Long fileLineNumberFromDir =
FileUtils.getFileLineNumberFromDir(testResources.getLeft());
Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);

} finally {
log.info(
"==========================================Clean test resource ========================================");
if (engineClient != null) {
engineClient.shutdown();
}

if (node1 != null) {
node1.shutdown();
}

if (node2 != null) {
node2.shutdown();
}
}
}
}
5 changes: 5 additions & 0 deletions seatunnel-engine/seatunnel-engine-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
<artifactId>imap-storage-file</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>imap-storage-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>imap-storage-plugins</artifactId>
<version>${revision}</version>
</parent>

<groupId>org.apache.seatunnel</groupId>
<artifactId>imap-storage-kafka</artifactId>
<name>SeaTunnel : Engine : Storage : IMap Storage Plugins : Kafka</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>serializer-protobuf</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
</project>
Loading

0 comments on commit 404336e

Please sign in to comment.