Skip to content

Commit 79fa6f9

Browse files
Enforce sorting handle fetchable operators, add option to repartition based on row count estimates (apache#11875)
* Tmp * Minor changes * Minor changes * Minor changes * Implement top down recursion with delete check * Minor changes * Minor changes * Address reviews * Update comments * Minor changes * Make test deterministic * Add fetch info to the statistics * Enforce distribution use inexact count estimate also. * Minor changes * Minor changes * Minor changes * Do not add unnecessary hash partitioning * Minor changes * Add config option to use inexact row number estimates during planning * Update config * Minor changes * Minor changes * Final review * Address reviews * Add handling for sort removal with fetch * Fix linter errors * Minor changes * Update config * Cleanup stats under fetch * Update SLT comment --------- Co-authored-by: Mehmet Ozan Kabak <[email protected]>
1 parent 12aa82c commit 79fa6f9

21 files changed

+643
-264
lines changed

datafusion/common/src/config.rs

+8
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,14 @@ config_namespace! {
333333
/// Number of input rows partial aggregation partition should process, before
334334
/// aggregation ratio check and trying to switch to skipping aggregation mode
335335
pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
336+
337+
/// Should DataFusion use row number estimates at the input to decide
338+
/// whether increasing parallelism is beneficial or not. By default,
339+
/// only exact row numbers (not estimates) are used for this decision.
340+
/// Setting this flag to `true` will likely produce better plans.
341+
/// if the source of statistics is accurate.
342+
/// We plan to make this the default in the future.
343+
pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
336344
}
337345
}
338346

datafusion/common/src/stats.rs

+104-18
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
2020
use std::fmt::{self, Debug, Display};
2121

22-
use crate::ScalarValue;
22+
use crate::{Result, ScalarValue};
2323

24-
use arrow_schema::Schema;
24+
use arrow_schema::{Schema, SchemaRef};
2525

2626
/// Represents a value with a degree of certainty. `Precision` is used to
2727
/// propagate information the precision of statistical values.
@@ -247,21 +247,96 @@ impl Statistics {
247247

248248
/// If the exactness of a [`Statistics`] instance is lost, this function relaxes
249249
/// the exactness of all information by converting them [`Precision::Inexact`].
250-
pub fn into_inexact(self) -> Self {
251-
Statistics {
252-
num_rows: self.num_rows.to_inexact(),
253-
total_byte_size: self.total_byte_size.to_inexact(),
254-
column_statistics: self
255-
.column_statistics
256-
.into_iter()
257-
.map(|cs| ColumnStatistics {
258-
null_count: cs.null_count.to_inexact(),
259-
max_value: cs.max_value.to_inexact(),
260-
min_value: cs.min_value.to_inexact(),
261-
distinct_count: cs.distinct_count.to_inexact(),
262-
})
263-
.collect::<Vec<_>>(),
250+
pub fn to_inexact(mut self) -> Self {
251+
self.num_rows = self.num_rows.to_inexact();
252+
self.total_byte_size = self.total_byte_size.to_inexact();
253+
self.column_statistics = self
254+
.column_statistics
255+
.into_iter()
256+
.map(|s| s.to_inexact())
257+
.collect();
258+
self
259+
}
260+
261+
/// Calculates the statistics after `fetch` and `skip` operations apply.
262+
/// Here, `self` denotes per-partition statistics. Use the `n_partitions`
263+
/// parameter to compute global statistics in a multi-partition setting.
264+
pub fn with_fetch(
265+
mut self,
266+
schema: SchemaRef,
267+
fetch: Option<usize>,
268+
skip: usize,
269+
n_partitions: usize,
270+
) -> Result<Self> {
271+
let fetch_val = fetch.unwrap_or(usize::MAX);
272+
273+
self.num_rows = match self {
274+
Statistics {
275+
num_rows: Precision::Exact(nr),
276+
..
277+
}
278+
| Statistics {
279+
num_rows: Precision::Inexact(nr),
280+
..
281+
} => {
282+
// Here, the inexact case gives us an upper bound on the number of rows.
283+
if nr <= skip {
284+
// All input data will be skipped:
285+
Precision::Exact(0)
286+
} else if nr <= fetch_val && skip == 0 {
287+
// If the input does not reach the `fetch` globally, and `skip`
288+
// is zero (meaning the input and output are identical), return
289+
// input stats as is.
290+
// TODO: Can input stats still be used, but adjusted, when `skip`
291+
// is non-zero?
292+
return Ok(self);
293+
} else if nr - skip <= fetch_val {
294+
// After `skip` input rows are skipped, the remaining rows are
295+
// less than or equal to the `fetch` values, so `num_rows` must
296+
// equal the remaining rows.
297+
check_num_rows(
298+
(nr - skip).checked_mul(n_partitions),
299+
// We know that we have an estimate for the number of rows:
300+
self.num_rows.is_exact().unwrap(),
301+
)
302+
} else {
303+
// At this point we know that we were given a `fetch` value
304+
// as the `None` case would go into the branch above. Since
305+
// the input has more rows than `fetch + skip`, the number
306+
// of rows will be the `fetch`, but we won't be able to
307+
// predict the other statistics.
308+
check_num_rows(
309+
fetch_val.checked_mul(n_partitions),
310+
// We know that we have an estimate for the number of rows:
311+
self.num_rows.is_exact().unwrap(),
312+
)
313+
}
314+
}
315+
Statistics {
316+
num_rows: Precision::Absent,
317+
..
318+
} => check_num_rows(fetch.and_then(|v| v.checked_mul(n_partitions)), false),
319+
};
320+
self.column_statistics = Statistics::unknown_column(&schema);
321+
self.total_byte_size = Precision::Absent;
322+
Ok(self)
323+
}
324+
}
325+
326+
/// Creates an estimate of the number of rows in the output using the given
327+
/// optional value and exactness flag.
328+
fn check_num_rows(value: Option<usize>, is_exact: bool) -> Precision<usize> {
329+
if let Some(value) = value {
330+
if is_exact {
331+
Precision::Exact(value)
332+
} else {
333+
// If the input stats are inexact, so are the output stats.
334+
Precision::Inexact(value)
264335
}
336+
} else {
337+
// If the estimate is not available (e.g. due to an overflow), we can
338+
// not produce a reliable estimate.
339+
Precision::Absent
265340
}
266341
}
267342

@@ -336,14 +411,25 @@ impl ColumnStatistics {
336411
}
337412

338413
/// Returns a [`ColumnStatistics`] instance having all [`Precision::Absent`] parameters.
339-
pub fn new_unknown() -> ColumnStatistics {
340-
ColumnStatistics {
414+
pub fn new_unknown() -> Self {
415+
Self {
341416
null_count: Precision::Absent,
342417
max_value: Precision::Absent,
343418
min_value: Precision::Absent,
344419
distinct_count: Precision::Absent,
345420
}
346421
}
422+
423+
/// If the exactness of a [`ColumnStatistics`] instance is lost, this
424+
/// function relaxes the exactness of all information by converting them
425+
/// [`Precision::Inexact`].
426+
pub fn to_inexact(mut self) -> Self {
427+
self.null_count = self.null_count.to_inexact();
428+
self.max_value = self.max_value.to_inexact();
429+
self.min_value = self.min_value.to_inexact();
430+
self.distinct_count = self.distinct_count.to_inexact();
431+
self
432+
}
347433
}
348434

349435
#[cfg(test)]

datafusion/core/src/dataframe/mod.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -3000,13 +3000,13 @@ mod tests {
30003000
.await?
30013001
.select_columns(&["c1", "c2", "c3"])?
30023002
.filter(col("c2").eq(lit(3)).and(col("c1").eq(lit("a"))))?
3003-
.limit(0, Some(1))?
30043003
.sort(vec![
30053004
// make the test deterministic
30063005
col("c1").sort(true, true),
30073006
col("c2").sort(true, true),
30083007
col("c3").sort(true, true),
30093008
])?
3009+
.limit(0, Some(1))?
30103010
.with_column("sum", col("c2") + col("c3"))?;
30113011

30123012
let df_sum_renamed = df
@@ -3022,11 +3022,11 @@ mod tests {
30223022

30233023
assert_batches_sorted_eq!(
30243024
[
3025-
"+-----+-----+----+-------+",
3026-
"| one | two | c3 | total |",
3027-
"+-----+-----+----+-------+",
3028-
"| a | 3 | 13 | 16 |",
3029-
"+-----+-----+----+-------+"
3025+
"+-----+-----+-----+-------+",
3026+
"| one | two | c3 | total |",
3027+
"+-----+-----+-----+-------+",
3028+
"| a | 3 | -72 | -69 |",
3029+
"+-----+-----+-----+-------+",
30303030
],
30313031
&df_sum_renamed
30323032
);

datafusion/core/src/datasource/statistics.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ pub async fn get_statistics_with_limit(
138138
// If we still have files in the stream, it means that the limit kicked
139139
// in, and the statistic could have been different had we processed the
140140
// files in a different order.
141-
statistics = statistics.into_inexact()
141+
statistics = statistics.to_inexact()
142142
}
143143

144144
Ok((result_files, statistics))

0 commit comments

Comments
 (0)