-
Notifications
You must be signed in to change notification settings - Fork 35
/
Copy pathflink-tables.sql
154 lines (144 loc) · 3.98 KB
/
flink-tables.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '500 ms';
SET 'table.exec.mini-batch.size' = '1000';
CREATE FUNCTION ARRAY_AGGR AS 'co.decodable.demos.arrayagg.ArrayAggr';
CREATE TABLE purchase_orders (
id INT,
order_date DATE,
purchaser_id INT,
PRIMARY KEY (id) NOT ENFORCED
)
WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgres',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'inventory',
'table-name' = 'purchase_orders',
'slot.name' = 'purchase_orders_slot'
);
CREATE TABLE order_lines (
id INT,
order_id INT,
product_id INT,
quantity INT,
price DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
)
WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgres',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'inventory',
'table-name' = 'order_lines',
'slot.name' = 'order_lines_slot'
);
CREATE TABLE products (
id INT,
name VARCHAR(255),
description VARCHAR(512),
weight DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
)
WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgres',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'inventory',
'table-name' = 'products',
'slot.name' = 'products_slot'
);
CREATE TABLE customers (
id INT,
first_name STRING,
last_name STRING,
email STRING,
PRIMARY KEY (id) NOT ENFORCED
)
WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgres',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'inventory',
'table-name' = 'customers',
'slot.name' = 'customers_slot'
);
CREATE TABLE customer_phone_numbers (
id INT,
customer_id INT,
type VARCHAR(20),
`value` VARCHAR(100),
PRIMARY KEY (id) NOT ENFORCED
)
WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgres',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'inventory',
'table-name' = 'customer_phone_numbers',
'slot.name' = 'customer_phone_numbers_slot'
);
CREATE TABLE orders_with_lines_kafka (
order_id INT,
order_date DATE,
purchaser_id INT,
lines ARRAY<ROW<id INT, product_id INT, quantity INT, price DOUBLE>>,
PRIMARY KEY (order_id) NOT ENFORCED
)
WITH (
'connector' = 'upsert-kafka',
'topic' = 'orders_with_lines',
'properties.bootstrap.servers' = 'redpanda:29092',
'key.format' = 'json', 'value.format' = 'json'
);
CREATE TABLE orders_with_lines_and_customer_kafka (
order_id INT,
order_date DATE,
purchaser ROW<id INT, first_name VARCHAR, last_name VARCHAR, email VARCHAR, phone_numbers ARRAY<ROW<id INT, type VARCHAR, `value` VARCHAR>>>,
lines ARRAY<ROW<id INT, product_id INT, quantity INT, price DOUBLE>>,
PRIMARY KEY (order_id) NOT ENFORCED
)
WITH (
'connector' = 'upsert-kafka',
'topic' = 'orders_with_lines_and_customer',
'properties.bootstrap.servers' = 'redpanda:29092',
'key.format' = 'json', 'value.format' = 'json'
);
CREATE TABLE orders_with_lines_es (
order_id INT,
order_date DATE,
purchaser_id INT,
lines ARRAY<ROW<id INT, product_id INT, quantity INT, price DOUBLE>>,
PRIMARY KEY (order_id) NOT ENFORCED
)
WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://elasticsearch:9200',
'index' = 'orders_with_lines'
);
CREATE TABLE orders_with_lines_and_customer_es (
order_id INT,
order_date DATE,
purchaser ROW<id INT, first_name VARCHAR, last_name VARCHAR, email VARCHAR, phone_numbers ARRAY<ROW<id INT, type VARCHAR, `value` VARCHAR>>>,
lines ARRAY<ROW<id INT, product_id INT, quantity INT, price DOUBLE>>,
PRIMARY KEY (order_id) NOT ENFORCED
)
WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://elasticsearch:9200',
'index' = 'orders_with_lines_and_customer'
);