From d1b56535e65de15533b382fabe30f9898ed942b0 Mon Sep 17 00:00:00 2001 From: Ryanne Dolan Date: Fri, 21 Jun 2024 00:10:43 -0500 Subject: [PATCH 1/3] Drop primary key constraint --- .github/workflows/integration-tests.yml | 7 ++++++- Makefile | 2 +- deploy/samples/subscriptions.yaml | 4 ++-- etc/integration-tests.sql | 10 ++-------- etc/readiness-probe.sql | 1 - .../hoptimator/catalog/ScriptImplementor.java | 8 +------- .../catalog/kafka/RawKafkaSchemaFactory.java | 3 ++- test-model.yaml | 14 -------------- 8 files changed, 14 insertions(+), 35 deletions(-) diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 8a2f09d..e10ecde 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -59,11 +59,16 @@ jobs: kubectl describe kafkas -n kafka kubectl describe flinkdeployments kubectl describe subscriptions + - name: Capture Flink Job Logs + if: always() + run: kubectl logs $(kubectl get pods -l component=jobmanager -o name) $(kubectl get pods -l component=taskmanager -o name) --since=0s - name: Capture Hoptimator Operator Logs if: always() run: kubectl logs $(kubectl get pods -l app=hoptimator-operator -o name) - name: Capture Flink Operator Logs if: always() run: kubectl logs $(kubectl get pods -l app.kubernetes.io/name=flink-kubernetes-operator -o name) + - name: Capture Flink Job Logs + if: always() + run: kubectl logs $(kubectl get pods -l app.kubernetes.io/name=flink-kubernetes-operator -o name) - diff --git a/Makefile b/Makefile index 2a50c07..00996f0 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ deploy-dev-environment: kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping" kubectl create namespace kafka || echo "skipping" kubectl create namespace mysql || echo "skipping" - helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.6.1/ + helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/ helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io diff --git a/deploy/samples/subscriptions.yaml b/deploy/samples/subscriptions.yaml index b0b307e..6e897c3 100644 --- a/deploy/samples/subscriptions.yaml +++ b/deploy/samples/subscriptions.yaml @@ -2,9 +2,9 @@ apiVersion: hoptimator.linkedin.com/v1alpha1 kind: Subscription metadata: - name: products + name: names spec: - sql: SELECT "quantity", "product_id" AS KEY FROM INVENTORY."products_on_hand" + sql: SELECT NAME, NAME AS KEY FROM DATAGEN.PERSON database: RAWKAFKA diff --git a/etc/integration-tests.sql b/etc/integration-tests.sql index b6844c0..a521e81 100644 --- a/etc/integration-tests.sql +++ b/etc/integration-tests.sql @@ -15,12 +15,6 @@ SELECT * FROM DATAGEN.COMPANY; !insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON SELECT * FROM RAWKAFKA."test-sink" LIMIT 5; --- MySQL CDC tables -SELECT * FROM INVENTORY."products_on_hand" LIMIT 1; - --- Test check command -!check not empty SELECT * FROM INVENTORY."products_on_hand"; - --- MySQL CDC -> Kafka (via sample subscription "products") -SELECT * FROM RAWKAFKA."products" LIMIT 1; +-- read from sample subscription "names" +SELECT * FROM RAWKAFKA."names" LIMIT 1; diff --git a/etc/readiness-probe.sql b/etc/readiness-probe.sql index dedeea8..30f0add 100644 --- a/etc/readiness-probe.sql +++ b/etc/readiness-probe.sql @@ -3,5 +3,4 @@ SELECT * FROM DATAGEN.PERSON; SELECT * FROM DATAGEN.COMPANY; -SELECT * FROM INVENTORY."products_on_hand" LIMIT 1; SELECT * FROM RAWKAFKA."test-sink" LIMIT 0; diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java index e9368ef..86a6133 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java @@ -168,9 +168,6 @@ public SqlNode visit(SqlCall call) { /** * Implements a CREATE TABLE...WITH... DDL statement. - * - * N.B. the following magic: - * - field 'KEY' is treated as a PRIMARY KEY */ class ConnectorImplementor implements ScriptImplementor { private final String database; @@ -192,11 +189,8 @@ public void implement(SqlWriter w) { (new CompoundIdentifierImplementor(database, name)).implement(w); SqlWriter.Frame frame1 = w.startList("(", ")"); (new RowTypeSpecImplementor(rowType)).implement(w); - if (rowType.getField("KEY", true, false) != null) { - w.sep(","); - w.literal("PRIMARY KEY (KEY) NOT ENFORCED"); - } w.endList(frame1); + // TODO support PRIMARY KEY for Tables that support it // TODO support PARTITIONED BY for Tables that support it w.keyword("WITH"); SqlWriter.Frame frame2 = w.startList("(", ")"); diff --git a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java index b151d0e..bc8fa2b 100644 --- a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java +++ b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java @@ -31,8 +31,9 @@ public Schema create(SchemaPlus parentSchema, String name, Map o .with("KEY", DataType.VARCHAR); ConfigProvider connectorConfigProvider = ConfigProvider.from(clientConfig) .withPrefix("properties.") - .with("connector", "upsert-kafka") + .with("connector", "kafka") .with("key.format", "csv") + .with("key.fields", "KEY") .with("value.format", "csv") .with("value.fields-include", "EXCEPT_KEY") .with("topic", x -> x); diff --git a/test-model.yaml b/test-model.yaml index df6c158..6c20686 100644 --- a/test-model.yaml +++ b/test-model.yaml @@ -14,17 +14,3 @@ schemas: bootstrap.servers: one-kafka-bootstrap.kafka.svc:9092 group.id: hoptimator-test auto.offset.reset: earliest - -- name: INVENTORY - type: custom - factory: com.linkedin.hoptimator.catalog.mysql.MySqlCdcSchemaFactory - operand: - username: root - password: debezium - hostname: mysql.mysql.svc.cluster.local - port: 3306 - database: inventory - urlSuffix: "?useUnicode=true&characterEncoding=UTF-8&useSSL=false" - connectorConfig: - scan.incremental.snapshot.enabled: false - From 1c869a4419e05751a692084f063ddd269c30c3ba Mon Sep 17 00:00:00 2001 From: Ryanne Dolan Date: Wed, 10 Jul 2024 10:15:02 -0500 Subject: [PATCH 2/3] Drop MySQL from integration tests --- .github/workflows/integration-tests.yml | 4 +- Makefile | 1 - deploy/dev/mysql.yaml | 62 ------------------------- 3 files changed, 3 insertions(+), 64 deletions(-) delete mode 100644 deploy/dev/mysql.yaml diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index e10ecde..c55d91f 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -61,7 +61,9 @@ jobs: kubectl describe subscriptions - name: Capture Flink Job Logs if: always() - run: kubectl logs $(kubectl get pods -l component=jobmanager -o name) $(kubectl get pods -l component=taskmanager -o name) --since=0s + run: | + kubectl logs $(kubectl get pods -l component=jobmanager -o name) --since=0s || echo "skipped." + kubectl logs $(kubectl get pods -l component=taskmanager -o name) --since=0s || echo "skipped." - name: Capture Hoptimator Operator Logs if: always() run: kubectl logs $(kubectl get pods -l app=hoptimator-operator -o name) diff --git a/Makefile b/Makefile index 00996f0..88d3f9f 100644 --- a/Makefile +++ b/Makefile @@ -25,7 +25,6 @@ quickstart: build deploy-dev-environment deploy deploy-dev-environment: kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping" kubectl create namespace kafka || echo "skipping" - kubectl create namespace mysql || echo "skipping" helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/ helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka diff --git a/deploy/dev/mysql.yaml b/deploy/dev/mysql.yaml deleted file mode 100644 index 06b7f16..0000000 --- a/deploy/dev/mysql.yaml +++ /dev/null @@ -1,62 +0,0 @@ -# Copyright (c) 2023, LinkedIn -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Based on examples at: -# https://debezium.io/documentation/reference/stable/operations/kubernetes.html - - -apiVersion: v1 -kind: Service -metadata: - name: mysql - namespace: mysql -spec: - ports: - - port: 3306 - selector: - app: mysql - clusterIP: None - ---- - -apiVersion: apps/v1 -kind: Deployment -metadata: - name: mysql - namespace: mysql -spec: - selector: - matchLabels: - app: mysql - strategy: - type: Recreate - template: - metadata: - labels: - app: mysql - spec: - containers: - - image: quay.io/debezium/example-mysql:2.2 - name: mysql - env: - - name: MYSQL_ROOT_PASSWORD - value: debezium - - name: MYSQL_USER - value: mysqluser - - name: MYSQL_PASSWORD - value: mysqlpw - ports: - - containerPort: 3306 - name: mysql - From a73cff591bd328e2ad6a8bd1fcc3d360d1c6923e Mon Sep 17 00:00:00 2001 From: Ryanne Dolan Date: Wed, 10 Jul 2024 10:45:14 -0500 Subject: [PATCH 3/3] Support PRIMARY_KEY magic field --- .../linkedin/hoptimator/catalog/ScriptImplementor.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java index 86a6133..72474be 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java @@ -168,6 +168,9 @@ public SqlNode visit(SqlCall call) { /** * Implements a CREATE TABLE...WITH... DDL statement. + * + * N.B. the following magic: + * - field 'PRIMARY_KEY' is treated as a PRIMARY KEY */ class ConnectorImplementor implements ScriptImplementor { private final String database; @@ -189,8 +192,11 @@ public void implement(SqlWriter w) { (new CompoundIdentifierImplementor(database, name)).implement(w); SqlWriter.Frame frame1 = w.startList("(", ")"); (new RowTypeSpecImplementor(rowType)).implement(w); + if (rowType.getField("PRIMARY_KEY", true, false) != null) { + w.sep(","); + w.literal("PRIMARY KEY (PRIMARY_KEY) NOT ENFORCED"); + } w.endList(frame1); - // TODO support PRIMARY KEY for Tables that support it // TODO support PARTITIONED BY for Tables that support it w.keyword("WITH"); SqlWriter.Frame frame2 = w.startList("(", ")");