Skip to content

Commit e4e9e57

Browse files
committed
Should fail VAR(DISTINCT) but doesn't
1 parent 2b6ccd3 commit e4e9e57

File tree

5 files changed

+15
-85
lines changed

5 files changed

+15
-85
lines changed

datafusion/functions-aggregate/src/variance.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use arrow::{
2828
use datafusion_common::{downcast_value, plan_err, DataFusionError, Result, ScalarValue};
2929
use datafusion_expr::{
3030
function::{AccumulatorArgs, StateFieldsArgs},
31-
type_coercion::aggregates::NUMERICS,
3231
utils::format_state_name,
3332
Accumulator, AggregateUDFImpl, Signature, Volatility,
3433
};
@@ -65,8 +64,8 @@ impl Default for VarianceSample {
6564
impl VarianceSample {
6665
pub fn new() -> Self {
6766
Self {
68-
aliases: vec![String::from("var")],
69-
signature: Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable),
67+
aliases: vec![String::from("var_sample")],
68+
signature: Signature::numeric(1, Volatility::Immutable),
7069
}
7170
}
7271
}
@@ -251,4 +250,8 @@ impl Accumulator for VarianceAccumulator {
251250
fn size(&self) -> usize {
252251
std::mem::size_of_val(self)
253252
}
253+
254+
fn supports_retract_batch(&self) -> bool {
255+
true
256+
}
254257
}

datafusion/physical-expr/src/aggregate/build_in.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -742,8 +742,6 @@ mod tests {
742742
Ok(())
743743
}
744744

745-
// TODO (yyin): Add back test
746-
747745
#[test]
748746
fn test_var_pop_expr() -> Result<()> {
749747
let funcs = vec![AggregateFunction::VariancePop];
@@ -1012,8 +1010,6 @@ mod tests {
10121010
assert!(observed.is_err());
10131011
}
10141012

1015-
// TODO (yyin): Add back tests to sqllogictest
1016-
10171013
#[test]
10181014
fn test_stddev_return_type() -> Result<()> {
10191015
let observed = AggregateFunction::Stddev.return_type(&[DataType::Float32])?;

datafusion/physical-expr/src/aggregate/variance.rs

Lines changed: 0 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -35,88 +35,13 @@ use datafusion_common::downcast_value;
3535
use datafusion_common::{DataFusionError, Result, ScalarValue};
3636
use datafusion_expr::Accumulator;
3737

38-
/// VAR and VAR_SAMP aggregate expression
39-
#[derive(Debug)]
40-
pub struct Variance {
41-
name: String,
42-
expr: Arc<dyn PhysicalExpr>,
43-
}
44-
4538
/// VAR_POP aggregate expression
4639
#[derive(Debug)]
4740
pub struct VariancePop {
4841
name: String,
4942
expr: Arc<dyn PhysicalExpr>,
5043
}
5144

52-
impl Variance {
53-
/// Create a new VARIANCE aggregate function
54-
pub fn new(
55-
expr: Arc<dyn PhysicalExpr>,
56-
name: impl Into<String>,
57-
data_type: DataType,
58-
) -> Self {
59-
// the result of variance just support FLOAT64 data type.
60-
assert!(matches!(data_type, DataType::Float64));
61-
Self {
62-
name: name.into(),
63-
expr,
64-
}
65-
}
66-
}
67-
68-
impl AggregateExpr for Variance {
69-
/// Return a reference to Any that can be used for downcasting
70-
fn as_any(&self) -> &dyn Any {
71-
self
72-
}
73-
74-
fn field(&self) -> Result<Field> {
75-
Ok(Field::new(&self.name, DataType::Float64, true))
76-
}
77-
78-
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
79-
Ok(Box::new(VarianceAccumulator::try_new(StatsType::Sample)?))
80-
}
81-
82-
fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
83-
Ok(Box::new(VarianceAccumulator::try_new(StatsType::Sample)?))
84-
}
85-
86-
fn state_fields(&self) -> Result<Vec<Field>> {
87-
Ok(vec![
88-
Field::new(
89-
format_state_name(&self.name, "count"),
90-
DataType::UInt64,
91-
true,
92-
),
93-
Field::new(
94-
format_state_name(&self.name, "mean"),
95-
DataType::Float64,
96-
true,
97-
),
98-
Field::new(format_state_name(&self.name, "m2"), DataType::Float64, true),
99-
])
100-
}
101-
102-
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
103-
vec![self.expr.clone()]
104-
}
105-
106-
fn name(&self) -> &str {
107-
&self.name
108-
}
109-
}
110-
111-
impl PartialEq<dyn Any> for Variance {
112-
fn eq(&self, other: &dyn Any) -> bool {
113-
down_cast_any_ref(other)
114-
.downcast_ref::<Self>()
115-
.map(|x| self.name == x.name && self.expr.eq(&x.expr))
116-
.unwrap_or(false)
117-
}
118-
}
119-
12045
impl VariancePop {
12146
/// Create a new VAR_POP aggregate function
12247
pub fn new(

datafusion/physical-expr/src/expressions/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ pub use crate::aggregate::stddev::{Stddev, StddevPop};
6060
pub use crate::aggregate::string_agg::StringAgg;
6161
pub use crate::aggregate::sum::Sum;
6262
pub use crate::aggregate::sum_distinct::DistinctSum;
63-
pub use crate::aggregate::variance::{Variance, VariancePop};
63+
pub use crate::aggregate::variance::VariancePop;
6464
pub use crate::window::cume_dist::{cume_dist, CumeDist};
6565
pub use crate::window::lead_lag::{lag, lead, WindowShift};
6666
pub use crate::window::nth_value::NthValue;

datafusion/sqllogictest/Cargo.toml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ bigdecimal = { workspace = true }
4040
bytes = { workspace = true, optional = true }
4141
chrono = { workspace = true, optional = true }
4242
clap = { version = "4.4.8", features = ["derive", "env"] }
43-
datafusion = { workspace = true, default-features = true }
43+
datafusion = { workspace = true, default-features = true, features = ["avro"] }
4444
datafusion-common = { workspace = true, default-features = true }
4545
datafusion-common-runtime = { workspace = true, default-features = true }
4646
futures = { workspace = true }
@@ -60,7 +60,13 @@ tokio-postgres = { version = "0.7.7", optional = true }
6060

6161
[features]
6262
avro = ["datafusion/avro"]
63-
postgres = ["bytes", "chrono", "tokio-postgres", "postgres-types", "postgres-protocol"]
63+
postgres = [
64+
"bytes",
65+
"chrono",
66+
"tokio-postgres",
67+
"postgres-types",
68+
"postgres-protocol",
69+
]
6470

6571
[dev-dependencies]
6672
env_logger = { workspace = true }

0 commit comments

Comments
 (0)