Skip to content

Flink: Dynamic Iceberg Sink Contribution #12424

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

mxm
Copy link
Contributor

@mxm mxm commented Feb 28, 2025

Description

The Flink Iceberg sink allows writing data from a continuous Flink stream to an Iceberg table. It already powers countless production jobs and has passed the test of time. But for many users, the static nature of the Flink Iceberg sink became a serious limitation. The target table, its schema and partitioning spec, and even the write parallelism are statically configured. Any changes to these require rebuilding and restarting the underlying Flink job.

We present the next generation Flink Iceberg sink, the Dynamic Flink Iceberg Sink.

From the user perspective, there are three main advantages to using the Dynamic Iceberg sink. It supports:

  1. Writing to any number of tables (No more 1:1 sink/table relationship).
  2. Dynamically creating and updating tables based on a user-supplied routing.
  3. Dynamically updating the schema and partition spec of tables.

All of this is done from within the sink, controlled by the user via the DynamicRecord class, without any Flink job restart.

Design document: https://docs.google.com/document/d/1R3NZmi65S4lwnmNjH4gLCuXZbgvZV5GNrQKJ5NYdO9s/edit

Major credits to @pvary who came up with the design and wrote the initial implementation.

Benchmarking

We have done some benchmarking to compare the old Iceberg sink with the Dynamic Iceberg sink. We found that the performance overhead of the Dynamic Sink is neglectable (see the results below). Of course, the real power of the Dynamic Sink unfolds for writing to multiple tables which is not supported in the regular Iceberg sink.

TestDynamicIcebergSinkPerf.testIcebergSink

[main] INFO org.apache.iceberg.flink.sink.dynamic.TestDynamicIcebergSinkPref - TEST RESULT: Snapshot 1119211093289016971 written 5000000 records in 9405 ms
[main] INFO org.apache.iceberg.flink.sink.dynamic.TestDynamicIcebergSinkPref - TEST RESULT: Snapshot 3781801840532551473 written 5000000 records in 8948 ms
[main] INFO org.apache.iceberg.flink.sink.dynamic.TestDynamicIcebergSinkPref - TEST RESULT: Snapshot 3033420860172241073 written 5000000 records in 8555 ms
[main] INFO org.apache.iceberg.flink.sink.dynamic.TestDynamicIcebergSinkPref - TEST RESULT: Snapshot 2032850461702063007 written 5000000 records in 8781 ms
[main] INFO org.apache.iceberg.flink.sink.dynamic.TestDynamicIcebergSinkPref - TEST RESULT: Snapshot 7558061307456947485 written 5000000 records in 8872 ms
[main] INFO org.apache.iceberg.flink.sink.dynamic.TestDynamicIcebergSinkPref - TEST RESULT: Snapshot 601297722355936169 written 5000000 records in 8330 ms
[main] INFO org.apache.iceberg.flink.sink.dynamic.TestDynamicIcebergSinkPref - TEST RESULT: Snapshot 6249065641929321708 written 5000000 records in 8512 ms
[main] INFO org.apache.iceberg.flink.sink.dynamic.TestDynamicIcebergSinkPref - TEST RESULT: Snapshot 2068854608451193539 written 5000000 records in 8351 ms
[main] INFO org.apache.iceberg.flink.sink.dynamic.TestDynamicIcebergSinkPref - TEST RESULT: Snapshot 2246238791649680214 written 5000000 records in 8433 ms

TestDynamicIcebergSinkPerf.testDynamicSink

[main] INFO org.apache.iceberg.flink.sink.dynamic.TestDynamicIcebergSinkPref - TEST RESULT: Snapshot 6885587612182426678 written 5000000 records in 9862 ms
[main] INFO org.apache.iceberg.flink.sink.dynamic.TestDynamicIcebergSinkPref - TEST RESULT: Snapshot 5143658647354768578 written 5000000 records in 9611 ms
[main] INFO org.apache.iceberg.flink.sink.dynamic.TestDynamicIcebergSinkPref - TEST RESULT: Snapshot 6787994330850859040 written 5000000 records in 8935 ms
[main] INFO org.apache.iceberg.flink.sink.dynamic.TestDynamicIcebergSinkPref - TEST RESULT: Snapshot 6788803513469485538 written 5000000 records in 8923 ms
[main] INFO org.apache.iceberg.flink.sink.dynamic.TestDynamicIcebergSinkPref - TEST RESULT: Snapshot 8592131276344399796 written 5000000 records in 9335 ms
[main] INFO org.apache.iceberg.flink.sink.dynamic.TestDynamicIcebergSinkPref - TEST RESULT: Snapshot 4567624953033955114 written 5000000 records in 8973 ms
[main] INFO org.apache.iceberg.flink.sink.dynamic.TestDynamicIcebergSinkPref - TEST RESULT: Snapshot 139484213775208428 written 5000000 records in 9275 ms
[main] INFO org.apache.iceberg.flink.sink.dynamic.TestDynamicIcebergSinkPref - TEST RESULT: Snapshot 5103930001749514761 written 5000000 records in 8982 ms
[main] INFO org.apache.iceberg.flink.sink.dynamic.TestDynamicIcebergSinkPref - TEST RESULT: Snapshot 4174643942796111771 written 5000000 records in 9209 ms

Breakdown PRs of this PR:

  1. Flink: Add DynamicRecord / DynamicRecordInternal / DynamicRecordInternalSerializer #12996
  2. Flink: Dynamic Iceberg Sink: Add table update code for schema comparison and evolution  #13032
  3. Flink: Dynamic Iceberg Sink: Add dynamic writer and committer #13080
  4. Table update operator (outstanding)
  5. Putting it all together (outstanding)

mxm and others added 2 commits February 28, 2025 12:41
@mxm mxm force-pushed the dynamic-sink-contrib branch from eb5ba7e to 11c9082 Compare February 28, 2025 11:42
@manuzhang
Copy link
Collaborator

@mxm Thanks for the nice work. It will be much easier to review and revise only Flink 1.20 parts in this PR, and back-port to Flink 1.19 afterwards.

@mxm
Copy link
Contributor Author

mxm commented Mar 3, 2025

Hey @manuzhang! Sure, I've reverted the 1.19 changes. Let me know what you think about the Dynamic Sink.

CC @pvary @stevenzwu

if (userSpec.isUnpartitioned()) {
return userSpec;
}
UnboundPartitionSpec.Builder builder =
Copy link
Contributor

@stevenzwu stevenzwu Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the purpose of the adjustment? there is no change in PartitionSpec. just rebind it with new table schema?

Can we use PartitionSpec#toUnbound() here and avoid changing api module?

I see the change is mainly to support this type of change. It is to support the case that two schemas (old and new) used the same field name but have different field ids.

      String sourceFieldName = userSpec.schema().idToName().get(field.sourceId());
      int adjustedSourceId = tableSchema.findField(sourceFieldName).fieldId();

I guess it is like this scenario

  • rename field "foo" to "bar" (with id = 9)
  • create a new field called "foo" (with id = 15)

while still want to maintaining the partition spec on the field name "foo" after the schema change but bound to the new field 15. wondering if this is a behavior we want to support. I think this result can be support with deletion of old partition field and addition of a new partition field. that will be a more clear intention than silently changing the binding of an existing partition field to a new source field based on field name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little bit of background: The Dynamic Sink matches field names based on their names, not their ids. The reason for this is that (a) the Iceberg Java API solely works based on names (b) we found that it's difficult for users to use the correct ids based on the current table schema / spec. The ids are usually dictated by the input schema, e.g. Avro which might use different ids, e.g. starting from 0 instead of 1.

The adjustment here ensures that the user-provided spec alongside with its schema can be mapped to one of the existing schema / spec. This is to avoid creating unnecessary table schema / spec when the schema / spec is otherwise identical.

For example:

  • Input schema {"id", 0, int} Input spec: {bucket[0][10]}
  • Table schema: {"id, 1, int} Table spec: {bucket[1][10]}

The logic would rewrite the input spec to match the table spec, because field id 0 ("id") in the input schema matches field id 1 ("id") in the table schema. This is then later used to resolved the table spec from the cache.

The rewrite isn't strictly necessary though, we can achieve the same by simply comparing. I've revised the logic in the latest commit. It's now integrated directly with cache lookup / update logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example:
Input schema {"id", 0, int} Input spec: {bucket[0][10]}
Table schema: {"id, 1, int} Table spec: {bucket[1][10]}

I don't fully get the example. In PartitionSpecEvolution, src/input spec is Iceberg's PartitionSpec. why would the input spec different with the table spec in terms of field ids? how is the input spec constructed (especially regarding source field name and id)? if you have a unit test to demonstrate the scenario, please point it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The input spec is what the user provides in DynamicRecord. The input schema is built by the user, e.g. from an Avro schema, and can be different form the actual Iceberg schema, e.g. the ids used don't necessarily have to match with the ids in the Iceberg schema, new fields added, etc.. The same goes for the PartitionSpec, it is bound to the input schema and references source ids which don't necessarily match with the Iceberg schema.

Given these constraints, to still be able to meaningfully evolve the schema, the logic around schema / spec evolution uses field names, not field ids. This is a decision we took after running PoCs with multiple customers. In the future, we might add a "strict" mode, which uses field ids, but so far customers consistently wanted name-based schema / spec evolution.

The goal is to translate the input schema / spec such that they either match with one of the existing schema / spec (check for schema / check for spec), or update the table schema / spec with the minimal amount of changes required.

If we didn't adapt the input schema / spec to table schema or the other existing schemas, we would go through a schema / spec change for every record. Not good for table metadata or catalog access / performance. We would also "lose" fields as soon as their ids changed.


if you have a unit test to demonstrate the scenario, please point it here.

Here is an example where we test compatibility of spec1 (the input spec) to spec2 (table spec). No spec update will be performance in this case:

Here is the same test, but we change the spec because its buckets changed:

private PartitionSpecEvolver() {}

/**
* Checks whether two PartitionSpecs are compatible with each other. Less strict than {@code
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you share why a different/relaxed compatibility check is needed? what are the scenarios?

Copy link
Contributor Author

@mxm mxm Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default PartitionSpec.equals() compares the specs based on the transform name and source ids. Since we don't enforce source ids to match in the Dynamic Sink, but check the actual field names, we need the custom logic.

Also see #12424 (comment).

}

DataStream<DynamicRecordInternal> distributeDataStream(DataStream<DynamicRecordInternal> input) {
return input.keyBy(DynamicRecordInternal::writerKey);
Copy link
Contributor

@stevenzwu stevenzwu Mar 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keyBy may result in unbalanced traffic distribution. let's assume a scenario where the dynamic sink started with a single table or single table has the dominant amount of traffic. and distribution mode is none. if the cardinality of writerKey is high (e.g. 100x) compared to the write parallelism and each writerKey slice is small enough to not cause significant skew, then keyBy can work reasonably well.

But the cardinality of writerKey is relatively small, we may see significant traffic skew using keyBy. The author of this closed PR described this problem using keyBy. This is where we added the more generally applicable range distribution that we added to the Flink Iceberg sink.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are absolutely right that a normal keyBy would result in unbalanced traffic distribution, at least when the key to value ratio is not equal. But it is important (and not straight-forward) to understand how the writerKey is generated.

First of all, writeParallelism dictates how many subtasks will receive data. WriterKeys will then be chosen to match a key group for each of these subtasks. This is done by TargetLimitedKeySelector which wraps around all other key selectors and ensures that the data will hit a set of subtasks. Specifically, this is done in this loop which probes for the right key to hit a key group in the right subtask. The default with DistributionMode NONE is to do round-robin. For HASH, we make sure to bound the keyspace to the set of subtasks according to the table writeParallelism.

Note that, principally, this method does not contradict with RANGE distribution, which can be implemented just as well on the Dynamic Sink. For range distribution, we would select a subtask depending on the data distribution.

Essentially, what we are doing here is a custom partitioner, with the added advantage of being able to use keyed state. We currently don't use state, but that is a great advantage. Naturally, this way of distributing data could be replaced with a Flink (custom) Partitioner, but using Flink state would then not be an option anymore.

@mxm
Copy link
Contributor Author

mxm commented Apr 7, 2025

Quick recap on a discussion Steven, Peter, and I had in a separate channel, which is only partially captured in the PR comments:

Iceberg Schema

Dynamic Sink users currently need to provide an Iceberg Schema, which also requires them to define field ids. However, correctly defining field ids is cumbersome and error prone. That's why the sink is designed to ignore field ids and instead compare fields by their names. This works also for nested fields. We enforce field name uniqueness on every level of the schema, just like Iceberg.

Iceberg internally uses ids because they allow to uniquely identify a field, e.g. if a field has been removed and a new field gets added with the same name, the new field would not contain data of the old field. However, in the Dynamic Sink we don't allow field deletions, as this operation would not be safe with the schema migration logic of the sink. We wouldn't want to remove a field, only to re-add it later on, e.g. in case of out of order data. This would only work reliably if the user supplied accurate field ids which we found to be unrealistic.

Consequently, we never use the user-supplied schema directly. Instead, we compare the user-supplied schema with the existing table schemas. If they are identical, we write with the matching table schema. Otherwise, we migrate the current table schema by deriving a set of migration steps, e.g. add field, widen type, make field optional, change position of field. In the process, we also rewrite the user-provided PartitionSpec (if any), which refers to a field id in the user-provided schema, to refer to the field id with the same name in the table schema.

Alternatives to Iceberg Schema

Steven wondered whether Iceberg's Schema is the best schema input format for the user. He suggested Flink's RowType instead. Using RowType would not require generating field ids. On the other hand, the advantage of using Iceberg Schema directly is that there are no ambiguities when it comes to types, since tables can be created and their schema modified directly by the Dynamic Sink. Users would also be able to embed comments for fields. Additionally, PartitionSpec currently requires a schema to be bound to.

Since Avro is a common input schema type, we could think about supporting it as well. Directly supporting different schema types could be useful for users. Internally we will still have to use Iceberg Schema, so maybe we can give users the option to supply different schema types in addition to Iceberg Schema.

Using keyBy (hash distribution) for distributing records

The Dynamic Sink currently uses keyBy for distribution modes NONE and HASH. The idea is that the Dynamic Sink works with a large number of tables which all have an individually configured write parallelism. In the case of NONE, we don't just hash distribute the data across the subtasks, but instead probe a keyGroup each for N subtasks, where N is the write parallelism. We then round-robin distribute the data across these tasks. The whole process is offset by a salt which is derived from the table name. This ensures that the distribution mode and write parallelism can dynamically be changed while retaining a balanced amount of work across all subtasks. For Hash, the process works similarly, only that we do a hash-based assignment of the deterministically random chosen N subtasks.

Using a Flink custom partitioner, we could achieve a similar mechanism, but it would require us to implement keyBy on top of it which might not be optimal. We would also lose the ability to use Flink's keyed state.

Catalog Access

The Dynamic Sink needs to access the Iceberg catalog to update the table metadata. The Dynamic sink supports two update modes when an unknown table schema is detected:

  1. Immediate update at every subtasks which discovers an unknown table schema
  2. Rerouting records to a single subtask which performs the schema update (default)

Schemas and other table data is cached in TableDataCache which allows refreshing the table on a regular interval. The default is to allow this once per second. We agreed that the load of option (1) on the catalog will be quite high if the number of table updates and the write parallelism of those tables are high. Option (2) will scale quite well even with many updates and high write parallelism.

@ottomata
Copy link

Yes!!! I'm excited about this feature!

Steven wondered whether Iceberg's Schema is the best schema input format for the user. He suggested Flink's RowType instead.

This would be nice for Wikimedia Foundation. We use JSONSchema and have tooling to automatically convert to Flink TypeInformation (and also Table API DataTypes).

We do a similar thing to what you are all describing here when creating and evolving Hive Parquet tables with Spark. We provide conversions from JSONSchema to Spark StructType (analogous to Flink's RowType), and then use the StructType to determine how to create or evolve the table via DDL statements.

This allows us to automate ingestion into various downstream systems from JSON streams in Kafka by using our managed event JSONSchema registry as the canonical data schema source.

Avro is a common input schema type

I might be ignorant here, but I think if you support RowType, you will also get Avro support? I haven't used it, but Flink has a built in schema converter for Avro to TypeInformation?

mxm added a commit to mxm/iceberg that referenced this pull request May 7, 2025
…nalSerializer

This adds the user-facing type DynamicRecord, alongside with its internal
representation DynamicRecordInternal and its type information and serializer.

Broken out of github.com/apache/pull/12424.
mxm added a commit to mxm/iceberg that referenced this pull request May 7, 2025
…nalSerializer

This adds the user-facing type DynamicRecord, alongside with its internal
representation DynamicRecordInternal and its type information and serializer.

Broken out of github.com/apache/pull/12424.
mxm added a commit to mxm/iceberg that referenced this pull request May 12, 2025
…nalSerializer

This adds the user-facing type DynamicRecord, alongside with its internal
representation DynamicRecordInternal and its type information and serializer.

Broken out of github.com/apache/pull/12424.
mxm added a commit to mxm/iceberg that referenced this pull request May 12, 2025
This adds the classes around schema / spec comparison and evolution. A breakdown
of the classes follows:

 # CompareSchemasVisitor

Compares the user-provided schema against the current table schema.

 # EvolveSchemaVisitor

Computes the changes required to the table schema to be compatible with the
user-provided schema.

 # ParititonSpecEvolution

Code for checking compatibility with the user-provided PartitionSpec and
computing a set of changes to rewrite the PartitionSpec.

 # TableDataCache

Cache which holds all relevant metadata of a table like its name, branch,
schema, partition spec. Also holds a cache of past comparison results for a
given table's schema and the user-provided input schema.

 # Table Updater

Core logic to compare and create/update a table given a user-provided input
schema.

Broken out of apache#12424, depends on apache#12996.
mxm added a commit to mxm/iceberg that referenced this pull request May 12, 2025
…ternal / DynamicRecordInternalSerializer

This adds the user-facing type DynamicRecord, alongside with its internal
representation DynamicRecordInternal and its type information and serializer.

Broken out of github.com/apache/pull/12424.
mxm added a commit to mxm/iceberg that referenced this pull request May 12, 2025
This adds the classes around schema / spec comparison and evolution. A breakdown
of the classes follows:

 # CompareSchemasVisitor

Compares the user-provided schema against the current table schema.

 # EvolveSchemaVisitor

Computes the changes required to the table schema to be compatible with the
user-provided schema.

 # ParititonSpecEvolution

Code for checking compatibility with the user-provided PartitionSpec and
computing a set of changes to rewrite the PartitionSpec.

 # TableDataCache

Cache which holds all relevant metadata of a table like its name, branch,
schema, partition spec. Also holds a cache of past comparison results for a
given table's schema and the user-provided input schema.

 # Table Updater

Core logic to compare and create/update a table given a user-provided input
schema.

Broken out of apache#12424, depends on apache#12996.
@pvary pvary mentioned this pull request May 14, 2025
6 tasks
mxm added a commit to mxm/iceberg that referenced this pull request May 15, 2025
This adds the classes around schema / spec comparison and evolution. A breakdown
of the classes follows:

 # CompareSchemasVisitor

Compares the user-provided schema against the current table schema.

 # EvolveSchemaVisitor

Computes the changes required to the table schema to be compatible with the
user-provided schema.

 # ParititonSpecEvolution

Code for checking compatibility with the user-provided PartitionSpec and
computing a set of changes to rewrite the PartitionSpec.

 # TableDataCache

Cache which holds all relevant metadata of a table like its name, branch,
schema, partition spec. Also holds a cache of past comparison results for a
given table's schema and the user-provided input schema.

 # Table Updater

Core logic to compare and create/update a table given a user-provided input
schema.

Broken out of apache#12424, depends on apache#12996.
mxm added a commit to mxm/iceberg that referenced this pull request May 15, 2025
This adds the classes around schema / spec comparison and evolution. A breakdown
of the classes follows:

 # CompareSchemasVisitor

Compares the user-provided schema against the current table schema.

 # EvolveSchemaVisitor

Computes the changes required to the table schema to be compatible with the
user-provided schema.

 # ParititonSpecEvolution

Code for checking compatibility with the user-provided PartitionSpec and
computing a set of changes to rewrite the PartitionSpec.

 # TableDataCache

Cache which holds all relevant metadata of a table like its name, branch,
schema, partition spec. Also holds a cache of past comparison results for a
given table's schema and the user-provided input schema.

 # Table Updater

Core logic to compare and create/update a table given a user-provided input
schema.

Broken out of apache#12424, depends on apache#12996.
mxm added a commit to mxm/iceberg that referenced this pull request May 16, 2025
…mparison and evolution

This adds the classes around schema / spec comparison and evolution. A breakdown
of the classes follows:

 # CompareSchemasVisitor

Compares the user-provided schema against the current table schema.

 # EvolveSchemaVisitor

Computes the changes required to the table schema to be compatible with the
user-provided schema.

 # ParititonSpecEvolution

Code for checking compatibility with the user-provided PartitionSpec and
computing a set of changes to rewrite the PartitionSpec.

 # TableDataCache

Cache which holds all relevant metadata of a table like its name, branch,
schema, partition spec. Also holds a cache of past comparison results for a
given table's schema and the user-provided input schema.

 # Table Updater

Core logic to compare and create/update a table given a user-provided input
schema.

Broken out of apache#12424, depends on apache#12996.
mxm added a commit to mxm/iceberg that referenced this pull request May 16, 2025
This adds the dynamic version of the writer and committer for the Flink Dynamic
Iceberg Sink. Conceptually, they work similar to the IcebergSink, but they
support writing to multiple tables. Write results from each table are aggregated
from the DynamicWriter in the DynamicWriteResultAggregator, from where they are
sent to the DynamicCommitter.

Broken out of apache#12424, depends on apache#13032.
mxm added a commit to mxm/iceberg that referenced this pull request May 16, 2025
This adds the dynamic version of the writer and committer for the Flink Dynamic
Iceberg Sink. Conceptually, they work similar to the IcebergSink, but they
support writing to multiple tables. Write results from each table are aggregated
from the DynamicWriter in the DynamicWriteResultAggregator, from where they are
sent to the DynamicCommitter.

Broken out of apache#12424, depends on apache#13032.
@mxm
Copy link
Contributor Author

mxm commented May 16, 2025

I decided to break this down into several PRs. See the description for the list: #12424 (comment)

int salt,
int writeParallelism,
int maxWriteParallelism) {
if (writeParallelism > maxWriteParallelism) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this work with flink autoscaling?

Copy link
Contributor Author

@mxm mxm May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question! In some way, yes. The table writeParallelisms will be capped at the current parallelism of the sink (maxWriteParallelism) which will be determined by Flink Autoscaling. This allows the sink to be scaled down, effectively reducing the write parallelism. On the other hand, the write parallelism of a table will never exceed the maxWriteParallelism, so we can't scale beyond the table writeParallelism.

Could you elaborate what you would expect to happen with autoscaling enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want each table's writeParallelism to change dynamically, similarly to how autoscaling works with multiple Iceberg sinks. That would require us to move away from the fixed writeParallelism config. I'm thinking that the solution would be two-fold:

  1. Flink Autoscaling would continue to scale the Dynamic Sink just like any operator
  2. Dynamic Sink would additionally measure and correct table congestion

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want each table's writeParallelism to change dynamically, similarly to how autoscaling works with multiple Iceberg sinks

Indeed this is the part I am thinking about in my use-case. I'm quite new to flink in general, so forgive me if I don't make sense! So from my observations:

  1. Definitely holds. I see the autoscaling.
  2. I guess we don't have this functionality yet. I'd be interested to see if it's possible though!

Even still, I'm thinking there might be a bit of a snag with auto scaling and the PartitionKeySelector.

Say there are N streams of data, potentially from N kafka-topic partitions, but each topic partition has data that will eventually be written to a single iceberg partition A.

The writing tends to be slower than the reading in this case, so we should get backpressure, and as I understand it, flink should autoscale the writing subtask and increase the number of subtasks. The problem is that I think with the PartitionKeySelector all data will be hashed onto the same writing subtask, because they are all getting written to partition A, meaning we scale up the number of subtasks, but only one will be doing any work.

Potentially the round robin selector makes more sense in this scenario, however I do still wonder if it's possible to set the max parallelism there to match the auto scaling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Say there are N streams of data, potentially from N kafka-topic partitions, but each topic partition has data that will eventually be written to a single iceberg partition A.

If you mean Iceberg table partition, we can still write multiple partition files for an Iceberg partition in NONE distribution mode. Every subtask will have a data file for Iceberg partition A. The writes will be distributed.

In general, Flink partition skew is an issue which needs to be tackled at the job level. Autoscaling cannot do anything about that. But it has ways to detect ineffective scaling decisions and avoid those. Generally, partition skew in Iceberg can be avoided by using none distribution mode (default) or range distribution mode. Hash-based partitioning is obviously prone to the partition skew problem you described.

Potentially the round robin selector makes more sense in this scenario, however I do still wonder if it's possible to set the max parallelism there to match the auto scaling.

Round-robin is the default. In a scenario with hash partitioning, where all data goes to one subtask / Iceberg partition, we won't benefit from setting the writeParallelism, but for other scenarios we will.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that makes sense! The None distribution (Round Robin Selection) works best here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants