Skip to content

Commit

Permalink
MINOR: Fix SubscriptionResponseWrapperSerializer (apache#17205)
Browse files Browse the repository at this point in the history
The existing check is not correct, because `byte` range is from -128...127.
This PR fixes the check to use `< 0`.

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
mjsax authored Sep 24, 2024
1 parent 9352faa commit 2e8bae8
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ default ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], b
* @param record The record that failed to produce
* @param exception The exception that occurred during production
*/
@SuppressWarnings("deprecation")
default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
Expand Down Expand Up @@ -76,7 +75,6 @@ default ProductionExceptionHandlerResponse handleSerializationException(final Pr
* @param exception the exception that occurred during serialization
* @param origin the origin of the serialization exception
*/
@SuppressWarnings("deprecation")
default ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context,
final ProducerRecord record,
final Exception exception,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,8 @@ public void setIfUnset(final SerdeGetter getter) {
public byte[] serialize(final String topic, final SubscriptionResponseWrapper<V> data) {
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data}

//7-bit (0x7F) maximum for data version.
if (Byte.compare((byte) 0x7F, data.version()) < 0) {
throw new UnsupportedVersionException("SubscriptionResponseWrapper version is larger than maximum supported 0x7F");
if (data.version() < 0) {
throw new UnsupportedVersionException("SubscriptionResponseWrapper version cannot be negative");
}

final byte[] serializedData = data.foreignValue() == null ? null : serializer.serialize(topic, data.foreignValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,10 @@ private static final class NonNullableSerde<T> implements Serde<T>, Serializer<T
}

@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {

}
public void configure(final Map<String, ?> configs, final boolean isKey) { }

@Override
public void close() {

}
public void close() { }

@Override
public Serializer<T> serializer() {
Expand All @@ -73,68 +69,95 @@ public T deserialize(final String topic, final byte[] data) {
}

@Test
@SuppressWarnings("unchecked")
public void ShouldSerdeWithNonNullsTest() {
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, (byte) 0x9A, (byte) 0xFF, (byte) 0x00});
final String foreignValue = "foreignValue";
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
try (final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);

assertArrayEquals(hashedValue, result.originalValueHash());
assertEquals(foreignValue, result.foreignValue());
assertNull(result.primaryPartition());
assertArrayEquals(hashedValue, result.originalValueHash());
assertEquals(foreignValue, result.foreignValue());
assertNull(result.primaryPartition());
}
}

@Test
@SuppressWarnings("unchecked")
public void shouldSerdeWithNullForeignValueTest() {
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, (byte) 0x9A, (byte) 0xFF, (byte) 0x00});
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, null, 1);
final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
try (final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);

assertArrayEquals(hashedValue, result.originalValueHash());
assertNull(result.foreignValue());
assertNull(result.primaryPartition());
assertArrayEquals(hashedValue, result.originalValueHash());
assertNull(result.foreignValue());
assertNull(result.primaryPartition());
}
}

@Test
@SuppressWarnings("unchecked")
public void shouldSerdeWithNullHashTest() {
final long[] hashedValue = null;
final String foreignValue = "foreignValue";
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
try (final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);

assertArrayEquals(hashedValue, result.originalValueHash());
assertEquals(foreignValue, result.foreignValue());
assertNull(result.primaryPartition());
assertArrayEquals(hashedValue, result.originalValueHash());
assertEquals(foreignValue, result.foreignValue());
assertNull(result.primaryPartition());
}
}

@Test
@SuppressWarnings("unchecked")
public void shouldSerdeWithNullsTest() {
final long[] hashedValue = null;
final String foreignValue = null;
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
try (final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);

assertArrayEquals(hashedValue, result.originalValueHash());
assertEquals(foreignValue, result.foreignValue());
assertNull(result.primaryPartition());
assertArrayEquals(hashedValue, result.originalValueHash());
assertEquals(foreignValue, result.foreignValue());
assertNull(result.primaryPartition());
}
}

@Test
public void shouldThrowExceptionWithBadVersionTest() {
final long[] hashedValue = null;
assertThrows(UnsupportedVersionException.class,
() -> new SubscriptionResponseWrapper<>(hashedValue, "foreignValue", (byte) 0xFF, 1));
assertThrows(
UnsupportedVersionException.class,
() -> new SubscriptionResponseWrapper<>(hashedValue, "foreignValue", (byte) -1, 1)
);
}

@Test
public void shouldThrowExceptionOnSerializeWhenDataVersionUnknown() {
final SubscriptionResponseWrapper<String> srw = new InvalidSubscriptionResponseWrapper(null, null, 1);
try (final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde<>(null)) {
assertThrows(
UnsupportedVersionException.class,
() -> srwSerde.serializer().serialize(null, srw)
);
}
}

public static class InvalidSubscriptionResponseWrapper extends SubscriptionResponseWrapper<String> {

public InvalidSubscriptionResponseWrapper(final long[] originalValueHash,
final String foreignValue,
final Integer primaryPartition) {
super(originalValueHash, foreignValue, primaryPartition);
}

@Override
public byte version() {
return -1;
}
}
}

0 comments on commit 2e8bae8

Please sign in to comment.