Skip to content

Commit

Permalink
Update to DataFusion 33 (#900)
Browse files Browse the repository at this point in the history
* Update to DataFusion 33

* Test fixes

* Fix

* Upgrade crates

* Tests

* Tests

* Use parquet file instead
  • Loading branch information
Dandandan authored Nov 26, 2023
1 parent c0561ed commit e4191dc
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 81 deletions.
14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ members = [
resolver = "2"

[workspace.dependencies]
arrow = { version = "47.0.0" }
arrow-flight = { version = "47.0.0", features = ["flight-sql-experimental"] }
arrow-schema = { version = "47.0.0", default-features = false }
arrow = { version = "48.0.0" }
arrow-flight = { version = "48.0.0", features = ["flight-sql-experimental"] }
arrow-schema = { version = "48.0.0", default-features = false }
configure_me = { version = "0.4.0" }
configure_me_codegen = { version = "0.4.4" }
datafusion = "32.0.0"
datafusion-cli = "32.0.0"
datafusion-proto = "32.0.0"
datafusion = "33.0.0"
datafusion-cli = "33.0.0"
datafusion-proto = "33.0.0"
object_store = "0.7.0"
sqlparser = "0.38.0"
sqlparser = "0.39.0"
tonic = { version = "0.10" }
tonic-build = { version = "0.10", default-features = false, features = [
"transport",
Expand Down
8 changes: 5 additions & 3 deletions ballista/client/src/columnar_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,12 @@ impl ColumnarValue {
}
}

pub fn to_arrow(&self) -> ArrayRef {
pub fn to_arrow(&self) -> Result<ArrayRef> {
match self {
ColumnarValue::Columnar(array) => array.clone(),
ColumnarValue::Scalar(value, n) => value.to_array_of_size(*n),
ColumnarValue::Columnar(array) => Ok(array.clone()),
ColumnarValue::Scalar(value, n) => {
value.to_array_of_size(*n).map_err(|x| x.into())
}
}
}

Expand Down
34 changes: 17 additions & 17 deletions ballista/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ mod tests {
let res = df.collect().await.unwrap();
let expected = vec![
"+-------------------+",
"| VARIANCE(test.id) |",
"| VAR(test.id) |",
"+-------------------+",
"| 6.000000000000001 |",
"+-------------------+",
Expand All @@ -852,11 +852,11 @@ mod tests {
.unwrap();
let res = df.collect().await.unwrap();
let expected = vec![
"+-----------------------+",
"| VARIANCE_POP(test.id) |",
"+-----------------------+",
"| 5.250000000000001 |",
"+-----------------------+",
"+-------------------+",
"| VAR_POP(test.id) |",
"+-------------------+",
"| 5.250000000000001 |",
"+-------------------+",
];
assert_result_eq(expected, &res);

Expand All @@ -867,7 +867,7 @@ mod tests {
let res = df.collect().await.unwrap();
let expected = vec![
"+-------------------+",
"| VARIANCE(test.id) |",
"| VAR(test.id) |",
"+-------------------+",
"| 6.000000000000001 |",
"+-------------------+",
Expand Down Expand Up @@ -908,11 +908,11 @@ mod tests {
.unwrap();
let res = df.collect().await.unwrap();
let expected = vec![
"+--------------------------------------+",
"| COVARIANCE(test.id,test.tinyint_col) |",
"+--------------------------------------+",
"| 0.28571428571428586 |",
"+--------------------------------------+",
"+---------------------------------+",
"| COVAR(test.id,test.tinyint_col) |",
"+---------------------------------+",
"| 0.28571428571428586 |",
"+---------------------------------+",
];
assert_result_eq(expected, &res);

Expand All @@ -922,11 +922,11 @@ mod tests {
.unwrap();
let res = df.collect().await.unwrap();
let expected = vec![
"+---------------------------------------+",
"| CORRELATION(test.id,test.tinyint_col) |",
"+---------------------------------------+",
"| 0.21821789023599245 |",
"+---------------------------------------+",
"+--------------------------------+",
"| CORR(test.id,test.tinyint_col) |",
"+--------------------------------+",
"| 0.21821789023599245 |",
"+--------------------------------+",
];
assert_result_eq(expected, &res);

Expand Down
2 changes: 1 addition & 1 deletion ballista/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ datafusion-proto = { workspace = true }
futures = "0.3"
hashbrown = "0.14"

itertools = "0.11"
itertools = "0.12"
libloading = "0.8.0"
log = "0.4"
md-5 = { version = "^0.10.0" }
Expand Down
4 changes: 2 additions & 2 deletions ballista/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,11 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}

fn statistics(&self) -> Statistics {
fn statistics(&self) -> Result<Statistics> {
// This execution plan sends the logical plan to the scheduler without
// performing the node by node conversion to a full physical plan.
// This implies that we cannot infer the statistics at this stage.
Statistics::default()
Ok(Statistics::new_unknown(&self.schema()))
}
}

Expand Down
68 changes: 34 additions & 34 deletions ballista/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use async_trait::async_trait;
use datafusion::common::stats::Precision;
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
Expand All @@ -37,8 +38,8 @@ use datafusion::error::Result;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use futures::{Stream, StreamExt, TryStreamExt};

Expand Down Expand Up @@ -172,37 +173,39 @@ impl ExecutionPlan for ShuffleReaderExec {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Statistics {
stats_for_partitions(
fn statistics(&self) -> Result<Statistics> {
Ok(stats_for_partitions(
self.schema.fields().len(),
self.partition
.iter()
.flatten()
.map(|loc| loc.partition_stats),
)
))
}
}

fn stats_for_partitions(
num_fields: usize,
partition_stats: impl Iterator<Item = PartitionStats>,
) -> Statistics {
// TODO stats: add column statistics to PartitionStats
partition_stats.fold(
Statistics {
is_exact: true,
num_rows: Some(0),
total_byte_size: Some(0),
column_statistics: None,
},
|mut acc, part| {
let (num_rows, total_byte_size) =
partition_stats.fold((Some(0), Some(0)), |(num_rows, total_byte_size), part| {
// if any statistic is unkown it makes the entire statistic unkown
acc.num_rows = acc.num_rows.zip(part.num_rows).map(|(a, b)| a + b as usize);
acc.total_byte_size = acc
.total_byte_size
let num_rows = num_rows.zip(part.num_rows).map(|(a, b)| a + b as usize);
let total_byte_size = total_byte_size
.zip(part.num_bytes)
.map(|(a, b)| a + b as usize);
acc
},
)
(num_rows, total_byte_size)
});

Statistics {
num_rows: num_rows.map(Precision::Exact).unwrap_or(Precision::Absent),
total_byte_size: total_byte_size
.map(Precision::Exact)
.unwrap_or(Precision::Absent),
column_statistics: vec![ColumnStatistics::new_unknown(); num_fields],
}
}

struct LocalShuffleStream {
Expand Down Expand Up @@ -445,13 +448,12 @@ mod tests {

#[tokio::test]
async fn test_stats_for_partitions_empty() {
let result = stats_for_partitions(std::iter::empty());
let result = stats_for_partitions(0, std::iter::empty());

let exptected = Statistics {
is_exact: true,
num_rows: Some(0),
total_byte_size: Some(0),
column_statistics: None,
num_rows: Precision::Exact(0),
total_byte_size: Precision::Exact(0),
column_statistics: vec![],
};

assert_eq!(result, exptected);
Expand All @@ -472,13 +474,12 @@ mod tests {
},
];

let result = stats_for_partitions(part_stats.into_iter());
let result = stats_for_partitions(0, part_stats.into_iter());

let exptected = Statistics {
is_exact: true,
num_rows: Some(14),
total_byte_size: Some(149),
column_statistics: None,
num_rows: Precision::Exact(14),
total_byte_size: Precision::Exact(149),
column_statistics: vec![],
};

assert_eq!(result, exptected);
Expand All @@ -499,13 +500,12 @@ mod tests {
},
];

let result = stats_for_partitions(part_stats.into_iter());
let result = stats_for_partitions(0, part_stats.into_iter());

let exptected = Statistics {
is_exact: true,
num_rows: None,
total_byte_size: None,
column_statistics: None,
num_rows: Precision::Absent,
total_byte_size: Precision::Absent,
column_statistics: vec![],
};

assert_eq!(result, exptected);
Expand Down
2 changes: 1 addition & 1 deletion ballista/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ impl ExecutionPlan for ShuffleWriterExec {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Statistics {
fn statistics(&self) -> Result<Statistics> {
self.plan.statistics()
}
}
Expand Down
4 changes: 2 additions & 2 deletions ballista/core/src/execution_plans/unresolved_shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ impl ExecutionPlan for UnresolvedShuffleExec {
))
}

fn statistics(&self) -> Statistics {
fn statistics(&self) -> Result<Statistics> {
// The full statistics are computed in the `ShuffleReaderExec` node
// that replaces this one once the previous stage is completed.
Statistics::default()
Ok(Statistics::new_unknown(&self.schema()))
}
}
2 changes: 1 addition & 1 deletion ballista/executor/src/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl ExecutionPlan for CollectExec {
}))
}

fn statistics(&self) -> Statistics {
fn statistics(&self) -> Result<Statistics> {
self.plan.statistics()
}
}
Expand Down
6 changes: 3 additions & 3 deletions ballista/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ mod test {

use crate::execution_engine::DefaultQueryStageExec;
use ballista_core::serde::scheduler::PartitionId;
use datafusion::error::DataFusionError;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
Expand Down Expand Up @@ -299,8 +299,8 @@ mod test {
Ok(Box::pin(NeverendingRecordBatchStream))
}

fn statistics(&self) -> Statistics {
Statistics::default()
fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
}

Expand Down
6 changes: 3 additions & 3 deletions ballista/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,20 @@ arrow-flight = { workspace = true }
async-recursion = "1.0.0"
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.11.0", features = ["s3"] }
base64 = { version = "0.13" }
base64 = { version = "0.21" }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = { workspace = true }
dashmap = "5.4.0"
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
etcd-client = { version = "0.11", optional = true }
etcd-client = { version = "0.12", optional = true }
flatbuffers = { version = "23.5.26" }
futures = "0.3"
graphviz-rust = "0.6.1"
http = "0.2"
http-body = "0.4"
hyper = "0.14.4"
itertools = "0.11.0"
itertools = "0.12.0"
log = "0.4"
object_store = { workspace = true }
once_cell = { version = "1.16.0", optional = true }
Expand Down
5 changes: 3 additions & 2 deletions ballista/scheduler/src/flight_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use arrow_flight::{
Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest,
HandshakeResponse, Location, Ticket,
};
use base64::Engine;
use futures::Stream;
use log::{debug, error, warn};
use std::convert::TryFrom;
Expand Down Expand Up @@ -503,8 +504,8 @@ impl FlightSqlService for FlightSqlServiceImpl {
"Auth type not implemented: {authorization}"
)))?;
}
let base64 = &authorization[basic.len()..];
let bytes = base64::decode(base64)
let bytes = base64::engine::general_purpose::STANDARD
.decode(&authorization[basic.len()..])
.map_err(|_| Status::invalid_argument("authorization not parsable"))?;
let str = String::from_utf8(bytes)
.map_err(|_| Status::invalid_argument("authorization not parsable"))?;
Expand Down
10 changes: 5 additions & 5 deletions examples/examples/standalone-sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use ballista::prelude::{BallistaConfig, BallistaContext, Result};
use ballista_examples::test_util;
use datafusion::prelude::CsvReadOptions;
use datafusion::execution::options::ParquetReadOptions;

#[tokio::main]
async fn main() -> Result<()> {
Expand All @@ -29,11 +29,11 @@ async fn main() -> Result<()> {

let testdata = test_util::examples_test_data();

// register csv file with the execution context
ctx.register_csv(
// register parquet file with the execution context
ctx.register_parquet(
"test",
&format!("{testdata}/aggregate_test_100.csv"),
CsvReadOptions::new(),
&format!("{testdata}/alltypes_plain.parquet"),
ParquetReadOptions::default(),
)
.await?;

Expand Down

0 comments on commit e4191dc

Please sign in to comment.