From 14ed43aa65d08ec726f831132afcde8b8164b997 Mon Sep 17 00:00:00 2001 From: Ryanne Dolan Date: Fri, 21 Jun 2024 00:10:43 -0500 Subject: [PATCH] Fix primary key constraint for nullable keys --- Makefile | 2 +- .../com/linkedin/hoptimator/catalog/ScriptImplementor.java | 5 +++-- .../hoptimator/catalog/kafka/RawKafkaSchemaFactory.java | 3 ++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index 2a50c07..00996f0 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ deploy-dev-environment: kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping" kubectl create namespace kafka || echo "skipping" kubectl create namespace mysql || echo "skipping" - helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.6.1/ + helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/ helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java index e9368ef..6a5e6e8 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java @@ -170,7 +170,7 @@ public SqlNode visit(SqlCall call) { * Implements a CREATE TABLE...WITH... DDL statement. * * N.B. the following magic: - * - field 'KEY' is treated as a PRIMARY KEY + * - non-nullable field 'KEY' is treated as a PRIMARY KEY */ class ConnectorImplementor implements ScriptImplementor { private final String database; @@ -192,7 +192,8 @@ public void implement(SqlWriter w) { (new CompoundIdentifierImplementor(database, name)).implement(w); SqlWriter.Frame frame1 = w.startList("(", ")"); (new RowTypeSpecImplementor(rowType)).implement(w); - if (rowType.getField("KEY", true, false) != null) { + RelDataTypeField key = rowType.getField("KEY", true, false); + if (key != null && key.getType().isNullable() == false) { w.sep(","); w.literal("PRIMARY KEY (KEY) NOT ENFORCED"); } diff --git a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java index b151d0e..bc8fa2b 100644 --- a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java +++ b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java @@ -31,8 +31,9 @@ public Schema create(SchemaPlus parentSchema, String name, Map o .with("KEY", DataType.VARCHAR); ConfigProvider connectorConfigProvider = ConfigProvider.from(clientConfig) .withPrefix("properties.") - .with("connector", "upsert-kafka") + .with("connector", "kafka") .with("key.format", "csv") + .with("key.fields", "KEY") .with("value.format", "csv") .with("value.fields-include", "EXCEPT_KEY") .with("topic", x -> x);