diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java index 939b1ecbcd682..02837b9dd80cc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java @@ -46,7 +46,6 @@ default ProductionExceptionHandlerResponse handle(final ProducerRecord record, final Exception exception) { @@ -76,7 +75,6 @@ default ProductionExceptionHandlerResponse handleSerializationException(final Pr * @param exception the exception that occurred during serialization * @param origin the origin of the serialization exception */ - @SuppressWarnings("deprecation") default ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context, final ProducerRecord record, final Exception exception, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java index 9e143d4e29655..72a918e455f88 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java @@ -66,9 +66,8 @@ public void setIfUnset(final SerdeGetter getter) { public byte[] serialize(final String topic, final SubscriptionResponseWrapper data) { //{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data} - //7-bit (0x7F) maximum for data version. - if (Byte.compare((byte) 0x7F, data.version()) < 0) { - throw new UnsupportedVersionException("SubscriptionResponseWrapper version is larger than maximum supported 0x7F"); + if (data.version() < 0) { + throw new UnsupportedVersionException("SubscriptionResponseWrapper version cannot be negative"); } final byte[] serializedData = data.foreignValue() == null ? null : serializer.serialize(topic, data.foreignValue()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java index 9be5e39cc15f9..276600fd106e2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java @@ -42,14 +42,10 @@ private static final class NonNullableSerde implements Serde, Serializer configs, final boolean isKey) { - - } + public void configure(final Map configs, final boolean isKey) { } @Override - public void close() { - - } + public void close() { } @Override public Serializer serializer() { @@ -73,68 +69,95 @@ public T deserialize(final String topic, final byte[] data) { } @Test - @SuppressWarnings("unchecked") public void ShouldSerdeWithNonNullsTest() { final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, (byte) 0x9A, (byte) 0xFF, (byte) 0x00}); final String foreignValue = "foreignValue"; final SubscriptionResponseWrapper srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1); - final SubscriptionResponseWrapperSerde srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String())); - final byte[] serResponse = srwSerde.serializer().serialize(null, srw); - final SubscriptionResponseWrapper result = srwSerde.deserializer().deserialize(null, serResponse); + try (final SubscriptionResponseWrapperSerde srwSerde = new SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) { + final byte[] serResponse = srwSerde.serializer().serialize(null, srw); + final SubscriptionResponseWrapper result = srwSerde.deserializer().deserialize(null, serResponse); - assertArrayEquals(hashedValue, result.originalValueHash()); - assertEquals(foreignValue, result.foreignValue()); - assertNull(result.primaryPartition()); + assertArrayEquals(hashedValue, result.originalValueHash()); + assertEquals(foreignValue, result.foreignValue()); + assertNull(result.primaryPartition()); + } } @Test - @SuppressWarnings("unchecked") public void shouldSerdeWithNullForeignValueTest() { final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, (byte) 0x9A, (byte) 0xFF, (byte) 0x00}); final SubscriptionResponseWrapper srw = new SubscriptionResponseWrapper<>(hashedValue, null, 1); - final SubscriptionResponseWrapperSerde srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String())); - final byte[] serResponse = srwSerde.serializer().serialize(null, srw); - final SubscriptionResponseWrapper result = srwSerde.deserializer().deserialize(null, serResponse); + try (final SubscriptionResponseWrapperSerde srwSerde = new SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) { + final byte[] serResponse = srwSerde.serializer().serialize(null, srw); + final SubscriptionResponseWrapper result = srwSerde.deserializer().deserialize(null, serResponse); - assertArrayEquals(hashedValue, result.originalValueHash()); - assertNull(result.foreignValue()); - assertNull(result.primaryPartition()); + assertArrayEquals(hashedValue, result.originalValueHash()); + assertNull(result.foreignValue()); + assertNull(result.primaryPartition()); + } } @Test - @SuppressWarnings("unchecked") public void shouldSerdeWithNullHashTest() { final long[] hashedValue = null; final String foreignValue = "foreignValue"; final SubscriptionResponseWrapper srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1); - final SubscriptionResponseWrapperSerde srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String())); - final byte[] serResponse = srwSerde.serializer().serialize(null, srw); - final SubscriptionResponseWrapper result = srwSerde.deserializer().deserialize(null, serResponse); + try (final SubscriptionResponseWrapperSerde srwSerde = new SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) { + final byte[] serResponse = srwSerde.serializer().serialize(null, srw); + final SubscriptionResponseWrapper result = srwSerde.deserializer().deserialize(null, serResponse); - assertArrayEquals(hashedValue, result.originalValueHash()); - assertEquals(foreignValue, result.foreignValue()); - assertNull(result.primaryPartition()); + assertArrayEquals(hashedValue, result.originalValueHash()); + assertEquals(foreignValue, result.foreignValue()); + assertNull(result.primaryPartition()); + } } @Test - @SuppressWarnings("unchecked") public void shouldSerdeWithNullsTest() { final long[] hashedValue = null; final String foreignValue = null; final SubscriptionResponseWrapper srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1); - final SubscriptionResponseWrapperSerde srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String())); - final byte[] serResponse = srwSerde.serializer().serialize(null, srw); - final SubscriptionResponseWrapper result = srwSerde.deserializer().deserialize(null, serResponse); + try (final SubscriptionResponseWrapperSerde srwSerde = new SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) { + final byte[] serResponse = srwSerde.serializer().serialize(null, srw); + final SubscriptionResponseWrapper result = srwSerde.deserializer().deserialize(null, serResponse); - assertArrayEquals(hashedValue, result.originalValueHash()); - assertEquals(foreignValue, result.foreignValue()); - assertNull(result.primaryPartition()); + assertArrayEquals(hashedValue, result.originalValueHash()); + assertEquals(foreignValue, result.foreignValue()); + assertNull(result.primaryPartition()); + } } @Test public void shouldThrowExceptionWithBadVersionTest() { final long[] hashedValue = null; - assertThrows(UnsupportedVersionException.class, - () -> new SubscriptionResponseWrapper<>(hashedValue, "foreignValue", (byte) 0xFF, 1)); + assertThrows( + UnsupportedVersionException.class, + () -> new SubscriptionResponseWrapper<>(hashedValue, "foreignValue", (byte) -1, 1) + ); + } + + @Test + public void shouldThrowExceptionOnSerializeWhenDataVersionUnknown() { + final SubscriptionResponseWrapper srw = new InvalidSubscriptionResponseWrapper(null, null, 1); + try (final SubscriptionResponseWrapperSerde srwSerde = new SubscriptionResponseWrapperSerde<>(null)) { + assertThrows( + UnsupportedVersionException.class, + () -> srwSerde.serializer().serialize(null, srw) + ); + } + } + + public static class InvalidSubscriptionResponseWrapper extends SubscriptionResponseWrapper { + + public InvalidSubscriptionResponseWrapper(final long[] originalValueHash, + final String foreignValue, + final Integer primaryPartition) { + super(originalValueHash, foreignValue, primaryPartition); + } + + @Override + public byte version() { + return -1; + } } } \ No newline at end of file