Skip to content

Commit

Permalink
Add check to avro and kafka compatibility tests (#32698)
Browse files Browse the repository at this point in the history
* Also fix Kafka integration test version resolution
  • Loading branch information
Abacn authored Oct 10, 2024
1 parent 2049e6b commit 4cf99ea
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ class KafkaTestUtilities {

@Inject
KafkaBatchIT(String delimited, String undelimited, Boolean sdfCompatible, ConfigurationContainer configurations, Project runningProject){
def kafkaioProject = runningProject.findProject(":sdks:java:io:kafka")
group = "Verification"
description = "Runs KafkaIO IT tests with Kafka clients API $delimited"
outputs.upToDateWhen { false }
testClassesDirs = runningProject.findProject(":sdks:java:io:kafka").sourceSets.test.output.classesDirs
classpath = configurations."kafkaVersion$undelimited" + runningProject.sourceSets.test.runtimeClasspath + runningProject.findProject(":sdks:java:io:kafka").sourceSets.test.runtimeClasspath
classpath = runningProject.sourceSets.test.runtimeClasspath + kafkaioProject.configurations."kafkaVersion$undelimited" + kafkaioProject.sourceSets.test.runtimeClasspath
systemProperty "beam.target.kafka.version", delimited

def pipelineOptions = [
'--sourceOptions={' +
Expand Down
1 change: 1 addition & 0 deletions sdks/java/extensions/avro/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ avroVersions.each { k, v ->
description = "Runs Avro extension tests with Avro version $v"
outputs.upToDateWhen { false }
classpath = sourceSets."avro$k".runtimeClasspath
systemProperty "beam.target.avro.version", v

include '**/*.class'
exclude '**/AvroIOTest$NeedsRunnerTests$*.class'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.avro;

import static org.junit.Assert.assertEquals;

import org.apache.avro.Schema;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class AvroVersionVerificationTest {
@Test
public void testAvroVersion() {
@Nullable String targetVer = System.getProperty("beam.target.avro.version");
Assume.assumeTrue(!Strings.isNullOrEmpty(targetVer));
String actualVer = Schema.class.getPackage().getImplementationVersion();
assertEquals(targetVer, actualVer);
}
}
2 changes: 2 additions & 0 deletions sdks/java/io/kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ kafkaVersions.each {kv ->
outputs.upToDateWhen { false }
testClassesDirs = sourceSets.test.output.classesDirs
classpath = configurations."kafkaVersion${kv.key}" + sourceSets.test.runtimeClasspath
systemProperty "beam.target.kafka.version", kv.value

include '**/KafkaIOTest.class'
}
}
Expand Down
6 changes: 2 additions & 4 deletions sdks/java/io/kafka/kafka-integration-test.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ provideIntegrationTestingDependencies()
enableJavaPerformanceTesting()

dependencies {
implementation "org.apache.kafka:kafka-clients:$delimited"
permitUnusedDeclared "org.apache.kafka:kafka-clients:$delimited"
implementation project(":sdks:java:io:kafka")
permitUnusedDeclared project(":sdks:java:io:kafka")
// Do not set kafka-client dependency here otherwise the version will be overwritten by BeamModulePlugin
// instead, rely on io/kafka/build.gradle's custom configurations with forced kafka-client resolutionStrategy
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,13 +632,15 @@ public AverageRecordSize load(TopicPartition topicPartition) throws Exception {

@Teardown
public void teardown() throws Exception {
final Deserializer<K> keyDeserializerInstance =
Preconditions.checkStateNotNull(this.keyDeserializerInstance);
final Deserializer<V> valueDeserializerInstance =
Preconditions.checkStateNotNull(this.valueDeserializerInstance);
try {
Closeables.close(keyDeserializerInstance, true);
Closeables.close(valueDeserializerInstance, true);
if (valueDeserializerInstance != null) {
Closeables.close(valueDeserializerInstance, true);
valueDeserializerInstance = null;
}
if (keyDeserializerInstance != null) {
Closeables.close(keyDeserializerInstance, true);
keyDeserializerInstance = null;
}
} catch (Exception anyException) {
LOG.warn("Fail to close resource during finishing bundle.", anyException);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assume.assumeFalse;

import java.io.IOException;
import java.time.Instant;
Expand Down Expand Up @@ -86,6 +87,7 @@
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
Expand All @@ -99,6 +101,7 @@
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.junit.AfterClass;
Expand Down Expand Up @@ -169,6 +172,13 @@ public class KafkaIOIT {

@BeforeClass
public static void setup() throws IOException {
// check kafka version first
@Nullable String targetVer = System.getProperty("beam.target.kafka.version");
if (!Strings.isNullOrEmpty(targetVer)) {
String actualVer = AppInfoParser.getVersion();
assertEquals(targetVer, actualVer);
}

options = IOITHelper.readIOTestPipelineOptions(Options.class);
sourceOptions = fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class);
if (options.isWithTestcontainers()) {
Expand Down Expand Up @@ -360,6 +370,10 @@ public void processElement(@Element String element, OutputReceiver<String> outpu
// This test verifies that bad data from Kafka is properly sent to the error handler
@Test
public void testKafkaIOSDFReadWithErrorHandler() throws IOException {
// TODO(https://github.com/apache/beam/issues/32704) re-enable when fixed, or remove the support
// for these old kafka-client versions
String actualVer = AppInfoParser.getVersion();
assumeFalse(actualVer.compareTo("2.0.0") >= 0 && actualVer.compareTo("2.3.0") < 0);
writePipeline
.apply(Create.of(KV.of("key", "val")))
.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
Expand Down Expand Up @@ -146,12 +147,14 @@
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.hamcrest.collection.IsIterableWithSize;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -515,6 +518,15 @@ public void testReadAvroGenericRecordsWithConfluentSchemaRegistry() {
p.run();
}

@Test
public void testKafkaVersion() {
// KafkaIO compatibility tests run unit tests in KafkaIOTest
@Nullable String targetVer = System.getProperty("beam.target.kafka.version");
Assume.assumeTrue(!Strings.isNullOrEmpty(targetVer));
String actualVer = AppInfoParser.getVersion();
assertEquals(targetVer, actualVer);
}

@Test
public void testReadAvroSpecificRecordsWithConfluentSchemaRegistry() {
int numElements = 100;
Expand Down Expand Up @@ -1582,6 +1594,11 @@ public byte[] serialize(String topic, Long data) {
public void configure(Map<String, ?> configs, boolean isKey) {
// intentionally left blank for compatibility with older kafka versions
}

@Override
public void close() {
// intentionally left blank for compatibility with kafka-client v2.2 or older
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ public static class FailingDeserializer implements Deserializer<String> {

public FailingDeserializer() {}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// intentionally left blank for compatibility with older kafka versions
}

@Override
public String deserialize(String topic, byte[] data) {
throw new SerializationException("Intentional serialization exception");
Expand Down

0 comments on commit 4cf99ea

Please sign in to comment.