This repo is a playground for experimenting with different ways of typing and deduping SQL queries on the path to 1-stream 1-table and replacing "normalization" and dbt.
Read more about this projedt in airbytehq/airbyte#26028 and the PRD.
- Accepts batches of new data and types them safely while inserting the data into the final table
- Handles deduplication
- Handles per-row typing errors safely
- Handles composite primary keys
- Handles deduplication when multiple entries for a PK exist in the same batch
- Handles deduplication when multiple entries for a PK when inserted out-of-order across different batches
- Handles CDC deletes and releated shenanigans.
If you want to seed a big test database:
-- Create the tables
CREATE TABLE PUBLIC.USERS (
"id" int PRIMARY KEY, -- PK cannot be null, but after raw insert and before typing, row will be null
"first_name" text,
"age" int,
"address" variant,
"updated_at" timestamp NOT NULL,
"_airbyte_meta" variant NOT NULL, -- Airbyte column, cannot be null
"_airbyte_raw_id" VARCHAR(36) NOT NULL, -- Airbyte column, cannot be null
"_airbyte_extracted_at" timestamp NOT NULL -- Airbyte column, cannot be null
);
CREATE SCHEMA IF NOT EXISTS Z_AIRBYTE;
CREATE TABLE IF NOT EXISTS Z_AIRBYTE.USERS_RAW (
"_airbyte_raw_id" VARCHAR(36) NOT NULL PRIMARY KEY, -- Airbyte column, cannot be null
"_airbyte_data" variant NOT NULL, -- Airbyte column, cannot be null
"_airbyte_extracted_at" timestamp NOT NULL, -- Airbyte column, cannot be null
"_airbyte_loaded_at" timestamp -- Airbyte column
);
TRUNCATE TABLE PUBLIC.USERS;
TRUNCATE TABLE Z_AIRBYTE.USERS_RAW;
-- Load in the data (CSV)
COPY INTO Z_AIRBYTE.USERS_RAW
FROM 'gcs://airbyte-performance-testing-public/typing-deduping-testing/users_raw.csv'
FILE_FORMAT = (TYPE = 'CSV' FIELD_DELIMITER = '\t' SKIP_HEADER = 1 ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE)
FORCE = TRUE
PURGE = FALSE
;
-- update the _airbyte_raw_ids each time
UPDATE Z_AIRBYTE.USERS_RAW
SET "_airbyte_raw_id" = UUID_STRING()
WHERE "_airbyte_loaded_at" IS NULL
;
Loading 1M records from CSV hosted on Google Cloud took about ~75 seconds.
And if you want to insert the data again, to emulate another sync with 1M records:
-- Load in the data (CSV) w/o truncating
COPY INTO Z_AIRBYTE.USERS_RAW
FROM 'gcs://airbyte-performance-testing-public/typing-deduping-testing/users_raw.csv'
FILE_FORMAT = (TYPE = 'CSV' FIELD_DELIMITER = '\t' SKIP_HEADER = 1 ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE)
FORCE = TRUE
PURGE = FALSE
;
-- update the _airbyte_raw_ids each time
UPDATE Z_AIRBYTE.USERS_RAW
SET "_airbyte_raw_id" = UUID_STRING()
WHERE "_airbyte_loaded_at" IS NULL
;
To Dump the file from postgres (where it was generated locally):
COPY z_airbyte.users_raw to '/Users/evan/workspace/airbyte/typing-and-deduping-sql-experiments/data/users_raw.csv'
WITH (FORMAT CSV, HEADER TRUE, ENCODING 'UTF8', quote '|', delimiter E'\t');
There's also a gcs://airbyte-performance-testing-public/typing-deduping-testing/users_raw_errors.csv
file that has 1M records with 10 rows having a type error. This is useful for testing the error handling.
- Can't use JSON files to load, 16MB limit for the whole file. Can't use normal CSV, because double-quoting JSON confuses snowflake's parser. Solution was to use ';' as
FIELD_DELIMITER
- CSV header order is meaningless. The header order and table order MUST match.
- It takes about 45 seconds to "spin up a thread" to start the
COPY INTO
statement. If the file was 10 lines or 1M lines, the startup cost was more-or-less the same.
If you want to seed a big test database:
-- Load in the data (CSV)
-- NOTE: We have to use overwrite because BQ can handle writing CSVs to non-nullable columns. This will alter the table to match the CSV and remove any partitioning or non-nullable-ness
CREATE TABLE IF NOT EXISTS testing_evan_2052.users (
`id` INT64 OPTIONS (description = 'PK cannot be null, but after raw insert and before typing, row will be temporarily null')
, `first_name` STRING
, `age` INT64
, `address` JSON
, `updated_at` TIMESTAMP
, `_airbyte_meta` JSON NOT NULL OPTIONS (description = 'Airbyte column, cannot be null')
, `_airbyte_raw_id` STRING NOT NULL OPTIONS (description = 'Airbyte column, cannot be null')
, `_airbyte_extracted_at` TIMESTAMP NOT NULL OPTIONS (description = 'Airbyte column, cannot be null')
)
PARTITION BY (
DATE_TRUNC(_airbyte_extracted_at, DAY)
-- TODO: Learn about partition_expiration_days https://cloud.google.com/bigquery/docs/creating-partitioned-tables
) CLUSTER BY
id, _airbyte_extracted_at
OPTIONS (
description="users table"
)
;
CREATE TABLE IF NOT EXISTS testing_evan_2052.users_raw (
`_airbyte_raw_id` STRING NOT NULL OPTIONS (description = 'Airbyte column, cannot be null')
, `_airbyte_data` JSON NOT NULL OPTIONS (description = 'Airbyte column, cannot be null')
, `_airbyte_extracted_at` TIMESTAMP NOT NULL OPTIONS (description = 'Airbyte column, cannot be null')
, `_airbyte_loaded_at` TIMESTAMP
)
-- no partition, no cluster
;
TRUNCATE TABLE `testing_evan_2052`.`users_raw`;
TRUNCATE TABLE `testing_evan_2052`.`users`;
-- Load in the data (CSV)
-- NOTE: We have to use overwrite because BQ can handle writing CSVs to non-nullable columns. This will alter the table to match the CSV and remove any partitioning or non-nullable-ness
DROP TABLE IF EXISTS testing_evan_2052.users_raw_tmp;
LOAD DATA OVERWRITE testing_evan_2052.users_raw_tmp
FROM FILES (
format = 'CSV',
uris = ['gs://airbyte-performance-testing-public/typing-deduping-testing/users_raw.csv'],
skip_leading_rows = 1,
field_delimiter = "\t"
);
INSERT INTO testing_evan_2052.users_raw (
_airbyte_raw_id, _airbyte_data, _airbyte_extracted_at
) SELECT
_airbyte_raw_id
, PARSE_JSON(_airbyte_data)
, CAST(_airbyte_extracted_at AS timestamp)
FROM testing_evan_2052.users_raw_tmp;
DROP TABLE IF EXISTS testing_evan_2052.users_raw_tmp;
-- update the _airbyte_raw_ids each time
UPDATE testing_evan_2052.users_raw
SET `_airbyte_raw_id` = GENERATE_UUID()
WHERE `_airbyte_loaded_at` IS NULL
;
select count(*) from testing_evan_2052.users_raw;
select count(*) from testing_evan_2052.users;
-- do it; loaded from .sql files in this repo
CALL testing_evan_2052._airbyte_type_dedupe();
- Loading 1M records from CSV hosted on Google Cloud took about ~10 seconds.
- Typing and Deduplicating 1M rows took ~11 seconds