Skip to content

flink:FlinkSink support dynamically changed schema #4190

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
naisongwen opened this issue Feb 22, 2022 · 32 comments
Open

flink:FlinkSink support dynamically changed schema #4190

naisongwen opened this issue Feb 22, 2022 · 32 comments

Comments

@naisongwen
Copy link

Now,FlinkSink requires developers to transfer the schema parameter to build DataStream, which means once the schema given,then the TableSchema will be determinded, canot be changed for ever ,but in practical scenarios,some data format,for example ,JSON,the tranformed schema is unfixed with field added,deleted or renamed and so on,so we want to change the mapped TableSchema while the DataStream is running.

@Shane-Yu
Copy link

Shane-Yu commented Mar 2, 2022

I also met this problem in CDC scenarios

@hililiwei
Copy link
Contributor

To summarize, your expectation is to dynamically update Iceberg table schema based on the schema of the data flow when the schema of the data flow does not match the Iceberg table?

@stevenzwu
Copy link
Contributor

stevenzwu commented Mar 25, 2022

I am not sure this would the universally desired behavior. if data stream contains incompatible schema change (like removing a required field), it will break the downstream consumers.

there is a value of automatically syncing input data schema to Iceberg table schema (for compatible schema evolution). Personally, I would like to keep it at the control plane, which would be more natural if there is a schema registry for tracking input data schema change. Control plane can then update Iceberg table schema and restart the Flink job to pick up new Iceberg table schema for write path.

It is tricky to support in automatic schema sync in the data plane. There would be parallel Iceberg writers (like hundreds) for a single sink table. Coordinating metadata (like schema) change is very tricky.

@naisongwen
Copy link
Author

right

@lintingbin
Copy link
Contributor

We also met this problem in our scenarios. We modify schema of the sink iceberg table frequently. Now I try to get the table schema through iceberg table refresh api to dynamically construct a DataStream to add new columns. It is possible to add new columns, but writing new columns to iceberg sink does not take effect, because sink does not support dynamic update of schema.If iceberg flink sink writer can add iceberg table refresh function support, it will be more convenient to use. After all, it takes a minute or two to restart flink now. @stevenzwu

@hililiwei
Copy link
Contributor

hililiwei commented Jun 21, 2022

I think we should take this feature seriously. In fact, in the Flink CDC, HUDI already supports dynamic table schema changes without restarting tasks. It captures schema changes to the table and updates it in real time.

When synchronizing data using flink CDC, it is unacceptable to restart a task if the table schema changes.

@lintingbin
Copy link
Contributor

We have internally implemented modifying columns, adding columns after the last column, and deleting the last column without restarting the flink program. Our processing logic is as follows: DataStream<Map<String, String>> -> map -> DataStream -> FlinkSink. In the implementation of map, we will refresh the table schema to generate the latest RowData after each checkpoint is done. At the same time, we have also modified the implementation of FlinkSink. Every time newAppender is called, we will refresh the table schema and use the latest table schema to write the data file. Is anyone interested in this feature? I can contribute our modifications to FlinkSink if needed.

@hililiwei
Copy link
Contributor

I personally look forward to seeing your PR.

@stevenzwu
Copy link
Contributor

stevenzwu commented Aug 3, 2022

@lintingbin2009 it might be helpful to describe the solution at high-level design in this issue or some doc.

Every time newAppender is called, we will refresh the table schema and use the latest table schema to write the data file

This sounds expensive and may not work well at scale. if every writer task needs to poll table for every file, it can create a lot of load on the Iceberg metadata system. Ideally, the table schema polling and change should done by operator coordinator.

@lintingbin
Copy link
Contributor

@hililiwei @stevenzwu #5425 This is my PR. Hope to have some suggestions. Now we test in an environment with a parallelism of about 40. The checkpoint time is the same as before the dynamic refresh schema is not added.

@leichangqing
Copy link

We also met this problem in our scenarios. We modify schema of the sink iceberg table frequently. Now I try to get the table schema through iceberg table refresh api to dynamically construct a DataStream to add new columns. It is possible to add new columns, but writing new columns to iceberg sink does not take effect, because sink does not support dynamic update of schema.If iceberg flink sink writer can add iceberg table refresh function support, it will be more convenient to use. After all, it takes a minute or two to restart flink now. @stevenzwu

I have interesting on this, how to contact with u about dynamical schema

@lintingbin
Copy link
Contributor

@leichangqing You can refer to the last two commits of my branch https://github.com/lintingbin2009/iceberg/tree/flink-sink-dynamically-change. We have put this part of the code in our own production environment and executed it for half a year, and it seems that there is no problem so far.
image
Corresponding to 1, 2, 3 in the above picture, you need to prepare:

  1. There needs to be a broadcast node that can subscribe to your schema changes.
  2. The data processing node can generate RowData according to the latest schema processing data.
  3. This is based on the above code modification so that the iceberg writer node can receive the latest schema and apply it.

@stevenzwu
Copy link
Contributor

It will be a good starting point if someone likes to create a design doc on how to solve this problem in a general and scalable way

@lintingbin
Copy link
Contributor

@stevenzwu This is a doc I wrote, you can give your opinion, and I will modify it.

@FranMorilloAWS
Copy link

Is there any news on this?

@Ruees
Copy link

Ruees commented Apr 16, 2024

@leichangqing You can refer to the last two commits of my branch https://github.com/lintingbin2009/iceberg/tree/flink-sink-dynamically-change. We have put this part of the code in our own production environment and executed it for half a year, and it seems that there is no problem so far. image Corresponding to 1, 2, 3 in the above picture, you need to prepare:

  1. There needs to be a broadcast node that can subscribe to your schema changes.
  2. The data processing node can generate RowData according to the latest schema processing data.
  3. This is based on the above code modification so that the iceberg writer node can receive the latest schema and apply it.

Can there be sample code to demonstrate how to use it?

@Ruees
Copy link

Ruees commented Apr 16, 2024

@leichangqing You can refer to the last two commits of my branch https://github.com/lintingbin2009/iceberg/tree/flink-sink-dynamically-change. We have put this part of the code in our own production environment and executed it for half a year, and it seems that there is no problem so far. image Corresponding to 1, 2, 3 in the above picture, you need to prepare:

  1. There needs to be a broadcast node that can subscribe to your schema changes.
  2. The data processing node can generate RowData according to the latest schema processing data.
  3. This is based on the above code modification so that the iceberg writer node can receive the latest schema and apply it.

I tried to pull the Flinksink related modification code for the first commit and added a column at the end using Java API in the map operator, but the result was not successful. Even after inserting the data successfully, the column at the end was still empty

@lkokhreidze
Copy link
Contributor

Just commenting for visibility that this feature would be extremely useful for our use case too. It's similar to CDC use case but instead driven by the services emitting events.
I'd also be happy to lend a hand, but at the moment, it's not clear what the state is. Is the proposed design agreed upon, or does it need re-iteration?

@pvary
Copy link
Contributor

pvary commented Apr 17, 2024

I think it is not trivial to implement this feature, as the schema of the RowData objects which are the input of the Sink is finalized when the job graph is created. To change the schema one need to regenerate the job graph, essentially restarting the job (calling the main method).
There might be some way to work around this, by changing the input to records where the schema is embedded to the records (performance loss), or getting the schema from an outside source (additional external depenency), but this would need some deeper changes in the Sink.
Also care should be taken, how to synchronize the table schema refresh throughout the tasks when the changes are detected...

As a workaround, we created our own schema check before converting the input to RowData, and throw a SuppressRestartsException when changes are detected.
We used Flink Kubernetes Operator to restart the job from failed state, using kubernetes.operator.job.restart.failed. The main method refreshes the table and the new job instance is started with the new schema.

Copy link

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

@github-actions github-actions bot added the stale label Oct 15, 2024
@ottomata
Copy link

How does flink-cdc do it?

@pvary
Copy link
Contributor

pvary commented Oct 17, 2024

It sends the schema along with every record. I'm playing around with a somewhat similar, but more performant solution, where we send only the schemaId instead of the full schema. The thing is only in infancy atm, but... 😄

@ottomata
Copy link

Ah, thanks!

FWIW, I think schema evolution support is worth the tradeoff of extra bytes per record :)

@pvary
Copy link
Contributor

pvary commented Oct 18, 2024

The current tradeoff is more like doubled CPU time (we need caching and an extra serialization/deserialization step, which is on an already well optimized hot path). We are still looking for ways to optimize this.

@FranMorilloAWS
Copy link

How is Paimon doing it? Same as Flink - CDC? How bout integrating with a schema registry and we use the schemaversionid

@github-actions github-actions bot removed the stale label Oct 19, 2024
@pvary
Copy link
Contributor

pvary commented Oct 21, 2024

The Iceberg table could be used as a schema registry. I would be reluctant to add any new requirements if possible

@FranMorilloAWS
Copy link

How would that look? So normally we consume from Kafka or Kinesis and use glue schema registry or confluent schema registry. As of now the Sink has the option of using Generic Record. Is there any roadmap for Specific?

@ottomata
Copy link

If schema registry support is considered, please make it pluggable! Wikimedia Foundation uses JSONSchema and $schema URIs. And we can do it with Flink.

@pvary
Copy link
Contributor

pvary commented Oct 22, 2024

If the schema is changed, then the target Iceberg table needs to be updated to the new schema anyways. So we can use the Iceberg schemaId to send along the records.

Copy link

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

@github-actions github-actions bot added the stale label Apr 21, 2025
@ottomata
Copy link

ottomata commented Apr 22, 2025

Still very interested in this! WMF is already using Iceberg, but Paimon can currently support this feature. We'd prefer to use Iceberg directly if we can!

@github-actions github-actions bot removed the stale label Apr 23, 2025
@pvary
Copy link
Contributor

pvary commented Apr 29, 2025

You might want to take a look at #12424

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests