Skip to content

Commit 899fe4f

Browse files
committed
Support Schema Evolution in iceberg
1 parent b36db64 commit 899fe4f

File tree

10 files changed

+46
-70
lines changed

10 files changed

+46
-70
lines changed

common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.spark.sql.types.DataType;
2828
import org.apache.spark.sql.types.TimestampNTZType$;
2929

30-
import org.apache.comet.CometConf;
3130
import org.apache.comet.vector.CometVector;
3231

3332
/** Base class for Comet Parquet column reader implementations. */
@@ -63,6 +62,13 @@ public abstract class AbstractColumnReader implements AutoCloseable {
6362
/** A pointer to the native implementation of ColumnReader. */
6463
protected long nativeHandle;
6564

65+
/**
66+
* Whether to enable schema evolution in Comet. For instance, promoting a integer column to a long
67+
* column, a float column to a double column, etc. This is automatically enabled when reading from
68+
* Iceberg tables.
69+
*/
70+
protected boolean supportsSchemaEvolution;
71+
6672
public AbstractColumnReader(
6773
DataType type,
6874
Type fieldType,
@@ -80,9 +86,11 @@ public AbstractColumnReader(
8086
DataType type,
8187
ColumnDescriptor descriptor,
8288
boolean useDecimal128,
83-
boolean useLegacyDateTimestamp) {
89+
boolean useLegacyDateTimestamp,
90+
boolean supportsSchemaEvolution) {
8491
this(type, null, descriptor, useDecimal128, useLegacyDateTimestamp);
85-
TypeUtil.checkParquetType(descriptor, type);
92+
this.supportsSchemaEvolution = supportsSchemaEvolution;
93+
TypeUtil.checkParquetType(descriptor, type, supportsSchemaEvolution);
8694
}
8795

8896
public ColumnDescriptor getDescriptor() {
@@ -120,7 +128,7 @@ public void close() {
120128

121129
protected void initNative() {
122130
LOG.debug("initializing the native column reader");
123-
DataType readType = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get() ? type : null;
131+
DataType readType = supportsSchemaEvolution ? type : null;
124132
boolean useLegacyDateTimestampOrNTZ =
125133
useLegacyDateTimestamp || type == TimestampNTZType$.MODULE$;
126134
nativeHandle =

common/src/main/java/org/apache/comet/parquet/BatchReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,8 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable {
583583
capacity,
584584
useDecimal128,
585585
useLazyMaterialization,
586-
useLegacyDateTimestamp);
586+
useLegacyDateTimestamp,
587+
false);
587588
reader.setPageReader(rowGroupReader.getPageReader(columns.get(i)));
588589
columnReaders[i] = reader;
589590
}

common/src/main/java/org/apache/comet/parquet/ColumnReader.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,9 @@ public ColumnReader(
9999
CometSchemaImporter importer,
100100
int batchSize,
101101
boolean useDecimal128,
102-
boolean useLegacyDateTimestamp) {
103-
super(type, descriptor, useDecimal128, useLegacyDateTimestamp);
102+
boolean useLegacyDateTimestamp,
103+
boolean supportsSchemaEvolution) {
104+
super(type, descriptor, useDecimal128, useLegacyDateTimestamp, supportsSchemaEvolution);
104105
assert batchSize > 0 : "Batch size must be positive, found " + batchSize;
105106
this.batchSize = batchSize;
106107
this.importer = importer;

common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,16 @@ public LazyColumnReader(
4949
CometSchemaImporter importer,
5050
int batchSize,
5151
boolean useDecimal128,
52-
boolean useLegacyDateTimestamp) {
53-
super(sparkReadType, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp);
52+
boolean useLegacyDateTimestamp,
53+
boolean supportsSchemaEvolution) {
54+
super(
55+
sparkReadType,
56+
descriptor,
57+
importer,
58+
batchSize,
59+
useDecimal128,
60+
useLegacyDateTimestamp,
61+
supportsSchemaEvolution);
5462
this.batchSize = 0; // the batch size is set later in `readBatch`
5563
this.vector = new CometLazyVector(sparkReadType, this, useDecimal128);
5664
}

common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class MetadataColumnReader extends AbstractColumnReader {
4545
public MetadataColumnReader(
4646
DataType type, ColumnDescriptor descriptor, boolean useDecimal128, boolean isConstant) {
4747
// TODO: should we handle legacy dates & timestamps for metadata columns?
48-
super(type, descriptor, useDecimal128, false);
48+
super(type, descriptor, useDecimal128, false, false);
4949

5050
this.isConstant = isConstant;
5151
}

common/src/main/java/org/apache/comet/parquet/TypeUtil.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@
3131
import org.apache.spark.sql.internal.SQLConf;
3232
import org.apache.spark.sql.types.*;
3333

34-
import org.apache.comet.CometConf;
35-
3634
public class TypeUtil {
3735

3836
/** Converts the input Spark 'field' into a Parquet column descriptor. */
@@ -116,11 +114,11 @@ public static ColumnDescriptor convertToParquet(StructField field) {
116114
* @param descriptor descriptor for a Parquet primitive column
117115
* @param sparkType Spark read type
118116
*/
119-
public static void checkParquetType(ColumnDescriptor descriptor, DataType sparkType) {
117+
public static void checkParquetType(
118+
ColumnDescriptor descriptor, DataType sparkType, boolean allowTypePromotion) {
120119
PrimitiveType.PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName();
121120
LogicalTypeAnnotation logicalTypeAnnotation =
122121
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
123-
boolean allowTypePromotion = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get();
124122

125123
if (sparkType instanceof NullType) {
126124
return;

common/src/main/java/org/apache/comet/parquet/Utils.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,33 +28,33 @@
2828

2929
public class Utils {
3030

31-
/** This method is called from Apache Iceberg. */
32-
public static ColumnReader getColumnReader(
33-
DataType type,
34-
ColumnDescriptor descriptor,
35-
CometSchemaImporter importer,
36-
int batchSize,
37-
boolean useDecimal128,
38-
boolean useLazyMaterialization) {
39-
// TODO: support `useLegacyDateTimestamp` for Iceberg
40-
return getColumnReader(
41-
type, descriptor, importer, batchSize, useDecimal128, useLazyMaterialization, true);
42-
}
43-
4431
public static ColumnReader getColumnReader(
4532
DataType type,
4633
ColumnDescriptor descriptor,
4734
CometSchemaImporter importer,
4835
int batchSize,
4936
boolean useDecimal128,
5037
boolean useLazyMaterialization,
51-
boolean useLegacyDateTimestamp) {
38+
boolean useLegacyDateTimestamp,
39+
boolean supportsSchemaEvolution) {
5240
if (useLazyMaterialization && supportLazyMaterialization(type)) {
5341
return new LazyColumnReader(
54-
type, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp);
42+
type,
43+
descriptor,
44+
importer,
45+
batchSize,
46+
useDecimal128,
47+
useLegacyDateTimestamp,
48+
supportsSchemaEvolution);
5549
} else {
5650
return new ColumnReader(
57-
type, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp);
51+
type,
52+
descriptor,
53+
importer,
54+
batchSize,
55+
useDecimal128,
56+
useLegacyDateTimestamp,
57+
supportsSchemaEvolution);
5858
}
5959
}
6060

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -539,16 +539,6 @@ object CometConf extends ShimCometConf {
539539
.booleanConf
540540
.createWithDefault(true)
541541

542-
val COMET_SCHEMA_EVOLUTION_ENABLED: ConfigEntry[Boolean] = conf(
543-
"spark.comet.schemaEvolution.enabled")
544-
.internal()
545-
.doc(
546-
"Whether to enable schema evolution in Comet. For instance, promoting a integer " +
547-
"column to a long column, a float column to a double column, etc. This is automatically" +
548-
"enabled when reading from Iceberg tables.")
549-
.booleanConf
550-
.createWithDefault(COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT)
551-
552542
val COMET_SPARK_TO_ARROW_ENABLED: ConfigEntry[Boolean] =
553543
conf("spark.comet.sparkToColumnar.enabled")
554544
.internal()

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
180180
}
181181

182182
if (s.isCometEnabled && schemaSupported) {
183-
// When reading from Iceberg, we automatically enable type promotion
184-
SQLConf.get.setConfString(COMET_SCHEMA_EVOLUTION_ENABLED.key, "true")
185183
CometBatchScanExec(
186184
scanExec.clone().asInstanceOf[BatchScanExec],
187185
runtimeFilters = scanExec.runtimeFilters)

spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1217,34 +1217,6 @@ abstract class ParquetReadSuite extends CometTestBase {
12171217
}
12181218
}
12191219

1220-
test("schema evolution") {
1221-
Seq(true, false).foreach { enableSchemaEvolution =>
1222-
Seq(true, false).foreach { useDictionary =>
1223-
{
1224-
withSQLConf(
1225-
CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> enableSchemaEvolution.toString) {
1226-
val data = (0 until 100).map(i => {
1227-
val v = if (useDictionary) i % 5 else i
1228-
(v, v.toFloat)
1229-
})
1230-
val readSchema =
1231-
StructType(
1232-
Seq(StructField("_1", LongType, false), StructField("_2", DoubleType, false)))
1233-
1234-
withParquetDataFrame(data, schema = Some(readSchema)) { df =>
1235-
// TODO: validate with Spark 3.x and 'usingDataFusionParquetExec=true'
1236-
if (enableSchemaEvolution || usingDataFusionParquetExec(conf)) {
1237-
checkAnswer(df, data.map(Row.fromTuple))
1238-
} else {
1239-
assertThrows[SparkException](df.collect())
1240-
}
1241-
}
1242-
}
1243-
}
1244-
}
1245-
}
1246-
}
1247-
12481220
test("scan metrics") {
12491221
// https://github.com/apache/datafusion-comet/issues/1441
12501222
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_ICEBERG_COMPAT)

0 commit comments

Comments
 (0)