From 5108fa75f92d869a70724f54635c88f5c5f6d29f Mon Sep 17 00:00:00 2001 From: Hai Yan Date: Mon, 27 Jan 2025 15:29:03 -0600 Subject: [PATCH] Fix Java21 build Signed-off-by: Hai Yan --- .../plugins/source/rds/model/MessageType.java | 18 ++++++++ .../LogicalReplicationEventProcessor.java | 16 ++++---- .../stream/LogicalReplicationClientTest.java | 2 +- .../LogicalReplicationEventProcessorTest.java | 41 +++++++++++-------- 4 files changed, 50 insertions(+), 27 deletions(-) diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/MessageType.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/MessageType.java index a537835099..4bb4bcb288 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/MessageType.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/MessageType.java @@ -10,6 +10,8 @@ package org.opensearch.dataprepper.plugins.source.rds.model; +import java.util.Map; + public enum MessageType { BEGIN('B'), RELATION('R'), @@ -20,6 +22,15 @@ public enum MessageType { private final char value; + private static final Map MESSAGE_TYPE_MAP = Map.of( + BEGIN.getValue(), BEGIN, + RELATION.getValue(), RELATION, + INSERT.getValue(), INSERT, + UPDATE.getValue(), UPDATE, + DELETE.getValue(), DELETE, + COMMIT.getValue(), COMMIT + ); + MessageType(char value) { this.value = value; } @@ -27,4 +38,11 @@ public enum MessageType { public char getValue() { return value; } + + public static MessageType from(char value) { + if (!MESSAGE_TYPE_MAP.containsKey(value)) { + throw new IllegalArgumentException("Invalid MessageType value: " + value); + } + return MESSAGE_TYPE_MAP.get(value); + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java index 142eb43c31..f9881d0063 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java @@ -96,18 +96,18 @@ public void process(ByteBuffer msg) { // If it's a RELATION, update table metadata map // If it's INSERT/UPDATE/DELETE, prepare events // If it's a COMMIT, convert all prepared events and send to buffer - char messageType = (char) msg.get(); - if (messageType == MessageType.BEGIN.getValue()) { + MessageType messageType = MessageType.from((char) msg.get()); + if (messageType == MessageType.BEGIN) { processBeginMessage(msg); - } else if (messageType == MessageType.RELATION.getValue()) { + } else if (messageType == MessageType.RELATION) { processRelationMessage(msg); - } else if (messageType == MessageType.INSERT.getValue()) { + } else if (messageType == MessageType.INSERT) { processInsertMessage(msg); - } else if (messageType == MessageType.UPDATE.getValue()) { + } else if (messageType == MessageType.UPDATE) { processUpdateMessage(msg); - } else if (messageType == MessageType.DELETE.getValue()) { + } else if (messageType == MessageType.DELETE) { processDeleteMessage(msg); - } else if (messageType == MessageType.COMMIT.getValue()) { + } else if (messageType == MessageType.COMMIT) { processCommitMessage(msg); } else { throw new IllegalArgumentException("Replication message type [" + messageType + "] is not supported. "); @@ -158,7 +158,7 @@ void processRelationMessage(ByteBuffer msg) { } void processCommitMessage(ByteBuffer msg) { - int flag = msg.get(); + int flag = msg.getInt(); long commitLsn = msg.getLong(); long endLsn = msg.getLong(); long epochMicro = msg.getLong(); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClientTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClientTest.java index e157d1832a..9cd410ee44 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClientTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClientTest.java @@ -63,7 +63,7 @@ void test_connect() throws SQLException, InterruptedException { final PGConnection pgConnection = mock(PGConnection.class, RETURNS_DEEP_STUBS); final ChainedLogicalStreamBuilder logicalStreamBuilder = mock(ChainedLogicalStreamBuilder.class); final PGReplicationStream stream = mock(PGReplicationStream.class); - final ByteBuffer message = mock(ByteBuffer.class); + final ByteBuffer message = ByteBuffer.allocate(0); final LogSequenceNumber lsn = mock(LogSequenceNumber.class); when(connectionManager.getConnection()).thenReturn(connection); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessorTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessorTest.java index 22614e4f02..31ec9618a2 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessorTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessorTest.java @@ -21,20 +21,16 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; -import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState; +import org.opensearch.dataprepper.plugins.source.rds.model.MessageType; import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.Random; import java.util.UUID; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) class LogicalReplicationEventProcessorTest { @@ -48,23 +44,26 @@ class LogicalReplicationEventProcessorTest { @Mock private Buffer> buffer; - @Mock private ByteBuffer message; private String s3Prefix; private LogicalReplicationEventProcessor objectUnderTest; + private Random random; + @BeforeEach void setUp() { s3Prefix = UUID.randomUUID().toString(); + random = new Random(); objectUnderTest = spy(createObjectUnderTest()); } @Test void test_correct_process_method_invoked_for_begin_message() { - when(message.get()).thenReturn((byte) 'B'); + setMessageType(MessageType.BEGIN); + doNothing().when(objectUnderTest).processBeginMessage(message); objectUnderTest.process(message); @@ -73,11 +72,8 @@ void test_correct_process_method_invoked_for_begin_message() { @Test void test_correct_process_method_invoked_for_relation_message() { - when(message.get()).thenReturn((byte) 'R'); - final StreamProgressState progressState = mock(StreamProgressState.class); - when(streamPartition.getProgressState()).thenReturn(Optional.of(progressState)); - when(sourceConfig.getTableNames()).thenReturn(List.of("database.schema.table1")); - when(progressState.getPrimaryKeyMap()).thenReturn(Map.of("database.schema.table1", List.of("key1", "key2"))); + setMessageType(MessageType.RELATION); + doNothing().when(objectUnderTest).processRelationMessage(message); objectUnderTest.process(message); @@ -86,7 +82,8 @@ void test_correct_process_method_invoked_for_relation_message() { @Test void test_correct_process_method_invoked_for_commit_message() { - when(message.get()).thenReturn((byte) 'C'); + setMessageType(MessageType.COMMIT); + doNothing().when(objectUnderTest).processCommitMessage(message); objectUnderTest.process(message); @@ -95,7 +92,7 @@ void test_correct_process_method_invoked_for_commit_message() { @Test void test_correct_process_method_invoked_for_insert_message() { - when(message.get()).thenReturn((byte) 'I'); + setMessageType(MessageType.INSERT); doNothing().when(objectUnderTest).processInsertMessage(message); objectUnderTest.process(message); @@ -105,7 +102,7 @@ void test_correct_process_method_invoked_for_insert_message() { @Test void test_correct_process_method_invoked_for_update_message() { - when(message.get()).thenReturn((byte) 'U'); + setMessageType(MessageType.UPDATE); doNothing().when(objectUnderTest).processUpdateMessage(message); objectUnderTest.process(message); @@ -115,7 +112,7 @@ void test_correct_process_method_invoked_for_update_message() { @Test void test_correct_process_method_invoked_for_delete_message() { - when(message.get()).thenReturn((byte) 'D'); + setMessageType(MessageType.DELETE); doNothing().when(objectUnderTest).processDeleteMessage(message); objectUnderTest.process(message); @@ -125,7 +122,9 @@ void test_correct_process_method_invoked_for_delete_message() { @Test void test_unsupported_message_type_throws_exception() { - when(message.get()).thenReturn((byte) 'A'); + message = ByteBuffer.allocate(1); + message.put((byte) 'A'); + message.flip(); assertThrows(IllegalArgumentException.class, () -> objectUnderTest.process(message)); } @@ -133,4 +132,10 @@ void test_unsupported_message_type_throws_exception() { private LogicalReplicationEventProcessor createObjectUnderTest() { return new LogicalReplicationEventProcessor(streamPartition, sourceConfig, buffer, s3Prefix); } + + private void setMessageType(MessageType messageType) { + message = ByteBuffer.allocate(1); + message.put((byte) messageType.getValue()); + message.flip(); + } }