Skip to content

Commit

Permalink
refactor: Fix/skip variety of new-streaming tests, cont (#18928)
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored Sep 26, 2024
1 parent bef75b9 commit 68b6f0e
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 6 deletions.
11 changes: 9 additions & 2 deletions crates/polars-expr/src/reduce/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ struct MinReduceState {

impl MinReduceState {
fn update_with_value(&mut self, other: &AnyValue<'static>) {
// AnyValue uses total ordering, so NaN is greater than any value.
// This means other < self.value.value() already ignores incoming NaNs.
// We still must check if self is NaN and if so replace.
if self.value.is_null()
|| !other.is_null() && (other < self.value.value() || self.value.is_nan())
{
Expand Down Expand Up @@ -80,8 +83,12 @@ struct MaxReduceState {

impl MaxReduceState {
fn update_with_value(&mut self, other: &AnyValue<'static>) {
// AnyValue uses total ordering, so NaN is greater than any value.
// This means other > self.value.value() might have false positives.
// We also must check if self is NaN and if so replace.
if self.value.is_null()
|| !other.is_null() && (other > self.value.value() || self.value.is_nan())
|| !other.is_null()
&& (other > self.value.value() && !other.is_nan() || self.value.is_nan())
{
self.value.update(other.clone());
}
Expand All @@ -90,7 +97,7 @@ impl MaxReduceState {

impl ReductionState for MaxReduceState {
fn update(&mut self, batch: &Series) -> PolarsResult<()> {
let sc = batch.min_reduce()?;
let sc = batch.max_reduce()?;
self.update_with_value(sc.value());
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions crates/polars-python/src/series/buffers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ where
T: PolarsNumericType,
{
let ca: &ChunkedArray<T> = s.as_ref().as_ref();
let ca = ca.rechunk();
let arr = ca.downcast_iter().next().unwrap();
arr.values().clone()
}
Expand Down
8 changes: 7 additions & 1 deletion crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use polars_core::prelude::{InitHashMaps, PlHashMap, PlIndexMap};
use polars_core::schema::Schema;
use polars_error::PolarsResult;
use polars_plan::plans::expr_ir::{ExprIR, OutputName};
use polars_plan::plans::{AExpr, IR};
use polars_plan::plans::{AExpr, FunctionIR, IR};
use polars_plan::prelude::SinkType;
use polars_utils::arena::{Arena, Node};
use polars_utils::itertools::Itertools;
Expand Down Expand Up @@ -238,6 +238,12 @@ pub fn lower_ir(
},

IR::MapFunction { input, function } => {
// MergeSorted uses a rechunk hack incompatible with the
// streaming engine.
if let FunctionIR::MergeSorted { .. } = function {
todo!()
}

let function = function.clone();
let phys_input = lower_ir(
*input,
Expand Down
1 change: 1 addition & 0 deletions py-polars/tests/unit/operations/test_ewm.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ def test_ewm_param_validation() -> None:


# https://github.com/pola-rs/polars/issues/4951
@pytest.mark.may_fail_auto_streaming
def test_ewm_with_multiple_chunks() -> None:
df0 = pl.DataFrame(
data=[
Expand Down
1 change: 1 addition & 0 deletions py-polars/tests/unit/operations/test_join_asof.py
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,7 @@ def test_asof_join_nearest_by_date() -> None:
assert_frame_equal(out, expected)


@pytest.mark.may_fail_auto_streaming # See #18927.
def test_asof_join_string() -> None:
left = pl.DataFrame({"x": [None, "a", "b", "c", None, "d", None]}).set_sorted("x")
right = pl.DataFrame({"x": ["apple", None, "chutney"], "y": [0, 1, 2]}).set_sorted(
Expand Down
1 change: 1 addition & 0 deletions py-polars/tests/unit/operations/test_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,7 @@ def test_sort_string_nulls() -> None:
]


@pytest.mark.may_fail_auto_streaming
def test_sort_by_unequal_lengths_7207() -> None:
df = pl.DataFrame({"a": [0, 1, 1, 0], "b": [3, 2, 3, 2]})
with pytest.raises(pl.exceptions.ShapeError):
Expand Down
6 changes: 3 additions & 3 deletions py-polars/tests/unit/operations/test_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ def test_median_quantile_duration() -> None:
def test_correlation_cast_supertype() -> None:
df = pl.DataFrame({"a": [1, 8, 3], "b": [4.0, 5.0, 2.0]})
df = df.with_columns(pl.col("b"))
assert df.select(pl.corr("a", "b")).to_dict(as_series=False) == {
"a": [0.5447047794019219]
}
assert_frame_equal(
df.select(pl.corr("a", "b")), pl.DataFrame({"a": [0.5447047794019219]})
)


def test_cov_corr_f32_type() -> None:
Expand Down
1 change: 1 addition & 0 deletions py-polars/tests/unit/test_projections.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def test_hconcat_projection_pushdown_length_maintained() -> None:
assert_frame_equal(out, expected)


@pytest.mark.may_fail_auto_streaming
def test_unnest_columns_available() -> None:
df = pl.DataFrame(
{
Expand Down

0 comments on commit 68b6f0e

Please sign in to comment.