diff --git a/crates/polars-pipe/src/executors/sinks/joins/cross.rs b/crates/polars-pipe/src/executors/sinks/joins/cross.rs index 9a0bb1fbab94..f48778c9f5cb 100644 --- a/crates/polars-pipe/src/executors/sinks/joins/cross.rs +++ b/crates/polars-pipe/src/executors/sinks/joins/cross.rs @@ -118,11 +118,13 @@ impl Operator for CrossJoinProbe { let iter_right = self.in_process_right.as_mut().unwrap(); let offset = iter_right.next().unwrap(); let right_df = chunk.data.slice(offset as i64, size); - let df = self.in_process_left_df.cross_join( + let mut df = self.in_process_left_df.cross_join( &right_df, Some(self.suffix.as_ref()), None, )?; + // Cross joins can produce multiple chunks. + df.as_single_chunk_par(); Ok(OperatorResult::HaveMoreOutPut(chunk.with_data(df))) }, } @@ -135,7 +137,7 @@ impl Operator for CrossJoinProbe { // we use the first join to determine the output names // this we can amortize the name allocations. - let df = match &self.output_names { + let mut df = match &self.output_names { None => { let df = self.in_process_left_df.cross_join( &right_df, @@ -149,6 +151,8 @@ impl Operator for CrossJoinProbe { .in_process_left_df ._cross_join_with_names(&right_df, names)?, }; + // Cross joins can produce multiple chunks. + df.as_single_chunk_par(); Ok(OperatorResult::HaveMoreOutPut(chunk.with_data(df))) }, diff --git a/crates/polars-pipe/src/executors/sinks/sort/sink_multiple.rs b/crates/polars-pipe/src/executors/sinks/sort/sink_multiple.rs index 4e7155c8b0de..e552ba576994 100644 --- a/crates/polars-pipe/src/executors/sinks/sort/sink_multiple.rs +++ b/crates/polars-pipe/src/executors/sinks/sort/sink_multiple.rs @@ -227,6 +227,7 @@ impl SortSinkMultiple { ) }; + debug_assert_eq!(column.chunks().len(), 1); // Safety: length is correct unsafe { chunk.data.with_column_unchecked(column) }; Ok(()) diff --git a/crates/polars-pipe/src/operators/chunks.rs b/crates/polars-pipe/src/operators/chunks.rs index 18cbe16a1fc1..4789eae6281f 100644 --- a/crates/polars-pipe/src/operators/chunks.rs +++ b/crates/polars-pipe/src/operators/chunks.rs @@ -10,13 +10,17 @@ pub struct DataChunk { impl DataChunk { pub(crate) fn new(chunk_index: IdxSize, data: DataFrame) -> Self { + // Check the invariant that all columns have a single chunk. + #[cfg(debug_assertions)] + { + for c in data.get_columns() { + assert_eq!(c.chunks().len(), 1); + } + } Self { chunk_index, data } } pub(crate) fn with_data(&self, data: DataFrame) -> Self { - DataChunk { - chunk_index: self.chunk_index, - data, - } + Self::new(self.chunk_index, data) } pub(crate) fn is_empty(&self) -> bool { self.data.height() == 0 diff --git a/py-polars/tests/unit/streaming/test_streaming_join.py b/py-polars/tests/unit/streaming/test_streaming_join.py index 47760aaaefa8..68719711d546 100644 --- a/py-polars/tests/unit/streaming/test_streaming_join.py +++ b/py-polars/tests/unit/streaming/test_streaming_join.py @@ -95,3 +95,16 @@ def test_streaming_cross_join_empty() -> None: ).collect(streaming=True) assert out.shape == (0, 2) assert out.columns == ["col1", "col1_right"] + + +def test_streaming_join_rechunk_12498() -> None: + rows = pl.int_range(0, 2) + + a = pl.select(A=rows).lazy() + b = pl.select(B=rows).lazy() + + q = a.join(b, how="cross") + assert q.collect(streaming=True).to_dict(as_series=False) == { + "A": [0, 1, 0, 1], + "B": [0, 0, 1, 1], + }