-
Notifications
You must be signed in to change notification settings - Fork 0
/
one-table.postgres.sql
353 lines (298 loc) · 14.2 KB
/
one-table.postgres.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
/*
SQL Experiments for Typing and Normalizing AirbyteRecords in 1 table
Run me on Postgres
Schema:
{
"id": "number",
"first_name": ["string", null],
"age": ["number", null],
"address": [null, {
"street": "string",
"zip": "string"
}],
"updated_at": timestamp
}
KNOWN LIMITATIONS
* Only one error type shown per row, the first one sequentially
* There's a full table scan used for de-duplication. This can be made more efficient...
* It would be better to show the actual error message from the DB, not custom "this column is bad" strings
*/
-- Set up the Experiment
-- Assumption: We can build the table at the start of the sync based only on the schema we get from the source/configured catalog
DROP TABLE IF EXISTS public.users;
DROP TABLE IF EXISTS z_airbyte.users_raw;
CREATE TABLE public.users (
"_airbyte_raw_id" uuid NOT NULL -- Airbyte column, cannot be null
, "_airbyte_meta" json NOT NULL -- Airbyte column, cannot be null
, "_airbyte_extracted_at" timestamp NOT NULL -- Airbyte column, cannot be null
, "id" int8 -- PK cannot be null, but after raw insert and before typing, row will be null
, "first_name" text
, "age" int8
, "address" json
, "updated_at" timestamp
);
-- indexes for colums we will use
CREATE INDEX "idx_users__airbyte_extracted_at" ON public.users USING BTREE ("_airbyte_extracted_at");
CREATE INDEX "idx_users__airbyte_raw_id" ON public.users USING BTREE ("_airbyte_raw_id");
CREATE INDEX "idx_users_pk" ON public.users USING BTREE ("id");
CREATE INDEX "idx_updated_at_pk" ON public.users USING BTREE ("updated_at");
---------------------------------------
--------- SAFE CAST METHODS -----------
---------------------------------------
-- Some DBs (BQ) have this built in, but we can do more-or-less the same thing with custom functions
CREATE OR REPLACE FUNCTION _airbyte_safe_cast_to_integer(v_input text)
RETURNS INTEGER AS $$
DECLARE v_int_value INTEGER DEFAULT NULL;
BEGIN
BEGIN
v_int_value := v_input::INTEGER;
EXCEPTION WHEN OTHERS THEN
RAISE NOTICE 'Invalid integer value: "%". Returning NULL.', v_input;
RETURN NULL;
END;
RETURN v_int_value;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION _airbyte_safe_cast_to_boolean(v_input text)
RETURNS BOOLEAN AS $$
DECLARE v_bool_value BOOLEAN DEFAULT NULL;
BEGIN
BEGIN
v_bool_value := v_input::BOOLEAN;
EXCEPTION WHEN OTHERS THEN
RAISE NOTICE 'Invalid boolean value: "%". Returning NULL.', v_input;
RETURN NULL;
END;
RETURN v_bool_value;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION _airbyte_safe_cast_to_text(v_input text)
RETURNS TEXT AS $$
DECLARE v_text_value TEXT DEFAULT NULL;
BEGIN
BEGIN
v_text_value := v_input::TEXT;
EXCEPTION WHEN OTHERS THEN
RAISE NOTICE 'Invalid text value: "%". Returning NULL.', v_input;
RETURN NULL;
END;
RETURN v_text_value;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION _airbyte_safe_cast_to_timestamp(v_input text)
RETURNS TIMESTAMP AS $$
DECLARE v_ts_value TIMESTAMP DEFAULT NULL;
BEGIN
BEGIN
v_ts_value := v_input::TIMESTAMP;
EXCEPTION WHEN OTHERS THEN
RAISE NOTICE 'Invalid timestamp value: "%". Returning NULL.', v_input;
RETURN NULL;
END;
RETURN v_ts_value;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION _airbyte_safe_cast_to_json(v_input text)
RETURNS JSON AS $$
DECLARE v_json_value JSON DEFAULT NULL;
BEGIN
BEGIN
v_json_value := v_input::JSON;
EXCEPTION WHEN OTHERS THEN
RAISE NOTICE 'Invalid json value: "%". Returning NULL.', v_input;
RETURN NULL;
END;
RETURN v_json_value;
END;
$$ LANGUAGE plpgsql;
-------------------------------------
--------- TYPE AND DEDUPE -----------
-------------------------------------
CREATE OR REPLACE FUNCTION _airbyte_prepare_raw_table()
RETURNS TEXT AS $$
BEGIN
CREATE SCHEMA IF NOT EXISTS z_airbyte;
CREATE TABLE IF NOT EXISTS z_airbyte.users_raw (
"_airbyte_raw_id" uuid NOT NULL, -- Airbyte column, cannot be null
"_airbyte_data" json NOT NULL, -- Airbyte column, cannot be null
"_airbyte_extracted_at" timestamp NOT NULL, -- Airbyte column, cannot be null
"_airbyte_loaded_at" timestamp, -- Airbyte column
PRIMARY KEY ("_airbyte_raw_id")
);
CREATE INDEX IF NOT EXISTS "idx_users_raw__airbyte_raw_id" ON z_airbyte.users_raw USING BTREE ("_airbyte_raw_id");
CREATE INDEX IF NOT EXISTS "idx_users_raw__airbyte_extracted_at" ON z_airbyte.users_raw USING BTREE ("_airbyte_extracted_at");
CREATE INDEX IF NOT EXISTS "idx_users_raw__airbyte_loaded_at" ON z_airbyte.users_raw USING BTREE ("_airbyte_loaded_at");
RETURN 'SUCCESS';
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION _airbyte_type_dedupe()
RETURNS TEXT AS $$
DECLARE missing_pk_count INTEGER DEFAULT 0;
BEGIN
-- Step 1: Validate the incoming data
-- We can't really do this properly in the pure-SQL example here, but we should throw if any row doesn't have a PK
missing_pk_count := (
SELECT COUNT(1)
FROM Z_AIRBYTE.USERS_RAW
WHERE
"_airbyte_loaded_at" IS NULL
AND _airbyte_safe_cast_to_integer(_airbyte_data ->> 'id') IS NULL
);
IF missing_pk_count > 0 THEN
RAISE EXCEPTION 'Table % has % rows missing a primary key', raw_table, missing_pk_count;
END IF;
-- Moving the data and deduping happens in a transaction to prevent duplicates from appearing
-- BEGIN
-- Step 2: Move the new data to the typed table
WITH intermediate_data AS (
SELECT
_airbyte_safe_cast_to_integer(_airbyte_data ->> 'id') as id,
_airbyte_safe_cast_to_text(_airbyte_data ->> 'first_name') as first_name,
_airbyte_safe_cast_to_integer(_airbyte_data ->> 'age') as age,
_airbyte_safe_cast_to_json(_airbyte_data ->> 'address') as address,
_airbyte_safe_cast_to_timestamp(_airbyte_data ->> 'updated_at') as updated_at,
(
CASE WHEN (_airbyte_data ->> 'id' IS NOT NULL) AND (_airbyte_safe_cast_to_integer(_airbyte_data ->> 'id') IS NULL) THEN ARRAY['Problem with `id`'] ELSE ARRAY[]::text[] END
||
CASE WHEN (_airbyte_data ->> 'first_name' IS NOT NULL) AND (_airbyte_safe_cast_to_text(_airbyte_data ->> 'first_name') IS NULL) THEN ARRAY['Problem with `first_name`'] ELSE ARRAY[]::text[] END
||
CASE WHEN (_airbyte_data ->> 'age' IS NOT NULL) AND (_airbyte_safe_cast_to_integer(_airbyte_data ->> 'age') IS NULL) THEN ARRAY['Problem with `age`'] ELSE ARRAY[]::text[] END
||
CASE WHEN (_airbyte_data ->> 'updated_at' IS NOT NULL) AND (_airbyte_safe_cast_to_timestamp(_airbyte_data ->> 'updated_at') IS NULL) THEN ARRAY['Problem with `updated_at`'] ELSE ARRAY[]::text[] END
||
CASE WHEN (_airbyte_data ->> 'address' IS NOT NULL) AND (_airbyte_safe_cast_to_json(_airbyte_data ->> 'address') IS NULL) THEN ARRAY['Problem with `address`'] ELSE ARRAY[]::text[] END
) as _airbyte_cast_errors
, _airbyte_raw_id
, _airbyte_extracted_at
FROM z_airbyte.users_raw
WHERE
_airbyte_loaded_at IS NULL -- inserting only new/null values, we can recover from failed previous checkpoints
OR (
-- Temporarily place back an entry for any CDC-deleted record so we can order them properly by cursor. We only need the PK and cursor value
_airbyte_loaded_at IS NOT NULL
AND _airbyte_data ->> '$._ab_cdc_deleted_at' IS NOT NULL
)
)
INSERT INTO public.users
(
id,
first_name,
age,
updated_at,
address,
_airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
)
SELECT
id,
first_name,
age,
updated_at,
address,
CASE
WHEN array_length(_airbyte_cast_errors, 1) = 0 THEN '{"errors": []}'::JSON
ELSE json_build_object('errors', _airbyte_cast_errors)
END AS _airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
FROM intermediate_data
;
-- Step 3: Dedupe and clean the typed table
-- This is a full table scan, but we need to do it this way to merge the new rows with the old to:
-- * Consider the case in which there are multiple entries for the same PK in the new insert batch
-- * Consider the case in which the data in the new batch is older than the data in the typed table, and we only want to keep the newer (pre-existing) data
-- * Order by the source's provided cursor and _airbyte_extracted_at to break any ties
WITH cte AS (
SELECT _airbyte_raw_id, row_number() OVER (
PARTITION BY id ORDER BY updated_at DESC, _airbyte_extracted_at DESC
) as row_number FROM public.users
)
DELETE FROM public.users
WHERE
-- Delete any rows which are not the most recent for a given PK
_airbyte_raw_id IN (
SELECT _airbyte_raw_id FROM cte WHERE row_number != 1
)
;
-- Step 4: Remove old entries from Raw table
DELETE FROM
z_airbyte.users_raw
WHERE
_airbyte_raw_id NOT IN (
SELECT _airbyte_raw_id FROM public.users
)
;
-- Step 5: Clean out CDC deletes from final table
-- Only run this step if _ab_cdc_deleted_at is a property of the stream
/*
DELETE FROM testing_evan_2052.users
WHERE _ab_cdc_deleted_at IS NOT NULL
*/
-- the following will always work, even if there is no _ab_cdc_deleted_at column, but it is slower
DELETE FROM public.users
WHERE
-- Delete rows that have been CDC deleted
id IN (
SELECT
_airbyte_safe_cast_to_integer(_airbyte_data ->> 'id') as id -- based on the PK which we know from the connector catalog
FROM z_airbyte.users_raw
WHERE _airbyte_data ->> '_ab_cdc_deleted_at' IS NOT NULL
)
;
-- Step 6: Apply typed_at timestamp where needed
UPDATE z_airbyte.users_raw
SET _airbyte_loaded_at = NOW()
WHERE _airbyte_loaded_at IS NULL
;
RETURN 'SUCCESS';
END;
$$ LANGUAGE plpgsql;
----------------------------
--------- SYNC 1 -----------
----------------------------
SELECT _airbyte_prepare_raw_table();
-- Load the raw data
INSERT INTO z_airbyte.users_raw ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") VALUES ('{ "id": 1, "updated_at": "2020-01-01T00:00:00Z", "first_name": "Evan", "age": 38, "address": { "city": "San Francisco", "zip": "94001" } }', gen_random_uuid(), NOW());
INSERT INTO z_airbyte.users_raw ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") VALUES ('{ "id": 2, "updated_at": "2020-01-01T00:00:01Z", "first_name": "Brian", "age": 39, "address": { "city": "Menlo Park", "zip": "94002" } }', gen_random_uuid(), NOW());
INSERT INTO z_airbyte.users_raw ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") VALUES ('{ "id": 3, "updated_at": "2020-01-01T00:00:02Z", "first_name": "Edward", "age": 40, "address": { "city": "Sunyvale", "zip": "94003" } }', gen_random_uuid(), NOW());
INSERT INTO z_airbyte.users_raw ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") VALUES ('{ "id": 4, "updated_at": "2020-01-01T00:00:03Z", "first_name": "Joe", "address": { "city": "Seattle", "zip": "98999" } }', gen_random_uuid(), NOW()); -- Joe is missing an age, null OK
SELECT _airbyte_type_dedupe();
----------------------------
--------- SYNC 2 -----------
----------------------------
SELECT _airbyte_prepare_raw_table();
-- Load the raw data
-- Age update for Evan (user 1)
-- There is an update for Brian (user 2, new address.zip)
-- There is an update for Edward (user 3, age is invalid)
-- No update for Joe (user 4)
INSERT INTO z_airbyte.users_raw ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") VALUES ('{ "id": 1, "updated_at": "2020-01-02T00:00:00Z", "first_name": "Evan", "age": 39, "address": { "city": "San Francisco", "zip": "94001" } }', gen_random_uuid(), NOW());
INSERT INTO z_airbyte.users_raw ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") VALUES ('{ "id": 2, "updated_at": "2020-01-02T00:00:01Z", "first_name": "Brian", "age": 39, "address": { "city": "Menlo Park", "zip": "99999" } }', gen_random_uuid(), NOW());
INSERT INTO z_airbyte.users_raw ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") VALUES ('{ "id": 3, "updated_at": "2020-01-02T00:00:02Z", "first_name": "Edward", "age": "forty", "address": { "city": "Sunyvale", "zip": "94003" } }', gen_random_uuid(), NOW());
SELECT _airbyte_type_dedupe();
----------------------------
--------- SYNC 3 -----------
----------------------------
SELECT _airbyte_prepare_raw_table();
-- Load the raw data
-- Delete row 1 with CDC
-- Insert multiple records for a new user (with age incrementing each time)
INSERT INTO z_airbyte.users_raw ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") VALUES ('{ "id": 2, "updated_at": "2020-01-03T00:00:00Z", "first_name": "Brian", "age": 39, "address": { "city": "Menlo Park", "zip": "99999" }, "_ab_cdc_deleted_at": true}', gen_random_uuid(), NOW());
INSERT INTO z_airbyte.users_raw ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") VALUES ('{ "id": 5, "updated_at": "2020-01-03T00:00:01Z", "first_name": "Cynthia", "age": 40, "address": { "city": "Redwood City", "zip": "98765" }}', gen_random_uuid(), NOW());
INSERT INTO z_airbyte.users_raw ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") VALUES ('{ "id": 5, "updated_at": "2020-01-03T00:00:02Z", "first_name": "Cynthia", "age": 41, "address": { "city": "Redwood City", "zip": "98765" }}', gen_random_uuid(), NOW());
INSERT INTO z_airbyte.users_raw ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") VALUES ('{ "id": 5, "updated_at": "2020-01-03T00:00:03Z", "first_name": "Cynthia", "age": 42, "address": { "city": "Redwood City", "zip": "98765" }}', gen_random_uuid(), NOW());
SELECT _airbyte_type_dedupe();
----------------------
-- FINAL VALIDATION --
----------------------
/*
You should see 5 RAW records, one for each of the 5 users
You should see 4 TYPED records, one for each user, except user #2, which was CDC deleted
You should have the latest data for each user in the typed final table:
* User #1 (Evan) has the latest data (age=39)
* User #3 (Edward) has a null age [+ error] due to that age being un-typable
* User #4 (Joe) has a null age & no errors
* User #5 (Cynthia) has one entry dispite the multiple insertes, with the latest entry (age=42)
*/
SELECT NOW();