From e4191dc7b76c00b319a7f04b0fdc452d6b2886eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 26 Nov 2023 19:13:21 +0000 Subject: [PATCH] Update to DataFusion 33 (#900) * Update to DataFusion 33 * Test fixes * Fix * Upgrade crates * Tests * Tests * Use parquet file instead --- Cargo.toml | 14 ++-- ballista/client/src/columnar_batch.rs | 8 ++- ballista/client/src/context.rs | 34 +++++----- ballista/core/Cargo.toml | 2 +- .../src/execution_plans/distributed_query.rs | 4 +- .../src/execution_plans/shuffle_reader.rs | 68 +++++++++---------- .../src/execution_plans/shuffle_writer.rs | 2 +- .../src/execution_plans/unresolved_shuffle.rs | 4 +- ballista/executor/src/collect.rs | 2 +- ballista/executor/src/executor.rs | 6 +- ballista/scheduler/Cargo.toml | 6 +- ballista/scheduler/src/flight_sql.rs | 5 +- examples/examples/standalone-sql.rs | 10 +-- 13 files changed, 84 insertions(+), 81 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6f34fa069..e4b5f324a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,16 +29,16 @@ members = [ resolver = "2" [workspace.dependencies] -arrow = { version = "47.0.0" } -arrow-flight = { version = "47.0.0", features = ["flight-sql-experimental"] } -arrow-schema = { version = "47.0.0", default-features = false } +arrow = { version = "48.0.0" } +arrow-flight = { version = "48.0.0", features = ["flight-sql-experimental"] } +arrow-schema = { version = "48.0.0", default-features = false } configure_me = { version = "0.4.0" } configure_me_codegen = { version = "0.4.4" } -datafusion = "32.0.0" -datafusion-cli = "32.0.0" -datafusion-proto = "32.0.0" +datafusion = "33.0.0" +datafusion-cli = "33.0.0" +datafusion-proto = "33.0.0" object_store = "0.7.0" -sqlparser = "0.38.0" +sqlparser = "0.39.0" tonic = { version = "0.10" } tonic-build = { version = "0.10", default-features = false, features = [ "transport", diff --git a/ballista/client/src/columnar_batch.rs b/ballista/client/src/columnar_batch.rs index 3431f5612..5e7fe89b0 100644 --- a/ballista/client/src/columnar_batch.rs +++ b/ballista/client/src/columnar_batch.rs @@ -147,10 +147,12 @@ impl ColumnarValue { } } - pub fn to_arrow(&self) -> ArrayRef { + pub fn to_arrow(&self) -> Result { match self { - ColumnarValue::Columnar(array) => array.clone(), - ColumnarValue::Scalar(value, n) => value.to_array_of_size(*n), + ColumnarValue::Columnar(array) => Ok(array.clone()), + ColumnarValue::Scalar(value, n) => { + value.to_array_of_size(*n).map_err(|x| x.into()) + } } } diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs index 76c8d439f..82ca17108 100644 --- a/ballista/client/src/context.rs +++ b/ballista/client/src/context.rs @@ -839,7 +839,7 @@ mod tests { let res = df.collect().await.unwrap(); let expected = vec![ "+-------------------+", - "| VARIANCE(test.id) |", + "| VAR(test.id) |", "+-------------------+", "| 6.000000000000001 |", "+-------------------+", @@ -852,11 +852,11 @@ mod tests { .unwrap(); let res = df.collect().await.unwrap(); let expected = vec![ - "+-----------------------+", - "| VARIANCE_POP(test.id) |", - "+-----------------------+", - "| 5.250000000000001 |", - "+-----------------------+", + "+-------------------+", + "| VAR_POP(test.id) |", + "+-------------------+", + "| 5.250000000000001 |", + "+-------------------+", ]; assert_result_eq(expected, &res); @@ -867,7 +867,7 @@ mod tests { let res = df.collect().await.unwrap(); let expected = vec![ "+-------------------+", - "| VARIANCE(test.id) |", + "| VAR(test.id) |", "+-------------------+", "| 6.000000000000001 |", "+-------------------+", @@ -908,11 +908,11 @@ mod tests { .unwrap(); let res = df.collect().await.unwrap(); let expected = vec![ - "+--------------------------------------+", - "| COVARIANCE(test.id,test.tinyint_col) |", - "+--------------------------------------+", - "| 0.28571428571428586 |", - "+--------------------------------------+", + "+---------------------------------+", + "| COVAR(test.id,test.tinyint_col) |", + "+---------------------------------+", + "| 0.28571428571428586 |", + "+---------------------------------+", ]; assert_result_eq(expected, &res); @@ -922,11 +922,11 @@ mod tests { .unwrap(); let res = df.collect().await.unwrap(); let expected = vec![ - "+---------------------------------------+", - "| CORRELATION(test.id,test.tinyint_col) |", - "+---------------------------------------+", - "| 0.21821789023599245 |", - "+---------------------------------------+", + "+--------------------------------+", + "| CORR(test.id,test.tinyint_col) |", + "+--------------------------------+", + "| 0.21821789023599245 |", + "+--------------------------------+", ]; assert_result_eq(expected, &res); diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml index 31a5d4959..d8d4bfcc5 100644 --- a/ballista/core/Cargo.toml +++ b/ballista/core/Cargo.toml @@ -57,7 +57,7 @@ datafusion-proto = { workspace = true } futures = "0.3" hashbrown = "0.14" -itertools = "0.11" +itertools = "0.12" libloading = "0.8.0" log = "0.4" md-5 = { version = "^0.10.0" } diff --git a/ballista/core/src/execution_plans/distributed_query.rs b/ballista/core/src/execution_plans/distributed_query.rs index ccb26206a..13511173e 100644 --- a/ballista/core/src/execution_plans/distributed_query.rs +++ b/ballista/core/src/execution_plans/distributed_query.rs @@ -210,11 +210,11 @@ impl ExecutionPlan for DistributedQueryExec { Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) } - fn statistics(&self) -> Statistics { + fn statistics(&self) -> Result { // This execution plan sends the logical plan to the scheduler without // performing the node by node conversion to a full physical plan. // This implies that we cannot infer the statistics at this stage. - Statistics::default() + Ok(Statistics::new_unknown(&self.schema())) } } diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs b/ballista/core/src/execution_plans/shuffle_reader.rs index fa3f9f691..6a77a16e2 100644 --- a/ballista/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/core/src/execution_plans/shuffle_reader.rs @@ -16,6 +16,7 @@ // under the License. use async_trait::async_trait; +use datafusion::common::stats::Precision; use std::any::Any; use std::collections::HashMap; use std::fmt::Debug; @@ -37,8 +38,8 @@ use datafusion::error::Result; use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, - SendableRecordBatchStream, Statistics, + ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, + RecordBatchStream, SendableRecordBatchStream, Statistics, }; use futures::{Stream, StreamExt, TryStreamExt}; @@ -172,37 +173,39 @@ impl ExecutionPlan for ShuffleReaderExec { Some(self.metrics.clone_inner()) } - fn statistics(&self) -> Statistics { - stats_for_partitions( + fn statistics(&self) -> Result { + Ok(stats_for_partitions( + self.schema.fields().len(), self.partition .iter() .flatten() .map(|loc| loc.partition_stats), - ) + )) } } fn stats_for_partitions( + num_fields: usize, partition_stats: impl Iterator, ) -> Statistics { // TODO stats: add column statistics to PartitionStats - partition_stats.fold( - Statistics { - is_exact: true, - num_rows: Some(0), - total_byte_size: Some(0), - column_statistics: None, - }, - |mut acc, part| { + let (num_rows, total_byte_size) = + partition_stats.fold((Some(0), Some(0)), |(num_rows, total_byte_size), part| { // if any statistic is unkown it makes the entire statistic unkown - acc.num_rows = acc.num_rows.zip(part.num_rows).map(|(a, b)| a + b as usize); - acc.total_byte_size = acc - .total_byte_size + let num_rows = num_rows.zip(part.num_rows).map(|(a, b)| a + b as usize); + let total_byte_size = total_byte_size .zip(part.num_bytes) .map(|(a, b)| a + b as usize); - acc - }, - ) + (num_rows, total_byte_size) + }); + + Statistics { + num_rows: num_rows.map(Precision::Exact).unwrap_or(Precision::Absent), + total_byte_size: total_byte_size + .map(Precision::Exact) + .unwrap_or(Precision::Absent), + column_statistics: vec![ColumnStatistics::new_unknown(); num_fields], + } } struct LocalShuffleStream { @@ -445,13 +448,12 @@ mod tests { #[tokio::test] async fn test_stats_for_partitions_empty() { - let result = stats_for_partitions(std::iter::empty()); + let result = stats_for_partitions(0, std::iter::empty()); let exptected = Statistics { - is_exact: true, - num_rows: Some(0), - total_byte_size: Some(0), - column_statistics: None, + num_rows: Precision::Exact(0), + total_byte_size: Precision::Exact(0), + column_statistics: vec![], }; assert_eq!(result, exptected); @@ -472,13 +474,12 @@ mod tests { }, ]; - let result = stats_for_partitions(part_stats.into_iter()); + let result = stats_for_partitions(0, part_stats.into_iter()); let exptected = Statistics { - is_exact: true, - num_rows: Some(14), - total_byte_size: Some(149), - column_statistics: None, + num_rows: Precision::Exact(14), + total_byte_size: Precision::Exact(149), + column_statistics: vec![], }; assert_eq!(result, exptected); @@ -499,13 +500,12 @@ mod tests { }, ]; - let result = stats_for_partitions(part_stats.into_iter()); + let result = stats_for_partitions(0, part_stats.into_iter()); let exptected = Statistics { - is_exact: true, - num_rows: None, - total_byte_size: None, - column_statistics: None, + num_rows: Precision::Absent, + total_byte_size: Precision::Absent, + column_statistics: vec![], }; assert_eq!(result, exptected); diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index 24869b2c8..1896c2064 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -418,7 +418,7 @@ impl ExecutionPlan for ShuffleWriterExec { Some(self.metrics.clone_inner()) } - fn statistics(&self) -> Statistics { + fn statistics(&self) -> Result { self.plan.statistics() } } diff --git a/ballista/core/src/execution_plans/unresolved_shuffle.rs b/ballista/core/src/execution_plans/unresolved_shuffle.rs index 0557529f3..fe3610c41 100644 --- a/ballista/core/src/execution_plans/unresolved_shuffle.rs +++ b/ballista/core/src/execution_plans/unresolved_shuffle.rs @@ -115,9 +115,9 @@ impl ExecutionPlan for UnresolvedShuffleExec { )) } - fn statistics(&self) -> Statistics { + fn statistics(&self) -> Result { // The full statistics are computed in the `ShuffleReaderExec` node // that replaces this one once the previous stage is completed. - Statistics::default() + Ok(Statistics::new_unknown(&self.schema())) } } diff --git a/ballista/executor/src/collect.rs b/ballista/executor/src/collect.rs index 8dbccc32f..22567bca0 100644 --- a/ballista/executor/src/collect.rs +++ b/ballista/executor/src/collect.rs @@ -108,7 +108,7 @@ impl ExecutionPlan for CollectExec { })) } - fn statistics(&self) -> Statistics { + fn statistics(&self) -> Result { self.plan.statistics() } } diff --git a/ballista/executor/src/executor.rs b/ballista/executor/src/executor.rs index 4ee57eb85..60db9f00b 100644 --- a/ballista/executor/src/executor.rs +++ b/ballista/executor/src/executor.rs @@ -210,7 +210,7 @@ mod test { use crate::execution_engine::DefaultQueryStageExec; use ballista_core::serde::scheduler::PartitionId; - use datafusion::error::DataFusionError; + use datafusion::error::{DataFusionError, Result}; use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, @@ -299,8 +299,8 @@ mod test { Ok(Box::pin(NeverendingRecordBatchStream)) } - fn statistics(&self) -> Statistics { - Statistics::default() + fn statistics(&self) -> Result { + Ok(Statistics::new_unknown(&self.schema())) } } diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml index 470195ced..270c5f996 100644 --- a/ballista/scheduler/Cargo.toml +++ b/ballista/scheduler/Cargo.toml @@ -47,20 +47,20 @@ arrow-flight = { workspace = true } async-recursion = "1.0.0" async-trait = "0.1.41" ballista-core = { path = "../core", version = "0.11.0", features = ["s3"] } -base64 = { version = "0.13" } +base64 = { version = "0.21" } clap = { version = "3", features = ["derive", "cargo"] } configure_me = { workspace = true } dashmap = "5.4.0" datafusion = { workspace = true } datafusion-proto = { workspace = true } -etcd-client = { version = "0.11", optional = true } +etcd-client = { version = "0.12", optional = true } flatbuffers = { version = "23.5.26" } futures = "0.3" graphviz-rust = "0.6.1" http = "0.2" http-body = "0.4" hyper = "0.14.4" -itertools = "0.11.0" +itertools = "0.12.0" log = "0.4" object_store = { workspace = true } once_cell = { version = "1.16.0", optional = true } diff --git a/ballista/scheduler/src/flight_sql.rs b/ballista/scheduler/src/flight_sql.rs index 23a863e23..62c96966e 100644 --- a/ballista/scheduler/src/flight_sql.rs +++ b/ballista/scheduler/src/flight_sql.rs @@ -35,6 +35,7 @@ use arrow_flight::{ Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse, Location, Ticket, }; +use base64::Engine; use futures::Stream; use log::{debug, error, warn}; use std::convert::TryFrom; @@ -503,8 +504,8 @@ impl FlightSqlService for FlightSqlServiceImpl { "Auth type not implemented: {authorization}" )))?; } - let base64 = &authorization[basic.len()..]; - let bytes = base64::decode(base64) + let bytes = base64::engine::general_purpose::STANDARD + .decode(&authorization[basic.len()..]) .map_err(|_| Status::invalid_argument("authorization not parsable"))?; let str = String::from_utf8(bytes) .map_err(|_| Status::invalid_argument("authorization not parsable"))?; diff --git a/examples/examples/standalone-sql.rs b/examples/examples/standalone-sql.rs index b24a63508..0427caa7d 100644 --- a/examples/examples/standalone-sql.rs +++ b/examples/examples/standalone-sql.rs @@ -17,7 +17,7 @@ use ballista::prelude::{BallistaConfig, BallistaContext, Result}; use ballista_examples::test_util; -use datafusion::prelude::CsvReadOptions; +use datafusion::execution::options::ParquetReadOptions; #[tokio::main] async fn main() -> Result<()> { @@ -29,11 +29,11 @@ async fn main() -> Result<()> { let testdata = test_util::examples_test_data(); - // register csv file with the execution context - ctx.register_csv( + // register parquet file with the execution context + ctx.register_parquet( "test", - &format!("{testdata}/aggregate_test_100.csv"), - CsvReadOptions::new(), + &format!("{testdata}/alltypes_plain.parquet"), + ParquetReadOptions::default(), ) .await?;