From 6e9af4effcb70f3171758ac45c4d16dffd327a72 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 17 Apr 2025 19:33:11 +0800 Subject: [PATCH 01/16] Make it optimized only for low number column --- datafusion/physical-plan/src/sorts/sort.rs | 177 ++++++++++++++++----- 1 file changed, 135 insertions(+), 42 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 9d0f34cc7f0f..cbbae92365cc 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -44,10 +44,8 @@ use crate::{ Statistics, }; -use arrow::array::{ - Array, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array, -}; -use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn}; +use arrow::array::{Array, ArrayRef, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array}; +use arrow::compute::{concat, concat_batches, interleave_record_batch, lexsort_to_indices, take_arrays, SortColumn}; use arrow::datatypes::{DataType, SchemaRef}; use arrow::row::{RowConverter, Rows, SortField}; use datafusion_common::{ @@ -662,55 +660,150 @@ impl ExternalSorter { let elapsed_compute = metrics.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); - // Please pay attention that any operation inside of `in_mem_sort_stream` will - // not perform any memory reservation. This is for avoiding the need of handling - // reservation failure and spilling in the middle of the sort/merge. The memory - // space for batches produced by the resulting stream will be reserved by the - // consumer of the stream. + if self.expr.len() <= 2 { + let interleave_indices = self + .build_sorted_indices(self.in_mem_batches.as_slice(), Arc::clone(&self.expr))?; - if self.in_mem_batches.len() == 1 { - let batch = self.in_mem_batches.swap_remove(0); - let reservation = self.reservation.take(); - return self.sort_batch_stream(batch, metrics, reservation); - } + let batches: Vec<&RecordBatch> = self.in_mem_batches.iter().collect(); + let sorted_batch = interleave_record_batch(&batches, &interleave_indices)?; - // If less than sort_in_place_threshold_bytes, concatenate and sort in place - if self.reservation.size() < self.sort_in_place_threshold_bytes { - // Concatenate memory batches together and sort - let batch = concat_batches(&self.schema, &self.in_mem_batches)?; - self.in_mem_batches.clear(); self.reservation - .try_resize(get_reserved_byte_for_record_batch(&batch)) + .try_resize(get_reserved_byte_for_record_batch(&sorted_batch)) .map_err(Self::err_with_oom_context)?; - let reservation = self.reservation.take(); - return self.sort_batch_stream(batch, metrics, reservation); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&self.schema), + futures::stream::once(async { Ok(sorted_batch) }), + )) as SendableRecordBatchStream) + } else { + + + // Please pay attention that any operation inside of `in_mem_sort_stream` will + // not perform any memory reservation. This is for avoiding the need of handling + // reservation failure and spilling in the middle of the sort/merge. The memory + // space for batches produced by the resulting stream will be reserved by the + // consumer of the stream. + + if self.in_mem_batches.len() == 1 { + let batch = self.in_mem_batches.swap_remove(0); + let reservation = self.reservation.take(); + return self.sort_batch_stream(batch, metrics, reservation); + } + + // If less than sort_in_place_threshold_bytes, concatenate and sort in place + if self.reservation.size() < self.sort_in_place_threshold_bytes { + // Concatenate memory batches together and sort + let batch = concat_batches(&self.schema, &self.in_mem_batches)?; + self.in_mem_batches.clear(); + self.reservation + .try_resize(get_reserved_byte_for_record_batch(&batch)) + .map_err(Self::err_with_oom_context)?; + let reservation = self.reservation.take(); + return self.sort_batch_stream(batch, metrics, reservation); + } + + let streams = std::mem::take(&mut self.in_mem_batches) + .into_iter() + .map(|batch| { + let metrics = self.metrics.baseline.intermediate(); + let reservation = self + .reservation + .split(get_reserved_byte_for_record_batch(&batch)); + let input = self.sort_batch_stream(batch, metrics, reservation)?; + Ok(spawn_buffered(input, 1)) + }) + .collect::>()?; + + let expressions: LexOrdering = self.expr.iter().cloned().collect(); + + StreamingMergeBuilder::new() + .with_streams(streams) + .with_schema(Arc::clone(&self.schema)) + .with_expressions(expressions.as_ref()) + .with_metrics(metrics) + .with_batch_size(self.batch_size) + .with_fetch(None) + .with_reservation(self.merge_reservation.new_empty()) + .build() + } + } + + + + fn build_sorted_indices( + &self, + current_batches: &[RecordBatch], + expr: Arc<[PhysicalSortExpr]>, + ) -> Result> { + // ===== Phase 1: Build global sort columns for each sort expression ===== + // For each sort expression, evaluate and collect the corresponding sort column from each in-memory batch + // Here, `self.expr` is a list of sort expressions, each providing `evaluate_to_sort_column()`, + // which returns an ArrayRef (in `.values`) and sort options (`options`) + + // ```text + // columns_by_expr for example: + // ├── expr_0 ──┬── ArrayRef_0_0 (from batch_0) + // │ ├── ArrayRef_0_1 (from batch_1) + // │ └── ArrayRef_0_2 (from batch_2) + // ├── expr_1 ──┬── ArrayRef_1_0 (from batch_0) + // │ ├── ArrayRef_1_1 (from batch_1) + // │ └── ArrayRef_1_2 (from batch_2) + // ``` + let mut columns_by_expr: Vec> = expr + .iter() + .map(|_| Vec::with_capacity(current_batches.len())) + .collect(); + + for batch in current_batches { + for (i, e) in expr.iter().enumerate() { + let col = e.evaluate_to_sort_column(batch)?.values; + columns_by_expr[i].push(col); + } + } + + // For each sort expression, concatenate arrays from all batches into one global array + let mut sort_columns = Vec::with_capacity(expr.len()); + for (arrays, e) in columns_by_expr.into_iter().zip(expr.iter()) { + let array = concat( + &arrays + .iter() + .map(|a| a.as_ref()) + .collect::>(), + )?; + sort_columns.push(SortColumn { + values: array, + options: e.options.into(), + }); } - let streams = std::mem::take(&mut self.in_mem_batches) - .into_iter() - .map(|batch| { - let metrics = self.metrics.baseline.intermediate(); - let reservation = self - .reservation - .split(get_reserved_byte_for_record_batch(&batch)); - let input = self.sort_batch_stream(batch, metrics, reservation)?; - Ok(spawn_buffered(input, 1)) + // ===== Phase 2: Compute global sorted indices ===== + // Use `lexsort_to_indices` to get global row indices in sorted order (as if all batches were concatenated) + let indices = if !is_multi_column_with_lists(&sort_columns) { + lexsort_to_indices(&sort_columns, None)? + } else { + lexsort_to_indices_multi_columns(sort_columns, None)? + }; + + // Phase 3: Prepare indices for interleaving + let batch_indices: Vec<(usize, usize)> = current_batches + .iter() + .enumerate() + .flat_map(|(batch_id, batch)| { + (0..batch.num_rows()).map(move |i| (batch_id, i)) }) - .collect::>()?; + .collect(); - let expressions: LexOrdering = self.expr.iter().cloned().collect(); + let interleave_indices: Vec<(usize, usize)> = indices + .values() + .iter() + .map(|x| batch_indices[*x as usize]) + .collect(); - StreamingMergeBuilder::new() - .with_streams(streams) - .with_schema(Arc::clone(&self.schema)) - .with_expressions(expressions.as_ref()) - .with_metrics(metrics) - .with_batch_size(self.batch_size) - .with_fetch(None) - .with_reservation(self.merge_reservation.new_empty()) - .build() + Ok(interleave_indices) } + + /// Sorts a single `RecordBatch` into a single stream. /// /// `reservation` accounts for the memory used by this batch and From 47a60828c5c177198a0ec2f57a507c17d0c5ea11 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 17 Apr 2025 19:37:28 +0800 Subject: [PATCH 02/16] fmt --- datafusion/physical-plan/src/sorts/sort.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index cbbae92365cc..a1fa9dcc1ed8 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -44,8 +44,13 @@ use crate::{ Statistics, }; -use arrow::array::{Array, ArrayRef, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array}; -use arrow::compute::{concat, concat_batches, interleave_record_batch, lexsort_to_indices, take_arrays, SortColumn}; +use arrow::array::{ + Array, ArrayRef, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array, +}; +use arrow::compute::{ + concat, concat_batches, interleave_record_batch, lexsort_to_indices, take_arrays, + SortColumn, +}; use arrow::datatypes::{DataType, SchemaRef}; use arrow::row::{RowConverter, Rows, SortField}; use datafusion_common::{ @@ -661,8 +666,10 @@ impl ExternalSorter { let _timer = elapsed_compute.timer(); if self.expr.len() <= 2 { - let interleave_indices = self - .build_sorted_indices(self.in_mem_batches.as_slice(), Arc::clone(&self.expr))?; + let interleave_indices = self.build_sorted_indices( + self.in_mem_batches.as_slice(), + Arc::clone(&self.expr), + )?; let batches: Vec<&RecordBatch> = self.in_mem_batches.iter().collect(); let sorted_batch = interleave_record_batch(&batches, &interleave_indices)?; @@ -676,8 +683,6 @@ impl ExternalSorter { futures::stream::once(async { Ok(sorted_batch) }), )) as SendableRecordBatchStream) } else { - - // Please pay attention that any operation inside of `in_mem_sort_stream` will // not perform any memory reservation. This is for avoiding the need of handling // reservation failure and spilling in the middle of the sort/merge. The memory @@ -728,8 +733,6 @@ impl ExternalSorter { } } - - fn build_sorted_indices( &self, current_batches: &[RecordBatch], @@ -802,8 +805,6 @@ impl ExternalSorter { Ok(interleave_indices) } - - /// Sorts a single `RecordBatch` into a single stream. /// /// `reservation` accounts for the memory used by this batch and From 84ec285260e98acd26ad6f167ba55c3153180591 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 17 Apr 2025 20:21:01 +0800 Subject: [PATCH 03/16] fix metrics --- datafusion/physical-plan/src/sorts/sort.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index a1fa9dcc1ed8..92a36fad6240 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -678,6 +678,8 @@ impl ExternalSorter { .try_resize(get_reserved_byte_for_record_batch(&sorted_batch)) .map_err(Self::err_with_oom_context)?; + metrics.record_output(sorted_batch.num_rows()); + Ok(Box::pin(RecordBatchStreamAdapter::new( Arc::clone(&self.schema), futures::stream::once(async { Ok(sorted_batch) }), From 694556ad10119e1fcef71e291f3e6aa5280643f0 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 17 Apr 2025 21:33:38 +0800 Subject: [PATCH 04/16] Fix test --- datafusion/physical-plan/src/sorts/sort.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 92a36fad6240..3dac2c93d846 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -674,6 +674,7 @@ impl ExternalSorter { let batches: Vec<&RecordBatch> = self.in_mem_batches.iter().collect(); let sorted_batch = interleave_record_batch(&batches, &interleave_indices)?; + self.in_mem_batches.clear(); self.reservation .try_resize(get_reserved_byte_for_record_batch(&sorted_batch)) .map_err(Self::err_with_oom_context)?; From 9b527dda398bc1d4d8e43ff987d692617865fe40 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 17 Apr 2025 22:33:14 +0800 Subject: [PATCH 05/16] fix test --- datafusion/core/tests/memory_limit/mod.rs | 2 +- datafusion/physical-plan/src/sorts/sort.rs | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 01342d1604fc..245f5007d739 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -268,7 +268,7 @@ async fn sort_spill_reservation() { let base_config = SessionConfig::new() // do not allow the sort to use the 'concat in place' path - .with_sort_in_place_threshold_bytes(10); + .with_sort_in_place_threshold_bytes(0); // This test case shows how sort_spill_reservation works by // purposely sorting data that requires non trivial memory to diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 3dac2c93d846..597b1f437293 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -665,7 +665,10 @@ impl ExternalSorter { let elapsed_compute = metrics.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); - if self.expr.len() <= 2 { + // Note, in theory in memory batches should have limited size, but some testing + // cases testing the memory limit use `sort_in_place_threshold_bytes` to, so here we + // set a larger limit to avoid testing failure. + if self.expr.len() <= 2 && self.reservation.size() < 1000 * self.sort_in_place_threshold_bytes { let interleave_indices = self.build_sorted_indices( self.in_mem_batches.as_slice(), Arc::clone(&self.expr), From 438edd21041866103388486ddd35631c36526446 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 17 Apr 2025 22:35:29 +0800 Subject: [PATCH 06/16] fix test --- datafusion/physical-plan/src/sorts/sort.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 597b1f437293..593e3123d436 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -668,7 +668,9 @@ impl ExternalSorter { // Note, in theory in memory batches should have limited size, but some testing // cases testing the memory limit use `sort_in_place_threshold_bytes` to, so here we // set a larger limit to avoid testing failure. - if self.expr.len() <= 2 && self.reservation.size() < 1000 * self.sort_in_place_threshold_bytes { + if self.expr.len() <= 2 + && self.reservation.size() < 1000 * self.sort_in_place_threshold_bytes + { let interleave_indices = self.build_sorted_indices( self.in_mem_batches.as_slice(), Arc::clone(&self.expr), From 9477014cd9a75aff5d473e12dfe79dc92c9a852b Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 17 Apr 2025 23:07:50 +0800 Subject: [PATCH 07/16] Fix test fail --- datafusion/core/tests/memory_limit/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 245f5007d739..2058a96f1cfa 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -458,7 +458,7 @@ async fn test_stringview_external_sort() { .with_memory_pool(Arc::new(FairSpillPool::new(60 * 1024 * 1024))); let runtime = builder.build_arc().unwrap(); - let config = SessionConfig::new().with_sort_spill_reservation_bytes(40 * 1024 * 1024); + let config = SessionConfig::new().with_sort_spill_reservation_bytes(40 * 1024 * 1024).with_sort_in_place_threshold_bytes(0); let ctx = SessionContext::new_with_config_rt(config, runtime); ctx.register_table("t", Arc::new(table)).unwrap(); @@ -481,6 +481,7 @@ async fn test_stringview_external_sort() { async fn test_in_mem_buffer_almost_full() { let config = SessionConfig::new() .with_sort_spill_reservation_bytes(3000000) + .with_sort_in_place_threshold_bytes(0) .with_target_partitions(1); let runtime = RuntimeEnvBuilder::new() .with_memory_pool(Arc::new(FairSpillPool::new(10 * 1024 * 1024))) From 2ac37f47eb206ee37f5d56724db1777a518991c5 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 17 Apr 2025 23:09:03 +0800 Subject: [PATCH 08/16] Fix fmt --- datafusion/core/tests/memory_limit/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 2058a96f1cfa..16e6330050e1 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -458,7 +458,9 @@ async fn test_stringview_external_sort() { .with_memory_pool(Arc::new(FairSpillPool::new(60 * 1024 * 1024))); let runtime = builder.build_arc().unwrap(); - let config = SessionConfig::new().with_sort_spill_reservation_bytes(40 * 1024 * 1024).with_sort_in_place_threshold_bytes(0); + let config = SessionConfig::new() + .with_sort_spill_reservation_bytes(40 * 1024 * 1024) + .with_sort_in_place_threshold_bytes(0); let ctx = SessionContext::new_with_config_rt(config, runtime); ctx.register_table("t", Arc::new(table)).unwrap(); From 3b93a11dd550c3f64f51f304932ce39bf26a280e Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 17 Apr 2025 23:22:53 +0800 Subject: [PATCH 09/16] Fix test --- datafusion/physical-plan/src/sorts/sort.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 593e3123d436..470547a162da 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1763,7 +1763,7 @@ mod tests { async fn test_sort_spill_utf8_strings() -> Result<()> { let session_config = SessionConfig::new() .with_batch_size(100) - .with_sort_in_place_threshold_bytes(20 * 1024) + .with_sort_in_place_threshold_bytes(0) .with_sort_spill_reservation_bytes(100 * 1024); let runtime = RuntimeEnvBuilder::new() .with_memory_limit(500 * 1024, 1.0) From 96b2e24ee2e3f598f44c48ee47222185c554dd52 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Fri, 18 Apr 2025 17:01:57 +0800 Subject: [PATCH 10/16] Clean code and address comments --- datafusion/common/src/config.rs | 2 +- datafusion/core/tests/memory_limit/mod.rs | 7 +- datafusion/physical-plan/src/sorts/sort.rs | 99 ++++++++++------------ docs/source/user-guide/configs.md | 2 +- 4 files changed, 49 insertions(+), 61 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1e0f63d6d81c..769871d650d6 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -343,7 +343,7 @@ config_namespace! { /// When sorting, below what size should data be concatenated /// and sorted in a single RecordBatch rather than sorted in /// batches and merged. - pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024 + pub sort_in_place_threshold_bytes: usize, default = 1000 * 1024 * 1024 /// Number of files to read in parallel when inferring schema and statistics pub meta_fetch_concurrency: usize, default = 32 diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 16e6330050e1..01342d1604fc 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -268,7 +268,7 @@ async fn sort_spill_reservation() { let base_config = SessionConfig::new() // do not allow the sort to use the 'concat in place' path - .with_sort_in_place_threshold_bytes(0); + .with_sort_in_place_threshold_bytes(10); // This test case shows how sort_spill_reservation works by // purposely sorting data that requires non trivial memory to @@ -458,9 +458,7 @@ async fn test_stringview_external_sort() { .with_memory_pool(Arc::new(FairSpillPool::new(60 * 1024 * 1024))); let runtime = builder.build_arc().unwrap(); - let config = SessionConfig::new() - .with_sort_spill_reservation_bytes(40 * 1024 * 1024) - .with_sort_in_place_threshold_bytes(0); + let config = SessionConfig::new().with_sort_spill_reservation_bytes(40 * 1024 * 1024); let ctx = SessionContext::new_with_config_rt(config, runtime); ctx.register_table("t", Arc::new(table)).unwrap(); @@ -483,7 +481,6 @@ async fn test_stringview_external_sort() { async fn test_in_mem_buffer_almost_full() { let config = SessionConfig::new() .with_sort_spill_reservation_bytes(3000000) - .with_sort_in_place_threshold_bytes(0) .with_target_partitions(1); let runtime = RuntimeEnvBuilder::new() .with_memory_pool(Arc::new(FairSpillPool::new(10 * 1024 * 1024))) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 470547a162da..9296eadc8aa2 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -48,8 +48,7 @@ use arrow::array::{ Array, ArrayRef, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array, }; use arrow::compute::{ - concat, concat_batches, interleave_record_batch, lexsort_to_indices, take_arrays, - SortColumn, + concat, interleave_record_batch, lexsort_to_indices, take_arrays, SortColumn, }; use arrow::datatypes::{DataType, SchemaRef}; use arrow::row::{RowConverter, Rows, SortField}; @@ -665,11 +664,27 @@ impl ExternalSorter { let elapsed_compute = metrics.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); - // Note, in theory in memory batches should have limited size, but some testing - // cases testing the memory limit use `sort_in_place_threshold_bytes` to, so here we - // set a larger limit to avoid testing failure. + // Please pay attention that any operation inside of `in_mem_sort_stream` will + // not perform any memory reservation. This is for avoiding the need of handling + // reservation failure and spilling in the middle of the sort/merge. The memory + // space for batches produced by the resulting stream will be reserved by the + // consumer of the stream. + + if self.in_mem_batches.len() == 1 { + let batch = self.in_mem_batches.swap_remove(0); + let reservation = self.reservation.take(); + return self.sort_batch_stream(batch, metrics, reservation); + } + + // If less than sort_in_place_threshold_bytes, we sort in memory. + // Note: + // In theory we should always be able to sort in place, but some corner cases for merging testing + // failed, so we set a large threshold to avoid that. + // Also, we only support sort expressions with less than 3 columns for now. Because from testing, when + // columns > 3, the performance of in-place sort is worse than sort/merge. + // Need to further investigate the performance of in-place sort when columns > 3. if self.expr.len() <= 2 - && self.reservation.size() < 1000 * self.sort_in_place_threshold_bytes + && self.reservation.size() < self.sort_in_place_threshold_bytes { let interleave_indices = self.build_sorted_indices( self.in_mem_batches.as_slice(), @@ -686,59 +701,35 @@ impl ExternalSorter { metrics.record_output(sorted_batch.num_rows()); - Ok(Box::pin(RecordBatchStreamAdapter::new( + return Ok(Box::pin(RecordBatchStreamAdapter::new( Arc::clone(&self.schema), futures::stream::once(async { Ok(sorted_batch) }), - )) as SendableRecordBatchStream) - } else { - // Please pay attention that any operation inside of `in_mem_sort_stream` will - // not perform any memory reservation. This is for avoiding the need of handling - // reservation failure and spilling in the middle of the sort/merge. The memory - // space for batches produced by the resulting stream will be reserved by the - // consumer of the stream. - - if self.in_mem_batches.len() == 1 { - let batch = self.in_mem_batches.swap_remove(0); - let reservation = self.reservation.take(); - return self.sort_batch_stream(batch, metrics, reservation); - } - - // If less than sort_in_place_threshold_bytes, concatenate and sort in place - if self.reservation.size() < self.sort_in_place_threshold_bytes { - // Concatenate memory batches together and sort - let batch = concat_batches(&self.schema, &self.in_mem_batches)?; - self.in_mem_batches.clear(); - self.reservation - .try_resize(get_reserved_byte_for_record_batch(&batch)) - .map_err(Self::err_with_oom_context)?; - let reservation = self.reservation.take(); - return self.sort_batch_stream(batch, metrics, reservation); - } + )) as SendableRecordBatchStream); + } - let streams = std::mem::take(&mut self.in_mem_batches) - .into_iter() - .map(|batch| { - let metrics = self.metrics.baseline.intermediate(); - let reservation = self - .reservation - .split(get_reserved_byte_for_record_batch(&batch)); - let input = self.sort_batch_stream(batch, metrics, reservation)?; - Ok(spawn_buffered(input, 1)) - }) - .collect::>()?; + let streams = std::mem::take(&mut self.in_mem_batches) + .into_iter() + .map(|batch| { + let metrics = self.metrics.baseline.intermediate(); + let reservation = self + .reservation + .split(get_reserved_byte_for_record_batch(&batch)); + let input = self.sort_batch_stream(batch, metrics, reservation)?; + Ok(spawn_buffered(input, 1)) + }) + .collect::>()?; - let expressions: LexOrdering = self.expr.iter().cloned().collect(); + let expressions: LexOrdering = self.expr.iter().cloned().collect(); - StreamingMergeBuilder::new() - .with_streams(streams) - .with_schema(Arc::clone(&self.schema)) - .with_expressions(expressions.as_ref()) - .with_metrics(metrics) - .with_batch_size(self.batch_size) - .with_fetch(None) - .with_reservation(self.merge_reservation.new_empty()) - .build() - } + StreamingMergeBuilder::new() + .with_streams(streams) + .with_schema(Arc::clone(&self.schema)) + .with_expressions(expressions.as_ref()) + .with_metrics(metrics) + .with_batch_size(self.batch_size) + .with_fetch(None) + .with_reservation(self.merge_reservation.new_empty()) + .build() } fn build_sorted_indices( diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 7a46d59d893e..83c149f40229 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -84,7 +84,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | | datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | -| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | +| datafusion.execution.sort_in_place_threshold_bytes | 1048576000 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | | datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics | | datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | | datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | From 149d3155c575a94ce146bb969544dbc37fd6c63a Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Fri, 18 Apr 2025 17:10:06 +0800 Subject: [PATCH 11/16] Fix test --- datafusion/physical-plan/src/sorts/sort.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 9296eadc8aa2..3c9329fbb688 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1458,7 +1458,7 @@ mod tests { use crate::test::TestMemoryExec; use arrow::array::*; - use arrow::compute::SortOptions; + use arrow::compute::{concat, SortOptions}; use arrow::datatypes::*; use datafusion_common::cast::as_primitive_array; use datafusion_common::test_util::batches_to_string; From 4cceab64c6edc8aaf5ac0a503616e28c625c3fbf Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Fri, 18 Apr 2025 17:18:02 +0800 Subject: [PATCH 12/16] Add fix --- datafusion/physical-plan/src/sorts/sort.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 3c9329fbb688..5a4d5927775b 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1458,7 +1458,7 @@ mod tests { use crate::test::TestMemoryExec; use arrow::array::*; - use arrow::compute::{concat, SortOptions}; + use arrow::compute::{concat_batches, SortOptions}; use arrow::datatypes::*; use datafusion_common::cast::as_primitive_array; use datafusion_common::test_util::batches_to_string; From 3e1c6032bed32601ce67310f203a1c553ec8e4ba Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Fri, 18 Apr 2025 17:25:01 +0800 Subject: [PATCH 13/16] polish comments --- datafusion/common/src/config.rs | 4 ++++ datafusion/physical-plan/src/sorts/sort.rs | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 769871d650d6..7304ed968090 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -343,6 +343,10 @@ config_namespace! { /// When sorting, below what size should data be concatenated /// and sorted in a single RecordBatch rather than sorted in /// batches and merged. + /// Note: + /// In theory we should always be able to sort in place, but some corner cases for merging testing + /// failed, so we set a large threshold to avoid that. + /// Future work: potential remove this option and always sort in place. pub sort_in_place_threshold_bytes: usize, default = 1000 * 1024 * 1024 /// Number of files to read in parallel when inferring schema and statistics diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 5a4d5927775b..fd43017226af 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1754,7 +1754,7 @@ mod tests { async fn test_sort_spill_utf8_strings() -> Result<()> { let session_config = SessionConfig::new() .with_batch_size(100) - .with_sort_in_place_threshold_bytes(0) + .with_sort_in_place_threshold_bytes(20 * 1024) .with_sort_spill_reservation_bytes(100 * 1024); let runtime = RuntimeEnvBuilder::new() .with_memory_limit(500 * 1024, 1.0) From ea4d85ec057249f7899053be1a4d38ecd852f440 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Fri, 18 Apr 2025 17:59:08 +0800 Subject: [PATCH 14/16] fix test --- datafusion/core/tests/memory_limit/mod.rs | 5 ++++- docs/source/user-guide/configs.md | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 01342d1604fc..132fffb58d82 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -458,7 +458,9 @@ async fn test_stringview_external_sort() { .with_memory_pool(Arc::new(FairSpillPool::new(60 * 1024 * 1024))); let runtime = builder.build_arc().unwrap(); - let config = SessionConfig::new().with_sort_spill_reservation_bytes(40 * 1024 * 1024); + let config = SessionConfig::new() + .with_sort_spill_reservation_bytes(40 * 1024 * 1024) + .with_sort_in_place_threshold_bytes(1024 * 1024); let ctx = SessionContext::new_with_config_rt(config, runtime); ctx.register_table("t", Arc::new(table)).unwrap(); @@ -481,6 +483,7 @@ async fn test_stringview_external_sort() { async fn test_in_mem_buffer_almost_full() { let config = SessionConfig::new() .with_sort_spill_reservation_bytes(3000000) + .with_sort_in_place_threshold_bytes(1024 * 1024) .with_target_partitions(1); let runtime = RuntimeEnvBuilder::new() .with_memory_pool(Arc::new(FairSpillPool::new(10 * 1024 * 1024))) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 83c149f40229..ade9b86eeb44 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -84,7 +84,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | | datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | -| datafusion.execution.sort_in_place_threshold_bytes | 1048576000 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | +| datafusion.execution.sort_in_place_threshold_bytes | 1048576000 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. Note: In theory we should always be able to sort in place, but some corner cases for merging testing failed, so we set a large threshold to avoid that. Future work: potential remove this option and always sort in place. | | datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics | | datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | | datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | From 9c9d779aa13293fb8ce9d12ef251524655bb6eba Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Fri, 18 Apr 2025 18:44:56 +0800 Subject: [PATCH 15/16] fix slt --- datafusion/sqllogictest/test_files/information_schema.slt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 87abaadb516f..1a5f04454c1d 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -260,7 +260,7 @@ datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 datafusion.execution.skip_physical_aggregate_schema_check false datafusion.execution.soft_max_rows_per_output_file 50000000 -datafusion.execution.sort_in_place_threshold_bytes 1048576 +datafusion.execution.sort_in_place_threshold_bytes 1048576000 datafusion.execution.sort_spill_reservation_bytes 10485760 datafusion.execution.split_file_groups_by_statistics false datafusion.execution.target_partitions 7 @@ -360,7 +360,7 @@ datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 Aggregat datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode datafusion.execution.skip_physical_aggregate_schema_check false When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max -datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. +datafusion.execution.sort_in_place_threshold_bytes 1048576000 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. Note: In theory we should always be able to sort in place, but some corner cases for merging testing failed, so we set a large threshold to avoid that. Future work: potential remove this option and always sort in place. datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). datafusion.execution.split_file_groups_by_statistics false Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental datafusion.execution.target_partitions 7 Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system From b12c9123f49b9fbe2cea5c848e057a120d9e92b2 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Sat, 17 May 2025 22:35:03 +0800 Subject: [PATCH 16/16] Add smaller optimization --- datafusion/common/src/config.rs | 6 +----- datafusion/core/tests/memory_limit/mod.rs | 4 +--- datafusion/sqllogictest/test_files/information_schema.slt | 4 ++-- docs/source/user-guide/configs.md | 2 +- 4 files changed, 5 insertions(+), 11 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 7304ed968090..1e0f63d6d81c 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -343,11 +343,7 @@ config_namespace! { /// When sorting, below what size should data be concatenated /// and sorted in a single RecordBatch rather than sorted in /// batches and merged. - /// Note: - /// In theory we should always be able to sort in place, but some corner cases for merging testing - /// failed, so we set a large threshold to avoid that. - /// Future work: potential remove this option and always sort in place. - pub sort_in_place_threshold_bytes: usize, default = 1000 * 1024 * 1024 + pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024 /// Number of files to read in parallel when inferring schema and statistics pub meta_fetch_concurrency: usize, default = 32 diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 132fffb58d82..f382943e5deb 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -459,8 +459,7 @@ async fn test_stringview_external_sort() { let runtime = builder.build_arc().unwrap(); let config = SessionConfig::new() - .with_sort_spill_reservation_bytes(40 * 1024 * 1024) - .with_sort_in_place_threshold_bytes(1024 * 1024); + .with_sort_spill_reservation_bytes(40 * 1024 * 1024); let ctx = SessionContext::new_with_config_rt(config, runtime); ctx.register_table("t", Arc::new(table)).unwrap(); @@ -483,7 +482,6 @@ async fn test_stringview_external_sort() { async fn test_in_mem_buffer_almost_full() { let config = SessionConfig::new() .with_sort_spill_reservation_bytes(3000000) - .with_sort_in_place_threshold_bytes(1024 * 1024) .with_target_partitions(1); let runtime = RuntimeEnvBuilder::new() .with_memory_pool(Arc::new(FairSpillPool::new(10 * 1024 * 1024))) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 1a5f04454c1d..87abaadb516f 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -260,7 +260,7 @@ datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 datafusion.execution.skip_physical_aggregate_schema_check false datafusion.execution.soft_max_rows_per_output_file 50000000 -datafusion.execution.sort_in_place_threshold_bytes 1048576000 +datafusion.execution.sort_in_place_threshold_bytes 1048576 datafusion.execution.sort_spill_reservation_bytes 10485760 datafusion.execution.split_file_groups_by_statistics false datafusion.execution.target_partitions 7 @@ -360,7 +360,7 @@ datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 Aggregat datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode datafusion.execution.skip_physical_aggregate_schema_check false When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max -datafusion.execution.sort_in_place_threshold_bytes 1048576000 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. Note: In theory we should always be able to sort in place, but some corner cases for merging testing failed, so we set a large threshold to avoid that. Future work: potential remove this option and always sort in place. +datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). datafusion.execution.split_file_groups_by_statistics false Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental datafusion.execution.target_partitions 7 Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index ade9b86eeb44..7a46d59d893e 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -84,7 +84,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | | datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | -| datafusion.execution.sort_in_place_threshold_bytes | 1048576000 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. Note: In theory we should always be able to sort in place, but some corner cases for merging testing failed, so we set a large threshold to avoid that. Future work: potential remove this option and always sort in place. | +| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | | datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics | | datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | | datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max |