Skip to content

Flink: Add DynamicRecord / DynamicRecordInternal / DynamicRecordInternalSerializer #12996

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 14, 2025

Conversation

mxm
Copy link
Contributor

@mxm mxm commented May 7, 2025

This adds the user-facing type DynamicRecord, alongside with its internal representation DynamicRecordInternal and its type information and serializer.

Broken out of #12424.

The original PR is based on Flink 1.20. This version is based on Flink 2.0.

@mxm mxm force-pushed the dynamic-sink-contrib-breakdown branch from 9081c13 to 0e50889 Compare May 7, 2025 14:32
private PartitionSpec spec;
private int writerKey;
private RowData rowData;
private boolean upsertMode;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename this to isUpsert or if it denotes an actual mode use an enum instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can do but it's consistent with the coding style. We often omit these verbs from the getters in Iceberg.

Copy link
Contributor

@gyfora gyfora May 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in that case upsert or useUpsertMode would probably a better name

private String tableName;
private String branch;
private Schema schema;
private PartitionSpec spec;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename this to partitionSpec in case some other kind of spec appears in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also leaning towards this name in the beginning, but it's Iceberg convention to use this name across the code base. We can rename though if this is a concern.

Comment on lines +85 to +91
// Check that the schema id can be resolved. Not strictly necessary for serialization.
Tuple3<RowDataSerializer, Schema, PartitionSpec> serializer =
serializerCache.serializerWithSchemaAndSpec(
toSerialize.tableName(),
toSerialize.schema().schemaId(),
toSerialize.spec().specId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not strictly necessary why do we do it? What happens if this fails / why would it fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically a sanity-test, to test that looking up the serializer by id on the remote side will work. The remote side won't have the schema available, because it is not written in this branch. If there are any issues, we will know about them on the sender side, as opposed on the receiving side.

I've added a JavaDoc which should clarify things.

private String branch;
private Schema schema;
private RowData rowData;
private PartitionSpec spec;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be called partitionSpec in case other specs are added in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private PartitionSpec spec;
private DistributionMode mode;
private int writeParallelism;
private boolean upsertMode;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a boolean doesn't really describe a mode , should this be an enum or isUpsert maybe?

Copy link
Contributor Author

@mxm mxm May 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it does. If enabled, upsert mode will be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See also #12996 (comment)

private Schema schema;
private RowData rowData;
private PartitionSpec spec;
private DistributionMode mode;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be distribution or distributionMode? (it is already clashing with upsertMode a little)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it makes sense to rename to distributionMode.

@mxm
Copy link
Contributor Author

mxm commented May 8, 2025

Thanks for the review @gyfora! I think it makes sense to rename the API-facing fields / getters / setters to avoid confusion for users.

@gyfora
Copy link
Contributor

gyfora commented May 8, 2025

Thanks for the review @gyfora! I think it makes sense to rename the API-facing fields / getters / setters to avoid confusion for users.

I am not yet aware of all the conventions here, @pvary maybe you could chime in related to the naming and then I will learn once and for all :D

private DistributionMode mode;
private int writeParallelism;
private boolean upsertMode;
@Nullable private List<String> equalityFields;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only this field is nullable?
Shall we use the annotation consistently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, only this field is currently nullable / optional. We could add some defaults. I was thinking to add a builder, what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A builder makes sense to me, as we have many parameters

@pvary
Copy link
Contributor

pvary commented May 9, 2025

Thanks for the review @gyfora! I think it makes sense to rename the API-facing fields / getters / setters to avoid confusion for users.

I am not yet aware of all the conventions here, @pvary maybe you could chime in related to the naming and then I will learn once and for all :D

It might be strange for new developers, but we always omit get, set, is from the method names.
Here is the guide: https://iceberg.apache.org/contribute/#iceberg-code-contribution-guidelines
The only exceptions are the overrides for external APIs

@gyfora
Copy link
Contributor

gyfora commented May 9, 2025

Thanks for the review @gyfora! I think it makes sense to rename the API-facing fields / getters / setters to avoid confusion for users.

I am not yet aware of all the conventions here, @pvary maybe you could chime in related to the naming and then I will learn once and for all :D

It might be strange for new developers, but we always omit get, set, is from the method names. Here is the guide: https://iceberg.apache.org/contribute/#iceberg-code-contribution-guidelines The only exceptions are the overrides for external APIs

In general I get the idea, but my particular concern was related to upgradeMode the convention clearly doesn't work well with a name like this as it's immediately confusing when you have other xxMode fields that are enums etc.

@pvary
Copy link
Contributor

pvary commented May 9, 2025

Thanks for the review @gyfora! I think it makes sense to rename the API-facing fields / getters / setters to avoid confusion for users.

I am not yet aware of all the conventions here, @pvary maybe you could chime in related to the naming and then I will learn once and for all :D

It might be strange for new developers, but we always omit get, set, is from the method names. Here is the guide: https://iceberg.apache.org/contribute/#iceberg-code-contribution-guidelines The only exceptions are the overrides for external APIs

In general I get the idea, but my particular concern was related to upgradeMode the convention clearly doesn't work well with a name like this as it's immediately confusing when you have other xxMode fields that are enums etc.

I assume this is upsertMode?
While I understand your concern, the IcebergSink contains upsertMode, and this convention is used throughout the Flink code, so I would stick to it.

@mxm
Copy link
Contributor Author

mxm commented May 9, 2025

I've pushed an update to address the comments.

On the name discussion: I think this is all just convention. Every community has its own styles. I don't think either way makes more sense. upsertMode makes perfect sense to me, isUpsert not so much because not every record produces an upsert, but isUpsertMode makes just as much sense, even though the boolean type is next to the name.

The most important reason is consistency. All existing Flink Iceberg sinks use that name. I don't see a strong case to deviate from it.

I did rename mode to distributionMode and spec to partitionSpec.

@mxm

This comment was marked as resolved.

…nalSerializer

This adds the user-facing type DynamicRecord, alongside with its internal
representation DynamicRecordInternal and its type information and serializer.

Broken out of github.com/apache/pull/12424.
@mxm mxm force-pushed the dynamic-sink-contrib-breakdown branch from 665aa07 to ec7d036 Compare May 12, 2025 11:11
@mxm
Copy link
Contributor Author

mxm commented May 12, 2025

(rebased and squashed commits)

return tableIdentifier;
}

public void setTableIdentifier(TableIdentifier tableIdentifier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these setters, if we have a builder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We wouldn't. I'm not sure though we should remove these methods, as they allow DynamicRecord to be reused. If we add the builder, that won't be possible anymore.

return tableName;
}

public void setTableName(String tableName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this setters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently use these setters here to allow for Flink's object reuse mode:

mxm added a commit to mxm/iceberg that referenced this pull request May 12, 2025
This adds the classes around schema / spec comparison and evolution. A breakdown
of the classes follows:

 # CompareSchemasVisitor

Compares the user-provided schema against the current table schema.

 # EvolveSchemaVisitor

Computes the changes required to the table schema to be compatible with the
user-provided schema.

 # ParititonSpecEvolution

Code for checking compatibility with the user-provided PartitionSpec and
computing a set of changes to rewrite the PartitionSpec.

 # TableDataCache

Cache which holds all relevant metadata of a table like its name, branch,
schema, partition spec. Also holds a cache of past comparison results for a
given table's schema and the user-provided input schema.

 # Table Updater

Core logic to compare and create/update a table given a user-provided input
schema.

Broken out of apache#12424, depends on apache#12996.
mxm added a commit to mxm/iceberg that referenced this pull request May 12, 2025
…ternal / DynamicRecordInternalSerializer

This adds the user-facing type DynamicRecord, alongside with its internal
representation DynamicRecordInternal and its type information and serializer.

Broken out of github.com/apache/pull/12424.
mxm added a commit to mxm/iceberg that referenced this pull request May 12, 2025
This adds the classes around schema / spec comparison and evolution. A breakdown
of the classes follows:

 # CompareSchemasVisitor

Compares the user-provided schema against the current table schema.

 # EvolveSchemaVisitor

Computes the changes required to the table schema to be compatible with the
user-provided schema.

 # ParititonSpecEvolution

Code for checking compatibility with the user-provided PartitionSpec and
computing a set of changes to rewrite the PartitionSpec.

 # TableDataCache

Cache which holds all relevant metadata of a table like its name, branch,
schema, partition spec. Also holds a cache of past comparison results for a
given table's schema and the user-provided input schema.

 # Table Updater

Core logic to compare and create/update a table given a user-provided input
schema.

Broken out of apache#12424, depends on apache#12996.
@pvary pvary mentioned this pull request May 14, 2025
6 tasks
@pvary pvary merged commit 268661a into apache:main May 14, 2025
20 checks passed
@pvary
Copy link
Contributor

pvary commented May 14, 2025

Merged to main.
Thanks @mxm for the PR and @gyfora for the review!

@mxm mxm deleted the dynamic-sink-contrib-breakdown branch May 15, 2025 08:45
@mxm
Copy link
Contributor Author

mxm commented May 15, 2025

Thanks @pvary @gyfora for reviewing! Thanks @pvary for the merge!

mxm added a commit to mxm/iceberg that referenced this pull request May 15, 2025
This adds the classes around schema / spec comparison and evolution. A breakdown
of the classes follows:

 # CompareSchemasVisitor

Compares the user-provided schema against the current table schema.

 # EvolveSchemaVisitor

Computes the changes required to the table schema to be compatible with the
user-provided schema.

 # ParititonSpecEvolution

Code for checking compatibility with the user-provided PartitionSpec and
computing a set of changes to rewrite the PartitionSpec.

 # TableDataCache

Cache which holds all relevant metadata of a table like its name, branch,
schema, partition spec. Also holds a cache of past comparison results for a
given table's schema and the user-provided input schema.

 # Table Updater

Core logic to compare and create/update a table given a user-provided input
schema.

Broken out of apache#12424, depends on apache#12996.
mxm added a commit to mxm/iceberg that referenced this pull request May 15, 2025
This adds the classes around schema / spec comparison and evolution. A breakdown
of the classes follows:

 # CompareSchemasVisitor

Compares the user-provided schema against the current table schema.

 # EvolveSchemaVisitor

Computes the changes required to the table schema to be compatible with the
user-provided schema.

 # ParititonSpecEvolution

Code for checking compatibility with the user-provided PartitionSpec and
computing a set of changes to rewrite the PartitionSpec.

 # TableDataCache

Cache which holds all relevant metadata of a table like its name, branch,
schema, partition spec. Also holds a cache of past comparison results for a
given table's schema and the user-provided input schema.

 # Table Updater

Core logic to compare and create/update a table given a user-provided input
schema.

Broken out of apache#12424, depends on apache#12996.
mxm added a commit to mxm/iceberg that referenced this pull request May 16, 2025
…mparison and evolution

This adds the classes around schema / spec comparison and evolution. A breakdown
of the classes follows:

 # CompareSchemasVisitor

Compares the user-provided schema against the current table schema.

 # EvolveSchemaVisitor

Computes the changes required to the table schema to be compatible with the
user-provided schema.

 # ParititonSpecEvolution

Code for checking compatibility with the user-provided PartitionSpec and
computing a set of changes to rewrite the PartitionSpec.

 # TableDataCache

Cache which holds all relevant metadata of a table like its name, branch,
schema, partition spec. Also holds a cache of past comparison results for a
given table's schema and the user-provided input schema.

 # Table Updater

Core logic to compare and create/update a table given a user-provided input
schema.

Broken out of apache#12424, depends on apache#12996.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants