Skip to content

Commit

Permalink
Fix Java21 build
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh committed Jan 27, 2025
1 parent 561d938 commit 5108fa7
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

package org.opensearch.dataprepper.plugins.source.rds.model;

import java.util.Map;

public enum MessageType {
BEGIN('B'),
RELATION('R'),
Expand All @@ -20,11 +22,27 @@ public enum MessageType {

private final char value;

private static final Map<Character, MessageType> MESSAGE_TYPE_MAP = Map.of(
BEGIN.getValue(), BEGIN,
RELATION.getValue(), RELATION,
INSERT.getValue(), INSERT,
UPDATE.getValue(), UPDATE,
DELETE.getValue(), DELETE,
COMMIT.getValue(), COMMIT
);

MessageType(char value) {
this.value = value;
}

public char getValue() {
return value;
}

public static MessageType from(char value) {
if (!MESSAGE_TYPE_MAP.containsKey(value)) {
throw new IllegalArgumentException("Invalid MessageType value: " + value);
}
return MESSAGE_TYPE_MAP.get(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,18 @@ public void process(ByteBuffer msg) {
// If it's a RELATION, update table metadata map
// If it's INSERT/UPDATE/DELETE, prepare events
// If it's a COMMIT, convert all prepared events and send to buffer
char messageType = (char) msg.get();
if (messageType == MessageType.BEGIN.getValue()) {
MessageType messageType = MessageType.from((char) msg.get());
if (messageType == MessageType.BEGIN) {
processBeginMessage(msg);
} else if (messageType == MessageType.RELATION.getValue()) {
} else if (messageType == MessageType.RELATION) {
processRelationMessage(msg);
} else if (messageType == MessageType.INSERT.getValue()) {
} else if (messageType == MessageType.INSERT) {
processInsertMessage(msg);
} else if (messageType == MessageType.UPDATE.getValue()) {
} else if (messageType == MessageType.UPDATE) {
processUpdateMessage(msg);
} else if (messageType == MessageType.DELETE.getValue()) {
} else if (messageType == MessageType.DELETE) {
processDeleteMessage(msg);
} else if (messageType == MessageType.COMMIT.getValue()) {
} else if (messageType == MessageType.COMMIT) {
processCommitMessage(msg);
} else {
throw new IllegalArgumentException("Replication message type [" + messageType + "] is not supported. ");
Expand Down Expand Up @@ -158,7 +158,7 @@ void processRelationMessage(ByteBuffer msg) {
}

void processCommitMessage(ByteBuffer msg) {
int flag = msg.get();
int flag = msg.getInt();
long commitLsn = msg.getLong();
long endLsn = msg.getLong();
long epochMicro = msg.getLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void test_connect() throws SQLException, InterruptedException {
final PGConnection pgConnection = mock(PGConnection.class, RETURNS_DEEP_STUBS);
final ChainedLogicalStreamBuilder logicalStreamBuilder = mock(ChainedLogicalStreamBuilder.class);
final PGReplicationStream stream = mock(PGReplicationStream.class);
final ByteBuffer message = mock(ByteBuffer.class);
final ByteBuffer message = ByteBuffer.allocate(0);
final LogSequenceNumber lsn = mock(LogSequenceNumber.class);

when(connectionManager.getConnection()).thenReturn(connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,16 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState;
import org.opensearch.dataprepper.plugins.source.rds.model.MessageType;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class LogicalReplicationEventProcessorTest {
Expand All @@ -48,23 +44,26 @@ class LogicalReplicationEventProcessorTest {
@Mock
private Buffer<Record<Event>> buffer;

@Mock
private ByteBuffer message;

private String s3Prefix;

private LogicalReplicationEventProcessor objectUnderTest;

private Random random;

@BeforeEach
void setUp() {
s3Prefix = UUID.randomUUID().toString();
random = new Random();

objectUnderTest = spy(createObjectUnderTest());
}

@Test
void test_correct_process_method_invoked_for_begin_message() {
when(message.get()).thenReturn((byte) 'B');
setMessageType(MessageType.BEGIN);
doNothing().when(objectUnderTest).processBeginMessage(message);

objectUnderTest.process(message);

Expand All @@ -73,11 +72,8 @@ void test_correct_process_method_invoked_for_begin_message() {

@Test
void test_correct_process_method_invoked_for_relation_message() {
when(message.get()).thenReturn((byte) 'R');
final StreamProgressState progressState = mock(StreamProgressState.class);
when(streamPartition.getProgressState()).thenReturn(Optional.of(progressState));
when(sourceConfig.getTableNames()).thenReturn(List.of("database.schema.table1"));
when(progressState.getPrimaryKeyMap()).thenReturn(Map.of("database.schema.table1", List.of("key1", "key2")));
setMessageType(MessageType.RELATION);
doNothing().when(objectUnderTest).processRelationMessage(message);

objectUnderTest.process(message);

Expand All @@ -86,7 +82,8 @@ void test_correct_process_method_invoked_for_relation_message() {

@Test
void test_correct_process_method_invoked_for_commit_message() {
when(message.get()).thenReturn((byte) 'C');
setMessageType(MessageType.COMMIT);
doNothing().when(objectUnderTest).processCommitMessage(message);

objectUnderTest.process(message);

Expand All @@ -95,7 +92,7 @@ void test_correct_process_method_invoked_for_commit_message() {

@Test
void test_correct_process_method_invoked_for_insert_message() {
when(message.get()).thenReturn((byte) 'I');
setMessageType(MessageType.INSERT);
doNothing().when(objectUnderTest).processInsertMessage(message);

objectUnderTest.process(message);
Expand All @@ -105,7 +102,7 @@ void test_correct_process_method_invoked_for_insert_message() {

@Test
void test_correct_process_method_invoked_for_update_message() {
when(message.get()).thenReturn((byte) 'U');
setMessageType(MessageType.UPDATE);
doNothing().when(objectUnderTest).processUpdateMessage(message);

objectUnderTest.process(message);
Expand All @@ -115,7 +112,7 @@ void test_correct_process_method_invoked_for_update_message() {

@Test
void test_correct_process_method_invoked_for_delete_message() {
when(message.get()).thenReturn((byte) 'D');
setMessageType(MessageType.DELETE);
doNothing().when(objectUnderTest).processDeleteMessage(message);

objectUnderTest.process(message);
Expand All @@ -125,12 +122,20 @@ void test_correct_process_method_invoked_for_delete_message() {

@Test
void test_unsupported_message_type_throws_exception() {
when(message.get()).thenReturn((byte) 'A');
message = ByteBuffer.allocate(1);
message.put((byte) 'A');
message.flip();

assertThrows(IllegalArgumentException.class, () -> objectUnderTest.process(message));
}

private LogicalReplicationEventProcessor createObjectUnderTest() {
return new LogicalReplicationEventProcessor(streamPartition, sourceConfig, buffer, s3Prefix);
}

private void setMessageType(MessageType messageType) {
message = ByteBuffer.allocate(1);
message.put((byte) messageType.getValue());
message.flip();
}
}

0 comments on commit 5108fa7

Please sign in to comment.