Skip to content

Commit

Permalink
[cdc] paimon cdc support snapshot mode (apache#4164)
Browse files Browse the repository at this point in the history
(cherry picked from commit f9164e4)
  • Loading branch information
MOBIN-F authored and wxplovecc committed Sep 11, 2024
1 parent 8b8f43c commit 046ebd1
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class MongoDBActionUtils {
private static final String INITIAL_MODE = "initial";
private static final String LATEST_OFFSET_MODE = "latest-offset";
private static final String TIMESTAMP_MODE = "timestamp";
private static final String SNAPSHOT_MODE = "snapshot";

public static final ConfigOption<String> FIELD_NAME =
ConfigOptions.key("field.name")
Expand Down Expand Up @@ -129,8 +130,14 @@ public static MongoDBSource<CdcSourceRecord> buildMongodbSource(
StartupOptions.timestamp(
mongodbConfig.get(SourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
break;
case SNAPSHOT_MODE:
sourceBuilder.startupOptions(StartupOptions.snapshot());
break;
default:
throw new IllegalArgumentException("Unsupported startup mode: " + startupMode);
throw new IllegalArgumentException(
String.format(
"Unknown scan.startup.mode='%s'. Valid scan.startup.mode for MongoDB CDC are [initial, latest-offset, timestamp, snapshot]",
startupMode));
}

Map<String, Object> customConverterConfigs = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@ public static MySqlSource<CdcSourceRecord> buildMySqlSource(
sourceBuilder.startupOptions(
StartupOptions.timestamp(
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
} else if ("snapshot".equalsIgnoreCase(startupMode)) {
sourceBuilder.startupOptions(StartupOptions.snapshot());
} else {
throw new IllegalArgumentException(
String.format(
"Unknown scan.startup.mode='%s'. Valid scan.startup.mode for MySQL CDC are [initial, earliest-offset, latest-offset, specific-offset, timestamp, snapshot]",
startupMode));
}

Properties jdbcProperties = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ public static JdbcIncrementalSource<CdcSourceRecord> buildPostgresSource(
sourceBuilder.startupOptions(StartupOptions.initial());
} else if ("latest-offset".equalsIgnoreCase(startupMode)) {
sourceBuilder.startupOptions(StartupOptions.latest());
} else if ("snapshot".equalsIgnoreCase(startupMode)) {
sourceBuilder.startupOptions(StartupOptions.snapshot());
} else {
throw new IllegalArgumentException(
String.format(
"Unknown scan.startup.mode='%s'. Valid scan.startup.mode for Postgres CDC are [initial, latest-offset, snapshot]",
startupMode));
}

Properties debeziumProperties = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1478,4 +1478,23 @@ public void testWriteOnlyAndSchemaEvolution() throws Exception {
waitForResult(expected, table, rowType, primaryKeys);
}
}

@Test
@Timeout(60)
public void testUnknowMysqlScanStartupMode() {
String scanStartupMode = "abc";
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "schema_evolution_multiple");
mySqlConfig.put("scan.startup.mode", scanStartupMode);

MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig).build();
assertThatThrownBy(action::run)
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"Unknown scan.startup.mode='"
+ scanStartupMode
+ "'. Valid scan.startup.mode for MySQL CDC are [initial, earliest-offset, latest-offset, specific-offset, timestamp, snapshot]"));
}
}

0 comments on commit 046ebd1

Please sign in to comment.