Skip to content

Commit

Permalink
Initial PR for stream support for Postgres in Rds source (#5310)
Browse files Browse the repository at this point in the history
* First working version

Signed-off-by: Hai Yan <[email protected]>

* More progress and update existing unit tests

Signed-off-by: Hai Yan <[email protected]>

* Add unit tests

Signed-off-by: Hai Yan <[email protected]>

* Remove and rename classes

Signed-off-by: Hai Yan <[email protected]>

* Remove test code

Signed-off-by: Hai Yan <[email protected]>

* Address review comments

Signed-off-by: Hai Yan <[email protected]>

* Address minor issues

Signed-off-by: Hai Yan <[email protected]>

* Group MySQL and Postgres stream states

Signed-off-by: Hai Yan <[email protected]>

* Address more comments

Signed-off-by: Hai Yan <[email protected]>

* Fix Java21 build

Signed-off-by: Hai Yan <[email protected]>

---------

Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh authored Jan 27, 2025
1 parent e9792dc commit f217b24
Show file tree
Hide file tree
Showing 48 changed files with 2,299 additions and 464 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/rds-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies {

implementation 'com.zendesk:mysql-binlog-connector-java:0.29.2'
implementation 'com.mysql:mysql-connector-j:8.4.0'
implementation 'org.postgresql:postgresql:42.7.4'

compileOnly 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
import org.opensearch.dataprepper.plugins.source.rds.export.DataFileScheduler;
import org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler;
import org.opensearch.dataprepper.plugins.source.rds.export.ExportTaskManager;
Expand All @@ -26,9 +27,13 @@
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.resync.ResyncScheduler;
import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManagerFactory;
import org.opensearch.dataprepper.plugins.source.rds.schema.MySqlConnectionManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.MySqlSchemaManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManager;
import org.opensearch.dataprepper.plugins.source.rds.stream.BinlogClientFactory;
import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManagerFactory;
import org.opensearch.dataprepper.plugins.source.rds.stream.ReplicationLogClientFactory;
import org.opensearch.dataprepper.plugins.source.rds.stream.StreamScheduler;
import org.opensearch.dataprepper.plugins.source.rds.utils.IdentifierShortener;
import org.slf4j.Logger;
Expand All @@ -37,6 +42,7 @@
import software.amazon.awssdk.services.s3.S3Client;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -101,9 +107,16 @@ public void start(Buffer<Record<Event>> buffer) {
new ClusterApiStrategy(rdsClient) : new InstanceApiStrategy(rdsClient);
final DbMetadata dbMetadata = rdsApiStrategy.describeDb(sourceConfig.getDbIdentifier());
final String s3PathPrefix = getS3PathPrefix();

final SchemaManager schemaManager = getSchemaManager(sourceConfig, dbMetadata);
final Map<String, Map<String, String>> tableColumnDataTypeMap = getColumnDataTypeMap(schemaManager);
final DbTableMetadata dbTableMetadata = new DbTableMetadata(dbMetadata, tableColumnDataTypeMap);
DbTableMetadata dbTableMetadata;
if (sourceConfig.getEngine() == EngineType.MYSQL) {
final Map<String, Map<String, String>> tableColumnDataTypeMap = getColumnDataTypeMap(
(MySqlSchemaManager) schemaManager);
dbTableMetadata = new DbTableMetadata(dbMetadata, tableColumnDataTypeMap);
} else {
dbTableMetadata = new DbTableMetadata(dbMetadata, Collections.emptyMap());
}

leaderScheduler = new LeaderScheduler(
sourceCoordinator, sourceConfig, s3PathPrefix, schemaManager, dbTableMetadata);
Expand All @@ -121,21 +134,23 @@ public void start(Buffer<Record<Event>> buffer) {
}

if (sourceConfig.isStreamEnabled()) {
BinlogClientFactory binaryLogClientFactory = new BinlogClientFactory(sourceConfig, rdsClient, dbMetadata);
ReplicationLogClientFactory replicationLogClientFactory = new ReplicationLogClientFactory(sourceConfig, rdsClient, dbMetadata);

if (sourceConfig.isTlsEnabled()) {
binaryLogClientFactory.setSSLMode(SSLMode.REQUIRED);
replicationLogClientFactory.setSSLMode(SSLMode.REQUIRED);
} else {
binaryLogClientFactory.setSSLMode(SSLMode.DISABLED);
replicationLogClientFactory.setSSLMode(SSLMode.DISABLED);
}

streamScheduler = new StreamScheduler(
sourceCoordinator, sourceConfig, s3PathPrefix, binaryLogClientFactory, buffer, pluginMetrics, acknowledgementSetManager, pluginConfigObservable);
sourceCoordinator, sourceConfig, s3PathPrefix, replicationLogClientFactory, buffer, pluginMetrics, acknowledgementSetManager, pluginConfigObservable);
runnableList.add(streamScheduler);

resyncScheduler = new ResyncScheduler(
sourceCoordinator, sourceConfig, getQueryManager(sourceConfig, dbMetadata), s3PathPrefix, buffer, pluginMetrics, acknowledgementSetManager);
runnableList.add(resyncScheduler);
if (sourceConfig.getEngine() == EngineType.MYSQL) {
resyncScheduler = new ResyncScheduler(
sourceCoordinator, sourceConfig, getQueryManager(sourceConfig, dbMetadata), s3PathPrefix, buffer, pluginMetrics, acknowledgementSetManager);
runnableList.add(resyncScheduler);
}
}

executor = Executors.newFixedThreadPool(runnableList.size());
Expand Down Expand Up @@ -164,19 +179,14 @@ public void shutdown() {
}

private SchemaManager getSchemaManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) {
final ConnectionManager connectionManager = new ConnectionManager(
dbMetadata.getEndpoint(),
dbMetadata.getPort(),
sourceConfig.getAuthenticationConfig().getUsername(),
sourceConfig.getAuthenticationConfig().getPassword(),
sourceConfig.isTlsEnabled());
return new SchemaManager(connectionManager);
final ConnectionManager connectionManager = new ConnectionManagerFactory(sourceConfig, dbMetadata).getConnectionManager();
return new SchemaManagerFactory(connectionManager).getSchemaManager();
}

private QueryManager getQueryManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) {
final String readerEndpoint = dbMetadata.getReaderEndpoint() != null ? dbMetadata.getReaderEndpoint() : dbMetadata.getEndpoint();
final int readerPort = dbMetadata.getReaderPort() == 0 ? dbMetadata.getPort() : dbMetadata.getReaderPort();
final ConnectionManager readerConnectionManager = new ConnectionManager(
final MySqlConnectionManager readerConnectionManager = new MySqlConnectionManager(
readerEndpoint,
readerPort,
sourceConfig.getAuthenticationConfig().getUsername(),
Expand All @@ -203,13 +213,11 @@ private String getS3PathPrefix() {
return s3PathPrefix;
}

private Map<String, Map<String, String>> getColumnDataTypeMap(final SchemaManager schemaManager) {
private Map<String, Map<String, String>> getColumnDataTypeMap(final MySqlSchemaManager schemaManager) {
return sourceConfig.getTableNames().stream()
.collect(Collectors.toMap(
fullTableName -> fullTableName,
fullTableName -> schemaManager.getColumnDataTypes(fullTableName.split("\\.")[0], fullTableName.split("\\.")[1])
));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

public enum EngineType {

MYSQL("mysql");
MYSQL("mysql"),
POSTGRES("postgres");

private static final Map<String, EngineType> ENGINE_TYPE_MAP = Arrays.stream(EngineType.values())
.collect(Collectors.toMap(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.source.rds.coordination.state;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;
import org.opensearch.dataprepper.plugins.source.rds.model.ForeignKeyRelation;

import java.util.List;

public class MySqlStreamState {

@JsonProperty("currentPosition")
private BinlogCoordinate currentPosition;

@JsonProperty("foreignKeyRelations")
private List<ForeignKeyRelation> foreignKeyRelations;

public BinlogCoordinate getCurrentPosition() {
return currentPosition;
}

public void setCurrentPosition(BinlogCoordinate currentPosition) {
this.currentPosition = currentPosition;
}

public List<ForeignKeyRelation> getForeignKeyRelations() {
return foreignKeyRelations;
}

public void setForeignKeyRelations(List<ForeignKeyRelation> foreignKeyRelations) {
this.foreignKeyRelations = foreignKeyRelations;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.source.rds.coordination.state;

import com.fasterxml.jackson.annotation.JsonProperty;

public class PostgresStreamState {

@JsonProperty("currentLsn")
private String currentLsn;

@JsonProperty("publicationName")
private String publicationName;

@JsonProperty("replicationSlotName")
private String replicationSlotName;

public String getCurrentLsn() {
return currentLsn;
}

public void setCurrentLsn(String currentLsn) {
this.currentLsn = currentLsn;
}

public String getPublicationName() {
return publicationName;
}

public void setPublicationName(String publicationName) {
this.publicationName = publicationName;
}

public String getReplicationSlotName() {
return replicationSlotName;
}

public void setReplicationSlotName(String replicationSlotName) {
this.replicationSlotName = replicationSlotName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,44 @@
package org.opensearch.dataprepper.plugins.source.rds.coordination.state;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;
import org.opensearch.dataprepper.plugins.source.rds.model.ForeignKeyRelation;

import java.util.List;
import java.util.Map;

public class StreamProgressState {

@JsonProperty("currentPosition")
private BinlogCoordinate currentPosition;
@JsonProperty("engineType")
private String engineType;

@JsonProperty("waitForExport")
private boolean waitForExport = false;

@JsonProperty("foreignKeyRelations")
private List<ForeignKeyRelation> foreignKeyRelations;
/**
* Map of table name to primary keys
*/
@JsonProperty("primaryKeyMap")
private Map<String, List<String>> primaryKeyMap;

public BinlogCoordinate getCurrentPosition() {
return currentPosition;
@JsonProperty("mySqlStreamState")
private MySqlStreamState mySqlStreamState;

@JsonProperty("postgresStreamState")
private PostgresStreamState postgresStreamState;

public String getEngineType() {
return engineType;
}

public void setEngineType(String engineType) {
this.engineType = engineType;
}

public Map<String, List<String>> getPrimaryKeyMap() {
return primaryKeyMap;
}

public void setCurrentPosition(BinlogCoordinate currentPosition) {
this.currentPosition = currentPosition;
public void setPrimaryKeyMap(Map<String, List<String>> primaryKeyMap) {
this.primaryKeyMap = primaryKeyMap;
}

public boolean shouldWaitForExport() {
Expand All @@ -38,11 +54,19 @@ public void setWaitForExport(boolean waitForExport) {
this.waitForExport = waitForExport;
}

public List<ForeignKeyRelation> getForeignKeyRelations() {
return foreignKeyRelations;
public MySqlStreamState getMySqlStreamState() {
return mySqlStreamState;
}

public void setMySqlStreamState(MySqlStreamState mySqlStreamState) {
this.mySqlStreamState = mySqlStreamState;
}

public PostgresStreamState getPostgresStreamState() {
return postgresStreamState;
}

public void setForeignKeyRelations(List<ForeignKeyRelation> foreignKeyRelations) {
this.foreignKeyRelations = foreignKeyRelations;
public void setPostgresStreamState(PostgresStreamState postgresStreamState) {
this.postgresStreamState = postgresStreamState;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.rds.datatype.postgres;

import java.util.HashMap;
import java.util.Map;

public enum ColumnType {
BOOLEAN(16, "boolean"),
SMALLINT(21, "smallint"),
INTEGER(23, "integer"),
BIGINT(20, "bigint"),
REAL(700, "real"),
DOUBLE_PRECISION(701, "double precision"),
NUMERIC(1700, "numeric"),
TEXT(25, "text"),
VARCHAR(1043, "varchar"),
DATE(1082, "date"),
TIME(1083, "time"),
TIMESTAMP(1114, "timestamp"),
TIMESTAMPTZ(1184, "timestamptz"),
UUID(2950, "uuid"),
JSON(114, "json"),
JSONB(3802, "jsonb");

private final int typeId;
private final String typeName;

private static final Map<Integer, ColumnType> TYPE_ID_MAP = new HashMap<>();

static {
for (ColumnType type : values()) {
TYPE_ID_MAP.put(type.typeId, type);
}
}

ColumnType(int typeId, String typeName) {
this.typeId = typeId;
this.typeName = typeName;
}

public int getTypeId() {
return typeId;
}

public String getTypeName() {
return typeName;
}

public static ColumnType getByTypeId(int typeId) {
if (!TYPE_ID_MAP.containsKey(typeId)) {
throw new IllegalArgumentException("Unsupported column type id: " + typeId);
}
return TYPE_ID_MAP.get(typeId);
}

public static String getTypeNameByEnum(ColumnType columnType) {
return columnType.getTypeName();
}
}
Loading

0 comments on commit f217b24

Please sign in to comment.