Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial PR for stream support for Postgres in Rds source #5310

Merged
merged 10 commits into from
Jan 27, 2025

Conversation

oeyh
Copy link
Collaborator

@oeyh oeyh commented Jan 6, 2025

Description

Initial PR for stream support for Postgres in Rds source. The implementation uses the existing structure for MySQL stream support. The correspondence is as follows:

For MySQL For Postgres Notes
BinaryLogClient/BinlogClientWrapper LogicalReplicationClient Client for connecting to database and read CDC stream.
MySqlConnectionManager (used to be ConnectionManager) PostgresConnectionManager Handles connection to database
MySqlSchemaManager (used to be SchemaManager) PostgresSchemaManager Handles methods to gather table schemas and other metadata about the database

Major changes:

  • Adds classes specific to Postgres stream (logical replication) support
  • Rename classes specific to MySQL (binlog) stream support to be clearer
  • Refactor some common classes to accommodate both MySQL and Postgres
  • Adds/Updates unit tests

This is an initial PR, will follow up with more changes to add e2e ack, checkpointing, resync, data type mapping etc. for Postgres.

Testing

Tested Data Prepper locally against Aurora Postgres DB.

Issues Resolved

Contributes to #5309

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@@ -121,21 +133,23 @@ public void start(Buffer<Record<Event>> buffer) {
}

if (sourceConfig.isStreamEnabled()) {
BinlogClientFactory binaryLogClientFactory = new BinlogClientFactory(sourceConfig, rdsClient, dbMetadata);
ReplicationLogClientFactory replicationLogClientFactory = new ReplicationLogClientFactory(sourceConfig, rdsClient, dbMetadata);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class needs some refactor. We should follow single responsibility principle here. This class has dependency on MySQL and Postgres Client. We should move these dependencies to separate class.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some refactoring and extracted out the creation of schemaManagers in separate classes. The ReplicationLogClientFactory already hides the creation of MySQL/Postgres clients.

@@ -16,6 +16,12 @@ public class StreamProgressState {
@JsonProperty("currentPosition")
private BinlogCoordinate currentPosition;

@JsonProperty("currentLsn")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider creating new class to separate MySQL vs Postgres properties.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This model is used in serialization/deserialization by the source coordinator, it's a bit more involved to separate into two classes. I added a TODO in code and will address it in followup PRs.

@@ -10,7 +10,7 @@
import java.sql.SQLException;
import java.util.Properties;

public class ConnectionManager {
public class MySqlConnectionManager {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider creating ConnectionManager interface and implementation for MySql/Postgres ConnectionManager

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created the interface.

// If it's INSERT/UPDATE/DELETE, prepare events
// If it's a COMMIT, convert all prepared events and send to buffer
char messageType = (char) msg.get();
if (messageType == 'B') {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using enums instead of char for message types for type safety, readability and maintainability

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

pipelineEvents = new ArrayList<>();
}

public void process(ByteBuffer msg) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement the missing functionality for UPDATE/DELETE message types 'K' and 'O' ?

https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can ByteBuffer msg be null ? Is there any validation to be done on msg ByteBuffer ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement the missing functionality for UPDATE/DELETE message types 'K' and 'O' ?

Done. I updated the UPDATE message processor to include 'K' (primary keys were updated) and 'O' (old row data is included). For DELETE, it shouldn't matter as long as the row data includes primary keys.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can ByteBuffer msg be null ? Is there any validation to be done on msg ByteBuffer ?

There's a null check for msg in the main while loop in LogicalReplicationClient.connect().

long epochMicro = msg.getLong();

if (currentLsn != commitLsn) {
// This shouldn't happen
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider throwing an exception instead of just logging ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to throw an exception.

@oeyh oeyh requested a review from san81 as a code owner January 16, 2025 06:52
@oeyh oeyh force-pushed the rds-postgres-stream branch from c61da05 to 81c5883 Compare January 16, 2025 18:05
Signed-off-by: Hai Yan <[email protected]>
import java.util.Map;

public enum ColumnType {
BOOLEAN(16, "boolean"),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are those typeId from postgres? Is there way we can use their SDK variables instead of the hardcoded number

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they are from postgres, but I don't think they have those in their libraries. Postgres has a system table pg_type that contains those type information. So an alternative would be to query the pg_type table the type ids and other type information.

chenqi0805
chenqi0805 previously approved these changes Jan 17, 2025
throw new RuntimeException("Commit LSN does not match current LSN, skipping");
}

writeToBuffer(bufferAccumulator);
Copy link
Collaborator

@chenqi0805 chenqi0805 Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QUES: will we flush uncommitted messages if numberOfRecordsToAccumulate or bufferTimeout is reached? If yes, Is it something we want to avoid?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This writeToBuffer() is called when processing the COMMIT message. So we will only write records to buffer when the changes are committed.

Copy link
Collaborator

@chenqi0805 chenqi0805 Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying. One follow up question, could there be uncommitted operations stay in the bufferAccumulator forever since user has never commit them (i.e. aborted operations)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The source only writes to bufferAccumulator when commit message is received. Before that, the records stay in a temporary list (pipelineEvents).

}

public static ColumnType getByTypeId(int typeId) {
return TYPE_ID_MAP.get(typeId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add exception handling for typeId

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

// Create replication slot
PGReplicationConnection replicationConnection = pgConnection.getReplicationAPI();
try {
replicationConnection.createReplicationSlot()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if the replication slot exist and reuse if one exist.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we reusing if the replication slot exist ?

retry++;
}
LOG.warn("Failed to get primary keys for table {}", table);
return List.of();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception handling when PrimaryKey is not present vs Internal DB Exception/Errors.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

Comment on lines 52 to 53
.withSlotOption("proto_version", "1")
.withSlotOption("publication_names", "my_publication");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Move the constant to static variable
  2. Use the created publication names.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

stream.setFlushedLSN(lsn);
stream.setAppliedLSN(lsn);
} catch (Exception e) {
LOG.error("Exception while processing Postgres replication stream. ", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add metrics in this class for exception, error and success cases.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will follow up on this in the next PR.

final long eventTimestampMillis = currentEventTimestamp;

char typeId = (char) msg.get();
if (typeId == 'N') {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move the typeId to enum

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Signed-off-by: Hai Yan <[email protected]>
@oeyh oeyh force-pushed the rds-postgres-stream branch from f4e1683 to 561d938 Compare January 24, 2025 16:23
Comment on lines +193 to +197
return "data_prepper_publication_" + UUID.randomUUID().toString().substring(0, 8);
}

private String generateReplicationSlotName() {
return "data_prepper_slot_" + UUID.randomUUID().toString().substring(0, 8);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this include pipeline identifier ? Can client view these in server ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this include pipeline identifier ?

Sure. I will try to address this in next PR.

Can client view these in server ?

Which client? If you mean users, yes, they can view the replication slot names if they query for it against the server.

}

public ConnectionManager getConnectionManager() {
if (sourceConfig.getEngine() == EngineType.MYSQL) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sourceConfig.getEngine() is this mandatory field ? We can throw exception is this is not set.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's MYSQL by default.

}

private QueryManager getQueryManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) {
final String readerEndpoint = dbMetadata.getReaderEndpoint() != null ? dbMetadata.getReaderEndpoint() : dbMetadata.getEndpoint();
final int readerPort = dbMetadata.getReaderPort() == 0 ? dbMetadata.getPort() : dbMetadata.getReaderPort();
final ConnectionManager readerConnectionManager = new ConnectionManager(
final MySqlConnectionManager readerConnectionManager = new MySqlConnectionManager(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need connection manager for Postgres ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This QueryManager is currently specific for MySQL due to the need for Resync'ing changes associated with foreign key cascades. Keeping those as is for now. If we ever need a QueryManager for Postgres, I will refactor these.

@@ -203,13 +213,11 @@ private String getS3PathPrefix() {
return s3PathPrefix;
}

private Map<String, Map<String, String>> getColumnDataTypeMap(final SchemaManager schemaManager) {
private Map<String, Map<String, String>> getColumnDataTypeMap(final MySqlSchemaManager schemaManager) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this still be interface SchemaManager ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be possible. Let me make some change after Postgres data type mapping work. It's currently only needed for MySQL.

dinujoh
dinujoh previously approved these changes Jan 27, 2025
chenqi0805
chenqi0805 previously approved these changes Jan 27, 2025
Signed-off-by: Hai Yan <[email protected]>
@oeyh oeyh dismissed stale reviews from chenqi0805 and dinujoh via 5108fa7 January 27, 2025 21:29
@oeyh
Copy link
Collaborator Author

oeyh commented Jan 27, 2025

Had to remove mocking of ByteBuffer to fix Java 21 build failure due to org.mockito.exceptions.base.MockitoException: Unsupported settings with this type 'java.nio.ByteBuffer' (https://github.com/opensearch-project/data-prepper/actions/runs/12953484201/job/36133256794).

Build is passing now (except a known flaky OTEL tests with Java 11)

@oeyh oeyh merged commit f217b24 into opensearch-project:main Jan 27, 2025
45 of 47 checks passed
@oeyh oeyh mentioned this pull request Jan 28, 2025
4 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants