From 25bf8d717b8264b2b0171d0bf0d3f4605f0d5f50 Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 28 Apr 2025 12:05:19 -0700 Subject: [PATCH 1/8] fix: get_struct field is incorrect when struct in array --- native/core/src/execution/planner.rs | 168 ++++++++++++++++++ .../comet/exec/CometNativeReaderSuite.scala | 72 ++++++++ 2 files changed, 240 insertions(+) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index ba7bbf0c2c..f9a3f7d644 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -3081,4 +3081,172 @@ mod tests { } }); } + + #[test] + fn test_struct_field() { + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let planner = PhysicalPlanner::new(Arc::from(session_ctx)); + + // Mock scan operator with 3 INT32 columns + let op_scan = Operator { + plan_id: 0, + children: vec![], + op_struct: Some(OpStruct::Scan(spark_operator::Scan { + fields: vec![ + spark_expression::DataType { + type_id: 4, // Int32 + type_info: None, + }, + spark_expression::DataType { + type_id: 4, // Int32 + type_info: None, + }, + spark_expression::DataType { + type_id: 4, // Int32 + type_info: None, + }, + ], + source: "".to_string(), + })), + }; + + // Mock expression to read a INT32 column with position 0 + let col_0 = spark_expression::Expr { + expr_struct: Some(Bound(spark_expression::BoundReference { + index: 0, + datatype: Some(spark_expression::DataType { + type_id: 4, + type_info: None, + }), + })), + }; + + // Mock expression to read a INT32 column with position 1 + let col_1 = spark_expression::Expr { + expr_struct: Some(Bound(spark_expression::BoundReference { + index: 1, + datatype: Some(spark_expression::DataType { + type_id: 4, + type_info: None, + }), + })), + }; + + let array_of_struct = spark_expression::Expr { + expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { + func: "make_array".to_string(), + args: vec![spark_expression::Expr { + expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { + func: "struct".to_string(), + args: vec![col_0, col_1], + return_type: None, + })), + }], + return_type: None, + })) + }; + + let get_first_array_element = spark_expression::Expr { + expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { + func: "array_element".to_string(), + args: vec![array_of_struct, spark_expression::Expr { + expr_struct: Some(ExprStruct::Literal(spark_expression::Literal { + value: Some(literal::Value::LongVal(1)), + datatype: Some(spark_expression::DataType { + type_id: 4, + type_info: None, + }), + is_null: false, + })) + }], + return_type: None, + })) + }; + + let get_field_of_struct = spark_expression::Expr { + expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { + func: "get_field".to_string(), + args: vec![get_first_array_element, spark_expression::Expr { + expr_struct: Some(ExprStruct::Literal(spark_expression::Literal { + value: Some(literal::Value::StringVal("c1".to_string())), + datatype: Some(spark_expression::DataType { + type_id: 7, + type_info: None, + }), + is_null: false, + })) + }], + return_type: None, + })) + }; + + // Make a projection operator with struct(array_col, array_col_1) + let projection = Operator { + children: vec![op_scan], + plan_id: 0, + op_struct: Some(OpStruct::Projection(spark_operator::Projection { + project_list: vec![get_field_of_struct], + })), + }; + + // Create a physical plan + let (mut scans, datafusion_plan) = + planner.create_plan(&projection, &mut vec![], 1).unwrap(); + + // Start executing the plan in a separate thread + // The plan waits for incoming batches and emitting result as input comes + let mut stream = datafusion_plan.native_plan.execute(0, task_ctx).unwrap(); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + // create async channel + let (tx, mut rx) = mpsc::channel(1); + + // Send data as input to the plan being executed in a separate thread + runtime.spawn(async move { + // create data batch + // 0, 1, 2 + // 3, 4, 5 + // 6, null, null + let a = Int64Array::from(vec![Some(0), Some(3), Some(6)]); + let b = Int64Array::from(vec![Some(1), Some(4), None]); + let c = Int64Array::from(vec![Some(2), Some(5), None]); + let input_batch1 = InputBatch::Batch(vec![Arc::new(a), Arc::new(b), Arc::new(c)], 3); + let input_batch2 = InputBatch::EOF; + + let batches = vec![input_batch1, input_batch2]; + + for batch in batches.into_iter() { + tx.send(batch).await.unwrap(); + } + }); + + // Wait for the plan to finish executing and assert the result + runtime.block_on(async move { + loop { + let batch = rx.recv().await.unwrap(); + scans[0].set_input_batch(batch); + match poll!(stream.next()) { + Poll::Ready(Some(batch)) => { + assert!(batch.is_ok(), "got error {}", batch.unwrap_err()); + let batch = batch.unwrap(); + let expected = [ + "+-------+", + "| col_0 |", + "+-------+", + "| 1 |", + "| 4 |", + "| |", + "+-------+", + ]; + assert_batches_eq!(expected, &[batch]); + } + Poll::Ready(None) => { + break; + } + _ => {} + } + } + }); + } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index d9c71f147d..fff5435460 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -224,4 +224,76 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper |""".stripMargin, "select c0 from tbl") } + test("native reader - read a STRUCT subfield from ARRAY of STRUCTS - second field") { + testSingleLineQuery( + """ + | select array(str0, str1) c0 from + | ( + | select + | named_struct('a', 1, 'b', 'n', 'c', 'x') str0, + | named_struct('a', 2, 'b', 'w', 'c', 'y') str1 + | ) + |""".stripMargin, + "select c0[0].b col0 from tbl") + } + + test("native reader - read a STRUCT subfield - field from second") { + testSingleLineQuery( + """ + |select array(named_struct('a', 1, 'b', 'n')) c0 + |""".stripMargin, + "select c0[0].b from tbl") + } + + test("native reader - read a STRUCT subfield from ARRAY of STRUCTS - field from first") { + testSingleLineQuery( + """ + | select array(str0, str1) c0 from + | ( + | select + | named_struct('a', 1, 'b', 'n', 'c', 'x') str0, + | named_struct('a', 2, 'b', 'w', 'c', 'y') str1 + | ) + |""".stripMargin, + "select c0[0].a, c0[0].b, c0[0].c from tbl") + } + + test("native reader - read a STRUCT subfield from ARRAY of STRUCTS - reverse fields") { + testSingleLineQuery( + """ + | select array(str0, str1) c0 from + | ( + | select + | named_struct('a', 1, 'b', 'n', 'c', 'x') str0, + | named_struct('a', 2, 'b', 'w', 'c', 'y') str1 + | ) + |""".stripMargin, + "select c0[0].c, c0[0].b, c0[0].a from tbl") + } + + test("native reader - read a STRUCT subfield from ARRAY of STRUCTS - skip field") { + testSingleLineQuery( + """ + | select array(str0, str1) c0 from + | ( + | select + | named_struct('a', 1, 'b', 'n', 'c', 'x') str0, + | named_struct('a', 2, 'b', 'w', 'c', 'y') str1 + | ) + |""".stripMargin, + "select c0[0].a, c0[0].c from tbl") + } + + test("native reader - read a STRUCT subfield from ARRAY of STRUCTS - duplicate first") { + testSingleLineQuery( + """ + | select array(str0, str1) c0 from + | ( + | select + | named_struct('a', 1, 'b', 'n', 'c', 'x') str0, + | named_struct('a', 2, 'b', 'w', 'c', 'y') str1 + | ) + |""".stripMargin, + "select c0[0].a, c0[0].a from tbl") + } } From dd214e0db440e736f0fcc4314ad514235cc46546 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 29 Apr 2025 14:05:33 -0700 Subject: [PATCH 2/8] comments --- native/core/src/execution/planner.rs | 61 ++++++++++++++++------------ 1 file changed, 36 insertions(+), 25 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index f9a3f7d644..e11e012002 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -2505,7 +2505,7 @@ mod tests { use futures::{poll, StreamExt}; - use arrow::array::{Array, DictionaryArray, Int32Array, StringArray}; + use arrow::array::{Array, DictionaryArray, Int32Array, Int64Array, StringArray}; use arrow::datatypes::DataType; use datafusion::logical_expr::ScalarUDF; use datafusion::{assert_batches_eq, physical_plan::common::collect, prelude::SessionContext}; @@ -3133,6 +3133,7 @@ mod tests { })), }; + // create a list of structs - make_array(struct(col_0, col_1)) let array_of_struct = spark_expression::Expr { expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { func: "make_array".to_string(), @@ -3144,44 +3145,54 @@ mod tests { })), }], return_type: None, - })) + })), }; + // get first array element, array[1] - in SQL index starts with 1 let get_first_array_element = spark_expression::Expr { expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { func: "array_element".to_string(), - args: vec![array_of_struct, spark_expression::Expr { - expr_struct: Some(ExprStruct::Literal(spark_expression::Literal { - value: Some(literal::Value::LongVal(1)), - datatype: Some(spark_expression::DataType { - type_id: 4, - type_info: None, - }), - is_null: false, - })) - }], + args: vec![ + array_of_struct, + spark_expression::Expr { + expr_struct: Some(ExprStruct::Literal(spark_expression::Literal { + value: Some(literal::Value::LongVal(1)), + datatype: Some(spark_expression::DataType { + type_id: 4, + type_info: None, + }), + is_null: false, + })), + }, + ], return_type: None, - })) + })), }; + // Get the field c1 of struct struct.c1 let get_field_of_struct = spark_expression::Expr { expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { func: "get_field".to_string(), - args: vec![get_first_array_element, spark_expression::Expr { - expr_struct: Some(ExprStruct::Literal(spark_expression::Literal { - value: Some(literal::Value::StringVal("c1".to_string())), - datatype: Some(spark_expression::DataType { - type_id: 7, - type_info: None, - }), - is_null: false, - })) - }], + args: vec![ + get_first_array_element, + spark_expression::Expr { + expr_struct: Some(ExprStruct::Literal(spark_expression::Literal { + value: Some(literal::Value::StringVal("c1".to_string())), + datatype: Some(spark_expression::DataType { + type_id: 7, + type_info: None, + }), + is_null: false, + })), + }, + ], return_type: None, - })) + })), }; - // Make a projection operator with struct(array_col, array_col_1) + // Make a projection operator + // select make_array(struct(col_0, col_1))[1][c1] is equivalent to + // select get_field(array_element(make_array(struct(col_0, col_1)), 1), 'c1') let projection = Operator { children: vec![op_scan], plan_id: 0, From 17266362c8af8f8c87f1c04665f0925b2c5e266c Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 14 May 2025 17:21:09 -0700 Subject: [PATCH 3/8] fix: cast list of structs and other cast fixes --- native/core/src/execution/planner.rs | 664 +++++++++++++----- native/core/src/parquet/parquet_support.rs | 118 +++- .../src/struct_funcs/get_struct_field.rs | 2 + .../apache/comet/serde/QueryPlanSerde.scala | 5 + .../comet/exec/CometNativeReaderSuite.scala | 19 +- 5 files changed, 621 insertions(+), 187 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index e11e012002..3b8c63be7e 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -69,12 +69,11 @@ use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr} use crate::execution::operators::ExecutionError::GeneralError; use crate::execution::shuffle::CompressionCodec; use crate::execution::spark_plan::SparkPlan; -use crate::parquet::parquet_exec::init_datasource_exec; -use crate::parquet::parquet_support::prepare_object_store; +use crate::parquet::parquet_support::{prepare_object_store, SparkParquetOptions}; use datafusion::common::scalar::ScalarStructBuilder; use datafusion::common::{ tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter}, - JoinType as DFJoinType, ScalarValue, + ExprSchema, JoinType as DFJoinType, ScalarValue, }; use datafusion::datasource::listing::PartitionedFile; use datafusion::logical_expr::type_coercion::other::get_coerce_type_for_case_expression; @@ -86,6 +85,11 @@ use datafusion::physical_expr::expressions::{Literal, StatsType}; use datafusion::physical_expr::window::WindowExpr; use datafusion::physical_expr::LexOrdering; +use crate::parquet::parquet_exec::init_datasource_exec; +use crate::parquet::schema_adapter::SparkSchemaAdapterFactory; +use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::datasource::physical_plan::{FileGroup, FileScanConfigBuilder, ParquetSource}; +use datafusion::datasource::source::DataSourceExec; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::filter::FilterExec as DataFusionFilterExec; use datafusion_comet_proto::spark_operator::SparkFilePartition; @@ -108,12 +112,14 @@ use datafusion_comet_spark_expr::{ SparkCastOptions, StartsWith, Stddev, StringSpaceExpr, SubstringExpr, SumDecimal, TimestampTruncExpr, ToJson, UnboundColumn, Variance, }; +use futures::StreamExt; use itertools::Itertools; use jni::objects::GlobalRef; use num::{BigInt, ToPrimitive}; use object_store::path::Path; use std::cmp::max; use std::{collections::HashMap, sync::Arc}; +use tokio::runtime::Runtime; use url::Url; // For clippy error on type_complexity. @@ -328,6 +334,7 @@ impl PhysicalPlanner { ))); } let field = input_schema.field(idx); + //dbg!(field); Ok(Arc::new(Column::new(field.name().as_str(), idx))) } ExprStruct::Unbound(unbound) => { @@ -1091,6 +1098,8 @@ impl PhysicalPlanner { let data_schema = convert_spark_types_to_arrow_schema(scan.data_schema.as_slice()); let required_schema: SchemaRef = convert_spark_types_to_arrow_schema(scan.required_schema.as_slice()); + // dbg!(&data_schema); + // dbg!(&required_schema); let partition_schema: SchemaRef = convert_spark_types_to_arrow_schema(scan.partition_schema.as_slice()); let projection_vector: Vec = scan @@ -2501,19 +2510,26 @@ fn create_case_expr( #[cfg(test)] mod tests { - use std::{sync::Arc, task::Poll}; - use futures::{poll, StreamExt}; + use std::{sync::Arc, task::Poll}; - use arrow::array::{Array, DictionaryArray, Int32Array, Int64Array, StringArray}; - use arrow::datatypes::DataType; + use arrow::array::{Array, DictionaryArray, Int32Array, StringArray}; + use arrow::datatypes::{DataType, Field, Fields, Schema}; + use datafusion::catalog::memory::DataSourceExec; + use datafusion::datasource::listing::PartitionedFile; + use datafusion::datasource::object_store::ObjectStoreUrl; + use datafusion::datasource::physical_plan::{FileGroup, FileScanConfigBuilder, ParquetSource}; use datafusion::logical_expr::ScalarUDF; + use datafusion::physical_plan::ExecutionPlan; use datafusion::{assert_batches_eq, physical_plan::common::collect, prelude::SessionContext}; + use tokio::runtime::Runtime; use tokio::sync::mpsc; use crate::execution::{operators::InputBatch, planner::PhysicalPlanner}; use crate::execution::operators::ExecutionError; + use crate::parquet::parquet_support::SparkParquetOptions; + use crate::parquet::schema_adapter::SparkSchemaAdapterFactory; use datafusion_comet_proto::spark_expression::expr::ExprStruct; use datafusion_comet_proto::{ spark_expression::expr::ExprStruct::*, @@ -2522,6 +2538,7 @@ mod tests { spark_operator, spark_operator::{operator::OpStruct, Operator}, }; + use datafusion_comet_spark_expr::EvalMode; #[test] fn test_unpack_dictionary_primitive() { @@ -3082,182 +3099,469 @@ mod tests { }); } + // #[test] + // fn test_struct_field() { + // let session_ctx = SessionContext::new(); + // let task_ctx = session_ctx.task_ctx(); + // let planner = PhysicalPlanner::new(Arc::from(session_ctx)); + // + // // Mock scan operator with 3 INT32 columns + // let op_scan = Operator { + // plan_id: 0, + // children: vec![], + // op_struct: Some(OpStruct::Scan(spark_operator::Scan { + // fields: vec![ + // spark_expression::DataType { + // type_id: 4, // Int32 + // type_info: None, + // }, + // spark_expression::DataType { + // type_id: 4, // Int32 + // type_info: None, + // }, + // spark_expression::DataType { + // type_id: 4, // Int32 + // type_info: None, + // }, + // ], + // source: "".to_string(), + // })), + // }; + // + // // Mock expression to read a INT32 column with position 0 + // let col_0 = spark_expression::Expr { + // expr_struct: Some(Bound(spark_expression::BoundReference { + // index: 0, + // datatype: Some(spark_expression::DataType { + // type_id: 4, + // type_info: None, + // }), + // })), + // }; + // + // // Mock expression to read a INT32 column with position 1 + // let col_1 = spark_expression::Expr { + // expr_struct: Some(Bound(spark_expression::BoundReference { + // index: 1, + // datatype: Some(spark_expression::DataType { + // type_id: 4, + // type_info: None, + // }), + // })), + // }; + // + // // create a list of structs - make_array(struct(col_0, col_1)) + // let array_of_struct = spark_expression::Expr { + // expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { + // func: "make_array".to_string(), + // args: vec![spark_expression::Expr { + // expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { + // func: "struct".to_string(), + // args: vec![col_0, col_1], + // return_type: None, + // })), + // }], + // return_type: None, + // })), + // }; + // + // // get first array element, array[1] - in SQL index starts with 1 + // // let get_first_array_element = spark_expression::Expr { + // // expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { + // // func: "array_element".to_string(), + // // args: vec![ + // // array_of_struct, + // // spark_expression::Expr { + // // expr_struct: Some(ExprStruct::Literal(spark_expression::Literal { + // // value: Some(literal::Value::LongVal(1)), + // // datatype: Some(spark_expression::DataType { + // // type_id: 4, + // // type_info: None, + // // }), + // // is_null: false, + // // })), + // // }, + // // ], + // // return_type: None, + // // })), + // // }; + // + // + // let get_first_array_element = spark_expression::Expr { + // expr_struct: Some(ExprStruct::ListExtract( + // Box::from(spark_expression::ListExtract { + // child: Some(Box::from(array_of_struct)), + // ordinal: Some(Box::new(spark_expression::Expr { + // expr_struct: Some(ExprStruct::Literal(spark_expression::Literal { + // value: Some(literal::Value::IntVal(0)), + // datatype: Some(spark_expression::DataType { + // type_id: 3, + // type_info: None, + // }), + // is_null: false, + // })) + // })), + // default_value: None, + // one_based: false, + // fail_on_error: false, + // }) + // )), + // }; + // + // // Get the field c1 of struct struct.c1 + // // let get_field_of_struct = spark_expression::Expr { + // // expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { + // // func: "get_field".to_string(), + // // args: vec![ + // // get_first_array_element, + // // spark_expression::Expr { + // // expr_struct: Some(ExprStruct::Literal(spark_expression::Literal { + // // value: Some(literal::Value::StringVal("c1".to_string())), + // // datatype: Some(spark_expression::DataType { + // // type_id: 7, + // // type_info: None, + // // }), + // // is_null: false, + // // })), + // // }, + // // ], + // // return_type: None, + // // })), + // // }; + // + // let get_field_of_struct = spark_expression::Expr { + // expr_struct: Some(ExprStruct::GetStructField( + // Box::from(spark_expression::GetStructField { + // child: Some(Box::from(get_first_array_element)), + // ordinal: 0, + // }) + // )), + // }; + // + // // Make a projection operator + // // select make_array(struct(col_0, col_1))[1][c1] is equivalent to + // // select get_field(array_element(make_array(struct(col_0, col_1)), 1), 'c1') + // let projection = Operator { + // children: vec![op_scan], + // plan_id: 0, + // op_struct: Some(OpStruct::Projection(spark_operator::Projection { + // project_list: vec![get_field_of_struct], + // })), + // }; + // + // // Create a physical plan + // let (mut scans, datafusion_plan) = + // planner.create_plan(&projection, &mut vec![], 1).unwrap(); + // + // // Start executing the plan in a separate thread + // // The plan waits for incoming batches and emitting result as input comes + // let mut stream = datafusion_plan.native_plan.execute(0, task_ctx).unwrap(); + // + // let runtime = tokio::runtime::Runtime::new().unwrap(); + // // create async channel + // let (tx, mut rx) = mpsc::channel(1); + // + // // Send data as input to the plan being executed in a separate thread + // runtime.spawn(async move { + // // create data batch + // // 0, 1, 2 + // // 3, 4, 5 + // // 6, null, null + // let a = Int64Array::from(vec![Some(0), Some(3), Some(6)]); + // let b = Int64Array::from(vec![Some(1), Some(4), None]); + // let c = Int64Array::from(vec![Some(2), Some(5), None]); + // let input_batch1 = InputBatch::Batch(vec![Arc::new(a), Arc::new(b), Arc::new(c)], 3); + // let input_batch2 = InputBatch::EOF; + // + // let batches = vec![input_batch1, input_batch2]; + // + // for batch in batches.into_iter() { + // tx.send(batch).await.unwrap(); + // } + // }); + // + // // Wait for the plan to finish executing and assert the result + // runtime.block_on(async move { + // loop { + // let batch = rx.recv().await.unwrap(); + // scans[0].set_input_batch(batch); + // match poll!(stream.next()) { + // Poll::Ready(Some(batch)) => { + // assert!(batch.is_ok(), "got error {}", batch.unwrap_err()); + // let batch = batch.unwrap(); + // let expected = [ + // "+-------+", + // "| col_0 |", + // "+-------+", + // "| 1 |", + // "| 4 |", + // "| |", + // "+-------+", + // ]; + // assert_batches_eq!(expected, &[batch]); + // } + // Poll::Ready(None) => { + // break; + // } + // _ => {} + // } + // } + // }); + // } + // + // #[test] + // fn test_struct_field_1() { + // use spark_expression::*; + // use spark_expression::data_type::*; + // use spark_expression::data_type::data_type_info::*; + // use spark_operator::*; + // + // let session_ctx = SessionContext::new(); + // let task_ctx = session_ctx.task_ctx(); + // let planner = PhysicalPlanner::new(Arc::from(session_ctx)); + // + // // Mock scan operator with 3 INT32 columns + // let op_scan = Operator { + // plan_id: 0, + // children: vec![], + // op_struct: Some(OpStruct::NativeScan(NativeScan { + // fields: vec![DataType { + // type_id: i32::from(DataTypeId::List), + // type_info: Some( + // Box::from(DataTypeInfo { + // datatype_struct: Some( + // DatatypeStruct::List( + // Box::from(ListInfo { + // element_type: Some( + // Box::from(DataType { + // type_id: i32::from(DataTypeId::Struct), + // type_info: Some( + // Box::from(DataTypeInfo { + // datatype_struct: Some( + // DatatypeStruct::Struct( + // StructInfo { + // field_names: vec![ + // "a".to_string(), + // "c".to_string(), + // ], + // field_datatypes: vec![ + // DataType { + // type_id: i32::from(DataTypeId::Int32), + // type_info: None, + // }, + // DataType { + // type_id: i32::from(DataTypeId::String), + // type_info: None, + // }, + // ], + // field_nullable: vec![ + // true, + // true, + // ], + // }, + // ), + // ), + // }), + // ), + // }), + // ), + // contains_null: true, + // }), + // ), + // ), + // }), + // ), + // },], + // source: "CometScan parquet".to_string(), + // required_schema: vec![ + // SparkStructField { + // name: "c0".to_string(), + // data_type: Some( + // DataType { + // type_id: i32::from(DataTypeId::List), + // type_info: Some( + // Box::from(DataTypeInfo { + // datatype_struct: Some( + // DatatypeStruct::List( + // Box::from(ListInfo { + // element_type: Some( + // Box::from(DataType { + // type_id: i32::from(DataTypeId::Struct), + // type_info: Some( + // Box::from(DataTypeInfo { + // datatype_struct: Some( + // DatatypeStruct::Struct( + // StructInfo { + // field_names: vec![ + // "a".to_string(), + // "c".to_string(), + // ], + // field_datatypes: vec![ + // DataType { + // type_id: i32::from(DataTypeId::Int32), + // type_info: None, + // }, + // DataType { + // type_id: i32::from(DataTypeId::String), + // type_info: None, + // }, + // ], + // field_nullable: vec![ + // true, + // true, + // ], + // }, + // ), + // ), + // }), + // ), + // }), + // ), + // contains_null: true, + // }), + // ), + // ), + // }), + // ), + // }, + // ), + // nullable: true, + // }, + // ], + // data_schema: vec![], + // partition_schema: vec![], + // data_filters: vec![], + // file_partitions: vec![ + // SparkFilePartition { + // partitioned_file: vec![ + // SparkPartitionedFile { + // file_path: "file:///tmp/test1/part-00000-45bd1152-e622-4056-bb59-4abd50cb92c1-c000.snappy.parquet".to_string(), + // start: 0, + // length: 1169, + // file_size: 1169, + // partition_values: vec![], + // }, + // ], + // }, + // ], + // projection_vector: vec![0], + // session_timezone: "America/Los_Angeles".to_string(), + // })), + // }; + // + // // Create a physical plan + // let (mut scans, datafusion_plan) = + // planner.create_plan(&op_scan, &mut vec![], 1).unwrap(); + // + // // Start executing the plan in a separate thread + // // The plan waits for incoming batches and emitting result as input comes + // let mut stream = datafusion_plan.native_plan.execute(0, task_ctx).unwrap(); + // + // let runtime = tokio::runtime::Runtime::new().unwrap(); + // // create async channel + // let (tx, mut rx) = mpsc::channel(1); + // + // // Send data as input to the plan being executed in a separate thread + // runtime.spawn(async move { + // // create data batch + // // 0, 1, 2 + // // 3, 4, 5 + // // 6, null, null + // let a = Int32Array::from(vec![Some(0), Some(3), Some(6)]); + // let b = Int32Array::from(vec![Some(1), Some(4), None]); + // let c = Int32Array::from(vec![Some(2), Some(5), None]); + // let input_batch1 = InputBatch::Batch(vec![Arc::new(a), Arc::new(b), Arc::new(c)], 3); + // let input_batch2 = InputBatch::EOF; + // + // let batches = vec![input_batch1, input_batch2]; + // + // for batch in batches.into_iter() { + // tx.send(batch).await.unwrap(); + // } + // }); + // + // // Wait for the plan to finish executing and assert the result + // runtime.block_on(async move { + // loop { + // let batch = rx.recv().await.unwrap(); + // scans[0].set_input_batch(batch); + // match poll!(stream.next()) { + // Poll::Ready(Some(batch)) => { + // assert!(batch.is_ok(), "got error {}", batch.unwrap_err()); + // let batch = batch.unwrap(); + // let expected = [ + // "+-------+", + // "| col_0 |", + // "+-------+", + // "| 1 |", + // "| 4 |", + // "| |", + // "+-------+", + // ]; + // assert_batches_eq!(expected, &[batch]); + // } + // Poll::Ready(None) => { + // break; + // } + // _ => {} + // } + // } + // }); + // } + #[test] - fn test_struct_field() { + fn test_struct_field_2() { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let planner = PhysicalPlanner::new(Arc::from(session_ctx)); - - // Mock scan operator with 3 INT32 columns - let op_scan = Operator { - plan_id: 0, - children: vec![], - op_struct: Some(OpStruct::Scan(spark_operator::Scan { - fields: vec![ - spark_expression::DataType { - type_id: 4, // Int32 - type_info: None, - }, - spark_expression::DataType { - type_id: 4, // Int32 - type_info: None, - }, - spark_expression::DataType { - type_id: 4, // Int32 - type_info: None, - }, - ], - source: "".to_string(), - })), - }; - - // Mock expression to read a INT32 column with position 0 - let col_0 = spark_expression::Expr { - expr_struct: Some(Bound(spark_expression::BoundReference { - index: 0, - datatype: Some(spark_expression::DataType { - type_id: 4, - type_info: None, - }), - })), - }; - - // Mock expression to read a INT32 column with position 1 - let col_1 = spark_expression::Expr { - expr_struct: Some(Bound(spark_expression::BoundReference { - index: 1, - datatype: Some(spark_expression::DataType { - type_id: 4, - type_info: None, - }), - })), - }; - - // create a list of structs - make_array(struct(col_0, col_1)) - let array_of_struct = spark_expression::Expr { - expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { - func: "make_array".to_string(), - args: vec![spark_expression::Expr { - expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { - func: "struct".to_string(), - args: vec![col_0, col_1], - return_type: None, - })), - }], - return_type: None, - })), - }; - - // get first array element, array[1] - in SQL index starts with 1 - let get_first_array_element = spark_expression::Expr { - expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { - func: "array_element".to_string(), - args: vec![ - array_of_struct, - spark_expression::Expr { - expr_struct: Some(ExprStruct::Literal(spark_expression::Literal { - value: Some(literal::Value::LongVal(1)), - datatype: Some(spark_expression::DataType { - type_id: 4, - type_info: None, - }), - is_null: false, - })), - }, - ], - return_type: None, - })), - }; - - // Get the field c1 of struct struct.c1 - let get_field_of_struct = spark_expression::Expr { - expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { - func: "get_field".to_string(), - args: vec![ - get_first_array_element, - spark_expression::Expr { - expr_struct: Some(ExprStruct::Literal(spark_expression::Literal { - value: Some(literal::Value::StringVal("c1".to_string())), - datatype: Some(spark_expression::DataType { - type_id: 7, - type_info: None, - }), - is_null: false, - })), - }, - ], - return_type: None, - })), - }; - - // Make a projection operator - // select make_array(struct(col_0, col_1))[1][c1] is equivalent to - // select get_field(array_element(make_array(struct(col_0, col_1)), 1), 'c1') - let projection = Operator { - children: vec![op_scan], - plan_id: 0, - op_struct: Some(OpStruct::Projection(spark_operator::Projection { - project_list: vec![get_field_of_struct], - })), - }; - - // Create a physical plan - let (mut scans, datafusion_plan) = - planner.create_plan(&projection, &mut vec![], 1).unwrap(); - - // Start executing the plan in a separate thread - // The plan waits for incoming batches and emitting result as input comes - let mut stream = datafusion_plan.native_plan.execute(0, task_ctx).unwrap(); - - let runtime = tokio::runtime::Runtime::new().unwrap(); - // create async channel - let (tx, mut rx) = mpsc::channel(1); - - // Send data as input to the plan being executed in a separate thread - runtime.spawn(async move { - // create data batch - // 0, 1, 2 - // 3, 4, 5 - // 6, null, null - let a = Int64Array::from(vec![Some(0), Some(3), Some(6)]); - let b = Int64Array::from(vec![Some(1), Some(4), None]); - let c = Int64Array::from(vec![Some(2), Some(5), None]); - let input_batch1 = InputBatch::Batch(vec![Arc::new(a), Arc::new(b), Arc::new(c)], 3); - let input_batch2 = InputBatch::EOF; - let batches = vec![input_batch1, input_batch2]; - - for batch in batches.into_iter() { - tx.send(batch).await.unwrap(); - } - }); - - // Wait for the plan to finish executing and assert the result - runtime.block_on(async move { - loop { - let batch = rx.recv().await.unwrap(); - scans[0].set_input_batch(batch); - match poll!(stream.next()) { - Poll::Ready(Some(batch)) => { - assert!(batch.is_ok(), "got error {}", batch.unwrap_err()); - let batch = batch.unwrap(); - let expected = [ - "+-------+", - "| col_0 |", - "+-------+", - "| 1 |", - "| 4 |", - "| |", - "+-------+", - ]; - assert_batches_eq!(expected, &[batch]); - } - Poll::Ready(None) => { - break; - } - _ => {} - } - } - }); + let required_schema = Schema::new(Fields::from(vec![Field::new( + "c0", + DataType::List( + Field::new( + "element", + DataType::Struct(Fields::from(vec![ + Field::new("a", DataType::Int32, false).into(), + Field::new("c", DataType::Utf8, false).into(), + ] as Vec)), + false, + ) + .into(), + ) + .into(), + false, + )])); + + let object_store_url = ObjectStoreUrl::local_filesystem(); + let source = Arc::new( + ParquetSource::default().with_schema_adapter_factory(Arc::new( + SparkSchemaAdapterFactory::new(SparkParquetOptions::new(EvalMode::Ansi, "", false)), + )), + ); + + let file_groups: Vec = vec![FileGroup::new(vec![PartitionedFile::new( + "/tmp/test1/part-00000-09e61881-4140-4f4a-bfd9-65db7b2bb0ae-c000.snappy.parquet", + 1095, + )])]; + + let file_scan_config = + FileScanConfigBuilder::new(object_store_url, required_schema.into(), source) + .with_file_groups(file_groups) + .build(); + let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone()))); + + let stream = scan.execute(0, session_ctx.task_ctx()).unwrap(); + let rt = Runtime::new().unwrap(); + let result: Vec<_> = rt.block_on(stream.collect()); + + let actual = result.get(0).unwrap().as_ref().unwrap(); + + let expected = [ + "+----------------+", + "| c0 |", + "+----------------+", + "| [{a: 1, c: x}] |", + "+----------------+", + ]; + assert_batches_eq!(expected, &[actual.clone()]); } } diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 6bf0f0fe45..1dd65110dd 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -16,6 +16,9 @@ // under the License. use crate::execution::operators::ExecutionError; +use arrow::array::ListArray; +use arrow::compute::can_cast_types; +use arrow::datatypes::Field; use arrow::{ array::{ cast::AsArray, new_null_array, types::Int32Type, types::TimestampMicrosecondType, Array, @@ -157,12 +160,19 @@ fn cast_array( let from_type = array.data_type(); match (from_type, to_type) { + // If Arrow cast supports the cast, delegate the cast to Arrow + _ if can_cast_types(from_type, to_type) => { + Ok(cast_with_options(&array, to_type, &PARQUET_OPTIONS)?) + } (Struct(_), Struct(_)) => Ok(cast_struct_to_struct( array.as_struct(), from_type, to_type, parquet_options, )?), + (List(_), List(_)) => { + cast_list_to_list(array.as_list(), from_type, to_type, parquet_options) + } (Timestamp(TimeUnit::Microsecond, None), Timestamp(TimeUnit::Microsecond, Some(tz))) => { Ok(Arc::new( array @@ -171,7 +181,38 @@ fn cast_array( .with_timezone(Arc::clone(tz)), )) } - _ => Ok(cast_with_options(&array, to_type, &PARQUET_OPTIONS)?), + _ => Ok(array), + } +} + +fn cast_list_to_list( + array: &ListArray, + from_type: &DataType, + to_type: &DataType, + parquet_options: &SparkParquetOptions, +) -> DataFusionResult { + match (from_type, to_type) { + (DataType::List(from_inner_type), DataType::List(to_inner_type)) => { + //dbg!(from_type); + //dbg!(to_type); + //dbg!(from_inner_type); + //dbg!(to_inner_type); + + let cast_field = cast_array( + array.values().clone(), + to_inner_type.data_type(), + parquet_options, + ) + .unwrap(); + + Ok(Arc::new(ListArray::new( + to_inner_type.clone(), + array.offsets().clone(), + cast_field, + array.nulls().cloned(), + ))) + } + _ => unreachable!(), } } @@ -277,6 +318,11 @@ pub(crate) fn prepare_object_store( #[cfg(test)] mod tests { use crate::parquet::parquet_support::prepare_object_store; + use arrow::array::{ + Array, ArrayBuilder, Int32Builder, ListBuilder, StringBuilder, StructBuilder, + }; + use arrow::compute::{can_cast_types, cast_with_options, CastOptions}; + use arrow::datatypes::{DataType, Field, Fields}; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::execution::runtime_env::RuntimeEnv; use object_store::path::Path; @@ -349,4 +395,74 @@ mod tests { assert_eq!(res.0, expected.0); assert_eq!(res.1, expected.1); } + + #[test] + fn test_cast_array() { + let fields = Fields::from([ + Arc::new(Field::new("a", DataType::Int32, true)), + Arc::new(Field::new("b", DataType::Utf8, true)), + Arc::new(Field::new("c", DataType::Utf8, true)), + ]); + + let struct_field = Field::new_list_field(DataType::Struct(fields.clone()), true); + let list_field = Field::new("list", DataType::List(Arc::new(struct_field)), true); + let a_col_builder = Int32Builder::with_capacity(1); + let b_col_builder = StringBuilder::new(); + let c_col_builder = StringBuilder::new(); + + let struct_builder = StructBuilder::new( + fields, + vec![ + Box::new(a_col_builder), + Box::new(b_col_builder), + Box::new(c_col_builder), + ], + ); + + let mut list_builder = ListBuilder::new(struct_builder); + + let values = list_builder.values(); + values + .field_builder::(0) + .unwrap() + .append_values(&[1, 2], &[true, true]); + + values + .field_builder::(1) + .unwrap() + .extend(vec![Some("n"), Some("m")]); + + values + .field_builder::(2) + .unwrap() + .extend(vec![Some("x"), Some("y")]); + + values.append(true); + values.append(true); + list_builder.append(true); + + let array = Arc::new(list_builder.finish()); + + let to_type = DataType::List( + Field::new( + "element", + DataType::Struct(Fields::from(vec![ + Field::new("a", DataType::Int32, false).into(), + Field::new("c", DataType::Utf8, false).into(), + ] as Vec)), + false, + ) + .into(), + ); + + dbg!(&array); + /* + Cast [{a: 1, b: n, c: x}] -> [{a: 1, c: x}] -> [{a: 1, c: n}] + */ + let res = cast_with_options(&array.as_ref(), &to_type, &CastOptions::default()).unwrap(); + + dbg!(can_cast_types(array.data_type(), &to_type)); + + dbg!(&res); + } } diff --git a/native/spark-expr/src/struct_funcs/get_struct_field.rs b/native/spark-expr/src/struct_funcs/get_struct_field.rs index 6c0ef5ee33..b92e8d2c41 100644 --- a/native/spark-expr/src/struct_funcs/get_struct_field.rs +++ b/native/spark-expr/src/struct_funcs/get_struct_field.rs @@ -81,6 +81,8 @@ impl PhysicalExpr for GetStructField { fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult { let child_value = self.child.evaluate(batch)?; + //dbg!(&batch); + //dbg!(&child_value); match child_value { ColumnarValue::Array(array) => { diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 562a4ba86e..ade979c585 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1962,6 +1962,11 @@ object QueryPlanSerde extends Logging with CometExprShim { val childExpr = exprToProtoInternal(child, inputs, binding) val ordinalExpr = exprToProtoInternal(ordinal, inputs, binding) +// val childExpr = exprToProtoInternal(child, inputs, binding) +// val ordinalExpr = +// exprToProtoInternal(Add(Cast(ordinal, LongType), Literal(1L)), inputs, binding) +// scalarFunctionExprToProto("array_element", childExpr, ordinalExpr) + if (childExpr.isDefined && ordinalExpr.isDefined) { val listExtractBuilder = ExprOuterClass.ListExtract .newBuilder() diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index fff5435460..4115ba4321 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -238,11 +238,18 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper } test("native reader - read a STRUCT subfield - field from second") { - testSingleLineQuery( - """ - |select array(named_struct('a', 1, 'b', 'n')) c0 - |""".stripMargin, - "select c0[0].b from tbl") + withSQLConf( + CometConf.COMET_EXEC_ENABLED.key -> "true", + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet", + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "false", + CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_datafusion") { + testSingleLineQuery( + """ + |select 1 a, named_struct('a', 1, 'b', 'n') c0 + |""".stripMargin, + "select c0.b from tbl") + } } test("native reader - read a STRUCT subfield from ARRAY of STRUCTS - field from first") { @@ -284,7 +291,7 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper "select c0[0].a, c0[0].c from tbl") } - test("native reader - read a STRUCT subfield from ARRAY of STRUCTS - duplicate first") { + test("native reader - read a STRUCT subfield from ARRAY of STRUCTS - duplicate first field") { testSingleLineQuery( """ | select array(str0, str1) c0 from From 2f14be06ea4b9b18b7b34ce8c8f098e03d8e9d6a Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 14 May 2025 17:28:01 -0700 Subject: [PATCH 4/8] clippy --- native/core/src/execution/planner.rs | 20 ++++++-------------- native/core/src/parquet/parquet_support.rs | 20 +++++++------------- 2 files changed, 13 insertions(+), 27 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 3b8c63be7e..2a9d6d1c2f 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -69,11 +69,11 @@ use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr} use crate::execution::operators::ExecutionError::GeneralError; use crate::execution::shuffle::CompressionCodec; use crate::execution::spark_plan::SparkPlan; -use crate::parquet::parquet_support::{prepare_object_store, SparkParquetOptions}; +use crate::parquet::parquet_support::prepare_object_store; use datafusion::common::scalar::ScalarStructBuilder; use datafusion::common::{ tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter}, - ExprSchema, JoinType as DFJoinType, ScalarValue, + JoinType as DFJoinType, ScalarValue, }; use datafusion::datasource::listing::PartitionedFile; use datafusion::logical_expr::type_coercion::other::get_coerce_type_for_case_expression; @@ -86,10 +86,6 @@ use datafusion::physical_expr::window::WindowExpr; use datafusion::physical_expr::LexOrdering; use crate::parquet::parquet_exec::init_datasource_exec; -use crate::parquet::schema_adapter::SparkSchemaAdapterFactory; -use datafusion::datasource::object_store::ObjectStoreUrl; -use datafusion::datasource::physical_plan::{FileGroup, FileScanConfigBuilder, ParquetSource}; -use datafusion::datasource::source::DataSourceExec; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::filter::FilterExec as DataFusionFilterExec; use datafusion_comet_proto::spark_operator::SparkFilePartition; @@ -112,14 +108,12 @@ use datafusion_comet_spark_expr::{ SparkCastOptions, StartsWith, Stddev, StringSpaceExpr, SubstringExpr, SumDecimal, TimestampTruncExpr, ToJson, UnboundColumn, Variance, }; -use futures::StreamExt; use itertools::Itertools; use jni::objects::GlobalRef; use num::{BigInt, ToPrimitive}; use object_store::path::Path; use std::cmp::max; use std::{collections::HashMap, sync::Arc}; -use tokio::runtime::Runtime; use url::Url; // For clippy error on type_complexity. @@ -3512,7 +3506,6 @@ mod tests { #[test] fn test_struct_field_2() { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); let required_schema = Schema::new(Fields::from(vec![Field::new( "c0", @@ -3520,14 +3513,13 @@ mod tests { Field::new( "element", DataType::Struct(Fields::from(vec![ - Field::new("a", DataType::Int32, false).into(), - Field::new("c", DataType::Utf8, false).into(), + Field::new("a", DataType::Int32, false), + Field::new("c", DataType::Utf8, false), ] as Vec)), false, ) .into(), - ) - .into(), + ), false, )])); @@ -3553,7 +3545,7 @@ mod tests { let rt = Runtime::new().unwrap(); let result: Vec<_> = rt.block_on(stream.collect()); - let actual = result.get(0).unwrap().as_ref().unwrap(); + let actual = result.first().unwrap().as_ref().unwrap(); let expected = [ "+----------------+", diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 1dd65110dd..6a26fabcc9 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -18,7 +18,6 @@ use crate::execution::operators::ExecutionError; use arrow::array::ListArray; use arrow::compute::can_cast_types; -use arrow::datatypes::Field; use arrow::{ array::{ cast::AsArray, new_null_array, types::Int32Type, types::TimestampMicrosecondType, Array, @@ -192,21 +191,20 @@ fn cast_list_to_list( parquet_options: &SparkParquetOptions, ) -> DataFusionResult { match (from_type, to_type) { - (DataType::List(from_inner_type), DataType::List(to_inner_type)) => { + (DataType::List(_), DataType::List(to_inner_type)) => { //dbg!(from_type); //dbg!(to_type); //dbg!(from_inner_type); //dbg!(to_inner_type); let cast_field = cast_array( - array.values().clone(), + Arc::clone(array.values()), to_inner_type.data_type(), parquet_options, - ) - .unwrap(); + )?; Ok(Arc::new(ListArray::new( - to_inner_type.clone(), + Arc::clone(to_inner_type), array.offsets().clone(), cast_field, array.nulls().cloned(), @@ -318,9 +316,7 @@ pub(crate) fn prepare_object_store( #[cfg(test)] mod tests { use crate::parquet::parquet_support::prepare_object_store; - use arrow::array::{ - Array, ArrayBuilder, Int32Builder, ListBuilder, StringBuilder, StructBuilder, - }; + use arrow::array::{Array, Int32Builder, ListBuilder, StringBuilder, StructBuilder}; use arrow::compute::{can_cast_types, cast_with_options, CastOptions}; use arrow::datatypes::{DataType, Field, Fields}; use datafusion::execution::object_store::ObjectStoreUrl; @@ -404,8 +400,6 @@ mod tests { Arc::new(Field::new("c", DataType::Utf8, true)), ]); - let struct_field = Field::new_list_field(DataType::Struct(fields.clone()), true); - let list_field = Field::new("list", DataType::List(Arc::new(struct_field)), true); let a_col_builder = Int32Builder::with_capacity(1); let b_col_builder = StringBuilder::new(); let c_col_builder = StringBuilder::new(); @@ -447,8 +441,8 @@ mod tests { Field::new( "element", DataType::Struct(Fields::from(vec![ - Field::new("a", DataType::Int32, false).into(), - Field::new("c", DataType::Utf8, false).into(), + Field::new("a", DataType::Int32, false), + Field::new("c", DataType::Utf8, false), ] as Vec)), false, ) From b94069226677b2fc80fa1517f6ff9343130a9821 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 15 May 2025 15:21:09 -0700 Subject: [PATCH 5/8] fix: cast list of structs and other cast fixes --- native/core/src/execution/planner.rs | 478 +++------------------ native/core/src/parquet/parquet_support.rs | 71 --- 2 files changed, 52 insertions(+), 497 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 2a9d6d1c2f..6e7db2e8a3 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -2513,10 +2513,11 @@ mod tests { use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{FileGroup, FileScanConfigBuilder, ParquetSource}; + use datafusion::error::DataFusionError; use datafusion::logical_expr::ScalarUDF; use datafusion::physical_plan::ExecutionPlan; use datafusion::{assert_batches_eq, physical_plan::common::collect, prelude::SessionContext}; - use tokio::runtime::Runtime; + use tempfile::TempDir; use tokio::sync::mpsc; use crate::execution::{operators::InputBatch, planner::PhysicalPlanner}; @@ -3093,457 +3094,80 @@ mod tests { }); } - // #[test] - // fn test_struct_field() { - // let session_ctx = SessionContext::new(); - // let task_ctx = session_ctx.task_ctx(); - // let planner = PhysicalPlanner::new(Arc::from(session_ctx)); - // - // // Mock scan operator with 3 INT32 columns - // let op_scan = Operator { - // plan_id: 0, - // children: vec![], - // op_struct: Some(OpStruct::Scan(spark_operator::Scan { - // fields: vec![ - // spark_expression::DataType { - // type_id: 4, // Int32 - // type_info: None, - // }, - // spark_expression::DataType { - // type_id: 4, // Int32 - // type_info: None, - // }, - // spark_expression::DataType { - // type_id: 4, // Int32 - // type_info: None, - // }, - // ], - // source: "".to_string(), - // })), - // }; - // - // // Mock expression to read a INT32 column with position 0 - // let col_0 = spark_expression::Expr { - // expr_struct: Some(Bound(spark_expression::BoundReference { - // index: 0, - // datatype: Some(spark_expression::DataType { - // type_id: 4, - // type_info: None, - // }), - // })), - // }; - // - // // Mock expression to read a INT32 column with position 1 - // let col_1 = spark_expression::Expr { - // expr_struct: Some(Bound(spark_expression::BoundReference { - // index: 1, - // datatype: Some(spark_expression::DataType { - // type_id: 4, - // type_info: None, - // }), - // })), - // }; - // - // // create a list of structs - make_array(struct(col_0, col_1)) - // let array_of_struct = spark_expression::Expr { - // expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { - // func: "make_array".to_string(), - // args: vec![spark_expression::Expr { - // expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { - // func: "struct".to_string(), - // args: vec![col_0, col_1], - // return_type: None, - // })), - // }], - // return_type: None, - // })), - // }; - // - // // get first array element, array[1] - in SQL index starts with 1 - // // let get_first_array_element = spark_expression::Expr { - // // expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { - // // func: "array_element".to_string(), - // // args: vec![ - // // array_of_struct, - // // spark_expression::Expr { - // // expr_struct: Some(ExprStruct::Literal(spark_expression::Literal { - // // value: Some(literal::Value::LongVal(1)), - // // datatype: Some(spark_expression::DataType { - // // type_id: 4, - // // type_info: None, - // // }), - // // is_null: false, - // // })), - // // }, - // // ], - // // return_type: None, - // // })), - // // }; - // - // - // let get_first_array_element = spark_expression::Expr { - // expr_struct: Some(ExprStruct::ListExtract( - // Box::from(spark_expression::ListExtract { - // child: Some(Box::from(array_of_struct)), - // ordinal: Some(Box::new(spark_expression::Expr { - // expr_struct: Some(ExprStruct::Literal(spark_expression::Literal { - // value: Some(literal::Value::IntVal(0)), - // datatype: Some(spark_expression::DataType { - // type_id: 3, - // type_info: None, - // }), - // is_null: false, - // })) - // })), - // default_value: None, - // one_based: false, - // fail_on_error: false, - // }) - // )), - // }; - // - // // Get the field c1 of struct struct.c1 - // // let get_field_of_struct = spark_expression::Expr { - // // expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc { - // // func: "get_field".to_string(), - // // args: vec![ - // // get_first_array_element, - // // spark_expression::Expr { - // // expr_struct: Some(ExprStruct::Literal(spark_expression::Literal { - // // value: Some(literal::Value::StringVal("c1".to_string())), - // // datatype: Some(spark_expression::DataType { - // // type_id: 7, - // // type_info: None, - // // }), - // // is_null: false, - // // })), - // // }, - // // ], - // // return_type: None, - // // })), - // // }; - // - // let get_field_of_struct = spark_expression::Expr { - // expr_struct: Some(ExprStruct::GetStructField( - // Box::from(spark_expression::GetStructField { - // child: Some(Box::from(get_first_array_element)), - // ordinal: 0, - // }) - // )), - // }; - // - // // Make a projection operator - // // select make_array(struct(col_0, col_1))[1][c1] is equivalent to - // // select get_field(array_element(make_array(struct(col_0, col_1)), 1), 'c1') - // let projection = Operator { - // children: vec![op_scan], - // plan_id: 0, - // op_struct: Some(OpStruct::Projection(spark_operator::Projection { - // project_list: vec![get_field_of_struct], - // })), - // }; - // - // // Create a physical plan - // let (mut scans, datafusion_plan) = - // planner.create_plan(&projection, &mut vec![], 1).unwrap(); - // - // // Start executing the plan in a separate thread - // // The plan waits for incoming batches and emitting result as input comes - // let mut stream = datafusion_plan.native_plan.execute(0, task_ctx).unwrap(); - // - // let runtime = tokio::runtime::Runtime::new().unwrap(); - // // create async channel - // let (tx, mut rx) = mpsc::channel(1); - // - // // Send data as input to the plan being executed in a separate thread - // runtime.spawn(async move { - // // create data batch - // // 0, 1, 2 - // // 3, 4, 5 - // // 6, null, null - // let a = Int64Array::from(vec![Some(0), Some(3), Some(6)]); - // let b = Int64Array::from(vec![Some(1), Some(4), None]); - // let c = Int64Array::from(vec![Some(2), Some(5), None]); - // let input_batch1 = InputBatch::Batch(vec![Arc::new(a), Arc::new(b), Arc::new(c)], 3); - // let input_batch2 = InputBatch::EOF; - // - // let batches = vec![input_batch1, input_batch2]; - // - // for batch in batches.into_iter() { - // tx.send(batch).await.unwrap(); - // } - // }); - // - // // Wait for the plan to finish executing and assert the result - // runtime.block_on(async move { - // loop { - // let batch = rx.recv().await.unwrap(); - // scans[0].set_input_batch(batch); - // match poll!(stream.next()) { - // Poll::Ready(Some(batch)) => { - // assert!(batch.is_ok(), "got error {}", batch.unwrap_err()); - // let batch = batch.unwrap(); - // let expected = [ - // "+-------+", - // "| col_0 |", - // "+-------+", - // "| 1 |", - // "| 4 |", - // "| |", - // "+-------+", - // ]; - // assert_batches_eq!(expected, &[batch]); - // } - // Poll::Ready(None) => { - // break; - // } - // _ => {} - // } - // } - // }); - // } - // - // #[test] - // fn test_struct_field_1() { - // use spark_expression::*; - // use spark_expression::data_type::*; - // use spark_expression::data_type::data_type_info::*; - // use spark_operator::*; - // - // let session_ctx = SessionContext::new(); - // let task_ctx = session_ctx.task_ctx(); - // let planner = PhysicalPlanner::new(Arc::from(session_ctx)); - // - // // Mock scan operator with 3 INT32 columns - // let op_scan = Operator { - // plan_id: 0, - // children: vec![], - // op_struct: Some(OpStruct::NativeScan(NativeScan { - // fields: vec![DataType { - // type_id: i32::from(DataTypeId::List), - // type_info: Some( - // Box::from(DataTypeInfo { - // datatype_struct: Some( - // DatatypeStruct::List( - // Box::from(ListInfo { - // element_type: Some( - // Box::from(DataType { - // type_id: i32::from(DataTypeId::Struct), - // type_info: Some( - // Box::from(DataTypeInfo { - // datatype_struct: Some( - // DatatypeStruct::Struct( - // StructInfo { - // field_names: vec![ - // "a".to_string(), - // "c".to_string(), - // ], - // field_datatypes: vec![ - // DataType { - // type_id: i32::from(DataTypeId::Int32), - // type_info: None, - // }, - // DataType { - // type_id: i32::from(DataTypeId::String), - // type_info: None, - // }, - // ], - // field_nullable: vec![ - // true, - // true, - // ], - // }, - // ), - // ), - // }), - // ), - // }), - // ), - // contains_null: true, - // }), - // ), - // ), - // }), - // ), - // },], - // source: "CometScan parquet".to_string(), - // required_schema: vec![ - // SparkStructField { - // name: "c0".to_string(), - // data_type: Some( - // DataType { - // type_id: i32::from(DataTypeId::List), - // type_info: Some( - // Box::from(DataTypeInfo { - // datatype_struct: Some( - // DatatypeStruct::List( - // Box::from(ListInfo { - // element_type: Some( - // Box::from(DataType { - // type_id: i32::from(DataTypeId::Struct), - // type_info: Some( - // Box::from(DataTypeInfo { - // datatype_struct: Some( - // DatatypeStruct::Struct( - // StructInfo { - // field_names: vec![ - // "a".to_string(), - // "c".to_string(), - // ], - // field_datatypes: vec![ - // DataType { - // type_id: i32::from(DataTypeId::Int32), - // type_info: None, - // }, - // DataType { - // type_id: i32::from(DataTypeId::String), - // type_info: None, - // }, - // ], - // field_nullable: vec![ - // true, - // true, - // ], - // }, - // ), - // ), - // }), - // ), - // }), - // ), - // contains_null: true, - // }), - // ), - // ), - // }), - // ), - // }, - // ), - // nullable: true, - // }, - // ], - // data_schema: vec![], - // partition_schema: vec![], - // data_filters: vec![], - // file_partitions: vec![ - // SparkFilePartition { - // partitioned_file: vec![ - // SparkPartitionedFile { - // file_path: "file:///tmp/test1/part-00000-45bd1152-e622-4056-bb59-4abd50cb92c1-c000.snappy.parquet".to_string(), - // start: 0, - // length: 1169, - // file_size: 1169, - // partition_values: vec![], - // }, - // ], - // }, - // ], - // projection_vector: vec![0], - // session_timezone: "America/Los_Angeles".to_string(), - // })), - // }; - // - // // Create a physical plan - // let (mut scans, datafusion_plan) = - // planner.create_plan(&op_scan, &mut vec![], 1).unwrap(); - // - // // Start executing the plan in a separate thread - // // The plan waits for incoming batches and emitting result as input comes - // let mut stream = datafusion_plan.native_plan.execute(0, task_ctx).unwrap(); - // - // let runtime = tokio::runtime::Runtime::new().unwrap(); - // // create async channel - // let (tx, mut rx) = mpsc::channel(1); - // - // // Send data as input to the plan being executed in a separate thread - // runtime.spawn(async move { - // // create data batch - // // 0, 1, 2 - // // 3, 4, 5 - // // 6, null, null - // let a = Int32Array::from(vec![Some(0), Some(3), Some(6)]); - // let b = Int32Array::from(vec![Some(1), Some(4), None]); - // let c = Int32Array::from(vec![Some(2), Some(5), None]); - // let input_batch1 = InputBatch::Batch(vec![Arc::new(a), Arc::new(b), Arc::new(c)], 3); - // let input_batch2 = InputBatch::EOF; - // - // let batches = vec![input_batch1, input_batch2]; - // - // for batch in batches.into_iter() { - // tx.send(batch).await.unwrap(); - // } - // }); - // - // // Wait for the plan to finish executing and assert the result - // runtime.block_on(async move { - // loop { - // let batch = rx.recv().await.unwrap(); - // scans[0].set_input_batch(batch); - // match poll!(stream.next()) { - // Poll::Ready(Some(batch)) => { - // assert!(batch.is_ok(), "got error {}", batch.unwrap_err()); - // let batch = batch.unwrap(); - // let expected = [ - // "+-------+", - // "| col_0 |", - // "+-------+", - // "| 1 |", - // "| 4 |", - // "| |", - // "+-------+", - // ]; - // assert_batches_eq!(expected, &[batch]); - // } - // Poll::Ready(None) => { - // break; - // } - // _ => {} - // } - // } - // }); - // } + /* + Testing a nested types scenario - #[test] - fn test_struct_field_2() { + select arr[0].a, arr[0].c from ( + select array(named_struct('a', 1, 'b', 'n', 'c', 'x')) arr) + */ + #[tokio::test] + async fn test_nested_types() -> Result<(), DataFusionError> { let session_ctx = SessionContext::new(); + // generate test data in the temp folder + let test_data = "select make_array(named_struct('a', 1, 'b', 'n', 'c', 'x')) c0"; + let tmp_dir = TempDir::new()?; + let test_path = tmp_dir.path().to_str().unwrap().to_string(); + + let plan = session_ctx + .sql(test_data) + .await? + .create_physical_plan() + .await?; + + // Write parquet file into temp folder + session_ctx + .write_parquet(plan, test_path.clone(), None) + .await?; + + // Define schema Comet reads with let required_schema = Schema::new(Fields::from(vec![Field::new( "c0", DataType::List( Field::new( "element", DataType::Struct(Fields::from(vec![ - Field::new("a", DataType::Int32, false), - Field::new("c", DataType::Utf8, false), + Field::new("a", DataType::Int32, true), + Field::new("c", DataType::Utf8, true), ] as Vec)), - false, + true, ) .into(), ), - false, + true, )])); - let object_store_url = ObjectStoreUrl::local_filesystem(); + // Register all parquet with temp data as file groups + let mut file_groups: Vec = vec![]; + for entry in std::fs::read_dir(&test_path)? { + let entry = entry?; + let path = entry.path(); + + if path.extension().and_then(|ext| ext.to_str()) == Some("parquet") { + if let Some(path_str) = path.to_str() { + file_groups.push(FileGroup::new(vec![PartitionedFile::from_path( + path_str.into(), + )?])); + } + } + } + let source = Arc::new( ParquetSource::default().with_schema_adapter_factory(Arc::new( SparkSchemaAdapterFactory::new(SparkParquetOptions::new(EvalMode::Ansi, "", false)), )), ); - let file_groups: Vec = vec![FileGroup::new(vec![PartitionedFile::new( - "/tmp/test1/part-00000-09e61881-4140-4f4a-bfd9-65db7b2bb0ae-c000.snappy.parquet", - 1095, - )])]; - + let object_store_url = ObjectStoreUrl::local_filesystem(); let file_scan_config = FileScanConfigBuilder::new(object_store_url, required_schema.into(), source) .with_file_groups(file_groups) .build(); - let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone()))); - let stream = scan.execute(0, session_ctx.task_ctx()).unwrap(); - let rt = Runtime::new().unwrap(); - let result: Vec<_> = rt.block_on(stream.collect()); + // Run native read + let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone()))); + let stream = scan.execute(0, session_ctx.task_ctx())?; + let result: Vec<_> = stream.collect().await; let actual = result.first().unwrap().as_ref().unwrap(); @@ -3555,5 +3179,7 @@ mod tests { "+----------------+", ]; assert_batches_eq!(expected, &[actual.clone()]); + + Ok(()) } } diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 6a26fabcc9..a72b15992c 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -316,9 +316,6 @@ pub(crate) fn prepare_object_store( #[cfg(test)] mod tests { use crate::parquet::parquet_support::prepare_object_store; - use arrow::array::{Array, Int32Builder, ListBuilder, StringBuilder, StructBuilder}; - use arrow::compute::{can_cast_types, cast_with_options, CastOptions}; - use arrow::datatypes::{DataType, Field, Fields}; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::execution::runtime_env::RuntimeEnv; use object_store::path::Path; @@ -391,72 +388,4 @@ mod tests { assert_eq!(res.0, expected.0); assert_eq!(res.1, expected.1); } - - #[test] - fn test_cast_array() { - let fields = Fields::from([ - Arc::new(Field::new("a", DataType::Int32, true)), - Arc::new(Field::new("b", DataType::Utf8, true)), - Arc::new(Field::new("c", DataType::Utf8, true)), - ]); - - let a_col_builder = Int32Builder::with_capacity(1); - let b_col_builder = StringBuilder::new(); - let c_col_builder = StringBuilder::new(); - - let struct_builder = StructBuilder::new( - fields, - vec![ - Box::new(a_col_builder), - Box::new(b_col_builder), - Box::new(c_col_builder), - ], - ); - - let mut list_builder = ListBuilder::new(struct_builder); - - let values = list_builder.values(); - values - .field_builder::(0) - .unwrap() - .append_values(&[1, 2], &[true, true]); - - values - .field_builder::(1) - .unwrap() - .extend(vec![Some("n"), Some("m")]); - - values - .field_builder::(2) - .unwrap() - .extend(vec![Some("x"), Some("y")]); - - values.append(true); - values.append(true); - list_builder.append(true); - - let array = Arc::new(list_builder.finish()); - - let to_type = DataType::List( - Field::new( - "element", - DataType::Struct(Fields::from(vec![ - Field::new("a", DataType::Int32, false), - Field::new("c", DataType::Utf8, false), - ] as Vec)), - false, - ) - .into(), - ); - - dbg!(&array); - /* - Cast [{a: 1, b: n, c: x}] -> [{a: 1, c: x}] -> [{a: 1, c: n}] - */ - let res = cast_with_options(&array.as_ref(), &to_type, &CastOptions::default()).unwrap(); - - dbg!(can_cast_types(array.data_type(), &to_type)); - - dbg!(&res); - } } From bb92d381bbde598cc122a1ab081129d4a4af148d Mon Sep 17 00:00:00 2001 From: comphead Date: Fri, 16 May 2025 09:27:44 -0700 Subject: [PATCH 6/8] fix: cast list of structs and other cast fixes --- native/core/src/execution/planner.rs | 3 --- native/core/src/parquet/parquet_support.rs | 17 ++++++++--------- .../src/struct_funcs/get_struct_field.rs | 2 -- .../org/apache/comet/serde/QueryPlanSerde.scala | 5 ----- 4 files changed, 8 insertions(+), 19 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 6e7db2e8a3..b9906b9a70 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -328,7 +328,6 @@ impl PhysicalPlanner { ))); } let field = input_schema.field(idx); - //dbg!(field); Ok(Arc::new(Column::new(field.name().as_str(), idx))) } ExprStruct::Unbound(unbound) => { @@ -1092,8 +1091,6 @@ impl PhysicalPlanner { let data_schema = convert_spark_types_to_arrow_schema(scan.data_schema.as_slice()); let required_schema: SchemaRef = convert_spark_types_to_arrow_schema(scan.required_schema.as_slice()); - // dbg!(&data_schema); - // dbg!(&required_schema); let partition_schema: SchemaRef = convert_spark_types_to_arrow_schema(scan.partition_schema.as_slice()); let projection_vector: Vec = scan diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index a72b15992c..74cb499fd6 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -158,11 +158,9 @@ fn cast_array( }; let from_type = array.data_type(); + // Try Comet specific handlers first, then arrow-rs cast if supported, + // return uncasted data otherwise match (from_type, to_type) { - // If Arrow cast supports the cast, delegate the cast to Arrow - _ if can_cast_types(from_type, to_type) => { - Ok(cast_with_options(&array, to_type, &PARQUET_OPTIONS)?) - } (Struct(_), Struct(_)) => Ok(cast_struct_to_struct( array.as_struct(), from_type, @@ -180,6 +178,10 @@ fn cast_array( .with_timezone(Arc::clone(tz)), )) } + // If Arrow cast supports the cast, delegate the cast to Arrow + _ if can_cast_types(from_type, to_type) => { + Ok(cast_with_options(&array, to_type, &PARQUET_OPTIONS)?) + } _ => Ok(array), } } @@ -192,11 +194,6 @@ fn cast_list_to_list( ) -> DataFusionResult { match (from_type, to_type) { (DataType::List(_), DataType::List(to_inner_type)) => { - //dbg!(from_type); - //dbg!(to_type); - //dbg!(from_inner_type); - //dbg!(to_inner_type); - let cast_field = cast_array( Arc::clone(array.values()), to_inner_type.data_type(), @@ -316,6 +313,8 @@ pub(crate) fn prepare_object_store( #[cfg(test)] mod tests { use crate::parquet::parquet_support::prepare_object_store; + use arrow::compute::can_cast_types; + use arrow::datatypes::DataType; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::execution::runtime_env::RuntimeEnv; use object_store::path::Path; diff --git a/native/spark-expr/src/struct_funcs/get_struct_field.rs b/native/spark-expr/src/struct_funcs/get_struct_field.rs index b92e8d2c41..6c0ef5ee33 100644 --- a/native/spark-expr/src/struct_funcs/get_struct_field.rs +++ b/native/spark-expr/src/struct_funcs/get_struct_field.rs @@ -81,8 +81,6 @@ impl PhysicalExpr for GetStructField { fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult { let child_value = self.child.evaluate(batch)?; - //dbg!(&batch); - //dbg!(&child_value); match child_value { ColumnarValue::Array(array) => { diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index ade979c585..562a4ba86e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1962,11 +1962,6 @@ object QueryPlanSerde extends Logging with CometExprShim { val childExpr = exprToProtoInternal(child, inputs, binding) val ordinalExpr = exprToProtoInternal(ordinal, inputs, binding) -// val childExpr = exprToProtoInternal(child, inputs, binding) -// val ordinalExpr = -// exprToProtoInternal(Add(Cast(ordinal, LongType), Literal(1L)), inputs, binding) -// scalarFunctionExprToProto("array_element", childExpr, ordinalExpr) - if (childExpr.isDefined && ordinalExpr.isDefined) { val listExtractBuilder = ExprOuterClass.ListExtract .newBuilder() From 14162e8b18d25ad19217ebd92a34d2dacbcdae6f Mon Sep 17 00:00:00 2001 From: comphead Date: Fri, 16 May 2025 09:45:21 -0700 Subject: [PATCH 7/8] clippy --- native/core/src/parquet/parquet_support.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 74cb499fd6..5ec3a7d3a2 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -313,8 +313,6 @@ pub(crate) fn prepare_object_store( #[cfg(test)] mod tests { use crate::parquet::parquet_support::prepare_object_store; - use arrow::compute::can_cast_types; - use arrow::datatypes::DataType; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::execution::runtime_env::RuntimeEnv; use object_store::path::Path; From 0b75dfd79eb2be32079186d769878a169b68ab38 Mon Sep 17 00:00:00 2001 From: comphead Date: Fri, 16 May 2025 14:48:59 -0700 Subject: [PATCH 8/8] clippy --- native/core/src/parquet/parquet_support.rs | 41 ++++++++-------------- 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 5ec3a7d3a2..40fbb361b6 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -167,8 +167,20 @@ fn cast_array( to_type, parquet_options, )?), - (List(_), List(_)) => { - cast_list_to_list(array.as_list(), from_type, to_type, parquet_options) + (List(_), List(to_inner_type)) => { + let list_arr: &ListArray = array.as_list(); + let cast_field = cast_array( + Arc::clone(list_arr.values()), + to_inner_type.data_type(), + parquet_options, + )?; + + Ok(Arc::new(ListArray::new( + Arc::clone(to_inner_type), + list_arr.offsets().clone(), + cast_field, + list_arr.nulls().cloned(), + ))) } (Timestamp(TimeUnit::Microsecond, None), Timestamp(TimeUnit::Microsecond, Some(tz))) => { Ok(Arc::new( @@ -186,31 +198,6 @@ fn cast_array( } } -fn cast_list_to_list( - array: &ListArray, - from_type: &DataType, - to_type: &DataType, - parquet_options: &SparkParquetOptions, -) -> DataFusionResult { - match (from_type, to_type) { - (DataType::List(_), DataType::List(to_inner_type)) => { - let cast_field = cast_array( - Arc::clone(array.values()), - to_inner_type.data_type(), - parquet_options, - )?; - - Ok(Arc::new(ListArray::new( - Arc::clone(to_inner_type), - array.offsets().clone(), - cast_field, - array.nulls().cloned(), - ))) - } - _ => unreachable!(), - } -} - /// Cast between struct types based on logic in /// `org.apache.spark.sql.catalyst.expressions.Cast#castStruct`. fn cast_struct_to_struct(