Skip to content

Commit b49c17b

Browse files
authored
chore: [comet-parquet-exec] Unit test fixes, default scan impl to native_comet (#1265)
* fix: fix tests failing in native_recordbatch but not in native_full * fix: use session timestamp in native scans * Revert "fix: use session timestamp in native scans" This reverts commit e601deb472037338a36300992434a987bdb026e8. * Revert Change to native record batch timezone * Change stability plans to match original scan. * fix after rebase * Update plans; generate distinct plans for full native scan * generate plans for native_recordbatch * In struct tests, check Comet operator only for scan types that support complex types * Revert "Revert Change to native record batch timezone" This reverts commit 4a147f3. * Reapply "fix: use session timestamp in native scans" This reverts commit 370f901. * Fix previous commit * Rename configs and default scan impl to 'native_comet' * add missing change * fix build * update plans for spark 3.5 * Add new plans for spark 3.5 * Update plans for Spark 4.0 * Plans updated from Spark 4
1 parent 27b4e81 commit b49c17b

File tree

1,838 files changed

+292828
-34164
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

1,838 files changed

+292828
-34164
lines changed

common/src/main/java/org/apache/comet/parquet/Native.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,12 @@ public static native void setPageV2(
249249
* @return a handle to the record batch reader, used in subsequent calls.
250250
*/
251251
public static native long initRecordBatchReader(
252-
String filePath, long fileSize, long start, long length, byte[] requiredSchema);
252+
String filePath,
253+
long fileSize,
254+
long start,
255+
long length,
256+
byte[] requiredSchema,
257+
String sessionTimezone);
253258

254259
// arrow native version of read batch
255260
/**

common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ public void init() throws URISyntaxException, IOException {
346346

347347
this.handle =
348348
Native.initRecordBatchReader(
349-
filePath, fileSize, start, length, serializedRequestedArrowSchema);
349+
filePath, fileSize, start, length, serializedRequestedArrowSchema, timeZoneId);
350350
isInitialized = true;
351351
}
352352

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,24 +77,26 @@ object CometConf extends ShimCometConf {
7777
.booleanConf
7878
.createWithDefault(false)
7979

80-
val SCAN_NATIVE = "native"
81-
val SCAN_NATIVE_FULL = "native_full"
82-
val SCAN_NATIVE_RECORDBATCH = "native_recordbatch"
80+
val SCAN_NATIVE_COMET = "native_comet"
81+
val SCAN_NATIVE_DATAFUSION = "native_datafusion"
82+
val SCAN_NATIVE_ICEBERG_COMPAT = "native_iceberg_compat"
8383

8484
val COMET_NATIVE_SCAN_IMPL: ConfigEntry[String] = conf("spark.comet.scan.impl")
8585
.doc(
86-
"The implementation of Comet Native Scan to use. Available modes are 'native'," +
87-
"'native_full', and 'native_recordbatch'. " +
88-
"'native' is for the original Comet native scan which uses a jvm based parquet file " +
89-
"reader and native column decoding. Supports simple types only " +
90-
"'native_full' is a fully native implementation of scan based on DataFusion" +
91-
"'native_recordbatch' is a native implementation that exposes apis to read parquet " +
92-
"columns natively.")
86+
s"The implementation of Comet Native Scan to use. Available modes are '$SCAN_NATIVE_COMET'," +
87+
s"'$SCAN_NATIVE_DATAFUSION', and '$SCAN_NATIVE_ICEBERG_COMPAT'. " +
88+
s"'$SCAN_NATIVE_COMET' is for the original Comet native scan which uses a jvm based " +
89+
"parquet file reader and native column decoding. Supports simple types only " +
90+
s"'$SCAN_NATIVE_DATAFUSION' is a fully native implementation of scan based on DataFusion" +
91+
s"'$SCAN_NATIVE_ICEBERG_COMPAT' is a native implementation that exposes apis to read " +
92+
"parquet columns natively.")
9393
.internal()
9494
.stringConf
9595
.transform(_.toLowerCase(Locale.ROOT))
96-
.checkValues(Set(SCAN_NATIVE, SCAN_NATIVE_FULL, SCAN_NATIVE_RECORDBATCH))
97-
.createWithDefault(sys.env.getOrElse("NATIVE_SCAN_IMPL", SCAN_NATIVE_FULL))
96+
.checkValues(Set(SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT))
97+
.createWithDefault(sys.env
98+
.getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_NATIVE_COMET)
99+
.toLowerCase(Locale.ROOT))
98100

99101
val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
100102
conf("spark.comet.parquet.read.parallel.io.enabled")

native/Cargo.lock

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,7 @@ arrow = { version = "53.2.0", features = ["prettyprint", "ffi", "chrono-tz"] }
3737
arrow-array = { version = "53.2.0" }
3838
arrow-buffer = { version = "53.2.0" }
3939
arrow-data = { version = "53.2.0" }
40-
arrow-ipc = { version = "53.2.0" }
4140
arrow-schema = { version = "53.2.0" }
42-
flatbuffers = { version = "24.3.25" }
4341
parquet = { version = "53.2.0", default-features = false, features = ["experimental"] }
4442
datafusion = { version = "43.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "parquet"] }
4543
datafusion-common = { version = "43.0.0" }

native/core/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ arrow-array = { workspace = true }
4040
arrow-buffer = { workspace = true }
4141
arrow-data = { workspace = true }
4242
arrow-schema = { workspace = true }
43-
arrow-ipc = { workspace = true }
44-
flatbuffers = { workspace = true }
4543
parquet = { workspace = true, default-features = false, features = ["experimental"] }
4644
futures = { workspace = true }
4745
mimalloc = { version = "*", default-features = false, optional = true }

native/core/src/execution/planner.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1156,8 +1156,11 @@ impl PhysicalPlanner {
11561156
table_parquet_options.global.pushdown_filters = true;
11571157
table_parquet_options.global.reorder_filters = true;
11581158

1159-
let mut spark_parquet_options =
1160-
SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);
1159+
let mut spark_parquet_options = SparkParquetOptions::new(
1160+
EvalMode::Legacy,
1161+
scan.session_timezone.as_str(),
1162+
false,
1163+
);
11611164
spark_parquet_options.allow_cast_unsigned_ints = true;
11621165

11631166
let mut builder = ParquetExecBuilder::new(file_scan_config)

native/core/src/parquet/mod.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use self::util::jni::TypePromotionInfo;
4646
use crate::execution::operators::ExecutionError;
4747
use crate::execution::utils::SparkArrowConvert;
4848
use crate::parquet::data_type::AsBytes;
49+
use crate::parquet::parquet_support::SparkParquetOptions;
4950
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
5051
use arrow::buffer::{Buffer, MutableBuffer};
5152
use arrow_array::{Array, RecordBatch};
@@ -59,8 +60,6 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext};
5960
use futures::{poll, StreamExt};
6061
use jni::objects::{JBooleanArray, JByteArray, JLongArray, JPrimitiveArray, JString, ReleaseMode};
6162
use jni::sys::jstring;
62-
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
63-
use parquet_support::SparkParquetOptions;
6463
use read::ColumnReader;
6564
use util::jni::{convert_column_descriptor, convert_encoding, deserialize_schema, get_file_path};
6665

@@ -608,7 +607,6 @@ enum ParquetReaderState {
608607
struct BatchContext {
609608
runtime: tokio::runtime::Runtime,
610609
batch_stream: Option<SendableRecordBatchStream>,
611-
batch_reader: Option<ParquetRecordBatchReader>,
612610
current_batch: Option<RecordBatch>,
613611
reader_state: ParquetReaderState,
614612
}
@@ -640,14 +638,14 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
640638
start: jlong,
641639
length: jlong,
642640
required_schema: jbyteArray,
641+
session_timezone: jstring,
643642
) -> jlong {
644643
try_unwrap_or_throw(&e, |mut env| unsafe {
645644
let path: String = env
646645
.get_string(&JString::from_raw(file_path))
647646
.unwrap()
648647
.into();
649648
let batch_stream: Option<SendableRecordBatchStream>;
650-
let batch_reader: Option<ParquetRecordBatchReader> = None;
651649
// TODO: (ARROW NATIVE) Use the common global runtime
652650
let runtime = tokio::runtime::Builder::new_multi_thread()
653651
.enable_all()
@@ -681,8 +679,13 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
681679
// TODO: Maybe these are configs?
682680
table_parquet_options.global.pushdown_filters = true;
683681
table_parquet_options.global.reorder_filters = true;
682+
let session_timezone: String = env
683+
.get_string(&JString::from_raw(session_timezone))
684+
.unwrap()
685+
.into();
684686

685-
let mut spark_parquet_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);
687+
let mut spark_parquet_options =
688+
SparkParquetOptions::new(EvalMode::Legacy, session_timezone.as_str(), false);
686689
spark_parquet_options.allow_cast_unsigned_ints = true;
687690

688691
let builder2 = ParquetExecBuilder::new(file_scan_config)
@@ -704,7 +707,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
704707
let ctx = BatchContext {
705708
runtime,
706709
batch_stream,
707-
batch_reader,
708710
current_batch: None,
709711
reader_state: ParquetReaderState::Init,
710712
};
@@ -725,7 +727,6 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_readNextRecordBatch(
725727
let batch_stream = context.batch_stream.as_mut().unwrap();
726728
let runtime = &context.runtime;
727729

728-
// let mut stream = batch_stream.as_mut();
729730
loop {
730731
let next_item = batch_stream.next();
731732
let poll_batch: Poll<Option<datafusion_common::Result<RecordBatch>>> =

native/core/src/parquet/parquet_support.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -818,7 +818,7 @@ fn cast_struct_to_struct(
818818
Ok(Arc::new(StructArray::new(
819819
to_fields.clone(),
820820
cast_fields,
821-
array.nulls().map(|nulls| nulls.clone()),
821+
array.nulls().cloned(),
822822
)))
823823
}
824824
_ => unreachable!(),

native/proto/src/proto/operator.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ message NativeScan {
9090
repeated spark.spark_expression.Expr data_filters = 6;
9191
repeated SparkFilePartition file_partitions = 7;
9292
repeated int64 projection_vector = 8;
93+
string session_timezone = 9;
9394
}
9495

9596
message Projection {

native/spark-expr/src/cast.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -838,7 +838,7 @@ fn cast_struct_to_struct(
838838
Ok(Arc::new(StructArray::new(
839839
to_fields.clone(),
840840
cast_fields,
841-
array.nulls().map(|nulls| nulls.clone()),
841+
array.nulls().cloned(),
842842
)))
843843
}
844844
_ => unreachable!(),

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ class CometSparkSessionExtensions
206206
// here and then it gets replaced with `CometNativeScanExec` in `CometExecRule`
207207
// but that only happens if `COMET_EXEC_ENABLED` is enabled
208208
&& COMET_EXEC_ENABLED.get()
209-
&& COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_FULL =>
209+
&& COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_DATAFUSION =>
210210
logInfo("Comet extension enabled for v1 full native Scan")
211211
CometScanExec(scanExec, session)
212212

@@ -377,7 +377,7 @@ class CometSparkSessionExtensions
377377
plan.transformUp {
378378
// Fully native scan for V1
379379
case scan: CometScanExec
380-
if COMET_NATIVE_SCAN_IMPL.get.equals(CometConf.SCAN_NATIVE_FULL) =>
380+
if COMET_NATIVE_SCAN_IMPL.get().equals(CometConf.SCAN_NATIVE_DATAFUSION) =>
381381
val nativeOp = QueryPlanSerde.operator2Proto(scan).get
382382
CometNativeScanExec(nativeOp, scan.wrapped, scan.session)
383383

spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class CometParquetFileFormat extends ParquetFileFormat with MetricsSupport with
101101
// Comet specific configurations
102102
val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)
103103
val nativeRecordBatchReaderEnabled =
104-
CometConf.COMET_NATIVE_SCAN_IMPL.get(sqlConf).equals(CometConf.SCAN_NATIVE_RECORDBATCH)
104+
CometConf.COMET_NATIVE_SCAN_IMPL.get(sqlConf).equals(CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
105105

106106
(file: PartitionedFile) => {
107107
val sharedConf = broadcastedHadoopConf.value.value

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2517,7 +2517,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
25172517

25182518
// Fully native scan for V1
25192519
case scan: CometScanExec
2520-
if CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == CometConf.SCAN_NATIVE_FULL =>
2520+
if CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == CometConf.SCAN_NATIVE_DATAFUSION =>
25212521
val nativeScanBuilder = OperatorOuterClass.NativeScan.newBuilder()
25222522
nativeScanBuilder.setSource(op.simpleStringWithNodeId())
25232523

@@ -2578,6 +2578,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
25782578
nativeScanBuilder.addAllDataSchema(dataSchema.toIterable.asJava)
25792579
nativeScanBuilder.addAllRequiredSchema(requiredSchema.toIterable.asJava)
25802580
nativeScanBuilder.addAllPartitionSchema(partitionSchema.toIterable.asJava)
2581+
nativeScanBuilder.setSessionTimezone(conf.getConfString("spark.sql.session.timeZone"))
25812582

25822583
Some(result.setNativeScan(nativeScanBuilder).build())
25832584

spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ case class CometScanExec(
474474
object CometScanExec extends DataTypeSupport {
475475

476476
override def isAdditionallySupported(dt: DataType): Boolean = {
477-
if (CometConf.COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_RECORDBATCH) {
477+
if (CometConf.COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_ICEBERG_COMPAT) {
478478
// TODO add array and map
479479
dt match {
480480
case s: StructType => s.fields.map(_.dataType).forall(isTypeSupported)

0 commit comments

Comments
 (0)