Skip to content

fix: Support Schema Evolution in iceberg #1723

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from

Conversation

huaxingao
Copy link
Contributor

Which issue does this PR close?

We original have CometConf.COMET_SCHEMA_EVOLUTION_ENABLED to set schema evolution to true in Scan rule if the scan is Iceberg table scan. However, it doesn't work for the following case:

    sql("CREATE TABLE %s (id Int) USING iceberg", table1);
    sql("INSERT INTO %s VALUES (1), (2), (3), (4)", table1);
    sql("alter table %s alter column id type bigint", table1);
    sql("SELECT * FROM %s", table1);

In this example, when executing SELECT * FROM table, Iceberg creates a Comet ColumnReader and invokes TypeUtil.checkParquetType. This throws an exception because the scan rule hasn't been applied yet, but the column type has already changed to bigint.

        org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException: column: [id], physicalType: INT32, logicalType: bigint
            at app//org.apache.comet.parquet.TypeUtil.checkParquetType(TypeUtil.java:222)
            at app//org.apache.comet.parquet.AbstractColumnReader.<init>(AbstractColumnReader.java:93)
            at app//org.apache.comet.parquet.ColumnReader.<init>(ColumnReader.java:104)
            at app//org.apache.comet.parquet.Utils.getColumnReader(Utils.java:50)

Instead of enabling schema evolution in the scan rule, I will update Utils.getColumnReader to accept a boolean supportsSchemaEvolution parameter and pass true from the Iceberg side.

Closes #.

Rationale for this change

What changes are included in this PR?

How are these changes tested?

I current test the new patch in Iceberg.

  @Test
  public void test() {
    String table1 = tableName("test");

    sql("CREATE TABLE %s (id Int) USING iceberg", table1);
    sql("INSERT INTO %s VALUES (1), (2), (3), (4)", table1);
    sql("alter table %s alter column id type bigint", table1);
    List<Object[]> results = sql("SELECT * FROM %s", table1);

    sql("DROP TABLE IF EXISTS %s", table1);
  }

Without the fix, I got

        org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException: column: [id], physicalType: INT32, logicalType: bigint
            at app//org.apache.comet.parquet.TypeUtil.checkParquetType(TypeUtil.java:222)
            at app//org.apache.comet.parquet.AbstractColumnReader.<init>(AbstractColumnReader.java:93)
            at app//org.apache.comet.parquet.ColumnReader.<init>(ColumnReader.java:104)
            at app//org.apache.comet.parquet.Utils.getColumnReader(Utils.java:50)
            at app//org.apache.iceberg.spark.data.vectorized.CometColumnReader.reset(CometColumnReader.java:103)

With fix, the problem went away.

@@ -1217,34 +1217,6 @@ abstract class ParquetReadSuite extends CometTestBase {
}
}

test("schema evolution") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update the test rather than remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to me this only tests CometConf.COMET_SCHEMA_EVOLUTION_ENABLED. Since I removed the config, I think this test is not needed any more.
I currently did my test in Iceberg. I am thinking maybe we can add an iceberg-integration module to have the iceberg related test, or depend on iceberg CI (#1715)?

@andygrove
Copy link
Member

Thanks @huaxingao. The implementation changes LGTM, but I would like to understand how this will be tested.

@@ -28,33 +28,33 @@

public class Utils {

/** This method is called from Apache Iceberg. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Shall we keep this comment? I think it is useful if it's still valid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will use the same method for both Comet and Iceberg. The comment is not needed any more.

@@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.internal.SQLConf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) This import can be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. Thanks

@huaxingao huaxingao changed the title Support Schema Evolution in iceberg fix: Support Schema Evolution in iceberg May 8, 2025
@huaxingao huaxingao force-pushed the allowSchemaEvolution branch from 62e547d to 6fc8b85 Compare May 8, 2025 23:35
@codecov-commenter
Copy link

codecov-commenter commented May 9, 2025

Codecov Report

Attention: Patch coverage is 60.00000% with 4 lines in your changes missing coverage. Please review.

Project coverage is 58.70%. Comparing base (f09f8af) to head (74a1058).
Report is 186 commits behind head on main.

Files with missing lines Patch % Lines
...org/apache/comet/parquet/AbstractColumnReader.java 80.00% 0 Missing and 1 partial ⚠️
...in/java/org/apache/comet/parquet/ColumnReader.java 0.00% 1 Missing ⚠️
...ava/org/apache/comet/parquet/LazyColumnReader.java 0.00% 1 Missing ⚠️
...c/main/java/org/apache/comet/parquet/TypeUtil.java 50.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1723      +/-   ##
============================================
+ Coverage     56.12%   58.70%   +2.58%     
- Complexity      976     1139     +163     
============================================
  Files           119      129      +10     
  Lines         11743    12707     +964     
  Branches       2251     2377     +126     
============================================
+ Hits           6591     7460     +869     
- Misses         4012     4058      +46     
- Partials       1140     1189      +49     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor

@hsiang-c hsiang-c left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@parthchandra
Copy link
Contributor

The config CometConf.COMET_SCHEMA_EVOLUTION_ENABLED is valid for Parquet files as well so removing it is not correct imo.
Also, ScanRule is called at planning time while getColumnReader is called during execution. Why is the config not set correctly?

@parthchandra
Copy link
Contributor

One set of Spark Sql test failures in native_iceberg_compat is exactly because this schema evolution/type promotion check is not being performed correctly (or not being performed at all, in fact). I'm trying to address these failures atm by writing a checkParquetType for complex types which will end up calling the current checkParquetType when it find a PrimitiveType value. I was counting on this config to provide compatible results.
What types of schema evolution does iceberg require(i.e. support) for complex types?

@huaxingao huaxingao closed this May 13, 2025
@huaxingao huaxingao reopened this May 13, 2025
@huaxingao huaxingao closed this May 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants