-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
materialize-azure-fabric-warehouse: new materialization connector #2300
Conversation
Creates a component that can be used for the common pattern of writing out serialized data to a staging location for later committing to the destination, or using in some other way. We've repeated very similar code for many existing SQL warehouse materializations and it felt like it was time to consolidate the common behaviors. The immediate use will be in the the new Azure Fabric Warehouse materialization. It could be retroactively incorporated into existing materializations in order to simplify them, but I'm not going to do that right now, at least until it has been proven with the brand new one.
The integration tests for materialize-azure-fabric-warehouse. Adding these as a separate commit for ease of review. The actual connector code that can run them will be in a subsequent commit.
Basic working version of the Azure Fabric Warehouse materialization. Fabric Warehouse is a relatively new product, and perhaps because of that it is missing some features typical of other SQL warehouse materializations: * No merge query support: This means we must delete rows from the target table that already exist, before inserting the updated rows. I don't think this is all that much different than what a true merge query would do, but it's a little more cumbersome to build queries for. * No ability to read from staged files directly; instead we must load them into "temporary" tables using a COPY INTO query. * Fabric Warehouse recently added support for temporary tables, but unfortunately COPY INTO does not work with them. An actual table must be created, COPY INTO'd, and then later dropped when it is not needed. Fabric Warehouse does at least support creating and dropping tables within a transaction, and this is utilized to ensure we don't leave tables hanging around. * No idempotent COPY INTO, which makes a post-commit apply strategy with exactly-once standard and delta updates impossible. Transactions are supported though, and there don't appear to be any strange limitations on them like number of tables in a single transaction, so the authoritative endpoint pattern works well. Other than these main limitations, a decent range of data types is supported, and the materialization seems to work pretty well. Later commits will add support for additional features like column migrations, and some additional optimizations.
Column migrations can't be done with the typical column renaming strategy since Fabric Warehouse doesn't support dropping columns from a table, but it does support creating, dropping, and renaming columns within a transaction. It seems that the accept workaround for migrating column types is to create a new table, copying the unchanged column values and casting the changed ones as appropriate, and then swap that table for the old one by dropping the old one and renaming the new one.
The previous commit for Fabric Warehouse modified the column migration tests to show how integer column widening works, where a schema that was previously compatible with a 64-bit integer column is now known to be too large for that, but not so large as to not fit into some other exact integer representation column. This adds support for all of the other SQL migrations for integer widening migrations, and updates their test snapshots.
Binary columns can be created as VARBINARY(MAX), and binary data can be loaded into them using the `BASE64_DECODE` from staged base64-encoded strings. There's a bit of extra complexity for templating out where this decoding is necessary, and dealing with binary keys and their required comparison operations. This seems to get the job done, although it makes the query templates harder to comprehend.
…zations Track the range of observed keys for load and merge queries, and provide query hints to hopefully limit the amount of data that must be scanned when running these queries. We have done similar merge query optimizations for materialize-bigquery and materialize-snowflake. This is the first example of also doing it for load queries.
… files Each file needs to be enclosed in single quotes, rather than the entire list of files.
A string field that is present in a document with a value of 0-length, AKA the string "", is different than a string field that is absent. The stdlib Go CSV writer does not allow for such a distinction to be made, since all fields must be strings and there is no way to indicate an "absent" string. This commit implements a custom CSV writer that can tell the difference between an absent and 0-length string. An absent string gets "skipped", with no value at all placed between the commas of a row. A 0-length string gets quoted to just be "". Other strings get quoted if they have special characters requiring quotes; otherwise they are written as-is. This new CSV writer is inspired by the Go stdlib CSV writer, with the required additions for absent vs. empty string. Some of the extra configuration that we generally don't need has been stripped out like specifying a special NULL string, and using a custom "comma" value - nobody has ever used these options in our filesink materializations, and it seems reasonable to not support them unless somebody requests them down the line. This custom CSV writer also does not use an additional internal buffer, since this is redundant with the buffer(s) that are inevitably used by its outputs.
a0fdce4
to
e4a5d29
Compare
@@ -41,7 +41,7 @@ Migratable Changes Before Apply Schema: | |||
|
|||
Migratable Changes Before Apply Data: | |||
key (STRING), _meta_flow_truncated (BOOLEAN), boolWidenedToJson (BOOLEAN), dateValue (DATE), datetimeValue (TIMESTAMP), flow_published_at (TIMESTAMP), int64 (INTEGER), intWidenedToJson (INTEGER), multiple (JSON), nonScalarValue (STRING), numericString (BIGNUMERIC), optional (STRING), requiredNumeric (BIGNUMERIC), scalarValue (STRING), stringWidenedToJson (STRING), timeValue (STRING), flow_document (STRING) | |||
1, false, true, 2024-01-01, 2024-01-01 01:01:01.111111 +0000 UTC, 2024-09-13 01:01:01 +0000 UTC, 1, 999, <nil>, <nil>, 123/1, <nil>, 456/1, test, hello, 01:01:01, {} | |||
1, false, true, 2024-01-01, 2024-01-01 01:01:01.111111 +0000 UTC, 2024-09-13 01:01:01 +0000 UTC, 1, 999, <nil>, <nil>, 12300000000000000000000000000000000000000/100000000000000000000000000000000000000, <nil>, 45600000000000000000000000000000000000000/100000000000000000000000000000000000000, test, hello, 01:01:01, {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The inclusion of all the extra 0's here seems to not be directly related to anything in this PR, but is rather an inconsequential change in how the bigquery client retrieves values, which must have arisen from some other change that upgraded its version.
@@ -85,6 +85,6 @@ Migratable Changes Applied Schema: | |||
|
|||
|
|||
Migratable Changes Applied Data: | |||
key (STRING), _meta/flow_truncated (BOOLEAN), boolWidenedToJson (BOOLEAN), flow_published_at (TIMESTAMP), int64 (BIGINT), multiple (STRING), nonScalarValue (STRING), optional (STRING), scalarValue (STRING), stringWidenedToJson (STRING), timeValue (STRING), flow_document (STRING), dateValue (STRING), datetimeValue (STRING), intWidenedToJson (STRING), numericString (STRING), requiredNumeric (STRING) | |||
1, false, true, 2024-09-13T01:01:01Z, 1, <nil>, <nil>, <nil>, test, hello, 01:01:01, {}, 2024-01-01, 2024-01-01T01:01:01.111111000Z, 999, 123, 456 | |||
key (STRING), _meta/flow_truncated (BOOLEAN), boolWidenedToJson (BOOLEAN), flow_published_at (TIMESTAMP), multiple (STRING), nonScalarValue (STRING), optional (STRING), scalarValue (STRING), stringWidenedToJson (STRING), timeValue (STRING), flow_document (STRING), dateValue (STRING), datetimeValue (STRING), int64 (DECIMAL), intWidenedToJson (STRING), numericString (STRING), requiredNumeric (STRING) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this and some of the other snapshots, the ordering of columns appears to have changed and I'm not 100% sure what that is, but it shouldn't make any difference in how the connector operates.
Some baseline docs for the new fabric warehouse materialization. See estuary/connectors#2300
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM based on my experience with materializations so far.
// flushOnNextBinding can be set to flush the file stream whenever a new binding | ||
// (by index) has a row encoded for it. Flushing the file stream will result in | ||
// the current streaming encoder being closed and flushed, which concludes the | ||
// current file being written. Any further writes to that same binding will | ||
// start a new file, so this should usually only be enabled for encoding rows | ||
// received from Store requests, where the documents are always in monotonic | ||
// order with respect to their binding index. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my understanding, what's the benefit of setting flushOnNextBinding
to true
? The frequency of written files would increase, so is the purpose to decrease latency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Streaming writes to object stores (S3, GCS, etc.) will actually buffer a little bit (or more than a little bit) of data so that it is uploaded in chunks, or even parts of a multipart upload. If there is a materialization with lots of bindings, we end up with these dangling chunks of buffered data that isn't flushed out of memory until we actually flush it, so cycling through lots of bindings in a single transaction cycle can cause the connector to use excessive amounts of memory with these dangling chunks that are waiting to be flushed.
Flushing when the binding changes over for stores helps with this, since Store requests are guaranteed to be group by their binding index, and in increasing order, so we'll always see stores for bindings in orders like 1, 1, 1, 3, 3, 3, 4...etc, but never 1, 1, 1, 3, 3, 1. So once we have observed the change from binding 1 -> 3, we know that we can flush the file for binding 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, that makes sense. Thanks for the explanation!
Some baseline docs for the new fabric warehouse materialization. See estuary/connectors#2300
Description:
New connector to materialize to Microsoft Azure Fabric Warehouse.
Most of the code is new an specific to the
materialize-azure-fabric-warehouse
connector itself, and is more thoroughly described in the commit messages.There are some changes to common packages/other connectors as well:
MergeBoundsBuilder
to support not building bounds on binary columns. At this point we won't bother trying to establish a minimum/maximum for fields that may be materialized as actual binary values, which is something that Fabric warehouse has implemented.Closes #2249
Closes #2301
Workflow steps:
(How does one use this feature, and how has it changed)
Documentation links affected:
materialize-s3-csv
andmaterialize-gcs-csv
since they will have fewer configuration knobs: Also in docs: doc uppdates for materialize-azure-fabric-warehouse flow#1901Notes for reviewers:
(anything that might help someone review this PR)
This change is