Skip to content

Fix: External sort failing on StringView due to shared buffers #14823

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 67 additions & 2 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ use std::sync::{Arc, LazyLock};

#[cfg(feature = "extended_tests")]
mod memory_limit_validation;
use arrow::array::{ArrayRef, DictionaryArray, RecordBatch};
use arrow::array::{ArrayRef, DictionaryArray, Int32Array, RecordBatch, StringViewArray};
use arrow::compute::SortOptions;
use arrow::datatypes::{Int32Type, SchemaRef};
use arrow_schema::{DataType, Field, Schema};
use datafusion::assert_batches_eq;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
Expand All @@ -41,14 +42,15 @@ use datafusion_catalog::streaming::StreamingTable;
use datafusion_catalog::Session;
use datafusion_common::{assert_contains, Result};
use datafusion_execution::memory_pool::{
GreedyMemoryPool, MemoryPool, TrackConsumersPool,
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
};
use datafusion_execution::TaskContext;
use datafusion_expr::{Expr, TableType};
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_optimizer::join_selection::JoinSelection;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::spill::get_record_batch_memory_size;
use rand::Rng;
use test_utils::AccessLogGenerator;

use async_trait::async_trait;
Expand Down Expand Up @@ -403,6 +405,69 @@ async fn oom_with_tracked_consumer_pool() {
.await
}

/// For regression case: if spilled `StringViewArray`'s buffer will be referenced by
/// other batches which are also need to be spilled, then the spill writer will
/// repeatedly write out the same buffer, and after reading back, each batch's size
/// will explode.
///
/// This test setup will cause 10 spills, each spill will sort around 20 batches.
/// If there is memory explosion for spilled record batch, this test will fail.
#[tokio::test]
async fn test_stringview_external_sort() {
let mut rng = rand::thread_rng();
let array_length = 1000;
let num_batches = 200;
// Batches contain two columns: random 100-byte string, and random i32
let mut batches = Vec::with_capacity(num_batches);

for _ in 0..num_batches {
let strings: Vec<String> = (0..array_length)
.map(|_| {
(0..100)
.map(|_| rng.gen_range(0..=u8::MAX) as char)
.collect()
})
.collect();

let string_array = StringViewArray::from(strings);
let array_ref: ArrayRef = Arc::new(string_array);

let random_numbers: Vec<i32> =
(0..array_length).map(|_| rng.gen_range(0..=1000)).collect();
let int_array = Int32Array::from(random_numbers);
let int_array_ref: ArrayRef = Arc::new(int_array);

let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("strings", DataType::Utf8View, false),
Field::new("random_numbers", DataType::Int32, false),
])),
vec![array_ref, int_array_ref],
)
.unwrap();
batches.push(batch);
}

// Run a sql query that sorts the batches by the int column
let schema = batches[0].schema();
let table = MemTable::try_new(schema, vec![batches]).unwrap();
let builder = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(60 * 1024 * 1024)));
let runtime = builder.build_arc().unwrap();

let config = SessionConfig::new().with_sort_spill_reservation_bytes(40 * 1024 * 1024);

let ctx = SessionContext::new_with_config_rt(config, runtime);
ctx.register_table("t", Arc::new(table)).unwrap();

let df = ctx
.sql("explain analyze SELECT * FROM t ORDER BY random_numbers")
.await
.unwrap();

let _ = df.collect().await.expect("Query execution failed");
}

/// Run the query with the specified memory limit,
/// and verifies the expected errors are returned
#[derive(Clone, Debug)]
Expand Down
131 changes: 89 additions & 42 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::fmt;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use crate::common::{spawn_buffered, IPCWriter};
use crate::common::spawn_buffered;
use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
use crate::expressions::PhysicalSortExpr;
use crate::limit::LimitStream;
Expand All @@ -44,7 +44,9 @@ use crate::{
Statistics,
};

use arrow::array::{Array, RecordBatch, RecordBatchOptions, UInt32Array};
use arrow::array::{
Array, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array,
};
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn};
use arrow::datatypes::{DataType, SchemaRef};
use arrow::row::{RowConverter, SortField};
Expand Down Expand Up @@ -300,6 +302,7 @@ impl ExternalSorter {
if input.num_rows() == 0 {
return Ok(());
}

self.reserve_memory_for_merge()?;

let size = get_reserved_byte_for_record_batch(&input);
Expand Down Expand Up @@ -397,6 +400,8 @@ impl ExternalSorter {
return Ok(0);
}

self.organize_stringview_arrays()?;

debug!("Spilling sort data of ExternalSorter to disk whilst inserting");

let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
Expand All @@ -414,6 +419,69 @@ impl ExternalSorter {
Ok(used)
}

/// Reconstruct `self.in_mem_batches` to organize the payload buffers of each
/// `StringViewArray` in sequential order by calling `gc()` on them.
///
/// Note this is a workaround until <https://github.com/apache/arrow-rs/issues/7185> is
/// available
///
/// # Rationale
/// After (merge-based) sorting, all batches will be sorted into a single run,
/// but physically this sorted run is chunked into many small batches. For
/// `StringViewArray`s inside each sorted run, their inner buffers are not
/// re-constructed by default, leading to non-sequential payload locations
/// (permutated by `interleave()` Arrow kernel). A single payload buffer might
/// be shared by multiple `RecordBatch`es.
/// When writing each batch to disk, the writer has to write all referenced buffers,
/// because they have to be read back one by one to reduce memory usage. This
/// causes extra disk reads and writes, and potentially execution failure.
///
/// # Example
/// Before sorting:
/// batch1 -> buffer1
/// batch2 -> buffer2
///
/// sorted_batch1 -> buffer1
/// -> buffer2
/// sorted_batch2 -> buffer1
/// -> buffer2
///
/// Then when spilling each batch, the writer has to write all referenced buffers
/// repeatedly.
fn organize_stringview_arrays(&mut self) -> Result<()> {
let mut organized_batches = Vec::with_capacity(self.in_mem_batches.len());

for batch in self.in_mem_batches.drain(..) {
let mut new_columns: Vec<Arc<dyn Array>> =
Vec::with_capacity(batch.num_columns());

let mut arr_mutated = false;
for array in batch.columns() {
if let Some(string_view_array) =
array.as_any().downcast_ref::<StringViewArray>()
{
let new_array = string_view_array.gc();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will string_view_array.gc() affect the performance when it call many times?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, if i make sense right, it seems not too much affection to performance, because we only remain the used buffer data?

before gc:

/// sorted_batch1 -> buffer1
///                          -> buffer2
/// sorted_batch2 -> buffer1
///                           -> buffer2

after gc:

/// sorted_batch1 ->  new buffer (used data of buffer1, used data of buffer2)

/// sorted_batch2 -> new buffer (used data of buffer1, used data of buffer2)

Copy link
Contributor Author

@2010YOUY01 2010YOUY01 Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, there is some inefficiency here, I filed apache/arrow-rs#7184. Once done on arrow side, we can remove the copies here to speed it up.
IMO it won't cause regression for datafusion, it's only done when the spill is triggered, and if we don't copy here it can cause larger inefficiency or fail some memory-limited sort queries.

new_columns.push(Arc::new(new_array));
arr_mutated = true;
} else {
new_columns.push(Arc::clone(array));
}
}

let organized_batch = if arr_mutated {
RecordBatch::try_new(batch.schema(), new_columns)?
} else {
batch
};

organized_batches.push(organized_batch);
}

self.in_mem_batches = organized_batches;

Ok(())
}

/// Sorts the in_mem_batches in place
///
/// Sorting may have freed memory, especially if fetch is `Some`. If
Expand All @@ -439,54 +507,29 @@ impl ExternalSorter {
// `self.in_mem_batches` is already taken away by the sort_stream, now it is empty.
// We'll gradually collect the sorted stream into self.in_mem_batches, or directly
// write sorted batches to disk when the memory is insufficient.
let mut spill_writer: Option<IPCWriter> = None;
while let Some(batch) = sorted_stream.next().await {
let batch = batch?;
match &mut spill_writer {
None => {
let sorted_size = get_reserved_byte_for_record_batch(&batch);
if self.reservation.try_grow(sorted_size).is_err() {
// Directly write in_mem_batches as well as all the remaining batches in
// sorted_stream to disk. Further batches fetched from `sorted_stream` will
// be handled by the `Some(writer)` matching arm.
let spill_file =
self.runtime.disk_manager.create_tmp_file("Sorting")?;
let mut writer = IPCWriter::new(spill_file.path(), &self.schema)?;
// Flush everything in memory to the spill file
for batch in self.in_mem_batches.drain(..) {
writer.write(&batch)?;
}
// as well as the newly sorted batch
writer.write(&batch)?;
spill_writer = Some(writer);
self.reservation.free();
self.spills.push(spill_file);
} else {
self.in_mem_batches.push(batch);
self.in_mem_batches_sorted = true;
}
}
Some(writer) => {
writer.write(&batch)?;
}
let sorted_size = get_reserved_byte_for_record_batch(&batch);
if self.reservation.try_grow(sorted_size).is_err() {
// Although the reservation is not enough, the batch is
// already in memory, so it's okay to combine it with previously
// sorted batches, and spill together.
self.in_mem_batches.push(batch);
self.spill().await?; // reservation is freed in spill()
} else {
self.in_mem_batches.push(batch);
self.in_mem_batches_sorted = true;
}
}

// Drop early to free up memory reserved by the sorted stream, otherwise the
// upcoming `self.reserve_memory_for_merge()` may fail due to insufficient memory.
drop(sorted_stream);

if let Some(writer) = &mut spill_writer {
writer.finish()?;
self.metrics.spill_count.add(1);
self.metrics.spilled_rows.add(writer.num_rows);
self.metrics.spilled_bytes.add(writer.num_bytes);
}

// Sorting may free up some memory especially when fetch is `Some`. If we have
// not freed more than 50% of the memory, then we have to spill to free up more
// memory for inserting more batches.
if spill_writer.is_none() && self.reservation.size() > before / 2 {
if self.reservation.size() > before / 2 {
// We have not freed more than 50% of the memory, so we have to spill to
// free up more memory
self.spill().await?;
Expand Down Expand Up @@ -1422,10 +1465,14 @@ mod tests {
let spill_count = metrics.spill_count().unwrap();
let spilled_rows = metrics.spilled_rows().unwrap();
let spilled_bytes = metrics.spilled_bytes().unwrap();
// Processing 840 KB of data using 400 KB of memory requires at least 2 spills
// It will spill roughly 18000 rows and 800 KBytes.
// We leave a little wiggle room for the actual numbers.
assert!((2..=10).contains(&spill_count));

// This test case is processing 840KB of data using 400KB of memory. Note
// that buffered batches can't be dropped until all sorted batches are
// generated, so we can only buffer `sort_spill_reservation_bytes` of sorted
// batches.
// The number of spills is roughly calculated as:
// `number_of_batches / (sort_spill_reservation_bytes / batch_size)`
assert!((12..=18).contains(&spill_count));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is caused by the above refactor, the old implementation forget to update the statistics, so we missed several counts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also update the comments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated in 92ca3b0

assert!((15000..=20000).contains(&spilled_rows));
assert!((700000..=900000).contains(&spilled_bytes));

Expand Down