-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Flink: Dynamic Iceberg Sink Contribution #12424
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Co-authored-by: Peter Vary <[email protected]>
Co-authored-by: Peter Vary <[email protected]>
eb5ba7e
to
11c9082
Compare
@mxm Thanks for the nice work. It will be much easier to review and revise only Flink 1.20 parts in this PR, and back-port to Flink 1.19 afterwards. |
This reverts commit 5a16da0.
Hey @manuzhang! Sure, I've reverted the 1.19 changes. Let me know what you think about the Dynamic Sink. CC @pvary @stevenzwu |
if (userSpec.isUnpartitioned()) { | ||
return userSpec; | ||
} | ||
UnboundPartitionSpec.Builder builder = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the purpose of the adjustment? there is no change in PartitionSpec. just rebind it with new table schema?
Can we use PartitionSpec#toUnbound()
here and avoid changing api
module?
I see the change is mainly to support this type of change. It is to support the case that two schemas (old and new) used the same field name but have different field ids.
String sourceFieldName = userSpec.schema().idToName().get(field.sourceId());
int adjustedSourceId = tableSchema.findField(sourceFieldName).fieldId();
I guess it is like this scenario
- rename field "foo" to "bar" (with id = 9)
- create a new field called "foo" (with id = 15)
while still want to maintaining the partition spec on the field name "foo" after the schema change but bound to the new field 15. wondering if this is a behavior we want to support. I think this result can be support with deletion of old partition field and addition of a new partition field. that will be a more clear intention than silently changing the binding of an existing partition field to a new source field based on field name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A little bit of background: The Dynamic Sink matches field names based on their names, not their ids. The reason for this is that (a) the Iceberg Java API solely works based on names (b) we found that it's difficult for users to use the correct ids based on the current table schema / spec. The ids are usually dictated by the input schema, e.g. Avro which might use different ids, e.g. starting from 0 instead of 1.
The adjustment here ensures that the user-provided spec alongside with its schema can be mapped to one of the existing schema / spec. This is to avoid creating unnecessary table schema / spec when the schema / spec is otherwise identical.
For example:
- Input schema {"id", 0, int} Input spec: {bucket[0][10]}
- Table schema: {"id, 1, int} Table spec: {bucket[1][10]}
The logic would rewrite the input spec to match the table spec, because field id 0 ("id") in the input schema matches field id 1 ("id") in the table schema. This is then later used to resolved the table spec from the cache.
The rewrite isn't strictly necessary though, we can achieve the same by simply comparing. I've revised the logic in the latest commit. It's now integrated directly with cache lookup / update logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example:
Input schema {"id", 0, int} Input spec: {bucket[0][10]}
Table schema: {"id, 1, int} Table spec: {bucket[1][10]}
I don't fully get the example. In PartitionSpecEvolution
, src/input spec is Iceberg's PartitionSpec
. why would the input spec different with the table spec in terms of field ids? how is the input spec constructed (especially regarding source field name and id)? if you have a unit test to demonstrate the scenario, please point it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The input spec is what the user provides in DynamicRecord. The input schema is built by the user, e.g. from an Avro schema, and can be different form the actual Iceberg schema, e.g. the ids used don't necessarily have to match with the ids in the Iceberg schema, new fields added, etc.. The same goes for the PartitionSpec, it is bound to the input schema and references source ids which don't necessarily match with the Iceberg schema.
Given these constraints, to still be able to meaningfully evolve the schema, the logic around schema / spec evolution uses field names, not field ids. This is a decision we took after running PoCs with multiple customers. In the future, we might add a "strict" mode, which uses field ids, but so far customers consistently wanted name-based schema / spec evolution.
The goal is to translate the input schema / spec such that they either match with one of the existing schema / spec (check for schema / check for spec), or update the table schema / spec with the minimal amount of changes required.
If we didn't adapt the input schema / spec to table schema or the other existing schemas, we would go through a schema / spec change for every record. Not good for table metadata or catalog access / performance. We would also "lose" fields as soon as their ids changed.
if you have a unit test to demonstrate the scenario, please point it here.
Here is an example where we test compatibility of spec1 (the input spec) to spec2 (table spec). No spec update will be performance in this case:
Line 74 in d7e65c8
void testCompatibleWithNonMatchingSourceIds() { |
Here is the same test, but we change the spec because its buckets changed:
Line 96 in d7e65c8
void testPartitionSpecEvolution() { |
private PartitionSpecEvolver() {} | ||
|
||
/** | ||
* Checks whether two PartitionSpecs are compatible with each other. Less strict than {@code |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you share why a different/relaxed compatibility check is needed? what are the scenarios?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default PartitionSpec.equals() compares the specs based on the transform name and source ids. Since we don't enforce source ids to match in the Dynamic Sink, but check the actual field names, we need the custom logic.
Also see #12424 (comment).
} | ||
|
||
DataStream<DynamicRecordInternal> distributeDataStream(DataStream<DynamicRecordInternal> input) { | ||
return input.keyBy(DynamicRecordInternal::writerKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keyBy
may result in unbalanced traffic distribution. let's assume a scenario where the dynamic sink started with a single table or single table has the dominant amount of traffic. and distribution mode is none
. if the cardinality of writerKey
is high (e.g. 100x) compared to the write parallelism and each writerKey slice is small enough to not cause significant skew, then keyBy
can work reasonably well.
But the cardinality of writerKey
is relatively small, we may see significant traffic skew using keyBy
. The author of this closed PR described this problem using keyBy
. This is where we added the more generally applicable range distribution that we added to the Flink Iceberg sink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are absolutely right that a normal keyBy would result in unbalanced traffic distribution, at least when the key to value ratio is not equal. But it is important (and not straight-forward) to understand how the writerKey is generated.
First of all, writeParallelism dictates how many subtasks will receive data. WriterKeys will then be chosen to match a key group for each of these subtasks. This is done by TargetLimitedKeySelector
which wraps around all other key selectors and ensures that the data will hit a set of subtasks. Specifically, this is done in this loop which probes for the right key to hit a key group in the right subtask. The default with DistributionMode NONE is to do round-robin. For HASH, we make sure to bound the keyspace to the set of subtasks according to the table writeParallelism.
Note that, principally, this method does not contradict with RANGE distribution, which can be implemented just as well on the Dynamic Sink. For range distribution, we would select a subtask depending on the data distribution.
Essentially, what we are doing here is a custom partitioner, with the added advantage of being able to use keyed state. We currently don't use state, but that is a great advantage. Naturally, this way of distributing data could be replaced with a Flink (custom) Partitioner, but using Flink state would then not be an option anymore.
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicKeySelector.java
Show resolved
Hide resolved
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicKeySelector.java
Show resolved
Hide resolved
Quick recap on a discussion Steven, Peter, and I had in a separate channel, which is only partially captured in the PR comments: Iceberg SchemaDynamic Sink users currently need to provide an Iceberg Schema, which also requires them to define field ids. However, correctly defining field ids is cumbersome and error prone. That's why the sink is designed to ignore field ids and instead compare fields by their names. This works also for nested fields. We enforce field name uniqueness on every level of the schema, just like Iceberg. Iceberg internally uses ids because they allow to uniquely identify a field, e.g. if a field has been removed and a new field gets added with the same name, the new field would not contain data of the old field. However, in the Dynamic Sink we don't allow field deletions, as this operation would not be safe with the schema migration logic of the sink. We wouldn't want to remove a field, only to re-add it later on, e.g. in case of out of order data. This would only work reliably if the user supplied accurate field ids which we found to be unrealistic. Consequently, we never use the user-supplied schema directly. Instead, we compare the user-supplied schema with the existing table schemas. If they are identical, we write with the matching table schema. Otherwise, we migrate the current table schema by deriving a set of migration steps, e.g. add field, widen type, make field optional, change position of field. In the process, we also rewrite the user-provided PartitionSpec (if any), which refers to a field id in the user-provided schema, to refer to the field id with the same name in the table schema. Alternatives to Iceberg SchemaSteven wondered whether Iceberg's Since Avro is a common input schema type, we could think about supporting it as well. Directly supporting different schema types could be useful for users. Internally we will still have to use Iceberg Schema, so maybe we can give users the option to supply different schema types in addition to Iceberg Schema. Using keyBy (hash distribution) for distributing recordsThe Dynamic Sink currently uses keyBy for distribution modes NONE and HASH. The idea is that the Dynamic Sink works with a large number of tables which all have an individually configured write parallelism. In the case of NONE, we don't just hash distribute the data across the subtasks, but instead probe a keyGroup each for N subtasks, where N is the write parallelism. We then round-robin distribute the data across these tasks. The whole process is offset by a salt which is derived from the table name. This ensures that the distribution mode and write parallelism can dynamically be changed while retaining a balanced amount of work across all subtasks. For Hash, the process works similarly, only that we do a hash-based assignment of the deterministically random chosen N subtasks. Using a Flink custom partitioner, we could achieve a similar mechanism, but it would require us to implement keyBy on top of it which might not be optimal. We would also lose the ability to use Flink's keyed state. Catalog AccessThe Dynamic Sink needs to access the Iceberg catalog to update the table metadata. The Dynamic sink supports two update modes when an unknown table schema is detected:
Schemas and other table data is cached in TableDataCache which allows refreshing the table on a regular interval. The default is to allow this once per second. We agreed that the load of option (1) on the catalog will be quite high if the number of table updates and the write parallelism of those tables are high. Option (2) will scale quite well even with many updates and high write parallelism. |
Yes!!! I'm excited about this feature!
This would be nice for Wikimedia Foundation. We use JSONSchema and have tooling to automatically convert to Flink TypeInformation (and also Table API DataTypes). We do a similar thing to what you are all describing here when creating and evolving Hive Parquet tables with Spark. We provide conversions from JSONSchema to Spark StructType (analogous to Flink's RowType), and then use the StructType to determine how to create or evolve the table via DDL statements. This allows us to automate ingestion into various downstream systems from JSON streams in Kafka by using our managed event JSONSchema registry as the canonical data schema source.
I might be ignorant here, but I think if you support RowType, you will also get Avro support? I haven't used it, but Flink has a built in schema converter for Avro to TypeInformation? |
…nalSerializer This adds the user-facing type DynamicRecord, alongside with its internal representation DynamicRecordInternal and its type information and serializer. Broken out of github.com/apache/pull/12424.
…nalSerializer This adds the user-facing type DynamicRecord, alongside with its internal representation DynamicRecordInternal and its type information and serializer. Broken out of github.com/apache/pull/12424.
…nalSerializer This adds the user-facing type DynamicRecord, alongside with its internal representation DynamicRecordInternal and its type information and serializer. Broken out of github.com/apache/pull/12424.
This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of apache#12424, depends on apache#12996.
…ternal / DynamicRecordInternalSerializer This adds the user-facing type DynamicRecord, alongside with its internal representation DynamicRecordInternal and its type information and serializer. Broken out of github.com/apache/pull/12424.
This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of apache#12424, depends on apache#12996.
This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of apache#12424, depends on apache#12996.
This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of apache#12424, depends on apache#12996.
…mparison and evolution This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of apache#12424, depends on apache#12996.
This adds the dynamic version of the writer and committer for the Flink Dynamic Iceberg Sink. Conceptually, they work similar to the IcebergSink, but they support writing to multiple tables. Write results from each table are aggregated from the DynamicWriter in the DynamicWriteResultAggregator, from where they are sent to the DynamicCommitter. Broken out of apache#12424, depends on apache#13032.
This adds the dynamic version of the writer and committer for the Flink Dynamic Iceberg Sink. Conceptually, they work similar to the IcebergSink, but they support writing to multiple tables. Write results from each table are aggregated from the DynamicWriter in the DynamicWriteResultAggregator, from where they are sent to the DynamicCommitter. Broken out of apache#12424, depends on apache#13032.
I decided to break this down into several PRs. See the description for the list: #12424 (comment) |
int salt, | ||
int writeParallelism, | ||
int maxWriteParallelism) { | ||
if (writeParallelism > maxWriteParallelism) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this work with flink autoscaling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great question! In some way, yes. The table writeParallelism
s will be capped at the current parallelism of the sink (maxWriteParallelism
) which will be determined by Flink Autoscaling. This allows the sink to be scaled down, effectively reducing the write parallelism. On the other hand, the write parallelism of a table will never exceed the maxWriteParallelism
, so we can't scale beyond the table writeParallelism
.
Could you elaborate what you would expect to happen with autoscaling enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably want each table's writeParallelism to change dynamically, similarly to how autoscaling works with multiple Iceberg sinks. That would require us to move away from the fixed writeParallelism config. I'm thinking that the solution would be two-fold:
- Flink Autoscaling would continue to scale the Dynamic Sink just like any operator
- Dynamic Sink would additionally measure and correct table congestion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably want each table's writeParallelism to change dynamically, similarly to how autoscaling works with multiple Iceberg sinks
Indeed this is the part I am thinking about in my use-case. I'm quite new to flink in general, so forgive me if I don't make sense! So from my observations:
- Definitely holds. I see the autoscaling.
- I guess we don't have this functionality yet. I'd be interested to see if it's possible though!
Even still, I'm thinking there might be a bit of a snag with auto scaling and the PartitionKeySelector.
Say there are N streams of data, potentially from N kafka-topic partitions, but each topic partition has data that will eventually be written to a single iceberg partition A.
The writing tends to be slower than the reading in this case, so we should get backpressure, and as I understand it, flink should autoscale the writing subtask and increase the number of subtasks. The problem is that I think with the PartitionKeySelector all data will be hashed onto the same writing subtask, because they are all getting written to partition A, meaning we scale up the number of subtasks, but only one will be doing any work.
Potentially the round robin selector makes more sense in this scenario, however I do still wonder if it's possible to set the max parallelism there to match the auto scaling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Say there are N streams of data, potentially from N kafka-topic partitions, but each topic partition has data that will eventually be written to a single iceberg partition A.
If you mean Iceberg table partition, we can still write multiple partition files for an Iceberg partition in NONE distribution mode. Every subtask will have a data file for Iceberg partition A. The writes will be distributed.
In general, Flink partition skew is an issue which needs to be tackled at the job level. Autoscaling cannot do anything about that. But it has ways to detect ineffective scaling decisions and avoid those. Generally, partition skew in Iceberg can be avoided by using none distribution mode (default) or range distribution mode. Hash-based partitioning is obviously prone to the partition skew problem you described.
Potentially the round robin selector makes more sense in this scenario, however I do still wonder if it's possible to set the max parallelism there to match the auto scaling.
Round-robin is the default. In a scenario with hash partitioning, where all data goes to one subtask / Iceberg partition, we won't benefit from setting the writeParallelism, but for other scenarios we will.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep that makes sense! The None distribution (Round Robin Selection) works best here.
Description
The Flink Iceberg sink allows writing data from a continuous Flink stream to an Iceberg table. It already powers countless production jobs and has passed the test of time. But for many users, the static nature of the Flink Iceberg sink became a serious limitation. The target table, its schema and partitioning spec, and even the write parallelism are statically configured. Any changes to these require rebuilding and restarting the underlying Flink job.
We present the next generation Flink Iceberg sink, the Dynamic Flink Iceberg Sink.
From the user perspective, there are three main advantages to using the Dynamic Iceberg sink. It supports:
All of this is done from within the sink, controlled by the user via the DynamicRecord class, without any Flink job restart.
Design document: https://docs.google.com/document/d/1R3NZmi65S4lwnmNjH4gLCuXZbgvZV5GNrQKJ5NYdO9s/edit
Major credits to @pvary who came up with the design and wrote the initial implementation.
Benchmarking
We have done some benchmarking to compare the old Iceberg sink with the Dynamic Iceberg sink. We found that the performance overhead of the Dynamic Sink is neglectable (see the results below). Of course, the real power of the Dynamic Sink unfolds for writing to multiple tables which is not supported in the regular Iceberg sink.
Breakdown PRs of this PR: