Skip to content

Commit adee561

Browse files
PatrickRenAHeise
authored andcommitted
[FLINK-19554][connector/testing-framework] KafkaSource IT and E2E case based on connector testing framework
1 parent b6bf1aa commit adee561

File tree

14 files changed

+721
-141
lines changed

14 files changed

+721
-141
lines changed

flink-connectors/flink-connector-kafka/pom.xml

+10-2
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,12 @@ under the License.
210210
<version>${project.version}</version>
211211
<scope>test</scope>
212212
</dependency>
213+
<dependency>
214+
<groupId>org.apache.flink</groupId>
215+
<artifactId>flink-connector-testing_${scala.binary.version}</artifactId>
216+
<version>${project.version}</version>
217+
<scope>test</scope>
218+
</dependency>
213219
</dependencies>
214220

215221
<build>
@@ -224,7 +230,8 @@ under the License.
224230
</goals>
225231
<configuration>
226232
<includes>
227-
<include>**/KafkaTestEnvironmentImpl*</include>
233+
<include>**/KafkaTestEnvironment*</include>
234+
<include>**/testutils/*</include>
228235
<include>META-INF/LICENSE</include>
229236
<include>META-INF/NOTICE</include>
230237
</includes>
@@ -247,7 +254,8 @@ under the License.
247254
<addMavenDescriptor>false</addMavenDescriptor>
248255
</archive>
249256
<includes>
250-
<include>**/KafkaTestEnvironmentImpl*</include>
257+
<include>**/KafkaTestEnvironment*</include>
258+
<include>**/testutils/*</include>
251259
<include>META-INF/LICENSE</include>
252260
<include>META-INF/NOTICE</include>
253261
</includes>

flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java

+180-133
Large diffs are not rendered by default.

flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
import org.apache.flink.api.connector.source.ReaderInfo;
2222
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
2323
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
24-
import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv;
2524
import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
2625
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
2726
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
2827
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
28+
import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
2929
import org.apache.flink.mock.Whitebox;
3030

3131
import org.apache.kafka.clients.admin.AdminClient;

flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818

1919
package org.apache.flink.connector.kafka.source.enumerator.initializer;
2020

21-
import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv;
2221
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator;
2322
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
23+
import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
2424

2525
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2626
import org.apache.kafka.clients.consumer.OffsetResetStrategy;

flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package org.apache.flink.connector.kafka.source.enumerator.subscriber;
2020

21-
import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv;
21+
import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
2222
import org.apache.flink.util.ExceptionUtils;
2323

2424
import org.apache.kafka.clients.admin.AdminClient;

flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
2323
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
2424
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
25-
import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv;
2625
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
2726
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
2827
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
28+
import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
2929
import org.apache.flink.connector.testutils.source.deserialization.TestingDeserializationContext;
3030
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
3131

flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525
import org.apache.flink.configuration.Configuration;
2626
import org.apache.flink.connector.kafka.source.KafkaSource;
2727
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
28-
import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv;
2928
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
3029
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
3130
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
31+
import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
3232
import org.apache.flink.connector.testutils.source.reader.SourceReaderTestBase;
3333
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
3434
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.kafka.source.testutils;
20+
21+
import org.apache.flink.api.connector.source.Boundedness;
22+
import org.apache.flink.api.connector.source.Source;
23+
import org.apache.flink.connector.kafka.source.KafkaSource;
24+
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
25+
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
26+
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
27+
import org.apache.flink.connectors.test.common.external.ExternalContext;
28+
import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
29+
30+
import org.apache.kafka.common.TopicPartition;
31+
import org.apache.kafka.common.serialization.StringDeserializer;
32+
import org.testcontainers.containers.KafkaContainer;
33+
34+
import java.util.HashMap;
35+
import java.util.Map;
36+
import java.util.concurrent.ThreadLocalRandom;
37+
import java.util.regex.Pattern;
38+
39+
/**
40+
* Kafka external context that will create multiple topics with only one partitions as source
41+
* splits.
42+
*/
43+
public class KafkaMultipleTopicExternalContext extends KafkaSingleTopicExternalContext {
44+
45+
private int numTopics = 0;
46+
47+
private final String topicPattern;
48+
49+
private final Map<String, SourceSplitDataWriter<String>> topicNameToSplitWriters =
50+
new HashMap<>();
51+
52+
public KafkaMultipleTopicExternalContext(String bootstrapServers) {
53+
super(bootstrapServers);
54+
this.topicPattern =
55+
"kafka-multiple-topic-[0-9]+-"
56+
+ ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
57+
}
58+
59+
@Override
60+
public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
61+
String topicName = getTopicName();
62+
createTopic(topicName, 1, (short) 1);
63+
final KafkaPartitionDataWriter splitWriter =
64+
new KafkaPartitionDataWriter(
65+
getKafkaProducerProperties(numTopics), new TopicPartition(topicName, 0));
66+
topicNameToSplitWriters.put(topicName, splitWriter);
67+
numTopics++;
68+
return splitWriter;
69+
}
70+
71+
@Override
72+
public Source<String, ?, ?> createSource(Boundedness boundedness) {
73+
KafkaSourceBuilder<String> builder = KafkaSource.builder();
74+
75+
if (boundedness == Boundedness.BOUNDED) {
76+
builder = builder.setBounded(OffsetsInitializer.latest());
77+
}
78+
79+
return builder.setGroupId("flink-kafka-multiple-topic-test")
80+
.setBootstrapServers(bootstrapServers)
81+
.setTopicPattern(Pattern.compile(topicPattern))
82+
.setDeserializer(
83+
KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
84+
.build();
85+
}
86+
87+
@Override
88+
public void close() {
89+
topicNameToSplitWriters.forEach(
90+
(topicName, splitWriter) -> {
91+
try {
92+
splitWriter.close();
93+
deleteTopic(topicName);
94+
} catch (Exception e) {
95+
kafkaAdminClient.close();
96+
throw new RuntimeException("Cannot close split writer", e);
97+
}
98+
});
99+
topicNameToSplitWriters.clear();
100+
kafkaAdminClient.close();
101+
}
102+
103+
private String getTopicName() {
104+
return topicPattern.replace("[0-9]+", String.valueOf(numTopics));
105+
}
106+
107+
@Override
108+
public String toString() {
109+
return "Multiple-topics Kafka";
110+
}
111+
112+
/** Factory of {@link KafkaSingleTopicExternalContext}. */
113+
public static class Factory extends KafkaSingleTopicExternalContext.Factory {
114+
115+
public Factory(KafkaContainer kafkaContainer) {
116+
super(kafkaContainer);
117+
}
118+
119+
@Override
120+
public ExternalContext<String> createExternalContext() {
121+
return new KafkaMultipleTopicExternalContext(getBootstrapServer());
122+
}
123+
}
124+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.kafka.source.testutils;
20+
21+
import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
22+
23+
import org.apache.kafka.clients.producer.KafkaProducer;
24+
import org.apache.kafka.clients.producer.ProducerRecord;
25+
import org.apache.kafka.common.TopicPartition;
26+
27+
import java.nio.charset.StandardCharsets;
28+
import java.util.Collection;
29+
import java.util.Properties;
30+
31+
/** Source split data writer for writing test data into Kafka topic partitions. */
32+
public class KafkaPartitionDataWriter implements SourceSplitDataWriter<String> {
33+
34+
private final KafkaProducer<byte[], byte[]> kafkaProducer;
35+
private final TopicPartition topicPartition;
36+
37+
public KafkaPartitionDataWriter(Properties producerProperties, TopicPartition topicPartition) {
38+
this.kafkaProducer = new KafkaProducer<>(producerProperties);
39+
this.topicPartition = topicPartition;
40+
}
41+
42+
@Override
43+
public void writeRecords(Collection<String> records) {
44+
for (String record : records) {
45+
ProducerRecord<byte[], byte[]> producerRecord =
46+
new ProducerRecord<>(
47+
topicPartition.topic(),
48+
topicPartition.partition(),
49+
null,
50+
record.getBytes(StandardCharsets.UTF_8));
51+
kafkaProducer.send(producerRecord);
52+
}
53+
kafkaProducer.flush();
54+
}
55+
56+
@Override
57+
public void close() {
58+
kafkaProducer.close();
59+
}
60+
}

0 commit comments

Comments
 (0)