Skip to content

Commit a96fd26

Browse files
committed
Bump DataFusion version
1 parent f1f4f2b commit a96fd26

File tree

14 files changed

+66
-52
lines changed

14 files changed

+66
-52
lines changed

rust/ballista/rust/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ members = [
2525
"scheduler",
2626
]
2727

28-
[profile.release]
29-
lto = true
30-
codegen-units = 1
28+
#[profile.release]
29+
#lto = true
30+
#codegen-units = 1

rust/ballista/rust/benchmarks/tpch/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ edition = "2018"
2727
[dependencies]
2828
ballista = { path="../../client" }
2929

30-
arrow = { git = "https://github.com/apache/arrow", rev="46161d2" }
31-
datafusion = { git = "https://github.com/apache/arrow", rev="46161d2" }
32-
parquet = { git = "https://github.com/apache/arrow", rev="46161d2" }
30+
arrow = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
31+
datafusion = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
32+
parquet = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
3333

3434

3535
env_logger = "0.8"

rust/ballista/rust/client/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,5 @@ ballista-core = { path = "../core" }
3030
futures = "0.3"
3131
log = "0.4"
3232
tokio = "1.0"
33-
arrow = { git = "https://github.com/apache/arrow", rev="46161d2" }
34-
datafusion = { git = "https://github.com/apache/arrow", rev="46161d2" }
33+
arrow = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
34+
datafusion = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }

rust/ballista/rust/client/src/context.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use ballista_core::{
3636
};
3737

3838
use arrow::datatypes::Schema;
39+
use datafusion::catalog::TableReference;
3940
use datafusion::execution::context::ExecutionContext;
4041
use datafusion::logical_plan::{DFSchema, Expr, LogicalPlan, Partitioning};
4142
use datafusion::physical_plan::csv::CsvReadOptions;
@@ -148,7 +149,10 @@ impl BallistaContext {
148149
for (name, plan) in &state.tables {
149150
let plan = ctx.optimize(plan)?;
150151
let execution_plan = ctx.create_physical_plan(&plan)?;
151-
ctx.register_table(name, Arc::new(DFTableAdapter::new(plan, execution_plan)));
152+
ctx.register_table(
153+
TableReference::Bare { table: name },
154+
Arc::new(DFTableAdapter::new(plan, execution_plan)),
155+
)?;
152156
}
153157
let df = ctx.sql(sql)?;
154158
Ok(BallistaDataFrame::from(self.state.clone(), df))
@@ -267,7 +271,7 @@ impl BallistaDataFrame {
267271
))
268272
}
269273

270-
pub fn select(&self, expr: &[Expr]) -> Result<BallistaDataFrame> {
274+
pub fn select(&self, expr: Vec<Expr>) -> Result<BallistaDataFrame> {
271275
Ok(Self::from(
272276
self.state.clone(),
273277
self.df.select(expr).map_err(BallistaError::from)?,
@@ -283,8 +287,8 @@ impl BallistaDataFrame {
283287

284288
pub fn aggregate(
285289
&self,
286-
group_expr: &[Expr],
287-
aggr_expr: &[Expr],
290+
group_expr: Vec<Expr>,
291+
aggr_expr: Vec<Expr>,
288292
) -> Result<BallistaDataFrame> {
289293
Ok(Self::from(
290294
self.state.clone(),
@@ -301,7 +305,7 @@ impl BallistaDataFrame {
301305
))
302306
}
303307

304-
pub fn sort(&self, expr: &[Expr]) -> Result<BallistaDataFrame> {
308+
pub fn sort(&self, expr: Vec<Expr>) -> Result<BallistaDataFrame> {
305309
Ok(Self::from(
306310
self.state.clone(),
307311
self.df.sort(expr).map_err(BallistaError::from)?,

rust/ballista/rust/core/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ sqlparser = "0.8"
3838
tokio = "1.0"
3939
tonic = "0.4"
4040
uuid = { version = "0.8", features = ["v4"] }
41-
arrow = { git = "https://github.com/apache/arrow", rev="46161d2" }
42-
arrow-flight = { git = "https://github.com/apache/arrow", rev="46161d2" }
43-
datafusion = { git = "https://github.com/apache/arrow", rev="46161d2" }
41+
arrow = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
42+
arrow-flight = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
43+
datafusion = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
4444

4545

4646
[dev-dependencies]

rust/ballista/rust/core/src/datasource.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ impl TableProvider for DFTableAdapter {
5757
_projection: &Option<Vec<usize>>,
5858
_batch_size: usize,
5959
_filters: &[Expr],
60+
_limit: Option<usize>,
6061
) -> DFResult<Arc<dyn ExecutionPlan>> {
6162
Ok(self.plan.clone())
6263
}

rust/ballista/rust/core/src/serde/logical_plan/from_proto.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,13 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
5252
match plan {
5353
LogicalPlanType::Projection(projection) => {
5454
let input: LogicalPlan = convert_box_required!(projection.input)?;
55+
let x: Vec<Expr> = projection
56+
.expr
57+
.iter()
58+
.map(|expr| expr.try_into())
59+
.collect::<Result<Vec<_>, _>>()?;
5560
LogicalPlanBuilder::from(&input)
56-
.project(
57-
&projection
58-
.expr
59-
.iter()
60-
.map(|expr| expr.try_into())
61-
.collect::<Result<Vec<_>, _>>()?,
62-
)?
61+
.project(x)?
6362
.build()
6463
.map_err(|e| e.into())
6564
}
@@ -89,7 +88,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
8988
.map(|expr| expr.try_into())
9089
.collect::<Result<Vec<_>, _>>()?;
9190
LogicalPlanBuilder::from(&input)
92-
.aggregate(&group_expr, &aggr_expr)?
91+
.aggregate(group_expr, aggr_expr)?
9392
.build()
9493
.map_err(|e| e.into())
9594
}
@@ -148,7 +147,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
148147
.map(|expr| expr.try_into())
149148
.collect::<Result<Vec<Expr>, _>>()?;
150149
LogicalPlanBuilder::from(&input)
151-
.sort(&sort_expr)?
150+
.sort(sort_expr)?
152151
.build()
153152
.map_err(|e| e.into())
154153
}

rust/ballista/rust/core/src/serde/logical_plan/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ mod roundtrip_tests {
8282
CsvReadOptions::new().schema(&schema).has_header(true),
8383
Some(vec![3, 4]),
8484
)
85-
.and_then(|plan| plan.sort(&[col("salary")]))
85+
.and_then(|plan| plan.sort(vec![col("salary")]))
8686
.and_then(|plan| plan.build())
8787
.map_err(BallistaError::DataFusionError)?,
8888
);
@@ -679,7 +679,7 @@ mod roundtrip_tests {
679679
CsvReadOptions::new().schema(&schema).has_header(true),
680680
Some(vec![3, 4]),
681681
)
682-
.and_then(|plan| plan.sort(&[col("salary")]))
682+
.and_then(|plan| plan.sort(vec![col("salary")]))
683683
.and_then(|plan| plan.explain(true))
684684
.and_then(|plan| plan.build())
685685
.map_err(BallistaError::DataFusionError)?;
@@ -689,7 +689,7 @@ mod roundtrip_tests {
689689
CsvReadOptions::new().schema(&schema).has_header(true),
690690
Some(vec![3, 4]),
691691
)
692-
.and_then(|plan| plan.sort(&[col("salary")]))
692+
.and_then(|plan| plan.sort(vec![col("salary")]))
693693
.and_then(|plan| plan.explain(false))
694694
.and_then(|plan| plan.build())
695695
.map_err(BallistaError::DataFusionError)?;
@@ -742,7 +742,7 @@ mod roundtrip_tests {
742742
CsvReadOptions::new().schema(&schema).has_header(true),
743743
Some(vec![3, 4]),
744744
)
745-
.and_then(|plan| plan.sort(&[col("salary")]))
745+
.and_then(|plan| plan.sort(vec![col("salary")]))
746746
.and_then(|plan| plan.build())
747747
.map_err(BallistaError::DataFusionError)?;
748748
roundtrip_test!(plan);
@@ -784,7 +784,7 @@ mod roundtrip_tests {
784784
CsvReadOptions::new().schema(&schema).has_header(true),
785785
Some(vec![3, 4]),
786786
)
787-
.and_then(|plan| plan.aggregate(&[col("state")], &[max(col("salary"))]))
787+
.and_then(|plan| plan.aggregate(vec![col("state")], vec![max(col("salary"))]))
788788
.and_then(|plan| plan.build())
789789
.map_err(BallistaError::DataFusionError)?;
790790

rust/ballista/rust/core/src/serde/logical_plan/to_proto.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -939,10 +939,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
939939
})
940940
}
941941
LogicalPlan::Extension { .. } => unimplemented!(),
942-
// _ => Err(BallistaError::General(format!(
943-
// "logical plan to_proto {:?}",
944-
// self
945-
// ))),
942+
LogicalPlan::Union { .. } => unimplemented!(),
946943
}
947944
}
948945
}
@@ -1161,10 +1158,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
11611158
Expr::Wildcard => Ok(protobuf::LogicalExprNode {
11621159
expr_type: Some(protobuf::logical_expr_node::ExprType::Wildcard(true)),
11631160
}),
1164-
// _ => Err(BallistaError::General(format!(
1165-
// "logical expr to_proto {:?}",
1166-
// self
1167-
// ))),
1161+
Expr::TryCast { .. } => unimplemented!(),
11681162
}
11691163
}
11701164
}

rust/ballista/rust/core/src/serde/physical_plan/from_proto.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,15 @@ use crate::serde::{proto_error, protobuf};
3030
use crate::{convert_box_required, convert_required};
3131

3232
use arrow::datatypes::{DataType, Schema, SchemaRef};
33+
use datafusion::catalog::catalog::{
34+
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
35+
};
3336
use datafusion::execution::context::{ExecutionConfig, ExecutionContextState};
3437
use datafusion::logical_plan::{DFSchema, Expr};
3538
use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
3639
use datafusion::physical_plan::expressions::col;
3740
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
41+
use datafusion::physical_plan::hash_join::PartitionMode;
3842
use datafusion::physical_plan::merge::MergeExec;
3943
use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
4044
use datafusion::physical_plan::{
@@ -111,6 +115,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
111115
options,
112116
Some(projection),
113117
batch_size,
118+
None,
114119
)?))
115120
}
116121
PhysicalPlanType::ParquetScan(scan) => {
@@ -123,6 +128,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
123128
None,
124129
scan.batch_size as usize,
125130
scan.num_partitions as usize,
131+
None,
126132
)?))
127133
}
128134
PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
@@ -215,8 +221,10 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
215221
.collect::<Result<Vec<_>, _>>()?;
216222

217223
let df_planner = DefaultPhysicalPlanner::default();
224+
let catalog_list =
225+
Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
218226
let ctx_state = ExecutionContextState {
219-
datasources: Default::default(),
227+
catalog_list,
220228
scalar_functions: Default::default(),
221229
var_provider: Default::default(),
222230
aggregate_functions: Default::default(),
@@ -294,7 +302,11 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
294302
protobuf::JoinType::Right => JoinType::Right,
295303
};
296304
Ok(Arc::new(HashJoinExec::try_new(
297-
left, right, &on, &join_type,
305+
left,
306+
right,
307+
&on,
308+
&join_type,
309+
PartitionMode::CollectLeft,
298310
)?))
299311
}
300312
PhysicalPlanType::ShuffleReader(shuffle_reader) => {
@@ -374,8 +386,9 @@ fn compile_expr(
374386
schema: &Schema,
375387
) -> Result<Arc<dyn PhysicalExpr>, BallistaError> {
376388
let df_planner = DefaultPhysicalPlanner::default();
389+
let catalog_list = Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
377390
let state = ExecutionContextState {
378-
datasources: HashMap::new(),
391+
catalog_list,
379392
scalar_functions: HashMap::new(),
380393
var_provider: HashMap::new(),
381394
aggregate_functions: HashMap::new(),

rust/ballista/rust/core/src/serde/physical_plan/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ mod roundtrip_tests {
4040

4141
use super::super::super::error::Result;
4242
use super::super::protobuf;
43+
use datafusion::physical_plan::hash_join::PartitionMode;
4344

4445
fn roundtrip_test(exec_plan: Arc<dyn ExecutionPlan>) -> Result<()> {
4546
let proto: protobuf::PhysicalPlanNode = exec_plan.clone().try_into()?;
@@ -84,6 +85,7 @@ mod roundtrip_tests {
8485
Arc::new(EmptyExec::new(false, Arc::new(schema_right))),
8586
&[("col".to_string(), "col".to_string())],
8687
&JoinType::Inner,
88+
PartitionMode::CollectLeft,
8789
)?))
8890
}
8991

rust/ballista/rust/executor/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ tokio-stream = "0.1"
4545
tonic = "0.4"
4646
uuid = { version = "0.8", features = ["v4"] }
4747

48-
arrow = { git = "https://github.com/apache/arrow", rev="46161d2" }
49-
arrow-flight = { git = "https://github.com/apache/arrow", rev="46161d2" }
50-
datafusion = { git = "https://github.com/apache/arrow", rev="46161d2" }
48+
arrow = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
49+
arrow-flight = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
50+
datafusion = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
5151

5252
[dev-dependencies]
5353

rust/ballista/rust/scheduler/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ sled_package = { package = "sled", version = "0.34", optional = true }
4747
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
4848
tonic = "0.4"
4949

50-
arrow = { git = "https://github.com/apache/arrow", rev="46161d2" }
51-
datafusion = { git = "https://github.com/apache/arrow", rev="46161d2" }
50+
arrow = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
51+
datafusion = { git = "https://github.com/apache/arrow", rev="f1f4f2b" }
5252

5353
[dev-dependencies]
5454
ballista-core = { path = "../core" }

rust/ballista/rust/scheduler/src/lib.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -199,12 +199,13 @@ impl SchedulerGrpc for SchedulerServer {
199199

200200
match file_type {
201201
FileType::Parquet => {
202-
let parquet_exec = ParquetExec::try_from_path(&path, None, None, 1024, 1)
203-
.map_err(|e| {
204-
let msg = format!("Error opening parquet files: {}", e);
205-
error!("{}", msg);
206-
tonic::Status::internal(msg)
207-
})?;
202+
let parquet_exec =
203+
ParquetExec::try_from_path(&path, None, None, 1024, 1, None)
204+
.map_err(|e| {
205+
let msg = format!("Error opening parquet files: {}", e);
206+
error!("{}", msg);
207+
tonic::Status::internal(msg)
208+
})?;
208209

209210
//TODO include statistics and any other info needed to reconstruct ParquetExec
210211
Ok(Response::new(GetFileMetadataResult {

0 commit comments

Comments
 (0)