Skip to content

Commit

Permalink
Fix primary key constraint for nullable keys
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan committed Jul 8, 2024
1 parent c3fa16a commit 14ed43a
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 4 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ deploy-dev-environment:
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
kubectl create namespace kafka || echo "skipping"
kubectl create namespace mysql || echo "skipping"
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.6.1/
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/
helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public SqlNode visit(SqlCall call) {
* Implements a CREATE TABLE...WITH... DDL statement.
*
* N.B. the following magic:
* - field 'KEY' is treated as a PRIMARY KEY
* - non-nullable field 'KEY' is treated as a PRIMARY KEY
*/
class ConnectorImplementor implements ScriptImplementor {
private final String database;
Expand All @@ -192,7 +192,8 @@ public void implement(SqlWriter w) {
(new CompoundIdentifierImplementor(database, name)).implement(w);
SqlWriter.Frame frame1 = w.startList("(", ")");
(new RowTypeSpecImplementor(rowType)).implement(w);
if (rowType.getField("KEY", true, false) != null) {
RelDataTypeField key = rowType.getField("KEY", true, false);
if (key != null && key.getType().isNullable() == false) {
w.sep(",");
w.literal("PRIMARY KEY (KEY) NOT ENFORCED");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> o
.with("KEY", DataType.VARCHAR);
ConfigProvider connectorConfigProvider = ConfigProvider.from(clientConfig)
.withPrefix("properties.")
.with("connector", "upsert-kafka")
.with("connector", "kafka")
.with("key.format", "csv")
.with("key.fields", "KEY")
.with("value.format", "csv")
.with("value.fields-include", "EXCEPT_KEY")
.with("topic", x -> x);
Expand Down

0 comments on commit 14ed43a

Please sign in to comment.