Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

materialize-azure-fabric-warehouse: new materialization connector #2300

Merged
merged 10 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ jobs:
- source-sqlserver
- source-test
- source-azure-blob-storage
- materialize-azure-fabric-warehouse
- materialize-bigquery
- materialize-databricks
- materialize-dynamodb
Expand Down Expand Up @@ -277,6 +278,7 @@ jobs:
"materialize-motherduck",
"materialize-snowflake",
"materialize-databricks",
"materialize-azure-fabric-warehouse",
"materialize-bigquery",
"materialize-redshift",
"materialize-s3-iceberg"
Expand Down
14 changes: 1 addition & 13 deletions filesink/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,28 +45,16 @@ func NewParquetStreamEncoder(cfg ParquetConfig, b *pf.MaterializationSpec_Bindin
}

type CsvConfig struct {
Delimiter string `json:"delimiter,omitempty" jsonschema:"title=Delimiter,description=Character to separate columns within a row. Defaults to a comma if blank. Must be a single character with a byte length of 1." jsonschema_extras:"order=0"`
NullString string `json:"nullString,omitempty" jsonschema:"title=Null String,description=String to use to represent NULL values. Defaults to an empty string if blank." jsonschema_extras:"order=1"`
SkipHeaders bool `json:"skipHeaders,omitempty" jsonschema:"title=Skip Headers,description=Do not write headers to files." jsonschema_extras:"order=2"`
SkipHeaders bool `json:"skipHeaders,omitempty" jsonschema:"title=Skip Headers,description=Do not write headers to files." jsonschema_extras:"order=2"`
}

func (c CsvConfig) Validate() error {
if r := []rune(c.Delimiter); len(r) > 1 {
return fmt.Errorf("delimiter %q must be a single rune (byte length of 1): got byte length of %d", c.Delimiter, len(r))
}

return nil
}

func NewCsvStreamEncoder(cfg CsvConfig, b *pf.MaterializationSpec_Binding, w io.WriteCloser) StreamEncoder {
var opts []enc.CsvOption

if cfg.Delimiter != "" {
opts = append(opts, enc.WithCsvDelimiter([]rune(cfg.Delimiter)[0])) // already validated to be 1 byte in length
}
if cfg.NullString != "" {
opts = append(opts, enc.WithCsvNullString(cfg.NullString))
}
if cfg.SkipHeaders {
opts = append(opts, enc.WithCsvSkipHeaders())
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
github.com/klauspost/compress v1.17.9
github.com/marcboeker/go-duckdb v1.8.0
github.com/mattn/go-sqlite3 v2.0.3+incompatible
github.com/microsoft/go-mssqldb v0.21.0
github.com/microsoft/go-mssqldb v1.8.0
github.com/minio/highwayhash v1.0.2
github.com/mitchellh/mapstructure v1.5.0
github.com/pinecone-io/go-pinecone v1.1.1
Expand Down Expand Up @@ -104,7 +104,7 @@ require (
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.8.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.3.2 // indirect
github.com/DataDog/appsec-internal-go v1.6.0 // indirect
github.com/DataDog/datadog-agent/pkg/obfuscate v0.54.0 // indirect
github.com/DataDog/datadog-agent/pkg/remoteconfig/state v0.54.0 // indirect
Expand Down
47 changes: 8 additions & 39 deletions go.sum

Large diffs are not rendered by default.

224 changes: 224 additions & 0 deletions materialize-azure-fabric-warehouse/.snapshots/TestSQLGeneration
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
--- Begin "a-warehouse"."a-schema".key_value createTargetTable ---

CREATE TABLE "a-warehouse"."a-schema".key_value (
key1 BIGINT,
key2 BIT,
"key!binary" VARBINARY(MAX),
"array" VARCHAR(MAX),
"binary" VARBINARY(MAX),
"boolean" BIT,
flow_published_at DATETIME2(6),
"integer" BIGINT,
"integerGt64Bit" DECIMAL(38,0),
"integerWithUserDDL" DECIMAL(20),
multiple VARCHAR(MAX),
number FLOAT,
"numberCastToString" VARCHAR(MAX),
"object" VARCHAR(MAX),
string VARCHAR(MAX),
"stringInteger" DECIMAL(38,0),
"stringInteger39Chars" VARCHAR(MAX),
"stringInteger66Chars" VARCHAR(MAX),
"stringNumber" FLOAT,
flow_document VARCHAR(MAX)
);
--- End "a-warehouse"."a-schema".key_value createTargetTable ---

--- Begin "a-warehouse"."a-schema".delta_updates createTargetTable ---

CREATE TABLE "a-warehouse"."a-schema".delta_updates (
"theKey" VARCHAR(MAX),
"aValue" BIGINT,
flow_published_at DATETIME2(6)
);
--- End "a-warehouse"."a-schema".delta_updates createTargetTable ---

--- Begin alter table add columns ---

ALTER TABLE "a-warehouse"."a-schema".key_value ADD
first_new_column STRING,
second_new_column BOOL;
--- End alter table add columns ---

--- Begin Fence Update ---
UPDATE "path"."to".checkpoints
SET "checkpoint" = 'AAECAwQFBgcICQ=='
WHERE materialization = 'some/Materialization'
AND key_begin = 1122867
AND key_end = 4293844428
AND fence = 123;
--- End Fence Update ---

--- Begin "a-warehouse"."a-schema".key_value storeCopyIntoFromStagedQuery ---
CREATE TABLE flow_temp_table_store_0 (
key1 BIGINT,
key2 BIT,
"key!binary" VARCHAR(MAX),
"array" VARCHAR(MAX),
"binary" VARCHAR(MAX),
"boolean" BIT,
flow_published_at DATETIME2(6),
"integer" BIGINT,
"integerGt64Bit" DECIMAL(38,0),
"integerWithUserDDL" DECIMAL(20),
multiple VARCHAR(MAX),
number FLOAT,
"numberCastToString" VARCHAR(MAX),
"object" VARCHAR(MAX),
string VARCHAR(MAX),
"stringInteger" DECIMAL(38,0),
"stringInteger39Chars" VARCHAR(MAX),
"stringInteger66Chars" VARCHAR(MAX),
"stringNumber" FLOAT,
flow_document VARCHAR(MAX)
);

COPY INTO flow_temp_table_store_0
(key1, key2, "key!binary", "array", "binary", "boolean", flow_published_at, "integer", "integerGt64Bit", "integerWithUserDDL", multiple, number, "numberCastToString", "object", string, "stringInteger", "stringInteger39Chars", "stringInteger66Chars", "stringNumber", flow_document)
FROM 'https://some/file1', 'https://some/file2'
WITH (
FILE_TYPE = 'CSV',
COMPRESSION = 'Gzip',
CREDENTIAL = (IDENTITY='Storage Account Key', SECRET='some-storage-account-key')
);

INSERT INTO "a-warehouse"."a-schema".key_value (key1, key2, "key!binary", "array", "binary", "boolean", flow_published_at, "integer", "integerGt64Bit", "integerWithUserDDL", multiple, number, "numberCastToString", "object", string, "stringInteger", "stringInteger39Chars", "stringInteger66Chars", "stringNumber", flow_document)
SELECT key1, key2, BASE64_DECODE("key!binary"), "array", BASE64_DECODE("binary"), "boolean", flow_published_at, "integer", "integerGt64Bit", "integerWithUserDDL", multiple, number, "numberCastToString", "object", string, "stringInteger", "stringInteger39Chars", "stringInteger66Chars", "stringNumber", flow_document
FROM flow_temp_table_store_0;

DROP TABLE flow_temp_table_store_0;
--- End "a-warehouse"."a-schema".key_value storeCopyIntoFromStagedQuery ---

--- Begin "a-warehouse"."a-schema".key_value storeCopyIntoDirectQuery ---
COPY INTO "a-warehouse"."a-schema".key_value
(key1, key2, "key!binary", "array", "binary", "boolean", flow_published_at, "integer", "integerGt64Bit", "integerWithUserDDL", multiple, number, "numberCastToString", "object", string, "stringInteger", "stringInteger39Chars", "stringInteger66Chars", "stringNumber", flow_document)
FROM 'https://some/file1', 'https://some/file2'
WITH (
FILE_TYPE = 'CSV',
COMPRESSION = 'Gzip',
CREDENTIAL = (IDENTITY='Storage Account Key', SECRET='some-storage-account-key')
);
--- End "a-warehouse"."a-schema".key_value storeCopyIntoDirectQuery ---

--- Begin "a-warehouse"."a-schema".delta_updates storeCopyIntoFromStagedQuery ---
CREATE TABLE flow_temp_table_store_1 (
"theKey" VARCHAR(MAX),
"aValue" BIGINT,
flow_published_at DATETIME2(6)
);

COPY INTO flow_temp_table_store_1
("theKey", "aValue", flow_published_at)
FROM 'https://some/file1', 'https://some/file2'
WITH (
FILE_TYPE = 'CSV',
COMPRESSION = 'Gzip',
CREDENTIAL = (IDENTITY='Storage Account Key', SECRET='some-storage-account-key')
);

INSERT INTO "a-warehouse"."a-schema".delta_updates ("theKey", "aValue", flow_published_at)
SELECT "theKey", "aValue", flow_published_at
FROM flow_temp_table_store_1;

DROP TABLE flow_temp_table_store_1;
--- End "a-warehouse"."a-schema".delta_updates storeCopyIntoFromStagedQuery ---

--- Begin "a-warehouse"."a-schema".delta_updates storeCopyIntoDirectQuery ---
COPY INTO "a-warehouse"."a-schema".delta_updates
("theKey", "aValue", flow_published_at)
FROM 'https://some/file1', 'https://some/file2'
WITH (
FILE_TYPE = 'CSV',
COMPRESSION = 'Gzip',
CREDENTIAL = (IDENTITY='Storage Account Key', SECRET='some-storage-account-key')
);
--- End "a-warehouse"."a-schema".delta_updates storeCopyIntoDirectQuery ---

--- Begin "a-warehouse"."a-schema".key_value createLoadTable ---
CREATE TABLE flow_temp_table_load_0 (
key1 BIGINT,
key2 BIT,
"key!binary" VARCHAR(MAX)
);

COPY INTO flow_temp_table_load_0
(key1, key2, "key!binary")
FROM 'https://some/file1', 'https://some/file2'
WITH (
FILE_TYPE = 'CSV',
COMPRESSION = 'Gzip',
CREDENTIAL = (IDENTITY='Storage Account Key', SECRET='some-storage-account-key')
);
--- End "a-warehouse"."a-schema".key_value createLoadTable ---

--- Begin "a-warehouse"."a-schema".key_value loadQuery ---
SELECT 0, r.flow_document
FROM flow_temp_table_load_0 AS l
JOIN "a-warehouse"."a-schema".key_value AS r
ON l.key1 = r.key1 AND r.key1 >= 10 AND r.key1 <= 100
AND l.key2 = r.key2
AND BASE64_DECODE(l."key!binary") = r."key!binary"
--- End "a-warehouse"."a-schema".key_value loadQuery ---

--- Begin "a-warehouse"."a-schema".key_value dropLoadTable ---
DROP TABLE flow_temp_table_load_0;--- End "a-warehouse"."a-schema".key_value dropLoadTable ---

--- Begin "a-warehouse"."a-schema".key_value storeMergeQuery ---
CREATE TABLE flow_temp_table_store_0 (
key1 BIGINT,
key2 BIT,
"key!binary" VARCHAR(MAX),
"array" VARCHAR(MAX),
"binary" VARCHAR(MAX),
"boolean" BIT,
flow_published_at DATETIME2(6),
"integer" BIGINT,
"integerGt64Bit" DECIMAL(38,0),
"integerWithUserDDL" DECIMAL(20),
multiple VARCHAR(MAX),
number FLOAT,
"numberCastToString" VARCHAR(MAX),
"object" VARCHAR(MAX),
string VARCHAR(MAX),
"stringInteger" DECIMAL(38,0),
"stringInteger39Chars" VARCHAR(MAX),
"stringInteger66Chars" VARCHAR(MAX),
"stringNumber" FLOAT,
flow_document VARCHAR(MAX)
);

COPY INTO flow_temp_table_store_0
(key1, key2, "key!binary", "array", "binary", "boolean", flow_published_at, "integer", "integerGt64Bit", "integerWithUserDDL", multiple, number, "numberCastToString", "object", string, "stringInteger", "stringInteger39Chars", "stringInteger66Chars", "stringNumber", flow_document)
FROM 'https://some/file1', 'https://some/file2'
WITH (
FILE_TYPE = 'CSV',
COMPRESSION = 'Gzip',
CREDENTIAL = (IDENTITY='Storage Account Key', SECRET='some-storage-account-key')
);

DELETE r
FROM "a-warehouse"."a-schema".key_value AS r
INNER JOIN flow_temp_table_store_0 AS l
ON l.key1 = r.key1 AND r.key1 >= 10 AND r.key1 <= 100
AND l.key2 = r.key2
AND BASE64_DECODE(l."key!binary") = r."key!binary";

INSERT INTO "a-warehouse"."a-schema".key_value (key1, key2, "key!binary", "array", "binary", "boolean", flow_published_at, "integer", "integerGt64Bit", "integerWithUserDDL", multiple, number, "numberCastToString", "object", string, "stringInteger", "stringInteger39Chars", "stringInteger66Chars", "stringNumber", flow_document)
SELECT key1, key2, BASE64_DECODE("key!binary"), "array", BASE64_DECODE("binary"), "boolean", flow_published_at, "integer", "integerGt64Bit", "integerWithUserDDL", multiple, number, "numberCastToString", "object", string, "stringInteger", "stringInteger39Chars", "stringInteger66Chars", "stringNumber", flow_document
FROM flow_temp_table_store_0
WHERE flow_document <> '"delete"';

DROP TABLE flow_temp_table_store_0;
--- End "a-warehouse"."a-schema".key_value storeMergeQuery ---

--- Begin createMigrationTable
CREATE TABLE some_table_tmp AS SELECT
not_migrated_column,
CAST(is_migrated_column AS VARCHAR(MAX)) AS is_migrated_column,
another_not_migrated_column,
CAST(CASE WHEN migrated_boolean_column = 1 THEN 'true' WHEN migrated_boolean_column = 0 THEN 'false' ELSE NULL END AS VARCHAR(MAX)) AS migrated_boolean_column,
yet_another_not_migrated_column
FROM some_table;
--- End createMigrationTable ---


Loading
Loading