-
Notifications
You must be signed in to change notification settings - Fork 2.6k
flink:FlinkSink support dynamically changed schema #4190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I also met this problem in CDC scenarios |
To summarize, your expectation is to dynamically update Iceberg table schema based on the schema of the data flow when the schema of the data flow does not match the Iceberg table? |
I am not sure this would the universally desired behavior. if data stream contains incompatible schema change (like removing a required field), it will break the downstream consumers. there is a value of automatically syncing input data schema to Iceberg table schema (for compatible schema evolution). Personally, I would like to keep it at the control plane, which would be more natural if there is a schema registry for tracking input data schema change. Control plane can then update Iceberg table schema and restart the Flink job to pick up new Iceberg table schema for write path. It is tricky to support in automatic schema sync in the data plane. There would be parallel Iceberg writers (like hundreds) for a single sink table. Coordinating metadata (like schema) change is very tricky. |
right |
We also met this problem in our scenarios. We modify schema of the sink iceberg table frequently. Now I try to get the table schema through iceberg table refresh api to dynamically construct a DataStream to add new columns. It is possible to add new columns, but writing new columns to iceberg sink does not take effect, because sink does not support dynamic update of schema.If iceberg flink sink writer can add iceberg table refresh function support, it will be more convenient to use. After all, it takes a minute or two to restart flink now. @stevenzwu |
I think we should take this feature seriously. In fact, in the Flink CDC, HUDI already supports dynamic table schema changes without restarting tasks. It captures schema changes to the table and updates it in real time. When synchronizing data using flink CDC, it is unacceptable to restart a task if the table schema changes. |
We have internally implemented modifying columns, adding columns after the last column, and deleting the last column without restarting the flink program. Our processing logic is as follows: DataStream<Map<String, String>> -> map -> DataStream -> FlinkSink. In the implementation of map, we will refresh the table schema to generate the latest RowData after each checkpoint is done. At the same time, we have also modified the implementation of FlinkSink. Every time newAppender is called, we will refresh the table schema and use the latest table schema to write the data file. Is anyone interested in this feature? I can contribute our modifications to FlinkSink if needed. |
I personally look forward to seeing your PR. |
@lintingbin2009 it might be helpful to describe the solution at high-level design in this issue or some doc.
This sounds expensive and may not work well at scale. if every writer task needs to poll table for every file, it can create a lot of load on the Iceberg metadata system. Ideally, the table schema polling and change should done by operator coordinator. |
@hililiwei @stevenzwu #5425 This is my PR. Hope to have some suggestions. Now we test in an environment with a parallelism of about 40. The checkpoint time is the same as before the dynamic refresh schema is not added. |
I have interesting on this, how to contact with u about dynamical schema |
@leichangqing You can refer to the last two commits of my branch https://github.com/lintingbin2009/iceberg/tree/flink-sink-dynamically-change. We have put this part of the code in our own production environment and executed it for half a year, and it seems that there is no problem so far.
|
It will be a good starting point if someone likes to create a design doc on how to solve this problem in a general and scalable way |
@stevenzwu This is a doc I wrote, you can give your opinion, and I will modify it. |
Is there any news on this? |
Can there be sample code to demonstrate how to use it? |
I tried to pull the Flinksink related modification code for the first commit and added a column at the end using Java API in the map operator, but the result was not successful. Even after inserting the data successfully, the column at the end was still empty |
Just commenting for visibility that this feature would be extremely useful for our use case too. It's similar to CDC use case but instead driven by the services emitting events. |
I think it is not trivial to implement this feature, as the schema of the RowData objects which are the input of the Sink is finalized when the job graph is created. To change the schema one need to regenerate the job graph, essentially restarting the job (calling the main method). As a workaround, we created our own schema check before converting the input to RowData, and throw a |
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible. |
How does flink-cdc do it? |
It sends the schema along with every record. I'm playing around with a somewhat similar, but more performant solution, where we send only the schemaId instead of the full schema. The thing is only in infancy atm, but... 😄 |
Ah, thanks! FWIW, I think schema evolution support is worth the tradeoff of extra bytes per record :) |
The current tradeoff is more like doubled CPU time (we need caching and an extra serialization/deserialization step, which is on an already well optimized hot path). We are still looking for ways to optimize this. |
How is Paimon doing it? Same as Flink - CDC? How bout integrating with a schema registry and we use the schemaversionid |
The Iceberg table could be used as a schema registry. I would be reluctant to add any new requirements if possible |
How would that look? So normally we consume from Kafka or Kinesis and use glue schema registry or confluent schema registry. As of now the Sink has the option of using Generic Record. Is there any roadmap for Specific? |
If schema registry support is considered, please make it pluggable! Wikimedia Foundation uses JSONSchema and $schema URIs. And we can do it with Flink. |
If the schema is changed, then the target Iceberg table needs to be updated to the new schema anyways. So we can use the Iceberg schemaId to send along the records. |
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible. |
Still very interested in this! WMF is already using Iceberg, but Paimon can currently support this feature. We'd prefer to use Iceberg directly if we can! |
You might want to take a look at #12424 |
Now,FlinkSink requires developers to transfer the schema parameter to build DataStream, which means once the schema given,then the TableSchema will be determinded, canot be changed for ever ,but in practical scenarios,some data format,for example ,JSON,the tranformed schema is unfixed with field added,deleted or renamed and so on,so we want to change the mapped TableSchema while the DataStream is running.
The text was updated successfully, but these errors were encountered: