Skip to content

Commit

Permalink
revert schema change v2
Browse files Browse the repository at this point in the history
  • Loading branch information
xinlifoobar committed Aug 4, 2024
1 parent 5d97345 commit 1ccc740
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 19 deletions.
3 changes: 1 addition & 2 deletions datafusion/functions-aggregate/src/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ impl AggregateUDFImpl for ArrayAgg {
}

fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
let data_type =
acc_args.input_exprs[0].data_type(acc_args.schema)?;
let data_type = acc_args.input_exprs[0].data_type(acc_args.schema)?;

if acc_args.is_distinct {
return Ok(Box::new(DistinctArrayAggAccumulator::try_new(&data_type)?));
Expand Down
3 changes: 1 addition & 2 deletions datafusion/functions-aggregate/src/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ impl AggregateUDFImpl for Avg {
}
use DataType::*;
// instantiate specialized accumulator based for the type
let input_type =
acc_args.input_exprs[0].data_type(acc_args.schema)?;
let input_type = acc_args.input_exprs[0].data_type(acc_args.schema)?;

match (&input_type, acc_args.data_type) {
(Float64, Float64) => Ok(Box::<AvgAccumulator>::default()),
Expand Down
3 changes: 1 addition & 2 deletions datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ impl AggregateUDFImpl for Count {
return not_impl_err!("COUNT DISTINCT with multiple arguments");
}

let data_type =
&acc_args.input_exprs[0].data_type(acc_args.schema)?;
let data_type = &acc_args.input_exprs[0].data_type(acc_args.schema)?;
Ok(match data_type {
// try and use a specialized accumulator if possible, otherwise fall back to generic accumulator
DataType::Int8 => Box::new(
Expand Down
16 changes: 8 additions & 8 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ mod tests {
use arrow_array::{Float32Array, Int32Array};
use datafusion_common::{
assert_batches_eq, assert_batches_sorted_eq, internal_err, DFSchema, DFSchemaRef,
DataFusionError, ScalarValue, ToDFSchema,
DataFusionError, ScalarValue,
};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::memory_pool::FairSpillPool;
Expand Down Expand Up @@ -1349,7 +1349,7 @@ mod tests {
};

let aggregates = vec![AggregateExprBuilder::new(count_udaf(), vec![lit(1i8)])
.dfschema(Arc::clone(&input_schema).to_dfschema()?)
.schema(Arc::clone(&input_schema))
.name("COUNT(1)")
.logical_exprs(vec![datafusion_expr::lit(1i8)])
.build()?];
Expand Down Expand Up @@ -1494,7 +1494,7 @@ mod tests {
let aggregates: Vec<Arc<dyn AggregateExpr>> =
vec![
AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?])
.dfschema(Arc::clone(&input_schema).to_dfschema()?)
.schema(Arc::clone(&input_schema))
.name("AVG(b)")
.build()?,
];
Expand Down Expand Up @@ -1790,7 +1790,7 @@ mod tests {
// Median(a)
fn test_median_agg_expr(schema: SchemaRef) -> Result<Arc<dyn AggregateExpr>> {
AggregateExprBuilder::new(median_udaf(), vec![col("a", &schema)?])
.dfschema(schema.to_dfschema()?)
.schema(Arc::clone(&schema))
.name("MEDIAN(a)")
.build()
}
Expand Down Expand Up @@ -1821,7 +1821,7 @@ mod tests {
let aggregates_v2: Vec<Arc<dyn AggregateExpr>> =
vec![
AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?])
.dfschema(Arc::clone(&input_schema).to_dfschema()?)
.schema(Arc::clone(&input_schema))
.name("AVG(b)")
.build()?,
];
Expand Down Expand Up @@ -1881,7 +1881,7 @@ mod tests {
let aggregates: Vec<Arc<dyn AggregateExpr>> =
vec![
AggregateExprBuilder::new(avg_udaf(), vec![col("a", &schema)?])
.dfschema(Arc::clone(&schema).to_dfschema()?)
.schema(Arc::clone(&schema))
.name("AVG(a)")
.build()?,
];
Expand Down Expand Up @@ -1921,7 +1921,7 @@ mod tests {
let aggregates: Vec<Arc<dyn AggregateExpr>> =
vec![
AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?])
.dfschema(Arc::clone(&schema).to_dfschema()?)
.schema(Arc::clone(&schema))
.name("AVG(b)")
.build()?,
];
Expand Down Expand Up @@ -2350,7 +2350,7 @@ mod tests {

let aggregates: Vec<Arc<dyn AggregateExpr>> =
vec![AggregateExprBuilder::new(count_udaf(), vec![lit(1)])
.dfschema(Arc::clone(&schema).to_dfschema()?)
.schema(Arc::clone(&schema))
.name("1")
.build()?];

Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ impl BuiltInWindowFunctionExpr for WindowUDFExpr {
}
}

#[allow(clippy::needless_borrow)]
pub(crate) fn calc_requirements<
T: Borrow<Arc<dyn PhysicalExpr>>,
S: Borrow<PhysicalSortExpr>,
Expand All @@ -413,7 +412,7 @@ pub(crate) fn calc_requirements<
let PhysicalSortExpr { expr, options } = element.borrow();
if !sort_reqs.iter().any(|e| e.expr.eq(expr)) {
sort_reqs.push(PhysicalSortRequirement::new(
Arc::clone(&expr),
Arc::clone(expr),
Some(*options),
));
}
Expand Down
4 changes: 1 addition & 3 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ use datafusion_common::file_options::csv_writer::CsvWriterOptions;
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::stats::Precision;
use datafusion_common::{
internal_err, not_impl_err, DataFusionError, Result,
};
use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
use datafusion_expr::{
Accumulator, AccumulatorFactoryFunction, AggregateUDF, ColumnarValue, ScalarUDF,
Signature, SimpleAggregateUDF, WindowFrame, WindowFrameBound,
Expand Down

0 comments on commit 1ccc740

Please sign in to comment.