diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 635b93b81b12..08e1fe076055 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -45,9 +45,11 @@ use crate::{ }; use arrow::array::{ - Array, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array, + Array, ArrayRef, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array, +}; +use arrow::compute::{ + concat, interleave_record_batch, lexsort_to_indices, take_arrays, SortColumn, }; -use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn}; use arrow::datatypes::{DataType, SchemaRef}; use arrow::row::{RowConverter, Rows, SortField}; use datafusion_common::{ @@ -674,16 +676,35 @@ impl ExternalSorter { return self.sort_batch_stream(batch, metrics, reservation); } - // If less than sort_in_place_threshold_bytes, concatenate and sort in place - if self.reservation.size() < self.sort_in_place_threshold_bytes { - // Concatenate memory batches together and sort - let batch = concat_batches(&self.schema, &self.in_mem_batches)?; + // If less than sort_in_place_threshold_bytes, we sort in memory. + // Note: + // In theory we should always be able to sort in place, but some corner cases for merging testing + // failed, so we set a large threshold to avoid that. + // Also, we only support sort expressions with less than 3 columns for now. Because from testing, when + // columns > 3, the performance of in-place sort is worse than sort/merge. + // Need to further investigate the performance of in-place sort when columns > 3. + if self.expr.len() <= 2 + && self.reservation.size() < self.sort_in_place_threshold_bytes + { + let interleave_indices = self.build_sorted_indices( + self.in_mem_batches.as_slice(), + Arc::clone(&self.expr), + )?; + + let batches: Vec<&RecordBatch> = self.in_mem_batches.iter().collect(); + let sorted_batch = interleave_record_batch(&batches, &interleave_indices)?; + self.in_mem_batches.clear(); self.reservation - .try_resize(get_reserved_byte_for_record_batch(&batch)) + .try_resize(get_reserved_byte_for_record_batch(&sorted_batch)) .map_err(Self::err_with_oom_context)?; - let reservation = self.reservation.take(); - return self.sort_batch_stream(batch, metrics, reservation); + + metrics.record_output(sorted_batch.num_rows()); + + return Ok(Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&self.schema), + futures::stream::once(async { Ok(sorted_batch) }), + )) as SendableRecordBatchStream); } let streams = std::mem::take(&mut self.in_mem_batches) @@ -711,6 +732,78 @@ impl ExternalSorter { .build() } + fn build_sorted_indices( + &self, + current_batches: &[RecordBatch], + expr: Arc<[PhysicalSortExpr]>, + ) -> Result> { + // ===== Phase 1: Build global sort columns for each sort expression ===== + // For each sort expression, evaluate and collect the corresponding sort column from each in-memory batch + // Here, `self.expr` is a list of sort expressions, each providing `evaluate_to_sort_column()`, + // which returns an ArrayRef (in `.values`) and sort options (`options`) + + // ```text + // columns_by_expr for example: + // ├── expr_0 ──┬── ArrayRef_0_0 (from batch_0) + // │ ├── ArrayRef_0_1 (from batch_1) + // │ └── ArrayRef_0_2 (from batch_2) + // ├── expr_1 ──┬── ArrayRef_1_0 (from batch_0) + // │ ├── ArrayRef_1_1 (from batch_1) + // │ └── ArrayRef_1_2 (from batch_2) + // ``` + let mut columns_by_expr: Vec> = expr + .iter() + .map(|_| Vec::with_capacity(current_batches.len())) + .collect(); + + for batch in current_batches { + for (i, e) in expr.iter().enumerate() { + let col = e.evaluate_to_sort_column(batch)?.values; + columns_by_expr[i].push(col); + } + } + + // For each sort expression, concatenate arrays from all batches into one global array + let mut sort_columns = Vec::with_capacity(expr.len()); + for (arrays, e) in columns_by_expr.into_iter().zip(expr.iter()) { + let array = concat( + &arrays + .iter() + .map(|a| a.as_ref()) + .collect::>(), + )?; + sort_columns.push(SortColumn { + values: array, + options: e.options.into(), + }); + } + + // ===== Phase 2: Compute global sorted indices ===== + // Use `lexsort_to_indices` to get global row indices in sorted order (as if all batches were concatenated) + let indices = if !is_multi_column_with_lists(&sort_columns) { + lexsort_to_indices(&sort_columns, None)? + } else { + lexsort_to_indices_multi_columns(sort_columns, None)? + }; + + // Phase 3: Prepare indices for interleaving + let batch_indices: Vec<(usize, usize)> = current_batches + .iter() + .enumerate() + .flat_map(|(batch_id, batch)| { + (0..batch.num_rows()).map(move |i| (batch_id, i)) + }) + .collect(); + + let interleave_indices: Vec<(usize, usize)> = indices + .values() + .iter() + .map(|x| batch_indices[*x as usize]) + .collect(); + + Ok(interleave_indices) + } + /// Sorts a single `RecordBatch` into a single stream. /// /// `reservation` accounts for the memory used by this batch and @@ -1382,7 +1475,7 @@ mod tests { use crate::test::TestMemoryExec; use arrow::array::*; - use arrow::compute::SortOptions; + use arrow::compute::{concat_batches, SortOptions}; use arrow::datatypes::*; use datafusion_common::cast::as_primitive_array; use datafusion_common::test_util::batches_to_string;