Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

materialize-azure-fabric-warehouse: new materialization connector #2300

Merged
merged 10 commits into from
Feb 4, 2025

Conversation

williamhbaker
Copy link
Member

@williamhbaker williamhbaker commented Jan 27, 2025

Description:

New connector to materialize to Microsoft Azure Fabric Warehouse.

Most of the code is new an specific to the materialize-azure-fabric-warehouse connector itself, and is more thoroughly described in the commit messages.

There are some changes to common packages/other connectors as well:

  • Generally supporting integer type widening for SQL materializations, since I was doing it for this Fabric materializations and it was relatively easy to implement across the board
  • Changes to the MergeBoundsBuilder to support not building bounds on binary columns. At this point we won't bother trying to establish a minimum/maximum for fields that may be materialized as actual binary values, which is something that Fabric warehouse has implemented.
  • Differentiation between empty strings and absent strings in the common streaming CSV encoder, which Fabric uses. This is accomplished by implementing a custom CSV writer and using that instead of the Go stdlib version, which cannot support such a differentiation. This may make CSV encoding a more viable option for other materializations as well, for example duckdb has optimized CSV reading capabilities, and so it may be faster than reading JSON files.

Closes #2249
Closes #2301

Workflow steps:

(How does one use this feature, and how has it changed)

Documentation links affected:

Notes for reviewers:

(anything that might help someone review this PR)


This change is Reviewable

Sorry, something went wrong.

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
Creates a component that can be used for the common pattern of writing out
serialized data to a staging location for later committing to the destination,
or using in some other way. We've repeated very similar code for many existing
SQL warehouse materializations and it felt like it was time to consolidate the
common behaviors.

The immediate use will be in the the new Azure Fabric Warehouse materialization.
It could be retroactively incorporated into existing materializations in order
to simplify them, but I'm not going to do that right now, at least until it has
been proven with the brand new one.
The integration tests for materialize-azure-fabric-warehouse. Adding these as a
separate commit for ease of review. The actual connector code that can run them
will be in a subsequent commit.
Basic working version of the Azure Fabric Warehouse materialization.

Fabric Warehouse is a relatively new product, and perhaps because of that it is
missing some features typical of other SQL warehouse materializations:

* No merge query support: This means we must delete rows from the target table
  that already exist, before inserting the updated rows. I don't think this is
  all that much different than what a true merge query would do, but it's a
  little more cumbersome to build queries for.
* No ability to read from staged files directly; instead we must load them into
  "temporary" tables using a COPY INTO query.
* Fabric Warehouse recently added support for temporary tables, but
  unfortunately COPY INTO does not work with them. An actual table must be
  created, COPY INTO'd, and then later dropped when it is not needed. Fabric
  Warehouse does at least support creating and dropping tables within a
  transaction, and this is utilized to ensure we don't leave tables hanging
  around.
* No idempotent COPY INTO, which makes a post-commit apply strategy with
  exactly-once standard and delta updates impossible. Transactions are supported
  though, and there don't appear to be any strange limitations on them like
  number of tables in a single transaction, so the authoritative endpoint
  pattern works well.

Other than these main limitations, a decent range of data types is supported,
and the materialization seems to work pretty well.

Later commits will add support for additional features like column migrations,
and some additional optimizations.
Column migrations can't be done with the typical column renaming strategy since
Fabric Warehouse doesn't support dropping columns from a table, but it does
support creating, dropping, and renaming columns within a transaction.

It seems that the accept workaround for migrating column types is to create a
new table, copying the unchanged column values and casting the changed ones as
appropriate, and then swap that table for the old one by dropping the old one
and renaming the new one.
The previous commit for Fabric Warehouse modified the column migration tests to
show how integer column widening works, where a schema that was previously
compatible with a 64-bit integer column is now known to be too large for that,
but not so large as to not fit into some other exact integer representation
column.

This adds support for all of the other SQL migrations for integer widening
migrations, and updates their test snapshots.
Binary columns can be created as VARBINARY(MAX), and binary data can be loaded
into them using the `BASE64_DECODE` from staged base64-encoded strings.

There's a bit of extra complexity for templating out where this decoding is
necessary, and dealing with binary keys and their required comparison
operations.

This seems to get the job done, although it makes the query templates harder to
comprehend.
…zations

Track the range of observed keys for load and merge queries, and provide query
hints to hopefully limit the amount of data that must be scanned when running
these queries.

We have done similar merge query optimizations for materialize-bigquery and
materialize-snowflake. This is the first example of also doing it for load
queries.
… files

Each file needs to be enclosed in single quotes, rather than the entire list of
files.
A string field that is present in a document with a value of 0-length, AKA the
string "", is different than a string field that is absent.

The stdlib Go CSV writer does not allow for such a distinction to be made,
since all fields must be strings and there is no way to indicate an "absent"
string.

This commit implements a custom CSV writer that can tell the difference between
an absent and 0-length string. An absent string gets "skipped", with no value at
all placed between the commas of a row. A 0-length string gets quoted to just be
"". Other strings get quoted if they have special characters requiring quotes;
otherwise they are written as-is.

This new CSV writer is inspired by the Go stdlib CSV writer, with the required
additions for absent vs. empty string. Some of the extra configuration that we
generally don't need has been stripped out like specifying a special NULL
string, and using a custom "comma" value - nobody has ever used these options in
our filesink materializations, and it seems reasonable to not support them
unless somebody requests them down the line. This custom CSV writer also does
not use an additional internal buffer, since this is redundant with the
buffer(s) that are inevitably used by its outputs.
@@ -41,7 +41,7 @@ Migratable Changes Before Apply Schema:

Migratable Changes Before Apply Data:
key (STRING), _meta_flow_truncated (BOOLEAN), boolWidenedToJson (BOOLEAN), dateValue (DATE), datetimeValue (TIMESTAMP), flow_published_at (TIMESTAMP), int64 (INTEGER), intWidenedToJson (INTEGER), multiple (JSON), nonScalarValue (STRING), numericString (BIGNUMERIC), optional (STRING), requiredNumeric (BIGNUMERIC), scalarValue (STRING), stringWidenedToJson (STRING), timeValue (STRING), flow_document (STRING)
1, false, true, 2024-01-01, 2024-01-01 01:01:01.111111 +0000 UTC, 2024-09-13 01:01:01 +0000 UTC, 1, 999, <nil>, <nil>, 123/1, <nil>, 456/1, test, hello, 01:01:01, {}
1, false, true, 2024-01-01, 2024-01-01 01:01:01.111111 +0000 UTC, 2024-09-13 01:01:01 +0000 UTC, 1, 999, <nil>, <nil>, 12300000000000000000000000000000000000000/100000000000000000000000000000000000000, <nil>, 45600000000000000000000000000000000000000/100000000000000000000000000000000000000, test, hello, 01:01:01, {}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inclusion of all the extra 0's here seems to not be directly related to anything in this PR, but is rather an inconsequential change in how the bigquery client retrieves values, which must have arisen from some other change that upgraded its version.

@@ -85,6 +85,6 @@ Migratable Changes Applied Schema:


Migratable Changes Applied Data:
key (STRING), _meta/flow_truncated (BOOLEAN), boolWidenedToJson (BOOLEAN), flow_published_at (TIMESTAMP), int64 (BIGINT), multiple (STRING), nonScalarValue (STRING), optional (STRING), scalarValue (STRING), stringWidenedToJson (STRING), timeValue (STRING), flow_document (STRING), dateValue (STRING), datetimeValue (STRING), intWidenedToJson (STRING), numericString (STRING), requiredNumeric (STRING)
1, false, true, 2024-09-13T01:01:01Z, 1, <nil>, <nil>, <nil>, test, hello, 01:01:01, {}, 2024-01-01, 2024-01-01T01:01:01.111111000Z, 999, 123, 456
key (STRING), _meta/flow_truncated (BOOLEAN), boolWidenedToJson (BOOLEAN), flow_published_at (TIMESTAMP), multiple (STRING), nonScalarValue (STRING), optional (STRING), scalarValue (STRING), stringWidenedToJson (STRING), timeValue (STRING), flow_document (STRING), dateValue (STRING), datetimeValue (STRING), int64 (DECIMAL), intWidenedToJson (STRING), numericString (STRING), requiredNumeric (STRING)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this and some of the other snapshots, the ordering of columns appears to have changed and I'm not 100% sure what that is, but it shouldn't make any difference in how the connector operates.

@williamhbaker williamhbaker requested a review from a team January 27, 2025 20:17
williamhbaker added a commit to estuary/flow that referenced this pull request Jan 27, 2025
Some baseline docs for the new fabric warehouse materialization.

See estuary/connectors#2300
@williamhbaker williamhbaker requested review from Alex-Bair and removed request for a team February 4, 2025 16:49
Copy link
Member

@Alex-Bair Alex-Bair left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM based on my experience with materializations so far.

Comment on lines +59 to +65
// flushOnNextBinding can be set to flush the file stream whenever a new binding
// (by index) has a row encoded for it. Flushing the file stream will result in
// the current streaming encoder being closed and flushed, which concludes the
// current file being written. Any further writes to that same binding will
// start a new file, so this should usually only be enabled for encoding rows
// received from Store requests, where the documents are always in monotonic
// order with respect to their binding index.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my understanding, what's the benefit of setting flushOnNextBinding to true? The frequency of written files would increase, so is the purpose to decrease latency?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streaming writes to object stores (S3, GCS, etc.) will actually buffer a little bit (or more than a little bit) of data so that it is uploaded in chunks, or even parts of a multipart upload. If there is a materialization with lots of bindings, we end up with these dangling chunks of buffered data that isn't flushed out of memory until we actually flush it, so cycling through lots of bindings in a single transaction cycle can cause the connector to use excessive amounts of memory with these dangling chunks that are waiting to be flushed.

Flushing when the binding changes over for stores helps with this, since Store requests are guaranteed to be group by their binding index, and in increasing order, so we'll always see stores for bindings in orders like 1, 1, 1, 3, 3, 3, 4...etc, but never 1, 1, 1, 3, 3, 1. So once we have observed the change from binding 1 -> 3, we know that we can flush the file for binding 1.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, that makes sense. Thanks for the explanation!

williamhbaker added a commit to estuary/flow that referenced this pull request Feb 4, 2025
Some baseline docs for the new fabric warehouse materialization.

See estuary/connectors#2300
@williamhbaker williamhbaker merged commit aacc2a5 into main Feb 4, 2025
53 of 55 checks passed
@williamhbaker williamhbaker deleted the wb/fabric-warehouse branch February 4, 2025 20:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants