-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Flink: Add DynamicRecord / DynamicRecordInternal / DynamicRecordInternalSerializer #12996
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
9081c13
to
0e50889
Compare
private PartitionSpec spec; | ||
private int writerKey; | ||
private RowData rowData; | ||
private boolean upsertMode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename this to isUpsert
or if it denotes an actual mode use an enum instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can do but it's consistent with the coding style. We often omit these verbs from the getters in Iceberg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in that case upsert
or useUpsertMode
would probably a better name
private String tableName; | ||
private String branch; | ||
private Schema schema; | ||
private PartitionSpec spec; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename this to partitionSpec in case some other kind of spec appears in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was also leaning towards this name in the beginning, but it's Iceberg convention to use this name across the code base. We can rename though if this is a concern.
// Check that the schema id can be resolved. Not strictly necessary for serialization. | ||
Tuple3<RowDataSerializer, Schema, PartitionSpec> serializer = | ||
serializerCache.serializerWithSchemaAndSpec( | ||
toSerialize.tableName(), | ||
toSerialize.schema().schemaId(), | ||
toSerialize.spec().specId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if not strictly necessary why do we do it? What happens if this fails / why would it fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is basically a sanity-test, to test that looking up the serializer by id on the remote side will work. The remote side won't have the schema available, because it is not written in this branch. If there are any issues, we will know about them on the sender side, as opposed on the receiving side.
I've added a JavaDoc which should clarify things.
private String branch; | ||
private Schema schema; | ||
private RowData rowData; | ||
private PartitionSpec spec; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be called partitionSpec in case other specs are added in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as #12996 (comment)
private PartitionSpec spec; | ||
private DistributionMode mode; | ||
private int writeParallelism; | ||
private boolean upsertMode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a boolean doesn't really describe a mode , should this be an enum or isUpsert
maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it does. If enabled, upsert mode will be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See also #12996 (comment)
private Schema schema; | ||
private RowData rowData; | ||
private PartitionSpec spec; | ||
private DistributionMode mode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be distribution or distributionMode? (it is already clashing with upsertMode a little)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it makes sense to rename to distributionMode.
Thanks for the review @gyfora! I think it makes sense to rename the API-facing fields / getters / setters to avoid confusion for users. |
private DistributionMode mode; | ||
private int writeParallelism; | ||
private boolean upsertMode; | ||
@Nullable private List<String> equalityFields; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only this field is nullable?
Shall we use the annotation consistently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, only this field is currently nullable / optional. We could add some defaults. I was thinking to add a builder, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A builder makes sense to me, as we have many parameters
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternal.java
Show resolved
Hide resolved
...ink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java
Show resolved
Hide resolved
...k/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/RowDataSerializerCache.java
Outdated
Show resolved
Hide resolved
It might be strange for new developers, but we always omit |
In general I get the idea, but my particular concern was related to |
I assume this is |
I've pushed an update to address the comments. On the name discussion: I think this is all just convention. Every community has its own styles. I don't think either way makes more sense. The most important reason is consistency. All existing Flink Iceberg sinks use that name. I don't see a strong case to deviate from it. I did rename |
This comment was marked as resolved.
This comment was marked as resolved.
…nalSerializer This adds the user-facing type DynamicRecord, alongside with its internal representation DynamicRecordInternal and its type information and serializer. Broken out of github.com/apache/pull/12424.
665aa07
to
ec7d036
Compare
(rebased and squashed commits) |
return tableIdentifier; | ||
} | ||
|
||
public void setTableIdentifier(TableIdentifier tableIdentifier) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need these setters, if we have a builder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We wouldn't. I'm not sure though we should remove these methods, as they allow DynamicRecord to be reused. If we add the builder, that won't be possible anymore.
return tableName; | ||
} | ||
|
||
public void setTableName(String tableName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this setters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We currently use these setters here to allow for Flink's object reuse mode:
Line 204 in bf02ad6
reuse.setTableName(from.tableName()); |
This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of apache#12424, depends on apache#12996.
…ternal / DynamicRecordInternalSerializer This adds the user-facing type DynamicRecord, alongside with its internal representation DynamicRecordInternal and its type information and serializer. Broken out of github.com/apache/pull/12424.
This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of apache#12424, depends on apache#12996.
This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of apache#12424, depends on apache#12996.
This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of apache#12424, depends on apache#12996.
…mparison and evolution This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of apache#12424, depends on apache#12996.
This adds the user-facing type DynamicRecord, alongside with its internal representation DynamicRecordInternal and its type information and serializer.
Broken out of #12424.
The original PR is based on Flink 1.20. This version is based on Flink 2.0.