Skip to content

Commit

Permalink
fix(query): sort spilling use arrow file format painc (#16658)
Browse files Browse the repository at this point in the history
* test

Signed-off-by: coldWater <[email protected]>

* fix

Signed-off-by: coldWater <[email protected]>

---------

Signed-off-by: coldWater <[email protected]>
  • Loading branch information
forsaken628 authored Oct 23, 2024
1 parent 8989dd4 commit 3d19b30
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,20 @@

use databend_common_expression::BlockMetaInfo;

use super::super::SortSpillParams;

/// Mark a partially sorted [`DataBlock`] as a block needs to be spilled.
#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct SortSpillMetaWithParams {
pub batch_rows: usize,
pub num_merge: usize,
}
pub struct SortSpillMetaWithParams(pub SortSpillParams);

#[typetag::serde(name = "sort_spill")]
impl BlockMetaInfo for SortSpillMetaWithParams {
fn equals(&self, _: &Box<dyn BlockMetaInfo>) -> bool {
unimplemented!("Unimplemented equals SortSpillMeta")
unimplemented!("Unimplemented equals SortSpillMetaWithParams")
}

fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
unimplemented!("Unimplemented clone SortSpillMeta")
unimplemented!("Unimplemented clone SortSpillMetaWithParams")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use databend_common_expression::types::StringType;
use databend_common_expression::types::TimestampType;
use databend_common_expression::with_number_mapped_type;
use databend_common_expression::BlockEntry;
use databend_common_expression::BlockMetaInfo;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::SortColumnDescription;
Expand All @@ -54,6 +53,14 @@ use super::TransformSortMergeLimit;
/// The memory will be doubled during merging.
const MERGE_RATIO: usize = 2;

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy)]
pub struct SortSpillParams {
/// The number of rows of each spilled block.
pub batch_rows: usize,
/// The number of spilled blocks in each merge of the spill processor.
pub num_merge: usize,
}

pub trait MergeSort<R: Rows> {
const NAME: &'static str;

Expand Down Expand Up @@ -102,12 +109,10 @@ pub struct TransformSortMergeBase<M, R, Converter> {
max_memory_usage: usize,
spilling_bytes_threshold: usize,
spilling_batch_bytes: usize,
// The following two fields will be passed to the spill processor.
// If these two fields are not zero, it means we need to spill.
/// The number of rows of each spilled block.
spill_batch_rows: usize,
/// The number of spilled blocks in each merge of the spill processor.
spill_num_merge: usize,

// The spill_params will be passed to the spill processor.
// If spill_params is Some, it means we need to spill.
spill_params: Option<SortSpillParams>,

_r: PhantomData<R>,
}
Expand Down Expand Up @@ -141,43 +146,43 @@ where
max_memory_usage,
spilling_bytes_threshold,
spilling_batch_bytes,

spill_batch_rows: 0,
spill_num_merge: 0,
spill_params: None,
may_spill,
_r: PhantomData,
})
}

fn prepare_spill(&mut self) -> Result<Vec<DataBlock>> {
let mut spill_meta = Box::new(SortSpillMeta {}) as Box<dyn BlockMetaInfo>;
if self.spill_batch_rows == 0 {
debug_assert_eq!(self.spill_num_merge, 0);
let mut spill_params = if self.spill_params.is_none() {
// We use the first memory calculation to estimate the batch size and the number of merge.
self.spill_num_merge = self
let num_merge = self
.inner
.num_bytes()
.div_ceil(self.spilling_batch_bytes)
.max(2);
self.spill_batch_rows = self.inner.num_rows().div_ceil(self.spill_num_merge);
let batch_rows = self.inner.num_rows().div_ceil(num_merge);
// The first block to spill will contain the parameters of spilling.
// Later blocks just contain a empty struct `SortSpillMeta` to save memory.
spill_meta = Box::new(SortSpillMetaWithParams {
batch_rows: self.spill_batch_rows,
num_merge: self.spill_num_merge,
}) as Box<dyn BlockMetaInfo>;
let params = SortSpillParams {
batch_rows,
num_merge,
};
self.spill_params = Some(params);
Some(params)
} else {
debug_assert!(self.spill_num_merge > 0);
}
None
};

let mut blocks = self.inner.prepare_spill(self.spill_batch_rows)?;
let mut blocks = self
.inner
.prepare_spill(self.spill_params.unwrap().batch_rows)?;

// Fill the spill meta.
if let Some(b) = blocks.first_mut() {
b.replace_meta(spill_meta);
}
for b in blocks.iter_mut().skip(1) {
b.replace_meta(Box::new(SortSpillMeta {}));
for b in blocks.iter_mut() {
b.replace_meta(match spill_params.take() {
Some(params) => Box::new(SortSpillMetaWithParams(params)),
None => Box::new(SortSpillMeta {}),
});
}

debug_assert_eq!(self.inner.num_bytes(), 0);
Expand Down Expand Up @@ -242,7 +247,7 @@ where
fn on_finish(&mut self, _output: bool) -> Result<Vec<DataBlock>> {
// If the processor has started to spill blocks,
// gather the final few data in one block.
self.inner.on_finish(self.spill_num_merge > 0)
self.inner.on_finish(self.spill_params.is_some())
}
}

Expand Down
27 changes: 11 additions & 16 deletions src/query/service/src/pipelines/builders/builder_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,25 +207,20 @@ impl SortPipelineBuilder {
let bytes_limit_per_proc = settings.get_sort_spilling_bytes_threshold_per_proc()?;
if memory_ratio == 0 && bytes_limit_per_proc == 0 {
// If these two settings are not set, do not enable sort spill.
// TODO(spill): enable sort spill by default like aggregate.
return Ok((0, 0));
}
let memory_ratio = (memory_ratio as f64 / 100_f64).min(1_f64);
let max_memory_usage = match settings.get_max_memory_usage()? {
0 => usize::MAX,
max_memory_usage => {
if memory_ratio == 0_f64 {
usize::MAX
} else {
(max_memory_usage as f64 * memory_ratio) as usize
}
}

let max_memory_usage = match (
settings.get_max_memory_usage()?,
(memory_ratio as f64 / 100_f64).min(1_f64),
) {
(0, _) | (_, 0.0) => usize::MAX,
(memory, ratio) => (memory as f64 * ratio) as usize,
};
let spill_threshold_per_core = match bytes_limit_per_proc {
0 => max_memory_usage / num_threads,
bytes => bytes,
};
let spill_threshold_per_core =
match settings.get_sort_spilling_bytes_threshold_per_proc()? {
0 => max_memory_usage / num_threads,
bytes => bytes,
};

Ok((max_memory_usage, spill_threshold_per_core))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use databend_common_pipeline_transforms::processors::sort::SimpleRowsDesc;
use databend_common_pipeline_transforms::processors::sort::SortSpillMeta;
use databend_common_pipeline_transforms::processors::sort::SortSpillMetaWithParams;
use databend_common_pipeline_transforms::processors::sort::SortedStream;
use databend_common_pipeline_transforms::processors::SortSpillParams;

use crate::spillers::Location;
use crate::spillers::Spiller;
Expand Down Expand Up @@ -89,15 +90,17 @@ pub struct TransformSortSpill<A: SortAlgorithm> {
}

#[inline(always)]
fn need_spill(block: &DataBlock) -> bool {
block
.get_meta()
.and_then(SortSpillMeta::downcast_ref_from)
.is_some()
|| block
.get_meta()
.and_then(SortSpillMetaWithParams::downcast_ref_from)
.is_some()
fn take_spill_meta(block: &mut DataBlock) -> Option<Option<SortSpillParams>> {
block.take_meta().map(|meta| {
if SortSpillMeta::downcast_ref_from(&meta).is_some() {
return None;
}
Some(
SortSpillMetaWithParams::downcast_from(meta)
.expect("unknown meta type")
.0,
)
})
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -142,36 +145,38 @@ where
}

if self.input.has_data() {
let block = self.input.pull_data().unwrap()?;
let mut block = self.input.pull_data().unwrap()?;
let meta = take_spill_meta(&mut block);
return match &self.state {
State::Init => {
if need_spill(&block) {
// Need to spill this block.
let meta =
SortSpillMetaWithParams::downcast_ref_from(block.get_meta().unwrap())
.unwrap();
self.batch_rows = meta.batch_rows;
self.num_merge = meta.num_merge;

self.input_data = Some(block);
self.state = State::Spill;
Ok(Event::Async)
} else {
// If we get a memory block at initial state, it means we will never spill data.
debug_assert!(self.spiller.columns_layout.is_empty());
self.output_block(block);
self.state = State::NoSpill;
Ok(Event::NeedConsume)
match meta {
Some(Some(params)) => {
// Need to spill this block.
self.batch_rows = params.batch_rows;
self.num_merge = params.num_merge;

self.input_data = Some(block);
self.state = State::Spill;
Ok(Event::Async)
}
Some(None) => unreachable!(),
None => {
// If we get a memory block at initial state, it means we will never spill data.
debug_assert!(self.spiller.columns_layout.is_empty());
self.output_block(block);
self.state = State::NoSpill;
Ok(Event::NeedConsume)
}
}
}
State::NoSpill => {
debug_assert!(!need_spill(&block));
debug_assert!(meta.is_none());
self.output_block(block);
self.state = State::NoSpill;
Ok(Event::NeedConsume)
}
State::Spill => {
if !need_spill(&block) {
if meta.is_none() {
// It means we get the last block.
// We can launch external merge sort now.
self.state = State::Merging;
Expand Down Expand Up @@ -470,7 +475,6 @@ mod tests {
use databend_common_expression::block_debug::pretty_format_blocks;
use databend_common_expression::types::DataType;
use databend_common_expression::types::Int32Type;
use databend_common_expression::DataBlock;
use databend_common_expression::DataField;
use databend_common_expression::DataSchemaRefExt;
use databend_common_expression::FromData;
Expand All @@ -479,10 +483,8 @@ mod tests {
use rand::rngs::ThreadRng;
use rand::Rng;

use super::TransformSortSpill;
use super::*;
use crate::sessions::QueryContext;
use crate::spillers::Spiller;
use crate::spillers::SpillerConfig;
use crate::spillers::SpillerType;
use crate::test_kits::*;
Expand Down
15 changes: 12 additions & 3 deletions src/query/service/src/spillers/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl BlocksEncoder {
}
}

pub(super) fn add_blocks(&mut self, blocks: Vec<DataBlock>) {
pub(super) fn add_blocks(&mut self, mut blocks: Vec<DataBlock>) {
let layout = if self.use_parquet {
// Currently we splice multiple complete parquet files into one,
// so that the file contains duplicate headers/footers and metadata,
Expand All @@ -71,7 +71,11 @@ impl BlocksEncoder {
bare_blocks_to_parquet(blocks, &mut self.buf).unwrap();
Layout::Parquet
} else {
let block = DataBlock::concat(&blocks).unwrap();
let block = if blocks.len() == 1 {
blocks.remove(0)
} else {
DataBlock::concat(&blocks).unwrap()
};
let columns_layout = std::iter::once(self.size())
.chain(block.columns().iter().map(|entry| {
let column = entry
Expand Down Expand Up @@ -135,7 +139,12 @@ fn bare_blocks_from_parquet<R: ChunkReader + 'static>(data: R) -> Result<DataBlo
let (block, _) = DataBlock::from_record_batch(&schema, &record_batch)?;
blocks.push(block);
}
DataBlock::concat(&blocks)

if blocks.len() == 1 {
Ok(blocks.remove(0))
} else {
DataBlock::concat(&blocks)
}
}

/// Serialize bare data blocks to parquet format.
Expand Down
19 changes: 19 additions & 0 deletions tests/sqllogictests/suites/query/order.test
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,22 @@ select number from numbers(10) as a order by b.number

statement error
select number from (select * from numbers(10) as b) as a order by b.number

statement ok
set sort_spilling_bytes_threshold_per_proc = 1;

statement ok
set spilling_file_format = 'arrow';

query I
select number from numbers(1000) order by number offset 997;
----
997
998
999

statement ok
unset sort_spilling_bytes_threshold_per_proc;

statement ok
unset spilling_file_format;

0 comments on commit 3d19b30

Please sign in to comment.