Skip to content

Commit

Permalink
fix: Ensure must_flush flag is not reset (#19046)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Oct 1, 2024
1 parent 26a364c commit e8e5b93
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 1 deletion.
8 changes: 7 additions & 1 deletion crates/polars-pipe/src/pipeline/dispatcher/drive_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,13 @@ pub(super) fn push_operators_single_thread(
let op = op.get_mut();
match op.execute(ec, &chunk)? {
OperatorResult::Finished(chunk) => {
must_flush.store(op.must_flush(), Ordering::Relaxed);
let flag = op.must_flush();
let _ = must_flush.compare_exchange(
false,
flag,
Ordering::Relaxed,
Ordering::Relaxed,
);
in_process.push((op_i + 1, chunk))
},
OperatorResult::HaveMoreOutPut(output_chunk) => {
Expand Down
21 changes: 21 additions & 0 deletions py-polars/tests/unit/streaming/test_streaming_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,24 @@ def test_streaming_outer_join_partial_flush(tmp_path: Path) -> None:
],
"value": [0, 1, 2, 3, 4, 5],
}


def test_flush_join_and_operation_19040() -> None:
df_A = pl.LazyFrame({"K": [True, False], "A": [1, 1]})

df_B = pl.LazyFrame({"K": [True], "B": [1]})

df_C = pl.LazyFrame({"K": [True], "C": [1]})

q = (
df_A.join(df_B, how="full", on=["K"], coalesce=True)
.join(df_C, how="full", on=["K"], coalesce=True)
.with_columns(B=pl.col("B"))
.sort("K")
)
assert q.collect(streaming=True).to_dict(as_series=False) == {
"K": [False, True],
"A": [1, 1],
"B": [None, 1],
"C": [None, 1],
}

0 comments on commit e8e5b93

Please sign in to comment.