diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 8a2f09d..e10ecde 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -59,11 +59,16 @@ jobs: kubectl describe kafkas -n kafka kubectl describe flinkdeployments kubectl describe subscriptions + - name: Capture Flink Job Logs + if: always() + run: kubectl logs $(kubectl get pods -l component=jobmanager -o name) $(kubectl get pods -l component=taskmanager -o name) --since=0s - name: Capture Hoptimator Operator Logs if: always() run: kubectl logs $(kubectl get pods -l app=hoptimator-operator -o name) - name: Capture Flink Operator Logs if: always() run: kubectl logs $(kubectl get pods -l app.kubernetes.io/name=flink-kubernetes-operator -o name) + - name: Capture Flink Job Logs + if: always() + run: kubectl logs $(kubectl get pods -l app.kubernetes.io/name=flink-kubernetes-operator -o name) - diff --git a/Makefile b/Makefile index 2a50c07..00996f0 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ deploy-dev-environment: kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping" kubectl create namespace kafka || echo "skipping" kubectl create namespace mysql || echo "skipping" - helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.6.1/ + helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/ helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io diff --git a/deploy/samples/subscriptions.yaml b/deploy/samples/subscriptions.yaml index b0b307e..6e897c3 100644 --- a/deploy/samples/subscriptions.yaml +++ b/deploy/samples/subscriptions.yaml @@ -2,9 +2,9 @@ apiVersion: hoptimator.linkedin.com/v1alpha1 kind: Subscription metadata: - name: products + name: names spec: - sql: SELECT "quantity", "product_id" AS KEY FROM INVENTORY."products_on_hand" + sql: SELECT NAME, NAME AS KEY FROM DATAGEN.PERSON database: RAWKAFKA diff --git a/etc/integration-tests.sql b/etc/integration-tests.sql index b6844c0..a521e81 100644 --- a/etc/integration-tests.sql +++ b/etc/integration-tests.sql @@ -15,12 +15,6 @@ SELECT * FROM DATAGEN.COMPANY; !insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON SELECT * FROM RAWKAFKA."test-sink" LIMIT 5; --- MySQL CDC tables -SELECT * FROM INVENTORY."products_on_hand" LIMIT 1; - --- Test check command -!check not empty SELECT * FROM INVENTORY."products_on_hand"; - --- MySQL CDC -> Kafka (via sample subscription "products") -SELECT * FROM RAWKAFKA."products" LIMIT 1; +-- read from sample subscription "names" +SELECT * FROM RAWKAFKA."names" LIMIT 1; diff --git a/etc/readiness-probe.sql b/etc/readiness-probe.sql index dedeea8..30f0add 100644 --- a/etc/readiness-probe.sql +++ b/etc/readiness-probe.sql @@ -3,5 +3,4 @@ SELECT * FROM DATAGEN.PERSON; SELECT * FROM DATAGEN.COMPANY; -SELECT * FROM INVENTORY."products_on_hand" LIMIT 1; SELECT * FROM RAWKAFKA."test-sink" LIMIT 0; diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java index e9368ef..86a6133 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java @@ -168,9 +168,6 @@ public SqlNode visit(SqlCall call) { /** * Implements a CREATE TABLE...WITH... DDL statement. - * - * N.B. the following magic: - * - field 'KEY' is treated as a PRIMARY KEY */ class ConnectorImplementor implements ScriptImplementor { private final String database; @@ -192,11 +189,8 @@ public void implement(SqlWriter w) { (new CompoundIdentifierImplementor(database, name)).implement(w); SqlWriter.Frame frame1 = w.startList("(", ")"); (new RowTypeSpecImplementor(rowType)).implement(w); - if (rowType.getField("KEY", true, false) != null) { - w.sep(","); - w.literal("PRIMARY KEY (KEY) NOT ENFORCED"); - } w.endList(frame1); + // TODO support PRIMARY KEY for Tables that support it // TODO support PARTITIONED BY for Tables that support it w.keyword("WITH"); SqlWriter.Frame frame2 = w.startList("(", ")"); diff --git a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java index b151d0e..bc8fa2b 100644 --- a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java +++ b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java @@ -31,8 +31,9 @@ public Schema create(SchemaPlus parentSchema, String name, Map o .with("KEY", DataType.VARCHAR); ConfigProvider connectorConfigProvider = ConfigProvider.from(clientConfig) .withPrefix("properties.") - .with("connector", "upsert-kafka") + .with("connector", "kafka") .with("key.format", "csv") + .with("key.fields", "KEY") .with("value.format", "csv") .with("value.fields-include", "EXCEPT_KEY") .with("topic", x -> x); diff --git a/test-model.yaml b/test-model.yaml index df6c158..6c20686 100644 --- a/test-model.yaml +++ b/test-model.yaml @@ -14,17 +14,3 @@ schemas: bootstrap.servers: one-kafka-bootstrap.kafka.svc:9092 group.id: hoptimator-test auto.offset.reset: earliest - -- name: INVENTORY - type: custom - factory: com.linkedin.hoptimator.catalog.mysql.MySqlCdcSchemaFactory - operand: - username: root - password: debezium - hostname: mysql.mysql.svc.cluster.local - port: 3306 - database: inventory - urlSuffix: "?useUnicode=true&characterEncoding=UTF-8&useSSL=false" - connectorConfig: - scan.incremental.snapshot.enabled: false -