-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Perf: Optimize in memory sort #15380
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
let mut current_batches = Vec::new(); | ||
let mut current_size = 0; | ||
|
||
for batch in std::mem::take(&mut self.in_mem_batches) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be nice to use pop
(while let Some(batch) = v.pop
) here to remove the batch from the vec once sorted to reduce memory usage. Now the batch is AFAIK retained until after the loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be nice to use
pop
(while let Some(batch) = v.pop
) here to remove the batch from the vec once sorted to reduce memory usage. Now the batch is AFAIK retained until after the loop.
Thank you @Dandandan for review and good suggestion, addressed your suggestion!
I think this is already looking quite nice. What do you need to finalize this @zhuqi-lucas |
Thank you @Dandandan for review, i think we just need to add the benchmark result for this PR for next step. And it's mergable for the first version, later we can improve it according to comments: |
@alamb Do we have the CI benchmark running now? If no, i need your help to run... Thanks a lot! And also for the sort-tpch itself, i was running for the improvement result, but not for other benchmark running. Previous sort-tpch: ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ concat_batches_for_sort ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1 │ 2241.04ms │ 1816.69ms │ +1.23x faster │
│ Q2 │ 1841.01ms │ 1496.73ms │ +1.23x faster │
│ Q3 │ 12755.85ms │ 12770.18ms │ no change │
│ Q4 │ 4433.49ms │ 3278.70ms │ +1.35x faster │
│ Q5 │ 4414.15ms │ 4409.04ms │ no change │
│ Q6 │ 4543.09ms │ 4597.32ms │ no change │
│ Q7 │ 8012.85ms │ 9026.30ms │ 1.13x slower │
│ Q8 │ 6572.37ms │ 6049.51ms │ +1.09x faster │
│ Q9 │ 6734.63ms │ 6345.69ms │ +1.06x faster │
│ Q10 │ 9896.16ms │ 9564.17ms │ no change │
└──────────────┴────────────┴─────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main) │ 61444.64ms │
│ Total Time (concat_batches_for_sort) │ 59354.33ms │
│ Average Time (main) │ 6144.46ms │
│ Average Time (concat_batches_for_sort) │ 5935.43ms │
│ Queries Faster │ 5 │
│ Queries Slower │ 1 │
│ Queries with No Change │ 4 │
└────────────────────────────────────────┴────────────┘ |
Latest result based current latest code: --------------------
Benchmark sort_tpch1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ concat_batches_for_sort ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1 │ 153.49ms │ 137.57ms │ +1.12x faster │
│ Q2 │ 131.29ms │ 120.93ms │ +1.09x faster │
│ Q3 │ 980.57ms │ 982.22ms │ no change │
│ Q4 │ 252.25ms │ 245.09ms │ no change │
│ Q5 │ 464.81ms │ 449.27ms │ no change │
│ Q6 │ 481.44ms │ 455.45ms │ +1.06x faster │
│ Q7 │ 810.73ms │ 709.74ms │ +1.14x faster │
│ Q8 │ 498.10ms │ 491.12ms │ no change │
│ Q9 │ 503.80ms │ 510.20ms │ no change │
│ Q10 │ 789.02ms │ 706.45ms │ +1.12x faster │
│ Q11 │ 417.39ms │ 411.50ms │ no change │
└──────────────┴──────────┴─────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main) │ 5482.89ms │
│ Total Time (concat_batches_for_sort) │ 5219.53ms │
│ Average Time (main) │ 498.44ms │
│ Average Time (concat_batches_for_sort) │ 474.50ms │
│ Queries Faster │ 5 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 6 │
└────────────────────────────────────────┴───────────┘
--------------------
Benchmark sort_tpch10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ concat_batches_for_sort ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1 │ 2243.52ms │ 1825.64ms │ +1.23x faster │
│ Q2 │ 1842.11ms │ 1639.00ms │ +1.12x faster │
│ Q3 │ 12446.31ms │ 11981.63ms │ no change │
│ Q4 │ 4047.55ms │ 3715.96ms │ +1.09x faster │
│ Q5 │ 4364.46ms │ 4503.51ms │ no change │
│ Q6 │ 4561.01ms │ 4688.31ms │ no change │
│ Q7 │ 8158.01ms │ 7915.54ms │ no change │
│ Q8 │ 6077.40ms │ 5524.08ms │ +1.10x faster │
│ Q9 │ 6347.21ms │ 5853.44ms │ +1.08x faster │
│ Q10 │ 11561.03ms │ 14235.69ms │ 1.23x slower │
│ Q11 │ 6069.42ms │ 5666.77ms │ +1.07x faster │
└──────────────┴────────────┴─────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main) │ 67718.04ms │
│ Total Time (concat_batches_for_sort) │ 67549.58ms │
│ Average Time (main) │ 6156.19ms │
│ Average Time (concat_batches_for_sort) │ 6140.87ms │
│ Queries Faster │ 6 │
│ Queries Slower │ 1 │
│ Queries with No Change │ 4 │
└────────────────────────────────────────┴────────────┘ |
Thanks for sharing the results @zhuqi-lucas this is really interesting! I think it mainly shows that we probably should try and use more efficient in memory sorting (e.g. an arrow kernel that sorts multiple batches) here rather than use |
🤖 |
I think the SortPreservingMergeStream is about as efficient as we know how to make it Maybe we can look into what overhead makes concat'ing better 🤔 Any per-stream overhead we can improve in SortPreservingMergeStream would likely flow directly to any query that does sorts |
This comment was marked as outdated.
This comment was marked as outdated.
Hm that doesn't make much sense as
Hm 🤔 ... but that will still take a separate step of sorting the input bathes, which next to sorting involves a full extra copy using I think the most efficient way would be to sort the indices to the arrays in one step followed by |
It seems when we merge the sorted batch, we already using the interleave to merge the sorted indices, here is the code: /// Drains the in_progress row indexes, and builds a new RecordBatch from them
///
/// Will then drop any batches for which all rows have been yielded to the output
///
/// Returns `None` if no pending rows
pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
if self.is_empty() {
return Ok(None);
}
let columns = (0..self.schema.fields.len())
.map(|column_idx| {
let arrays: Vec<_> = self
.batches
.iter()
.map(|(_, batch)| batch.column(column_idx).as_ref())
.collect();
Ok(interleave(&arrays, &self.indices)?)
})
.collect::<Result<Vec<_>>>()?;
self.indices.clear(); But this PR, we also concat some batches into one batch, do you mean we can also use the indices from each batch to one batch just like the merge phase? |
Thanks @alamb for this triggering, it seems stuck. |
I mean theoretically we don't have to The merging is useful for sorting streams of data, but I think it is expected the process of sorting batches first followed by a custom merge implementation is slower than a single sorting pass based on rust std unstable sort (which is optimized for doing a minimal amount of comparisons quickly). |
A more complete rationale / explanation of the same idea was written here by @2010YOUY01 #15375 (comment)
|
I think i got it now, thank you @Dandandan, it means we already have those in memory batch, we just need to first sort all elements' indices (2-level index consists of (batch_idx, row_idx)), we don't need to construct the StreamingMergeBuilder for in memory sort, we just need to sort it as a single sorting pass. Let me try this way, and compare the performance! |
Very interesting, firstly i now try merge all memory batch, and single sort, some query become crazy fast and some crazy slow, i think because:
So next step, we can try to make the in memory sort with parallel? --------------------
Benchmark sort_tpch10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ concat_batches_for_sort ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1 │ 2243.52ms │ 1416.52ms │ +1.58x faster │
│ Q2 │ 1842.11ms │ 1096.12ms │ +1.68x faster │
│ Q3 │ 12446.31ms │ 12535.45ms │ no change │
│ Q4 │ 4047.55ms │ 1964.73ms │ +2.06x faster │
│ Q5 │ 4364.46ms │ 5955.70ms │ 1.36x slower │
│ Q6 │ 4561.01ms │ 6275.39ms │ 1.38x slower │
│ Q7 │ 8158.01ms │ 19145.68ms │ 2.35x slower │
│ Q8 │ 6077.40ms │ 5146.80ms │ +1.18x faster │
│ Q9 │ 6347.21ms │ 5544.48ms │ +1.14x faster │
│ Q10 │ 11561.03ms │ 23572.68ms │ 2.04x slower │
│ Q11 │ 6069.42ms │ 4810.88ms │ +1.26x faster │
└──────────────┴────────────┴─────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main) │ 67718.04ms │
│ Total Time (concat_batches_for_sort) │ 87464.44ms │
│ Average Time (main) │ 6156.19ms │
│ Average Time (concat_batches_for_sort) │ 7951.31ms │
│ Queries Faster │ 6 │
│ Queries Slower │ 4 │
│ Queries with No Change │ 1 │
└────────────────────────────────────────┴────────────┘ Patch tried: diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs
index 7fd1c2b16..ec3cd89f3 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -671,85 +671,14 @@ impl ExternalSorter {
return self.sort_batch_stream(batch, metrics, reservation);
}
- // If less than sort_in_place_threshold_bytes, concatenate and sort in place
- if self.reservation.size() < self.sort_in_place_threshold_bytes {
- // Concatenate memory batches together and sort
- let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
- self.in_mem_batches.clear();
- self.reservation
- .try_resize(get_reserved_byte_for_record_batch(&batch))?;
- let reservation = self.reservation.take();
- return self.sort_batch_stream(batch, metrics, reservation);
- }
-
- let mut merged_batches = Vec::new();
- let mut current_batches = Vec::new();
- let mut current_size = 0;
-
- // Drain in_mem_batches using pop() to release memory earlier.
- // This avoids holding onto the entire vector during iteration.
- // Note:
- // Now we use `sort_in_place_threshold_bytes` to determine, in future we can make it more dynamic.
- while let Some(batch) = self.in_mem_batches.pop() {
- let batch_size = get_reserved_byte_for_record_batch(&batch);
-
- // If adding this batch would exceed the memory threshold, merge current_batches.
- if current_size + batch_size > self.sort_in_place_threshold_bytes
- && !current_batches.is_empty()
- {
- // Merge accumulated batches into one.
- let merged = concat_batches(&self.schema, ¤t_batches)?;
- current_batches.clear();
-
- // Update memory reservation.
- self.reservation.try_shrink(current_size)?;
- let merged_size = get_reserved_byte_for_record_batch(&merged);
- self.reservation.try_grow(merged_size)?;
-
- merged_batches.push(merged);
- current_size = 0;
- }
-
- current_batches.push(batch);
- current_size += batch_size;
- }
-
- // Merge any remaining batches after the loop.
- if !current_batches.is_empty() {
- let merged = concat_batches(&self.schema, ¤t_batches)?;
- self.reservation.try_shrink(current_size)?;
- let merged_size = get_reserved_byte_for_record_batch(&merged);
- self.reservation.try_grow(merged_size)?;
- merged_batches.push(merged);
- }
-
- // Create sorted streams directly without using spawn_buffered.
- // This allows for sorting to happen inline and enables earlier batch drop.
- let streams = merged_batches
- .into_iter()
- .map(|batch| {
- let metrics = self.metrics.baseline.intermediate();
- let reservation = self
- .reservation
- .split(get_reserved_byte_for_record_batch(&batch));
-
- // Sort the batch inline.
- let input = self.sort_batch_stream(batch, metrics, reservation)?;
- Ok(input)
- })
- .collect::<Result<_>>()?;
-
- let expressions: LexOrdering = self.expr.iter().cloned().collect();
-
- StreamingMergeBuilder::new()
- .with_streams(streams)
- .with_schema(Arc::clone(&self.schema))
- .with_expressions(expressions.as_ref())
- .with_metrics(metrics)
- .with_batch_size(self.batch_size)
- .with_fetch(None)
- .with_reservation(self.merge_reservation.new_empty())
- .build()
+ // Because batches are all in memory, we can sort them in place
+ // Concatenate memory batches together and sort
+ let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
+ self.in_mem_batches.clear();
+ self.reservation
+ .try_resize(get_reserved_byte_for_record_batch(&batch))?;
+ let reservation = self.reservation.take();
+ self.sort_batch_stream(batch, metrics, reservation)
} |
I think
I think for The core improvements that I think are important:
|
Good explain.
I see, the execute already using partition: fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> { |
In this case, the final merging might become the bottleneck, because SPM does not have internal parallelism either, during the final merge only 1 core is busy. |
Yes, to be clear I don't argue to remove SortPreservingMergeExec or sorting in two fases altogether or something similar, just was reacting to the idea of adding more parallelism in
|
Thank you @2010YOUY01 @Dandandan , it's very interesting, i am thinking:
final_merged_batch_size =
if (partition_cal_size < min_sort_size) => min_sort_size
else if (partition_cal_size > max_sort_size) => max_sort_size
else => partition_cal_size This prevents creating too many small batches (which can fragment merge tasks) or overly large batches. But how can we calculate the min_sort_size and max_sort_size?
|
🤖 |
yeah, sorry I had a bug retriggered |
I wonder if we can skip interleave / copying entirely? Specifically, what if we sorted to indices, as you suggested, but then instead of calling |
🤖: Benchmark completed Details
|
Thanks @alamb , it looks promising. |
No performance improvement for benchmark, i believe mostly the benchmark batch size > sort_in_place size, it will not gain from this PR. Sort-tpch 10 should gain performance not in this benchmark list. |
Thanks @zhuqi-lucas for sticking to this issue! I think we're close to have a PR that can be merged that improved sort performance and gets some good insights for where to spend time for followup work. |
Thank you @Dandandan for patient review and great guide and suggestion! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me this looks like a good improvement. @alamb can we rerun the benchmarks on this to see if we don't get regressions?
Will do Just FYI I would like to find some way that I am not the only one who can easily run benchmarks -- I am using my own home grown scripts here: https://github.com/alamb/datafusion-benchmarking But I suspect it would be straight forward or others to use them |
🤖 |
🤖: Benchmark completed Details
|
🤖 |
I guess #15583 would be optimal but I guess I can checkout and run the scripts myself as well :) |
Yeah, the thing that is missing there is some stable set of machines to run the scripts on |
Thank you @alamb @Dandandan , i can checkout the script too.
The first result of the sort-tpch small data set, it seems Q3 has regression, i am confused why it not happened for the previous run in my local mac.
And the performance improvement will be obvious for large data set sort-tpch 10 for my previous running in local mac. |
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
It seems latest benchmark not triggered. |
I think it was due to |
🤖 |
🤖: Benchmark completed Details
|
🤖 |
🤖: Benchmark completed Details
|
Thank you @alamb , it can be reproduced that Q3 has some regression, i will take a look if we can continue optimize it. |
Which issue does this PR close?
Rationale for this change
Perf: Support automatically concat_batches for sort which will improve performance
And it's mergable for the first version, later we can improve it according to comments:
#15375 (comment)
What changes are included in this PR?
Perf: Support automatically concat_batches for sort which will improve performance
Are these changes tested?
Yes
Are there any user-facing changes?
No