Skip to content

Commit

Permalink
[cdc] Kafka database sync new table shouldn't require primary keys (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Sep 18, 2024
1 parent cc573ea commit 102e5a9
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory()
allowUpperCase,
partitionKeys,
primaryKeys,
requirePrimaryKeys(),
partitionKeyMultiple,
metadataConverters);
Pattern includingPattern = Pattern.compile(includingTables);
Expand All @@ -170,6 +171,8 @@ protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory()
createdTables);
}

protected abstract boolean requirePrimaryKeys();

@Override
protected void buildSink(
DataStream<RichCdcMultiplexRecord> input,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ public KafkaSyncDatabaseAction(
protected CdcTimestampExtractor createCdcTimestampExtractor() {
return new MessageQueueCdcTimestampExtractor();
}

@Override
protected boolean requirePrimaryKeys() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,9 @@ protected MongoDBSource<CdcSourceRecord> buildSource() {
includingTables,
Collections.emptyList()));
}

@Override
protected boolean requirePrimaryKeys() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,9 @@ public List<Identifier> monitoredTables() {
public List<Identifier> excludedTables() {
return excludedTables;
}

@Override
protected boolean requirePrimaryKeys() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ public PulsarSyncDatabaseAction(
protected CdcTimestampExtractor createCdcTimestampExtractor() {
return new MessageQueueCdcTimestampExtractor();
}

@Override
protected boolean requirePrimaryKeys() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -38,21 +37,24 @@ public class NewTableSchemaBuilder implements Serializable {
private final boolean caseSensitive;
private final List<String> partitionKeys;
private final List<String> primaryKeys;
private final boolean requirePrimaryKeys;
private final CdcMetadataConverter[] metadataConverters;
protected Map<String, List<String>> partitionKeyMultiple = new HashMap<>();
private final Map<String, List<String>> partitionKeyMultiple;

public NewTableSchemaBuilder(
Map<String, String> tableConfig,
boolean caseSensitive,
List<String> partitionKeys,
List<String> primaryKeys,
boolean requirePrimaryKeys,
Map<String, List<String>> partitionKeyMultiple,
CdcMetadataConverter[] metadataConverters) {
this.tableConfig = tableConfig;
this.caseSensitive = caseSensitive;
this.metadataConverters = metadataConverters;
this.partitionKeys = partitionKeys;
this.primaryKeys = primaryKeys;
this.requirePrimaryKeys = requirePrimaryKeys;
this.partitionKeyMultiple = partitionKeyMultiple;
}

Expand Down Expand Up @@ -84,6 +86,6 @@ public Optional<Schema> build(RichCdcMultiplexRecord record) {
metadataConverters,
caseSensitive,
false,
true));
requirePrimaryKeys));
}
}

0 comments on commit 102e5a9

Please sign in to comment.