Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix primary key constraint #66

Merged
merged 4 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,18 @@ jobs:
kubectl describe kafkas -n kafka
kubectl describe flinkdeployments
kubectl describe subscriptions
- name: Capture Flink Job Logs
if: always()
run: |
kubectl logs $(kubectl get pods -l component=jobmanager -o name) --since=0s || echo "skipped."
kubectl logs $(kubectl get pods -l component=taskmanager -o name) --since=0s || echo "skipped."
- name: Capture Hoptimator Operator Logs
if: always()
run: kubectl logs $(kubectl get pods -l app=hoptimator-operator -o name)
- name: Capture Flink Operator Logs
if: always()
run: kubectl logs $(kubectl get pods -l app.kubernetes.io/name=flink-kubernetes-operator -o name)
- name: Capture Flink Job Logs
if: always()
run: kubectl logs $(kubectl get pods -l app.kubernetes.io/name=flink-kubernetes-operator -o name)


3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ quickstart: build deploy-dev-environment deploy
deploy-dev-environment:
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
kubectl create namespace kafka || echo "skipping"
kubectl create namespace mysql || echo "skipping"
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.6.1/
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/
helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
Expand Down
62 changes: 0 additions & 62 deletions deploy/dev/mysql.yaml

This file was deleted.

4 changes: 2 additions & 2 deletions deploy/samples/subscriptions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Subscription
metadata:
name: products
name: names
spec:
sql: SELECT "quantity", "product_id" AS KEY FROM INVENTORY."products_on_hand"
sql: SELECT NAME, NAME AS KEY FROM DATAGEN.PERSON
database: RAWKAFKA


10 changes: 2 additions & 8 deletions etc/integration-tests.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@ SELECT * FROM DATAGEN.COMPANY;
!insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON
SELECT * FROM RAWKAFKA."test-sink" LIMIT 5;

-- MySQL CDC tables
SELECT * FROM INVENTORY."products_on_hand" LIMIT 1;

-- Test check command
!check not empty SELECT * FROM INVENTORY."products_on_hand";

-- MySQL CDC -> Kafka (via sample subscription "products")
SELECT * FROM RAWKAFKA."products" LIMIT 1;
-- read from sample subscription "names"
SELECT * FROM RAWKAFKA."names" LIMIT 1;

1 change: 0 additions & 1 deletion etc/readiness-probe.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@

SELECT * FROM DATAGEN.PERSON;
SELECT * FROM DATAGEN.COMPANY;
SELECT * FROM INVENTORY."products_on_hand" LIMIT 1;
SELECT * FROM RAWKAFKA."test-sink" LIMIT 0;
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public SqlNode visit(SqlCall call) {
* Implements a CREATE TABLE...WITH... DDL statement.
*
* N.B. the following magic:
* - field 'KEY' is treated as a PRIMARY KEY
* - field 'PRIMARY_KEY' is treated as a PRIMARY KEY
*/
class ConnectorImplementor implements ScriptImplementor {
private final String database;
Expand All @@ -192,9 +192,9 @@ public void implement(SqlWriter w) {
(new CompoundIdentifierImplementor(database, name)).implement(w);
SqlWriter.Frame frame1 = w.startList("(", ")");
(new RowTypeSpecImplementor(rowType)).implement(w);
if (rowType.getField("KEY", true, false) != null) {
if (rowType.getField("PRIMARY_KEY", true, false) != null) {
w.sep(",");
w.literal("PRIMARY KEY (KEY) NOT ENFORCED");
w.literal("PRIMARY KEY (PRIMARY_KEY) NOT ENFORCED");
}
w.endList(frame1);
// TODO support PARTITIONED BY for Tables that support it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> o
.with("KEY", DataType.VARCHAR);
ConfigProvider connectorConfigProvider = ConfigProvider.from(clientConfig)
.withPrefix("properties.")
.with("connector", "upsert-kafka")
.with("connector", "kafka")
.with("key.format", "csv")
.with("key.fields", "KEY")
.with("value.format", "csv")
.with("value.fields-include", "EXCEPT_KEY")
.with("topic", x -> x);
Expand Down
14 changes: 0 additions & 14 deletions test-model.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,3 @@ schemas:
bootstrap.servers: one-kafka-bootstrap.kafka.svc:9092
group.id: hoptimator-test
auto.offset.reset: earliest

- name: INVENTORY
type: custom
factory: com.linkedin.hoptimator.catalog.mysql.MySqlCdcSchemaFactory
operand:
username: root
password: debezium
hostname: mysql.mysql.svc.cluster.local
port: 3306
database: inventory
urlSuffix: "?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
connectorConfig:
scan.incremental.snapshot.enabled: false

Loading