Skip to content

Commit

Permalink
Upgrade to Kafka 3.5 (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 authored Jan 17, 2024
1 parent d89d539 commit 87746cc
Show file tree
Hide file tree
Showing 13 changed files with 228 additions and 135 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build-and-publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ jobs:
build-and-publish:
name: Java Gradle
uses: bakdata/ci-templates/.github/workflows/[email protected]
with:
java-version: 17
secrets:
sonar-token: ${{ secrets.SONARCLOUD_TOKEN }}
sonar-organization: ${{ secrets.SONARCLOUD_ORGANIZATION }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ jobs:
name: Java Gradle
uses: bakdata/ci-templates/.github/workflows/[email protected]
with:
java-version: 17
release-type: "${{ inputs.release-type }}"
secrets:
github-email: "${{ secrets.GH_EMAIL }}"
Expand Down
11 changes: 5 additions & 6 deletions brute-force-connect/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,18 @@ dependencies {
testImplementation(group = "io.confluent", name = "kafka-streams-protobuf-serde", version = confluentVersion)
testImplementation(group = "io.confluent", name = "kafka-streams-json-schema-serde", version = confluentVersion)

testImplementation(group = "com.adobe.testing", name = "s3mock-junit5", version = "2.1.8") {
exclude(group = "ch.qos.logback")
exclude(group = "org.apache.logging.log4j", module = "log4j-to-slf4j")
}
val fluentKafkaVersion = "2.5.1"
val testContainersVersion: String by project
testImplementation(group = "org.testcontainers", name = "junit-jupiter", version = testContainersVersion)
testImplementation(group = "org.testcontainers", name = "localstack", version = testContainersVersion)
val fluentKafkaVersion = "2.11.3"
testImplementation(
group = "com.bakdata.fluent-kafka-streams-tests",
name = "schema-registry-mock-junit5",
version = fluentKafkaVersion
)

testImplementation(group = "com.bakdata.kafka", name = "large-message-serde", version = largeMessageVersion)
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = kafkaVersion) {
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = "3.5.0") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
testImplementation(group = "org.apache.kafka", name = "connect-file", version = kafkaVersion)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -28,11 +28,11 @@
import static net.mguenther.kafka.junit.Wait.delay;
import static org.assertj.core.api.Assertions.assertThat;

import com.adobe.testing.s3mock.junit5.S3MockExtension;
import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
Expand All @@ -55,11 +55,47 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.containers.localstack.LocalStackContainer.Service;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;

@Testcontainers
class BruteForceConverterIntegrationTest {
@RegisterExtension
static final S3MockExtension S3_MOCK = S3MockExtension.builder().silent()
.withSecureConnection(false).build();

private static final DockerImageName LOCAL_STACK_IMAGE = DockerImageName.parse("localstack/localstack")
.withTag("1.3.1");
@Container
private static final LocalStackContainer LOCAL_STACK_CONTAINER = new LocalStackContainer(LOCAL_STACK_IMAGE)
.withServices(Service.S3);

static S3Client getS3Client() {
return S3Client.builder()
.endpointOverride(getEndpointOverride())
.credentialsProvider(StaticCredentialsProvider.create(getCredentials()))
.region(getRegion())
.build();
}

private static Region getRegion() {
return Region.of(LOCAL_STACK_CONTAINER.getRegion());
}

private static AwsBasicCredentials getCredentials() {
return AwsBasicCredentials.create(
LOCAL_STACK_CONTAINER.getAccessKey(), LOCAL_STACK_CONTAINER.getSecretKey()
);
}

private static URI getEndpointOverride() {
return LOCAL_STACK_CONTAINER.getEndpointOverride(Service.S3);
}
private static final String BUCKET_NAME = "testbucket";
private static final String TOPIC = "input";
@RegisterExtension
Expand All @@ -69,12 +105,13 @@ class BruteForceConverterIntegrationTest {

private static Properties createS3BackedProperties() {
final Properties properties = new Properties();
properties.put(AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG, "http://localhost:" + S3_MOCK.getHttpPort());
properties.put(AbstractLargeMessageConfig.S3_REGION_CONFIG, "us-east-1");
properties.put(AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, "foo");
properties.put(AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, "bar");
properties.put(AbstractLargeMessageConfig.S3_ENABLE_PATH_STYLE_ACCESS_CONFIG, true);
properties.put(AbstractLargeMessageConfig.BASE_PATH_CONFIG, String.format("s3://%s/", BUCKET_NAME));
final AwsBasicCredentials credentials = getCredentials();
properties.setProperty(AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG,
getEndpointOverride().toString());
properties.setProperty(AbstractLargeMessageConfig.S3_REGION_CONFIG, getRegion().id());
properties.setProperty(AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, credentials.accessKeyId());
properties.setProperty(AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, credentials.secretAccessKey());
properties.setProperty(AbstractLargeMessageConfig.BASE_PATH_CONFIG, String.format("s3://%s/", BUCKET_NAME));
return properties;
}

Expand All @@ -85,7 +122,9 @@ private static String withValuePrefix(final Object config) {
@BeforeEach
void setUp() throws IOException {
this.outputFile = Files.createTempFile("test", "temp");
S3_MOCK.createS3Client().createBucket(BUCKET_NAME);
getS3Client().createBucket(CreateBucketRequest.builder()
.bucket(BUCKET_NAME)
.build());
this.kafkaCluster = this.createCluster();
this.kafkaCluster.start();
}
Expand Down Expand Up @@ -130,13 +169,13 @@ private EmbeddedKafkaCluster createCluster() {

private Properties config() {
final Properties properties = new Properties();
properties.put(ConnectorConfig.NAME_CONFIG, "test");
properties.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "FileStreamSink");
properties.put(SinkConnector.TOPICS_CONFIG, TOPIC);
properties.put(FileStreamSinkConnector.FILE_CONFIG, this.outputFile.toString());
properties.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, BruteForceConverter.class.getName());
properties.put(withValuePrefix(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG),
properties.setProperty(ConnectorConfig.NAME_CONFIG, "test");
properties.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "FileStreamSink");
properties.setProperty(SinkConnector.TOPICS_CONFIG, TOPIC);
properties.setProperty(FileStreamSinkConnector.FILE_CONFIG, this.outputFile.toString());
properties.setProperty(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
properties.setProperty(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, BruteForceConverter.class.getName());
properties.setProperty(withValuePrefix(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG),
this.schemaRegistry.getUrl());
createS3BackedProperties().forEach((key, value) -> properties.put(withValuePrefix(key), value));
return properties;
Expand All @@ -154,7 +193,7 @@ private Properties createBackedStringProducerProperties(final boolean shouldBack
private Properties createBaseProducerProperties() {
final Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBrokerList());
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBrokerList());
return properties;
}

Expand All @@ -167,7 +206,7 @@ private Properties createStringProducerProperties() {
private Properties createAvroProducerProperties() {
final Properties properties = this.createBaseProducerProperties();
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GenericAvroSerializer.class);
properties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistry.getUrl());
properties.setProperty(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistry.getUrl());
return properties;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -30,7 +30,6 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

import com.adobe.testing.s3mock.junit5.S3MockExtension;
import com.bakdata.schemaregistrymock.SchemaRegistryMock;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
Expand All @@ -49,6 +48,7 @@
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer;
import io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde;
import io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -76,21 +76,57 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.containers.localstack.LocalStackContainer.Service;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;

@Testcontainers
class BruteForceConverterTest {
@RegisterExtension
static final S3MockExtension S3_MOCK = S3MockExtension.builder().silent().withSecureConnection(false).build();
private static final String TOPIC = "topic";
final SchemaRegistryMock schemaRegistry = new SchemaRegistryMock(List.of(

private static final DockerImageName LOCAL_STACK_IMAGE = DockerImageName.parse("localstack/localstack")
.withTag("1.3.1");
@Container
private static final LocalStackContainer LOCAL_STACK_CONTAINER = new LocalStackContainer(LOCAL_STACK_IMAGE)
.withServices(Service.S3);
private final SchemaRegistryMock schemaRegistry = new SchemaRegistryMock(List.of(
new AvroSchemaProvider(),
new JsonSchemaProvider(),
new ProtobufSchemaProvider()
));

static S3Client getS3Client() {
return S3Client.builder()
.endpointOverride(getEndpointOverride())
.credentialsProvider(StaticCredentialsProvider.create(getCredentials()))
.region(getRegion())
.build();
}

private static Region getRegion() {
return Region.of(LOCAL_STACK_CONTAINER.getRegion());
}

private static AwsBasicCredentials getCredentials() {
return AwsBasicCredentials.create(
LOCAL_STACK_CONTAINER.getAccessKey(), LOCAL_STACK_CONTAINER.getSecretKey()
);
}
private static final String TOPIC = "topic";

private static URI getEndpointOverride() {
return LOCAL_STACK_CONTAINER.getEndpointOverride(Service.S3);
}

static Stream<Arguments> generateGenericAvroSerializers() {
return generateSerializers(new GenericAvroSerde());
}
Expand Down Expand Up @@ -125,7 +161,7 @@ private static DynamicMessage generateDynamicMessage() throws DescriptorValidati
final DynamicSchema dynamicSchema = DynamicSchema.newBuilder()
.setName("file")
.addMessageDefinition(MessageDefinition.newBuilder("Test")
.addField("", "string", "testId", 1, null, null, null)
.addField(null, "string", "testId", 1, null, null)
.build())
.build();
final Descriptor test = dynamicSchema.getMessageDescriptor("Test");
Expand Down Expand Up @@ -154,12 +190,12 @@ private static <T> SerializerFactory<T> createLargeMessageSerializer(final Serde
}

private static Map<String, Object> getS3EndpointConfig() {
final AwsBasicCredentials credentials = getCredentials();
return Map.of(
AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG, "http://localhost:" + S3_MOCK.getHttpPort(),
AbstractLargeMessageConfig.S3_REGION_CONFIG, "us-east-1",
AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, "foo",
AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, "bar",
AbstractLargeMessageConfig.S3_ENABLE_PATH_STYLE_ACCESS_CONFIG, true
AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG, getEndpointOverride().toString(),
AbstractLargeMessageConfig.S3_REGION_CONFIG, getRegion().id(),
AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, credentials.accessKeyId(),
AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, credentials.secretAccessKey()
);
}

Expand Down Expand Up @@ -287,14 +323,14 @@ void shouldConvertByteKeys(final SerializerFactory<byte[]> factory) {
}

@ParameterizedTest
@MethodSource("generateProtobufSerializers")
void shouldConvertJsonKeys(final SerializerFactory<DynamicMessage> factory) throws DescriptorValidationException {
final DynamicMessage value = generateDynamicMessage();
@MethodSource("generateJsonSerializers")
void shouldConvertJsonKeys(final SerializerFactory<JsonTestRecord> factory) {
final JsonTestRecord value = new JsonTestRecord("test");
final Map<String, Object> config = Map.of(
BruteForceConverterConfig.CONVERTER_CONFIG,
List.of(AvroConverter.class.getName(), ProtobufConverter.class.getName())
List.of(AvroConverter.class.getName(), JsonSchemaConverter.class.getName())
);
this.testValueConversion(factory, new KafkaProtobufSerializer<>(), value, config, new ProtobufConverter());
this.testKeyConversion(factory, new KafkaJsonSchemaSerializer<>(), value, config, new JsonSchemaConverter());
}

@ParameterizedTest
Expand Down Expand Up @@ -359,7 +395,9 @@ private <T> void testConversion(final SerializerFactory<T> factory, final Serial
final T value, final Map<String, Object> originals, final Converter expectedConverter,
final boolean isKey) {
final String bucket = "bucket";
S3_MOCK.createS3Client().createBucket(bucket);
getS3Client().createBucket(CreateBucketRequest.builder()
.bucket(bucket)
.build());
final Map<String, Object> config = new HashMap<>(originals);
config.put(SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistry.getUrl());
config.put(AbstractLargeMessageConfig.BASE_PATH_CONFIG, "s3://" + bucket + "/");
Expand Down
11 changes: 5 additions & 6 deletions brute-force-serde/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.google.protobuf.gradle.protoc
description = "Kafka SerDe that deserializes messages of an unknown serialization format"

plugins {
id("com.github.davidmc24.gradle.plugin.avro") version "1.2.1"
id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
id("com.google.protobuf") version "0.8.18"
}

Expand All @@ -26,13 +26,12 @@ dependencies {
testImplementation(group = "io.confluent", name = "kafka-streams-protobuf-serde", version = confluentVersion)
testImplementation(group = "io.confluent", name = "kafka-streams-json-schema-serde", version = confluentVersion)

testImplementation(group = "com.adobe.testing", name = "s3mock-junit5", version = "2.1.8") {
exclude(group = "ch.qos.logback")
exclude(group = "org.apache.logging.log4j", module = "log4j-to-slf4j")
}
val testContainersVersion: String by project
testImplementation(group = "org.testcontainers", name = "junit-jupiter", version = testContainersVersion)
testImplementation(group = "org.testcontainers", name = "localstack", version = testContainersVersion)
testImplementation(group = "org.jooq", name = "jool", version = "0.9.14")

val fluentKafkaVersion = "2.5.1"
val fluentKafkaVersion = "2.11.3"
testImplementation(
group = "com.bakdata.fluent-kafka-streams-tests",
name = "fluent-kafka-streams-tests-junit5",
Expand Down
Loading

0 comments on commit 87746cc

Please sign in to comment.