-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial PR for stream support for Postgres in Rds source #5310
Changes from 5 commits
3767b1f
5f72e59
9381480
4fbdbdb
9e70372
81c5883
a3bec6f
00abf68
561d938
5108fa7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; | ||
import org.opensearch.dataprepper.model.record.Record; | ||
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; | ||
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType; | ||
import org.opensearch.dataprepper.plugins.source.rds.export.DataFileScheduler; | ||
import org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler; | ||
import org.opensearch.dataprepper.plugins.source.rds.export.ExportTaskManager; | ||
|
@@ -25,10 +26,13 @@ | |
import org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata; | ||
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; | ||
import org.opensearch.dataprepper.plugins.source.rds.resync.ResyncScheduler; | ||
import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManager; | ||
import org.opensearch.dataprepper.plugins.source.rds.schema.MySqlConnectionManager; | ||
import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager; | ||
import org.opensearch.dataprepper.plugins.source.rds.schema.MySqlSchemaManager; | ||
import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManager; | ||
import org.opensearch.dataprepper.plugins.source.rds.stream.BinlogClientFactory; | ||
import org.opensearch.dataprepper.plugins.source.rds.schema.PostgresConnectionManager; | ||
import org.opensearch.dataprepper.plugins.source.rds.schema.PostgresSchemaManager; | ||
import org.opensearch.dataprepper.plugins.source.rds.stream.ReplicationLogClientFactory; | ||
import org.opensearch.dataprepper.plugins.source.rds.stream.StreamScheduler; | ||
import org.opensearch.dataprepper.plugins.source.rds.utils.IdentifierShortener; | ||
import org.slf4j.Logger; | ||
|
@@ -37,6 +41,7 @@ | |
import software.amazon.awssdk.services.s3.S3Client; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.ExecutorService; | ||
|
@@ -101,9 +106,16 @@ public void start(Buffer<Record<Event>> buffer) { | |
new ClusterApiStrategy(rdsClient) : new InstanceApiStrategy(rdsClient); | ||
final DbMetadata dbMetadata = rdsApiStrategy.describeDb(sourceConfig.getDbIdentifier()); | ||
final String s3PathPrefix = getS3PathPrefix(); | ||
|
||
final SchemaManager schemaManager = getSchemaManager(sourceConfig, dbMetadata); | ||
final Map<String, Map<String, String>> tableColumnDataTypeMap = getColumnDataTypeMap(schemaManager); | ||
final DbTableMetadata dbTableMetadata = new DbTableMetadata(dbMetadata, tableColumnDataTypeMap); | ||
DbTableMetadata dbTableMetadata; | ||
if (sourceConfig.getEngine() == EngineType.MYSQL) { | ||
final Map<String, Map<String, String>> tableColumnDataTypeMap = getColumnDataTypeMap( | ||
(MySqlSchemaManager) schemaManager); | ||
dbTableMetadata = new DbTableMetadata(dbMetadata, tableColumnDataTypeMap); | ||
} else { | ||
dbTableMetadata = new DbTableMetadata(dbMetadata, Collections.emptyMap()); | ||
} | ||
|
||
leaderScheduler = new LeaderScheduler( | ||
sourceCoordinator, sourceConfig, s3PathPrefix, schemaManager, dbTableMetadata); | ||
|
@@ -121,21 +133,23 @@ public void start(Buffer<Record<Event>> buffer) { | |
} | ||
|
||
if (sourceConfig.isStreamEnabled()) { | ||
BinlogClientFactory binaryLogClientFactory = new BinlogClientFactory(sourceConfig, rdsClient, dbMetadata); | ||
ReplicationLogClientFactory replicationLogClientFactory = new ReplicationLogClientFactory(sourceConfig, rdsClient, dbMetadata); | ||
|
||
if (sourceConfig.isTlsEnabled()) { | ||
binaryLogClientFactory.setSSLMode(SSLMode.REQUIRED); | ||
replicationLogClientFactory.setSSLMode(SSLMode.REQUIRED); | ||
} else { | ||
binaryLogClientFactory.setSSLMode(SSLMode.DISABLED); | ||
replicationLogClientFactory.setSSLMode(SSLMode.DISABLED); | ||
} | ||
|
||
streamScheduler = new StreamScheduler( | ||
sourceCoordinator, sourceConfig, s3PathPrefix, binaryLogClientFactory, buffer, pluginMetrics, acknowledgementSetManager, pluginConfigObservable); | ||
sourceCoordinator, sourceConfig, s3PathPrefix, replicationLogClientFactory, buffer, pluginMetrics, acknowledgementSetManager, pluginConfigObservable); | ||
runnableList.add(streamScheduler); | ||
|
||
resyncScheduler = new ResyncScheduler( | ||
sourceCoordinator, sourceConfig, getQueryManager(sourceConfig, dbMetadata), s3PathPrefix, buffer, pluginMetrics, acknowledgementSetManager); | ||
runnableList.add(resyncScheduler); | ||
if (sourceConfig.getEngine() == EngineType.MYSQL) { | ||
resyncScheduler = new ResyncScheduler( | ||
sourceCoordinator, sourceConfig, getQueryManager(sourceConfig, dbMetadata), s3PathPrefix, buffer, pluginMetrics, acknowledgementSetManager); | ||
runnableList.add(resyncScheduler); | ||
} | ||
} | ||
|
||
executor = Executors.newFixedThreadPool(runnableList.size()); | ||
|
@@ -164,19 +178,35 @@ public void shutdown() { | |
} | ||
|
||
private SchemaManager getSchemaManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) { | ||
final ConnectionManager connectionManager = new ConnectionManager( | ||
// For MySQL | ||
if (sourceConfig.getEngine() == EngineType.MYSQL) { | ||
final MySqlConnectionManager connectionManager = new MySqlConnectionManager( | ||
dbMetadata.getEndpoint(), | ||
dbMetadata.getPort(), | ||
sourceConfig.getAuthenticationConfig().getUsername(), | ||
sourceConfig.getAuthenticationConfig().getPassword(), | ||
sourceConfig.isTlsEnabled()); | ||
return new MySqlSchemaManager(connectionManager); | ||
} | ||
// For Postgres | ||
final PostgresConnectionManager connectionManager = new PostgresConnectionManager( | ||
dbMetadata.getEndpoint(), | ||
dbMetadata.getPort(), | ||
sourceConfig.getAuthenticationConfig().getUsername(), | ||
sourceConfig.getAuthenticationConfig().getPassword(), | ||
sourceConfig.isTlsEnabled()); | ||
return new SchemaManager(connectionManager); | ||
sourceConfig.isTlsEnabled(), | ||
getDatabaseName(sourceConfig.getTableNames())); | ||
return new PostgresSchemaManager(connectionManager); | ||
} | ||
|
||
private String getDatabaseName(List<String> tableNames) { | ||
return tableNames.get(0).split("\\.")[0]; | ||
} | ||
|
||
private QueryManager getQueryManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) { | ||
final String readerEndpoint = dbMetadata.getReaderEndpoint() != null ? dbMetadata.getReaderEndpoint() : dbMetadata.getEndpoint(); | ||
final int readerPort = dbMetadata.getReaderPort() == 0 ? dbMetadata.getPort() : dbMetadata.getReaderPort(); | ||
final ConnectionManager readerConnectionManager = new ConnectionManager( | ||
final MySqlConnectionManager readerConnectionManager = new MySqlConnectionManager( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need connection manager for Postgres ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This QueryManager is currently specific for MySQL due to the need for Resync'ing changes associated with foreign key cascades. Keeping those as is for now. If we ever need a QueryManager for Postgres, I will refactor these. |
||
readerEndpoint, | ||
readerPort, | ||
sourceConfig.getAuthenticationConfig().getUsername(), | ||
|
@@ -203,13 +233,11 @@ private String getS3PathPrefix() { | |
return s3PathPrefix; | ||
} | ||
|
||
private Map<String, Map<String, String>> getColumnDataTypeMap(final SchemaManager schemaManager) { | ||
private Map<String, Map<String, String>> getColumnDataTypeMap(final MySqlSchemaManager schemaManager) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this still be interface SchemaManager ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it should be possible. Let me make some change after Postgres data type mapping work. It's currently only needed for MySQL. |
||
return sourceConfig.getTableNames().stream() | ||
.collect(Collectors.toMap( | ||
fullTableName -> fullTableName, | ||
fullTableName -> schemaManager.getColumnDataTypes(fullTableName.split("\\.")[0], fullTableName.split("\\.")[1]) | ||
)); | ||
} | ||
|
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,12 @@ public class StreamProgressState { | |
@JsonProperty("currentPosition") | ||
private BinlogCoordinate currentPosition; | ||
|
||
@JsonProperty("currentLsn") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider creating new class to separate MySQL vs Postgres properties. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This model is used in serialization/deserialization by the source coordinator, it's a bit more involved to separate into two classes. I added a TODO in code and will address it in followup PRs. |
||
private String currentLsn; | ||
|
||
@JsonProperty("replicationSlotName") | ||
private String replicationSlotName; | ||
|
||
@JsonProperty("waitForExport") | ||
private boolean waitForExport = false; | ||
|
||
|
@@ -26,10 +32,22 @@ public BinlogCoordinate getCurrentPosition() { | |
return currentPosition; | ||
} | ||
|
||
public String getCurrentLsn() { | ||
return currentLsn; | ||
} | ||
|
||
public String getReplicationSlotName() { | ||
return replicationSlotName; | ||
} | ||
|
||
public void setCurrentPosition(BinlogCoordinate currentPosition) { | ||
this.currentPosition = currentPosition; | ||
} | ||
|
||
public void setReplicationSlotName(String replicationSlotName) { | ||
this.replicationSlotName = replicationSlotName; | ||
} | ||
|
||
public boolean shouldWaitForExport() { | ||
return waitForExport; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
public enum ColumnType { | ||
BOOLEAN(16, "boolean"), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are those typeId from postgres? Is there way we can use their SDK variables instead of the hardcoded number There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, they are from postgres, but I don't think they have those in their libraries. Postgres has a system table |
||
SMALLINT(21, "smallint"), | ||
INTEGER(23, "integer"), | ||
BIGINT(20, "bigint"), | ||
REAL(700, "real"), | ||
DOUBLE_PRECISION(701, "double precision"), | ||
NUMERIC(1700, "numeric"), | ||
TEXT(25, "text"), | ||
VARCHAR(1043, "varchar"), | ||
DATE(1082, "date"), | ||
TIME(1083, "time"), | ||
TIMESTAMP(1114, "timestamp"), | ||
TIMESTAMPTZ(1184, "timestamptz"), | ||
UUID(2950, "uuid"), | ||
JSON(114, "json"), | ||
JSONB(3802, "jsonb"); | ||
|
||
private final int typeId; | ||
private final String typeName; | ||
|
||
private static final Map<Integer, ColumnType> TYPE_ID_MAP = new HashMap<>(); | ||
|
||
static { | ||
for (ColumnType type : values()) { | ||
TYPE_ID_MAP.put(type.typeId, type); | ||
} | ||
} | ||
|
||
ColumnType(int typeId, String typeName) { | ||
this.typeId = typeId; | ||
this.typeName = typeName; | ||
} | ||
|
||
public int getTypeId() { | ||
return typeId; | ||
} | ||
|
||
public String getTypeName() { | ||
return typeName; | ||
} | ||
|
||
public static ColumnType getByTypeId(int typeId) { | ||
return TYPE_ID_MAP.get(typeId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add exception handling for typeId There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added. |
||
} | ||
|
||
public static String getTypeNameByEnum(ColumnType columnType) { | ||
return columnType.getTypeName(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; | ||
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; | ||
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; | ||
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType; | ||
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition; | ||
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; | ||
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.LeaderPartition; | ||
|
@@ -17,14 +18,17 @@ | |
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState; | ||
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; | ||
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; | ||
import org.opensearch.dataprepper.plugins.source.rds.schema.MySqlSchemaManager; | ||
import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManager; | ||
import org.opensearch.dataprepper.plugins.source.rds.schema.PostgresSchemaManager; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.time.Duration; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.UUID; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.opensearch.dataprepper.plugins.source.rds.RdsService.S3_PATH_DELIMITER; | ||
|
@@ -152,22 +156,39 @@ private Map<String, List<String>> getPrimaryKeyMap() { | |
return sourceConfig.getTableNames().stream() | ||
.collect(Collectors.toMap( | ||
fullTableName -> fullTableName, | ||
fullTableName -> schemaManager.getPrimaryKeys(fullTableName.split("\\.")[0], fullTableName.split("\\.")[1]) | ||
fullTableName -> ((MySqlSchemaManager)schemaManager).getPrimaryKeys(fullTableName.split("\\.")[0], fullTableName.split("\\.")[1]) | ||
)); | ||
} | ||
|
||
private void createStreamPartition(RdsSourceConfig sourceConfig) { | ||
final StreamProgressState progressState = new StreamProgressState(); | ||
progressState.setWaitForExport(sourceConfig.isExportEnabled()); | ||
getCurrentBinlogPosition().ifPresent(progressState::setCurrentPosition); | ||
progressState.setForeignKeyRelations(schemaManager.getForeignKeyRelations(sourceConfig.getTableNames())); | ||
if (sourceConfig.getEngine() == EngineType.MYSQL) { | ||
getCurrentBinlogPosition().ifPresent(progressState::setCurrentPosition); | ||
progressState.setForeignKeyRelations(((MySqlSchemaManager)schemaManager).getForeignKeyRelations(sourceConfig.getTableNames())); | ||
} else { | ||
// Postgres | ||
// Create replication slot, which will mark the starting point for stream | ||
final String publicationName = generatePublicationName(); | ||
final String slotName = generateReplicationSlotName(); | ||
((PostgresSchemaManager)schemaManager).createLogicalReplicationSlot(sourceConfig.getTableNames(), publicationName, slotName); | ||
progressState.setReplicationSlotName(slotName); | ||
} | ||
StreamPartition streamPartition = new StreamPartition(sourceConfig.getDbIdentifier(), progressState); | ||
sourceCoordinator.createPartition(streamPartition); | ||
} | ||
|
||
private Optional<BinlogCoordinate> getCurrentBinlogPosition() { | ||
Optional<BinlogCoordinate> binlogCoordinate = schemaManager.getCurrentBinaryLogPosition(); | ||
Optional<BinlogCoordinate> binlogCoordinate = ((MySqlSchemaManager)schemaManager).getCurrentBinaryLogPosition(); | ||
LOG.debug("Current binlog position: {}", binlogCoordinate.orElse(null)); | ||
return binlogCoordinate; | ||
} | ||
|
||
private String generatePublicationName() { | ||
return "data_prepper_publication_" + UUID.randomUUID().toString().substring(0, 8); | ||
} | ||
|
||
private String generateReplicationSlotName() { | ||
return "data_prepper_slot_" + UUID.randomUUID().toString().substring(0, 8); | ||
Comment on lines
+193
to
+197
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this include pipeline identifier ? Can client view these in server ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Sure. I will try to address this in next PR.
Which client? If you mean users, yes, they can view the replication slot names if they query for it against the server. |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,7 +10,7 @@ | |
import java.sql.SQLException; | ||
import java.util.Properties; | ||
|
||
public class ConnectionManager { | ||
public class MySqlConnectionManager { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider creating ConnectionManager interface and implementation for MySql/Postgres ConnectionManager There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Created the interface. |
||
static final String JDBC_URL_FORMAT = "jdbc:mysql://%s:%d"; | ||
static final String USERNAME_KEY = "user"; | ||
static final String PASSWORD_KEY = "password"; | ||
|
@@ -25,7 +25,7 @@ public class ConnectionManager { | |
private final String password; | ||
private final boolean requireSSL; | ||
|
||
public ConnectionManager(String hostName, int port, String username, String password, boolean requireSSL) { | ||
public MySqlConnectionManager(String hostName, int port, String username, String password, boolean requireSSL) { | ||
this.hostName = hostName; | ||
this.port = port; | ||
this.username = username; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class needs some refactor. We should follow single responsibility principle here. This class has dependency on MySQL and Postgres Client. We should move these dependencies to separate class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some refactoring and extracted out the creation of schemaManagers in separate classes. The
ReplicationLogClientFactory
already hides the creation of MySQL/Postgres clients.