diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 8a2f09d..c55d91f 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -59,11 +59,18 @@ jobs: kubectl describe kafkas -n kafka kubectl describe flinkdeployments kubectl describe subscriptions + - name: Capture Flink Job Logs + if: always() + run: | + kubectl logs $(kubectl get pods -l component=jobmanager -o name) --since=0s || echo "skipped." + kubectl logs $(kubectl get pods -l component=taskmanager -o name) --since=0s || echo "skipped." - name: Capture Hoptimator Operator Logs if: always() run: kubectl logs $(kubectl get pods -l app=hoptimator-operator -o name) - name: Capture Flink Operator Logs if: always() run: kubectl logs $(kubectl get pods -l app.kubernetes.io/name=flink-kubernetes-operator -o name) + - name: Capture Flink Job Logs + if: always() + run: kubectl logs $(kubectl get pods -l app.kubernetes.io/name=flink-kubernetes-operator -o name) - diff --git a/Makefile b/Makefile index 2a50c07..88d3f9f 100644 --- a/Makefile +++ b/Makefile @@ -25,8 +25,7 @@ quickstart: build deploy-dev-environment deploy deploy-dev-environment: kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping" kubectl create namespace kafka || echo "skipping" - kubectl create namespace mysql || echo "skipping" - helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.6.1/ + helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/ helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io diff --git a/deploy/dev/mysql.yaml b/deploy/dev/mysql.yaml deleted file mode 100644 index 06b7f16..0000000 --- a/deploy/dev/mysql.yaml +++ /dev/null @@ -1,62 +0,0 @@ -# Copyright (c) 2023, LinkedIn -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Based on examples at: -# https://debezium.io/documentation/reference/stable/operations/kubernetes.html - - -apiVersion: v1 -kind: Service -metadata: - name: mysql - namespace: mysql -spec: - ports: - - port: 3306 - selector: - app: mysql - clusterIP: None - ---- - -apiVersion: apps/v1 -kind: Deployment -metadata: - name: mysql - namespace: mysql -spec: - selector: - matchLabels: - app: mysql - strategy: - type: Recreate - template: - metadata: - labels: - app: mysql - spec: - containers: - - image: quay.io/debezium/example-mysql:2.2 - name: mysql - env: - - name: MYSQL_ROOT_PASSWORD - value: debezium - - name: MYSQL_USER - value: mysqluser - - name: MYSQL_PASSWORD - value: mysqlpw - ports: - - containerPort: 3306 - name: mysql - diff --git a/deploy/samples/subscriptions.yaml b/deploy/samples/subscriptions.yaml index b0b307e..6e897c3 100644 --- a/deploy/samples/subscriptions.yaml +++ b/deploy/samples/subscriptions.yaml @@ -2,9 +2,9 @@ apiVersion: hoptimator.linkedin.com/v1alpha1 kind: Subscription metadata: - name: products + name: names spec: - sql: SELECT "quantity", "product_id" AS KEY FROM INVENTORY."products_on_hand" + sql: SELECT NAME, NAME AS KEY FROM DATAGEN.PERSON database: RAWKAFKA diff --git a/etc/integration-tests.sql b/etc/integration-tests.sql index b6844c0..a521e81 100644 --- a/etc/integration-tests.sql +++ b/etc/integration-tests.sql @@ -15,12 +15,6 @@ SELECT * FROM DATAGEN.COMPANY; !insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON SELECT * FROM RAWKAFKA."test-sink" LIMIT 5; --- MySQL CDC tables -SELECT * FROM INVENTORY."products_on_hand" LIMIT 1; - --- Test check command -!check not empty SELECT * FROM INVENTORY."products_on_hand"; - --- MySQL CDC -> Kafka (via sample subscription "products") -SELECT * FROM RAWKAFKA."products" LIMIT 1; +-- read from sample subscription "names" +SELECT * FROM RAWKAFKA."names" LIMIT 1; diff --git a/etc/readiness-probe.sql b/etc/readiness-probe.sql index dedeea8..30f0add 100644 --- a/etc/readiness-probe.sql +++ b/etc/readiness-probe.sql @@ -3,5 +3,4 @@ SELECT * FROM DATAGEN.PERSON; SELECT * FROM DATAGEN.COMPANY; -SELECT * FROM INVENTORY."products_on_hand" LIMIT 1; SELECT * FROM RAWKAFKA."test-sink" LIMIT 0; diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java index e9368ef..72474be 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java @@ -170,7 +170,7 @@ public SqlNode visit(SqlCall call) { * Implements a CREATE TABLE...WITH... DDL statement. * * N.B. the following magic: - * - field 'KEY' is treated as a PRIMARY KEY + * - field 'PRIMARY_KEY' is treated as a PRIMARY KEY */ class ConnectorImplementor implements ScriptImplementor { private final String database; @@ -192,9 +192,9 @@ public void implement(SqlWriter w) { (new CompoundIdentifierImplementor(database, name)).implement(w); SqlWriter.Frame frame1 = w.startList("(", ")"); (new RowTypeSpecImplementor(rowType)).implement(w); - if (rowType.getField("KEY", true, false) != null) { + if (rowType.getField("PRIMARY_KEY", true, false) != null) { w.sep(","); - w.literal("PRIMARY KEY (KEY) NOT ENFORCED"); + w.literal("PRIMARY KEY (PRIMARY_KEY) NOT ENFORCED"); } w.endList(frame1); // TODO support PARTITIONED BY for Tables that support it diff --git a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java index b151d0e..bc8fa2b 100644 --- a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java +++ b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java @@ -31,8 +31,9 @@ public Schema create(SchemaPlus parentSchema, String name, Map o .with("KEY", DataType.VARCHAR); ConfigProvider connectorConfigProvider = ConfigProvider.from(clientConfig) .withPrefix("properties.") - .with("connector", "upsert-kafka") + .with("connector", "kafka") .with("key.format", "csv") + .with("key.fields", "KEY") .with("value.format", "csv") .with("value.fields-include", "EXCEPT_KEY") .with("topic", x -> x); diff --git a/test-model.yaml b/test-model.yaml index df6c158..6c20686 100644 --- a/test-model.yaml +++ b/test-model.yaml @@ -14,17 +14,3 @@ schemas: bootstrap.servers: one-kafka-bootstrap.kafka.svc:9092 group.id: hoptimator-test auto.offset.reset: earliest - -- name: INVENTORY - type: custom - factory: com.linkedin.hoptimator.catalog.mysql.MySqlCdcSchemaFactory - operand: - username: root - password: debezium - hostname: mysql.mysql.svc.cluster.local - port: 3306 - database: inventory - urlSuffix: "?useUnicode=true&characterEncoding=UTF-8&useSSL=false" - connectorConfig: - scan.incremental.snapshot.enabled: false -