diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 6618c6aeec28..3d4284519070 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -417,7 +417,7 @@ config_namespace! { /// When sorting, below what size should data be concatenated /// and sorted in a single RecordBatch rather than sorted in /// batches and merged. - pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024 + pub sort_in_place_threshold_bytes: usize, default = 3 * 1024 * 1024 /// Number of files to read in parallel when inferring schema and statistics pub meta_fetch_concurrency: usize, default = 32 diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 21f98fd01260..00d32635e964 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -45,11 +45,14 @@ use crate::{ Statistics, }; -use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray}; -use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays}; +use arrow::array::{Array, ArrayRef, RecordBatch, RecordBatchOptions, StringViewArray}; +use arrow::compute::{ + concat, interleave_record_batch, lexsort_to_indices, take_arrays, SortColumn, +}; use arrow::datatypes::SchemaRef; use datafusion_common::config::SpillCompression; use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError, Result}; + use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; @@ -654,16 +657,25 @@ impl ExternalSorter { return self.sort_batch_stream(batch, metrics, reservation); } - // If less than sort_in_place_threshold_bytes, concatenate and sort in place + // If less than sort_in_place_threshold_bytes, we sort in memory. if self.reservation.size() < self.sort_in_place_threshold_bytes { - // Concatenate memory batches together and sort - let batch = concat_batches(&self.schema, &self.in_mem_batches)?; + let interleave_indices = + self.build_sorted_indices(self.in_mem_batches.as_slice(), &self.expr)?; + + let batches: Vec<&RecordBatch> = self.in_mem_batches.iter().collect(); + let sorted_batch = interleave_record_batch(&batches, &interleave_indices)?; + self.in_mem_batches.clear(); self.reservation - .try_resize(get_reserved_byte_for_record_batch(&batch)) + .try_resize(get_reserved_byte_for_record_batch(&sorted_batch)) .map_err(Self::err_with_oom_context)?; - let reservation = self.reservation.take(); - return self.sort_batch_stream(batch, metrics, reservation); + + metrics.record_output(sorted_batch.num_rows()); + + return Ok(Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&self.schema), + futures::stream::once(async { Ok(sorted_batch) }), + )) as SendableRecordBatchStream); } let streams = std::mem::take(&mut self.in_mem_batches) @@ -689,6 +701,74 @@ impl ExternalSorter { .build() } + fn build_sorted_indices( + &self, + current_batches: &[RecordBatch], + expr: &LexOrdering, + ) -> Result> { + // ===== Phase 1: Build global sort columns for each sort expression ===== + // For each sort expression, evaluate and collect the corresponding sort column from each in-memory batch + // Here, `self.expr` is a list of sort expressions, each providing `evaluate_to_sort_column()`, + // which returns an ArrayRef (in `.values`) and sort options (`options`) + + // ```text + // columns_by_expr for example: + // ├── expr_0 ──┬── ArrayRef_0_0 (from batch_0) + // │ ├── ArrayRef_0_1 (from batch_1) + // │ └── ArrayRef_0_2 (from batch_2) + // ├── expr_1 ──┬── ArrayRef_1_0 (from batch_0) + // │ ├── ArrayRef_1_1 (from batch_1) + // │ └── ArrayRef_1_2 (from batch_2) + // ``` + let mut columns_by_expr: Vec> = expr + .iter() + .map(|_| Vec::with_capacity(current_batches.len())) + .collect(); + + for batch in current_batches { + for (i, e) in expr.iter().enumerate() { + let col = e.evaluate_to_sort_column(batch)?.values; + columns_by_expr[i].push(col); + } + } + + // For each sort expression, concatenate arrays from all batches into one global array + let mut sort_columns = Vec::with_capacity(expr.len()); + for (arrays, e) in columns_by_expr.into_iter().zip(expr.iter()) { + let array = concat( + &arrays + .iter() + .map(|a| a.as_ref()) + .collect::>(), + )?; + sort_columns.push(SortColumn { + values: array, + options: e.options.into(), + }); + } + + // ===== Phase 2: Compute global sorted indices ===== + // Use `lexsort_to_indices` to get global row indices in sorted order (as if all batches were concatenated) + let indices = lexsort_to_indices(&sort_columns, None)?; + + // Phase 3: Prepare indices for interleaving + let batch_indices: Vec<(usize, usize)> = current_batches + .iter() + .enumerate() + .flat_map(|(batch_id, batch)| { + (0..batch.num_rows()).map(move |i| (batch_id, i)) + }) + .collect(); + + let interleave_indices: Vec<(usize, usize)> = indices + .values() + .iter() + .map(|x| batch_indices[*x as usize]) + .collect(); + + Ok(interleave_indices) + } + /// Sorts a single `RecordBatch` into a single stream. /// /// `reservation` accounts for the memory used by this batch and @@ -1301,7 +1381,7 @@ mod tests { use crate::test::TestMemoryExec; use arrow::array::*; - use arrow::compute::SortOptions; + use arrow::compute::{concat_batches, SortOptions}; use arrow::datatypes::*; use datafusion_common::cast::as_primitive_array; use datafusion_common::test_util::batches_to_string; diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index f76e436e0ad3..a00eedfd8eca 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -261,7 +261,7 @@ datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 datafusion.execution.skip_physical_aggregate_schema_check false datafusion.execution.soft_max_rows_per_output_file 50000000 -datafusion.execution.sort_in_place_threshold_bytes 1048576 +datafusion.execution.sort_in_place_threshold_bytes 3145728 datafusion.execution.sort_spill_reservation_bytes 10485760 datafusion.execution.spill_compression uncompressed datafusion.execution.split_file_groups_by_statistics false @@ -373,7 +373,7 @@ datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 Aggregat datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode datafusion.execution.skip_physical_aggregate_schema_check false When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max -datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. +datafusion.execution.sort_in_place_threshold_bytes 3145728 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). datafusion.execution.spill_compression uncompressed Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. datafusion.execution.split_file_groups_by_statistics false Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index c618aa18c231..a3e2b11017de 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -85,7 +85,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | | datafusion.execution.spill_compression | uncompressed | Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | -| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | +| datafusion.execution.sort_in_place_threshold_bytes | 3145728 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | | datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics | | datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | | datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max |