Skip to content

Perf: Optimize in memory sort #15380

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
6 changes: 5 additions & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,11 @@ config_namespace! {
/// When sorting, below what size should data be concatenated
/// and sorted in a single RecordBatch rather than sorted in
/// batches and merged.
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
/// Note:
/// In theory we should always be able to sort in place, but some corner cases for merging testing
/// failed, so we set a large threshold to avoid that.
/// Future work: potential remove this option and always sort in place.
pub sort_in_place_threshold_bytes: usize, default = 1000 * 1024 * 1024

/// Number of files to read in parallel when inferring schema and statistics
pub meta_fetch_concurrency: usize, default = 32
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,9 @@ async fn test_stringview_external_sort() {
.with_memory_pool(Arc::new(FairSpillPool::new(60 * 1024 * 1024)));
let runtime = builder.build_arc().unwrap();

let config = SessionConfig::new().with_sort_spill_reservation_bytes(40 * 1024 * 1024);
let config = SessionConfig::new()
.with_sort_spill_reservation_bytes(40 * 1024 * 1024)
.with_sort_in_place_threshold_bytes(1024 * 1024);

let ctx = SessionContext::new_with_config_rt(config, runtime);
ctx.register_table("t", Arc::new(table)).unwrap();
Expand All @@ -481,6 +483,7 @@ async fn test_stringview_external_sort() {
async fn test_in_mem_buffer_almost_full() {
let config = SessionConfig::new()
.with_sort_spill_reservation_bytes(3000000)
.with_sort_in_place_threshold_bytes(1024 * 1024)
.with_target_partitions(1);
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(10 * 1024 * 1024)))
Expand Down
113 changes: 103 additions & 10 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ use crate::{
};

use arrow::array::{
Array, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array,
Array, ArrayRef, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array,
};
use arrow::compute::{
concat, interleave_record_batch, lexsort_to_indices, take_arrays, SortColumn,
};
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn};
use arrow::datatypes::{DataType, SchemaRef};
use arrow::row::{RowConverter, Rows, SortField};
use datafusion_common::{
Expand Down Expand Up @@ -674,16 +676,35 @@ impl ExternalSorter {
return self.sort_batch_stream(batch, metrics, reservation);
}

// If less than sort_in_place_threshold_bytes, concatenate and sort in place
if self.reservation.size() < self.sort_in_place_threshold_bytes {
// Concatenate memory batches together and sort
let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
// If less than sort_in_place_threshold_bytes, we sort in memory.
// Note:
// In theory we should always be able to sort in place, but some corner cases for merging testing
// failed, so we set a large threshold to avoid that.
// Also, we only support sort expressions with less than 3 columns for now. Because from testing, when
// columns > 3, the performance of in-place sort is worse than sort/merge.
// Need to further investigate the performance of in-place sort when columns > 3.
if self.expr.len() <= 2
&& self.reservation.size() < self.sort_in_place_threshold_bytes
{
let interleave_indices = self.build_sorted_indices(
self.in_mem_batches.as_slice(),
Arc::clone(&self.expr),
)?;

let batches: Vec<&RecordBatch> = self.in_mem_batches.iter().collect();
let sorted_batch = interleave_record_batch(&batches, &interleave_indices)?;

self.in_mem_batches.clear();
self.reservation
.try_resize(get_reserved_byte_for_record_batch(&batch))
.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
.map_err(Self::err_with_oom_context)?;
let reservation = self.reservation.take();
return self.sort_batch_stream(batch, metrics, reservation);

metrics.record_output(sorted_batch.num_rows());

return Ok(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&self.schema),
futures::stream::once(async { Ok(sorted_batch) }),
)) as SendableRecordBatchStream);
}

let streams = std::mem::take(&mut self.in_mem_batches)
Expand Down Expand Up @@ -711,6 +732,78 @@ impl ExternalSorter {
.build()
}

fn build_sorted_indices(
&self,
current_batches: &[RecordBatch],
expr: Arc<[PhysicalSortExpr]>,
) -> Result<Vec<(usize, usize)>> {
// ===== Phase 1: Build global sort columns for each sort expression =====
// For each sort expression, evaluate and collect the corresponding sort column from each in-memory batch
// Here, `self.expr` is a list of sort expressions, each providing `evaluate_to_sort_column()`,
// which returns an ArrayRef (in `.values`) and sort options (`options`)

// ```text
// columns_by_expr for example:
// ├── expr_0 ──┬── ArrayRef_0_0 (from batch_0)
// │ ├── ArrayRef_0_1 (from batch_1)
// │ └── ArrayRef_0_2 (from batch_2)
// ├── expr_1 ──┬── ArrayRef_1_0 (from batch_0)
// │ ├── ArrayRef_1_1 (from batch_1)
// │ └── ArrayRef_1_2 (from batch_2)
// ```
let mut columns_by_expr: Vec<Vec<ArrayRef>> = expr
.iter()
.map(|_| Vec::with_capacity(current_batches.len()))
.collect();

for batch in current_batches {
for (i, e) in expr.iter().enumerate() {
let col = e.evaluate_to_sort_column(batch)?.values;
columns_by_expr[i].push(col);
}
}

// For each sort expression, concatenate arrays from all batches into one global array
let mut sort_columns = Vec::with_capacity(expr.len());
for (arrays, e) in columns_by_expr.into_iter().zip(expr.iter()) {
let array = concat(
&arrays
.iter()
.map(|a| a.as_ref())
.collect::<Vec<&dyn Array>>(),
)?;
sort_columns.push(SortColumn {
values: array,
options: e.options.into(),
});
}

// ===== Phase 2: Compute global sorted indices =====
// Use `lexsort_to_indices` to get global row indices in sorted order (as if all batches were concatenated)
let indices = if !is_multi_column_with_lists(&sort_columns) {
lexsort_to_indices(&sort_columns, None)?
} else {
lexsort_to_indices_multi_columns(sort_columns, None)?
};

// Phase 3: Prepare indices for interleaving
let batch_indices: Vec<(usize, usize)> = current_batches
.iter()
.enumerate()
.flat_map(|(batch_id, batch)| {
(0..batch.num_rows()).map(move |i| (batch_id, i))
})
.collect();

let interleave_indices: Vec<(usize, usize)> = indices
.values()
.iter()
.map(|x| batch_indices[*x as usize])
.collect();

Ok(interleave_indices)
}

/// Sorts a single `RecordBatch` into a single stream.
///
/// `reservation` accounts for the memory used by this batch and
Expand Down Expand Up @@ -1382,7 +1475,7 @@ mod tests {
use crate::test::TestMemoryExec;

use arrow::array::*;
use arrow::compute::SortOptions;
use arrow::compute::{concat_batches, SortOptions};
use arrow::datatypes::*;
use datafusion_common::cast::as_primitive_array;
use datafusion_common::test_util::batches_to_string;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8
datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000
datafusion.execution.skip_physical_aggregate_schema_check false
datafusion.execution.soft_max_rows_per_output_file 50000000
datafusion.execution.sort_in_place_threshold_bytes 1048576
datafusion.execution.sort_in_place_threshold_bytes 1048576000
datafusion.execution.sort_spill_reservation_bytes 10485760
datafusion.execution.split_file_groups_by_statistics false
datafusion.execution.target_partitions 7
Expand Down Expand Up @@ -362,7 +362,7 @@ datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 Aggregat
datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode
datafusion.execution.skip_physical_aggregate_schema_check false When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step.
datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max
datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged.
datafusion.execution.sort_in_place_threshold_bytes 1048576000 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. Note: In theory we should always be able to sort in place, but some corner cases for merging testing failed, so we set a large threshold to avoid that. Future work: potential remove this option and always sort in place.
datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured).
datafusion.execution.split_file_groups_by_statistics false Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental
datafusion.execution.target_partitions 7 Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system |
| datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. |
| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). |
| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. |
| datafusion.execution.sort_in_place_threshold_bytes | 1048576000 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. Note: In theory we should always be able to sort in place, but some corner cases for merging testing failed, so we set a large threshold to avoid that. Future work: potential remove this option and always sort in place. |
| datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics |
| datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. |
| datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max |
Expand Down