-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial PR for stream support for Postgres in Rds source #5310
Conversation
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
@@ -121,21 +133,23 @@ public void start(Buffer<Record<Event>> buffer) { | |||
} | |||
|
|||
if (sourceConfig.isStreamEnabled()) { | |||
BinlogClientFactory binaryLogClientFactory = new BinlogClientFactory(sourceConfig, rdsClient, dbMetadata); | |||
ReplicationLogClientFactory replicationLogClientFactory = new ReplicationLogClientFactory(sourceConfig, rdsClient, dbMetadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class needs some refactor. We should follow single responsibility principle here. This class has dependency on MySQL and Postgres Client. We should move these dependencies to separate class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some refactoring and extracted out the creation of schemaManagers in separate classes. The ReplicationLogClientFactory
already hides the creation of MySQL/Postgres clients.
@@ -16,6 +16,12 @@ public class StreamProgressState { | |||
@JsonProperty("currentPosition") | |||
private BinlogCoordinate currentPosition; | |||
|
|||
@JsonProperty("currentLsn") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider creating new class to separate MySQL vs Postgres properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This model is used in serialization/deserialization by the source coordinator, it's a bit more involved to separate into two classes. I added a TODO in code and will address it in followup PRs.
@@ -10,7 +10,7 @@ | |||
import java.sql.SQLException; | |||
import java.util.Properties; | |||
|
|||
public class ConnectionManager { | |||
public class MySqlConnectionManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider creating ConnectionManager interface and implementation for MySql/Postgres ConnectionManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created the interface.
// If it's INSERT/UPDATE/DELETE, prepare events | ||
// If it's a COMMIT, convert all prepared events and send to buffer | ||
char messageType = (char) msg.get(); | ||
if (messageType == 'B') { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using enums instead of char for message types for type safety, readability and maintainability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
pipelineEvents = new ArrayList<>(); | ||
} | ||
|
||
public void process(ByteBuffer msg) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement the missing functionality for UPDATE/DELETE message types 'K' and 'O' ?
https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can ByteBuffer msg be null ? Is there any validation to be done on msg ByteBuffer ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement the missing functionality for UPDATE/DELETE message types 'K' and 'O' ?
Done. I updated the UPDATE message processor to include 'K' (primary keys were updated) and 'O' (old row data is included). For DELETE, it shouldn't matter as long as the row data includes primary keys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can ByteBuffer msg be null ? Is there any validation to be done on msg ByteBuffer ?
There's a null check for msg
in the main while loop in LogicalReplicationClient.connect()
.
long epochMicro = msg.getLong(); | ||
|
||
if (currentLsn != commitLsn) { | ||
// This shouldn't happen |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider throwing an exception instead of just logging ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to throw an exception.
Signed-off-by: Hai Yan <[email protected]>
c61da05
to
81c5883
Compare
Signed-off-by: Hai Yan <[email protected]>
import java.util.Map; | ||
|
||
public enum ColumnType { | ||
BOOLEAN(16, "boolean"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are those typeId from postgres? Is there way we can use their SDK variables instead of the hardcoded number
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, they are from postgres, but I don't think they have those in their libraries. Postgres has a system table pg_type
that contains those type information. So an alternative would be to query the pg_type
table the type ids and other type information.
throw new RuntimeException("Commit LSN does not match current LSN, skipping"); | ||
} | ||
|
||
writeToBuffer(bufferAccumulator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QUES: will we flush uncommitted messages if numberOfRecordsToAccumulate or bufferTimeout is reached? If yes, Is it something we want to avoid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This writeToBuffer() is called when processing the COMMIT message. So we will only write records to buffer when the changes are committed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for clarifying. One follow up question, could there be uncommitted operations stay in the bufferAccumulator forever since user has never commit them (i.e. aborted operations)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The source only writes to bufferAccumulator when commit message is received. Before that, the records stay in a temporary list (pipelineEvents
).
} | ||
|
||
public static ColumnType getByTypeId(int typeId) { | ||
return TYPE_ID_MAP.get(typeId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add exception handling for typeId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
// Create replication slot | ||
PGReplicationConnection replicationConnection = pgConnection.getReplicationAPI(); | ||
try { | ||
replicationConnection.createReplicationSlot() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check if the replication slot exist and reuse if one exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we reusing if the replication slot exist ?
retry++; | ||
} | ||
LOG.warn("Failed to get primary keys for table {}", table); | ||
return List.of(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exception handling when PrimaryKey is not present vs Internal DB Exception/Errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
.withSlotOption("proto_version", "1") | ||
.withSlotOption("publication_names", "my_publication"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Move the constant to static variable
- Use the created publication names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
stream.setFlushedLSN(lsn); | ||
stream.setAppliedLSN(lsn); | ||
} catch (Exception e) { | ||
LOG.error("Exception while processing Postgres replication stream. ", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add metrics in this class for exception, error and success cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will follow up on this in the next PR.
final long eventTimestampMillis = currentEventTimestamp; | ||
|
||
char typeId = (char) msg.get(); | ||
if (typeId == 'N') { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move the typeId to enum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
f4e1683
to
561d938
Compare
return "data_prepper_publication_" + UUID.randomUUID().toString().substring(0, 8); | ||
} | ||
|
||
private String generateReplicationSlotName() { | ||
return "data_prepper_slot_" + UUID.randomUUID().toString().substring(0, 8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this include pipeline identifier ? Can client view these in server ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this include pipeline identifier ?
Sure. I will try to address this in next PR.
Can client view these in server ?
Which client? If you mean users, yes, they can view the replication slot names if they query for it against the server.
} | ||
|
||
public ConnectionManager getConnectionManager() { | ||
if (sourceConfig.getEngine() == EngineType.MYSQL) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sourceConfig.getEngine()
is this mandatory field ? We can throw exception is this is not set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's MYSQL by default.
} | ||
|
||
private QueryManager getQueryManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) { | ||
final String readerEndpoint = dbMetadata.getReaderEndpoint() != null ? dbMetadata.getReaderEndpoint() : dbMetadata.getEndpoint(); | ||
final int readerPort = dbMetadata.getReaderPort() == 0 ? dbMetadata.getPort() : dbMetadata.getReaderPort(); | ||
final ConnectionManager readerConnectionManager = new ConnectionManager( | ||
final MySqlConnectionManager readerConnectionManager = new MySqlConnectionManager( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need connection manager for Postgres ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This QueryManager is currently specific for MySQL due to the need for Resync'ing changes associated with foreign key cascades. Keeping those as is for now. If we ever need a QueryManager for Postgres, I will refactor these.
@@ -203,13 +213,11 @@ private String getS3PathPrefix() { | |||
return s3PathPrefix; | |||
} | |||
|
|||
private Map<String, Map<String, String>> getColumnDataTypeMap(final SchemaManager schemaManager) { | |||
private Map<String, Map<String, String>> getColumnDataTypeMap(final MySqlSchemaManager schemaManager) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this still be interface SchemaManager ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it should be possible. Let me make some change after Postgres data type mapping work. It's currently only needed for MySQL.
Signed-off-by: Hai Yan <[email protected]>
Had to remove mocking of Build is passing now (except a known flaky OTEL tests with Java 11) |
Description
Initial PR for stream support for Postgres in Rds source. The implementation uses the existing structure for MySQL stream support. The correspondence is as follows:
BinaryLogClient
/BinlogClientWrapper
LogicalReplicationClient
MySqlConnectionManager
(used to beConnectionManager
)PostgresConnectionManager
MySqlSchemaManager
(used to beSchemaManager
)PostgresSchemaManager
Major changes:
This is an initial PR, will follow up with more changes to add e2e ack, checkpointing, resync, data type mapping etc. for Postgres.
Testing
Tested Data Prepper locally against Aurora Postgres DB.
Issues Resolved
Contributes to #5309
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.