Skip to content

Commit

Permalink
fix: streaming cross join if swapped is hit (pola-rs#13656)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored and r-brink committed Jan 24, 2024
1 parent 19adb7e commit 133ba88
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 33 deletions.
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/arrow/write/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub(super) fn encode_plain(
buffer: &mut Vec<u8>,
) -> PolarsResult<()> {
if is_optional {
let iter = array.iter().flatten().take(
let iter = array.non_null_values_iter().take(
array
.validity()
.as_ref()
Expand Down
18 changes: 7 additions & 11 deletions crates/polars-parquet/src/arrow/write/primitive/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@ where
if is_optional {
buffer.reserve(std::mem::size_of::<P>() * (array.len() - array.null_count()));
// append the non-null values
array.iter().for_each(|x| {
if let Some(x) = x {
let parquet_native: P = x.as_();
buffer.extend_from_slice(parquet_native.to_le_bytes().as_ref())
}
});
for x in array.non_null_values_iter() {
let parquet_native: P = x.as_();
buffer.extend_from_slice(parquet_native.to_le_bytes().as_ref())
}
} else {
buffer.reserve(std::mem::size_of::<P>() * array.len());
// append all values
Expand All @@ -56,7 +54,7 @@ where
{
if is_optional {
// append the non-null values
let iterator = array.iter().flatten().map(|x| {
let iterator = array.non_null_values_iter().map(|x| {
let parquet_native: P = x.as_();
let integer: i64 = parquet_native.as_();
integer
Expand Down Expand Up @@ -175,16 +173,14 @@ where
null_count: Some(array.null_count() as i64),
distinct_count: None,
max_value: array
.iter()
.flatten()
.non_null_values_iter()
.map(|x| {
let x: P = x.as_();
x
})
.max_by(|x, y| x.ord(y)),
min_value: array
.iter()
.flatten()
.non_null_values_iter()
.map(|x| {
let x: P = x.as_();
x
Expand Down
41 changes: 25 additions & 16 deletions crates/polars-pipe/src/executors/sinks/joins/cross.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ use crate::operators::{
pub struct CrossJoin {
chunks: Vec<DataChunk>,
suffix: SmartString,
swapped: bool,
}

impl CrossJoin {
pub(crate) fn new(suffix: SmartString) -> Self {
pub(crate) fn new(suffix: SmartString, swapped: bool) -> Self {
CrossJoin {
chunks: vec![],
suffix,
swapped,
}
}
}
Expand All @@ -44,6 +46,7 @@ impl Sink for CrossJoin {
fn split(&self, _thread_no: usize) -> Box<dyn Sink> {
Box::new(Self {
suffix: self.suffix.clone(),
swapped: self.swapped,
..Default::default()
})
}
Expand All @@ -57,6 +60,7 @@ impl Sink for CrossJoin {
in_process_right: None,
in_process_left_df: Default::default(),
output_names: None,
swapped: self.swapped,
})))
}

Expand All @@ -77,6 +81,7 @@ pub struct CrossJoinProbe {
in_process_right: Option<StepBy<Range<usize>>>,
in_process_left_df: DataFrame,
output_names: Option<Vec<SmartString>>,
swapped: bool,
}

impl Operator for CrossJoinProbe {
Expand Down Expand Up @@ -118,13 +123,17 @@ impl Operator for CrossJoinProbe {
let iter_right = self.in_process_right.as_mut().unwrap();
let offset = iter_right.next().unwrap();
let right_df = chunk.data.slice(offset as i64, size);
let mut df = self.in_process_left_df.cross_join(
&right_df,
Some(self.suffix.as_ref()),
None,
)?;

let (a, b) = if self.swapped {
(&right_df, &self.in_process_left_df)
} else {
(&self.in_process_left_df, &right_df)
};

let mut df = a.cross_join(b, Some(self.suffix.as_ref()), None)?;
// Cross joins can produce multiple chunks.
df.as_single_chunk_par();
// No parallelize in operators
df.as_single_chunk();
Ok(OperatorResult::HaveMoreOutPut(chunk.with_data(df)))
},
}
Expand All @@ -135,24 +144,24 @@ impl Operator for CrossJoinProbe {

let right_df = chunk.data.slice(offset as i64, size);

let (a, b) = if self.swapped {
(&right_df, &self.in_process_left_df)
} else {
(&self.in_process_left_df, &right_df)
};

// we use the first join to determine the output names
// this we can amortize the name allocations.
let mut df = match &self.output_names {
None => {
let df = self.in_process_left_df.cross_join(
&right_df,
Some(self.suffix.as_ref()),
None,
)?;
let df = a.cross_join(b, Some(self.suffix.as_ref()), None)?;
self.output_names = Some(df.get_column_names_owned());
df
},
Some(names) => self
.in_process_left_df
._cross_join_with_names(&right_df, names)?,
Some(names) => a._cross_join_with_names(b, names)?,
};
// Cross joins can produce multiple chunks.
df.as_single_chunk_par();
df.as_single_chunk();

Ok(OperatorResult::HaveMoreOutPut(chunk.with_data(df)))
},
Expand Down
8 changes: 3 additions & 5 deletions crates/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,12 @@ where
} => {
// slice pushdown optimization should not set this one in a streaming query.
assert!(options.args.slice.is_none());
let swapped = swap_join_order(options);

match &options.args.how {
#[cfg(feature = "cross_join")]
JoinType::Cross => {
Box::new(CrossJoin::new(options.args.suffix().into())) as Box<dyn SinkTrait>
},
JoinType::Cross => Box::new(CrossJoin::new(options.args.suffix().into(), swapped))
as Box<dyn SinkTrait>,
join_type @ JoinType::Inner | join_type @ JoinType::Left => {
let input_schema_left = lp_arena.get(*input_left).schema(lp_arena);
let join_columns_left = Arc::new(exprs_to_physical(
Expand All @@ -265,8 +265,6 @@ where
Some(input_schema_right.as_ref()),
)?);

let swapped = swap_join_order(options);

let (join_columns_left, join_columns_right) = if swapped {
(join_columns_right, join_columns_left)
} else {
Expand Down
10 changes: 10 additions & 0 deletions py-polars/tests/unit/streaming/test_streaming_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,13 @@ def test_scan_empty_csv_10818(io_files_path: Path) -> None:
empty_file_path = io_files_path / "empty.csv"
df = pl.scan_csv(empty_file_path, raise_if_empty=False).collect(streaming=True)
assert df.is_empty()


@pytest.mark.write_disk()
def test_streaming_cross_join_schema(tmp_path: Path) -> None:
file_path = tmp_path / "temp.parquet"
a = pl.DataFrame({"a": [1, 2]}).lazy()
b = pl.DataFrame({"b": ["b"]}).lazy()
a.join(b, how="cross").sink_parquet(file_path)
read = pl.read_parquet(file_path, parallel="none")
assert read.to_dict(as_series=False) == {"a": [1, 2], "b": ["b", "b"]}

0 comments on commit 133ba88

Please sign in to comment.