Skip to content

Commit

Permalink
refactor: polish memory usage when copy into parquet.
Browse files Browse the repository at this point in the history
1. serialize early to reduce memory usage.
2. limit num of threads by memory and max_file_size.
3. adjust default max_file_size.
  • Loading branch information
youngsofun committed May 22, 2024
1 parent b23080d commit b8dd3e8
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 75 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/query/storages/stage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ databend-common-pipeline-transforms = { path = "../../pipeline/transforms" }
databend-common-settings = { path = "../../settings" }
databend-common-storage = { path = "../../../common/storage" }
databend-common-storages-parquet = { path = "../parquet" }
databend-storages-common-table-meta = { path = "../common/table_meta" }

arrow-schema = { workspace = true }
async-backtrace = { workspace = true }
async-trait = { workspace = true }
bstr = "1.9.1"
Expand All @@ -36,6 +38,7 @@ enum-as-inner = "0.6.0"
futures = { workspace = true }
log = { workspace = true }
opendal = { workspace = true }
parquet_rs = { workspace = true }
serde = { workspace = true }

serde_json = { workspace = true }
Expand Down
25 changes: 6 additions & 19 deletions src/query/storages/stage/src/append/do_append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,45 +34,32 @@ impl StageTable {
) -> databend_common_exception::Result<()> {
let settings = ctx.get_settings();

let single = self.table_info.stage_info.copy_options.single;
let max_file_size = if single {
usize::MAX
} else {
let max_file_size = self.table_info.stage_info.copy_options.max_file_size;
if max_file_size == 0 {
// 256M per file by default.
256 * 1024 * 1024
} else {
let mem_limit = (settings.get_max_memory_usage()? / 2) as usize;
max_file_size.min(mem_limit)
}
};
let fmt = self.table_info.stage_info.file_format_params.clone();
let mem_limit = settings.get_max_memory_usage()? as usize;
let max_threads = settings.get_max_threads()? as usize;

let op = StageTable::get_op(&self.table_info.stage_info)?;
let fmt = self.table_info.stage_info.file_format_params.clone();
let uuid = uuid::Uuid::new_v4().to_string();
let group_id = AtomicUsize::new(0);
match fmt {
FileFormatParams::Parquet(_) => append_data_to_parquet_files(
pipeline,
ctx.clone(),
self.table_info.clone(),
op,
max_file_size,
max_threads,
uuid,
&group_id,
mem_limit,
max_threads,
)?,
_ => append_data_to_row_based_files(
pipeline,
ctx.clone(),
self.table_info.clone(),
op,
max_file_size,
max_threads,
uuid,
&group_id,
mem_limit,
max_threads,
)?,
};
if !self.table_info.stage_info.copy_options.detailed_output {
Expand Down
45 changes: 24 additions & 21 deletions src/query/storages/stage/src/append/parquet_file/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,54 +12,57 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_catalog::plan::StageTableInfo;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_formats::FileFormatOptionsExt;
use databend_common_pipeline_core::Pipeline;
use opendal::Operator;

use super::limit_file_size_processor::LimitFileSizeProcessor;
use super::writer_processor::ParquetFileWriter;

// LimitFileSizeProcessor * 1: slice/group block to batches (as a block meta) that are suitable as a file.
// ParquetFileSink * N: simply serialize blocks in each meta to a whole file and write out.
/// - LimitFileSizeProcessor * 1: slice/group block to batches (as a block meta) to avoid files being too small when there are many threads.
/// - ParquetFileSink * N: serialize incoming blocks to Vec to reduce memory, and flush when they are large enough.
#[allow(clippy::too_many_arguments)]
pub(crate) fn append_data_to_parquet_files(
pipeline: &mut Pipeline,
ctx: Arc<dyn TableContext>,
table_info: StageTableInfo,
op: Operator,
max_file_size: usize,
max_threads: usize,
uuid: String,
group_id: &std::sync::atomic::AtomicUsize,
mem_limit: usize,
max_threads: usize,
) -> Result<()> {
let is_single = table_info.stage_info.copy_options.single;
let max_file_size = table_info.stage_info.copy_options.max_file_size;
// when serializing block to parquet, the memory may be doubled
let mem_limit = mem_limit / 2;
pipeline.try_resize(1)?;
pipeline.add_transform(|input, output| {
LimitFileSizeProcessor::try_create(input, output, max_file_size)
})?;
if max_file_size != usize::MAX {
let max_file_size = if is_single {
None
} else {
let max_file_size = if max_file_size == 0 {
64 * 1024 * 1024
} else {
max_file_size.min(mem_limit)
};
pipeline.add_transform(|input, output| {
LimitFileSizeProcessor::try_create(input, output, max_file_size)
})?;

let max_threads = max_threads.min(mem_limit / max_file_size).max(1);
pipeline.try_resize(max_threads)?;
}
Some(max_file_size)
};
pipeline.add_transform(|input, output| {
let gid = group_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut options_ext =
FileFormatOptionsExt::create_from_settings(&ctx.get_settings(), false)?;
let output_format = options_ext.get_output_format(
table_info.schema(),
table_info.stage_info.file_format_params.clone(),
)?;
ParquetFileWriter::try_create(
input,
output,
table_info.clone(),
output_format,
op.clone(),
uuid.clone(),
gid,
max_file_size,
)
})?;
Ok(())
Expand Down
138 changes: 110 additions & 28 deletions src/query/storages/stage/src/append/parquet_file/writer_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,24 @@ use std::collections::VecDeque;
use std::mem;
use std::sync::Arc;

use arrow_schema::Schema as ArrowSchema;
use async_trait::async_trait;
use databend_common_catalog::plan::StageTableInfo;
use databend_common_exception::Result;
use databend_common_expression::converts::arrow::table_schema_to_arrow_schema;
use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::DataBlock;
use databend_common_formats::output_format::OutputFormat;
use databend_common_pipeline_core::processors::Event;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::Processor;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_storages_common_table_meta::table::TableCompression;
use opendal::Operator;
use parquet_rs::arrow::ArrowWriter;
use parquet_rs::basic::Encoding;
use parquet_rs::file::properties::EnabledStatistics;
use parquet_rs::file::properties::WriterProperties;

use super::block_batch::BlockBatch;
use crate::append::output::DataSummary;
Expand All @@ -40,53 +46,112 @@ pub struct ParquetFileWriter {
output: Arc<OutputPort>,

table_info: StageTableInfo,
output_format: Box<dyn OutputFormat>,
arrow_schema: Arc<ArrowSchema>,

unload_output: UnloadOutput,
unload_output_blocks: Option<VecDeque<DataBlock>>,
input_data: Option<DataBlock>,
file_to_write: Option<(Vec<u8>, DataSummary)>,
input_data: Vec<DataBlock>,

input_bytes: usize,
row_counts: usize,
writer: ArrowWriter<Vec<u8>>,

file_to_write: Option<(Vec<u8>, DataSummary)>,
data_accessor: Operator,

// the result of statement
unload_output: UnloadOutput,
unload_output_blocks: Option<VecDeque<DataBlock>>,

uuid: String,
group_id: usize,
batch_id: usize,

targe_file_size: Option<usize>,
}

const MAX_BUFFER_SIZE: usize = 64 * 1024 * 1024;
// this is number of rows, not size
const MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;

fn create_writer(
arrow_schema: Arc<ArrowSchema>,
targe_file_size: Option<usize>,
) -> Result<ArrowWriter<Vec<u8>>> {
let props = WriterProperties::builder()
.set_compression(TableCompression::Zstd.into())
.set_max_row_group_size(MAX_ROW_GROUP_SIZE)
.set_encoding(Encoding::PLAIN)
.set_dictionary_enabled(false)
.set_statistics_enabled(EnabledStatistics::None)
.set_bloom_filter_enabled(false)
.build();
let buf_size = match targe_file_size {
Some(n) if n < MAX_BUFFER_SIZE => n,
_ => MAX_BUFFER_SIZE,
};
let writer = ArrowWriter::try_new(Vec::with_capacity(buf_size), arrow_schema, Some(props))?;
Ok(writer)
}

impl ParquetFileWriter {
pub fn try_create(
input: Arc<InputPort>,
output: Arc<OutputPort>,
table_info: StageTableInfo,
output_format: Box<dyn OutputFormat>,
data_accessor: Operator,
uuid: String,
group_id: usize,
targe_file_size: Option<usize>,
) -> Result<ProcessorPtr> {
let unload_output =
UnloadOutput::create(table_info.stage_info.copy_options.detailed_output);

let arrow_schema = Arc::new(table_schema_to_arrow_schema(&table_info.schema));
let writer = create_writer(arrow_schema.clone(), targe_file_size)?;

Ok(ProcessorPtr::create(Box::new(ParquetFileWriter {
input,
output,
table_info,
output_format,
arrow_schema,
unload_output,
unload_output_blocks: None,
input_data: None,
writer,
input_data: Vec::new(),
input_bytes: 0,
file_to_write: None,
data_accessor,
uuid,
group_id,
batch_id: 0,
targe_file_size,
row_counts: 0,
})))
}
pub fn reinit_writer(&mut self) -> Result<()> {
self.writer = create_writer(self.arrow_schema.clone(), self.targe_file_size)?;
self.row_counts = 0;
self.input_bytes = 0;
Ok(())
}

fn flush(&mut self) -> Result<()> {
_ = self.writer.finish();
let buf = mem::take(self.writer.inner_mut());
let output_bytes = buf.len();
self.file_to_write = Some((buf, DataSummary {
row_counts: self.row_counts,
input_bytes: self.input_bytes,
output_bytes,
}));
self.reinit_writer()?;
Ok(())
}
}

#[async_trait]
impl Processor for ParquetFileWriter {
fn name(&self) -> String {
"ParquetFileSink".to_string()
"ParquetFileWriter".to_string()
}

fn as_any(&mut self) -> &mut dyn Any {
Expand All @@ -100,10 +165,13 @@ impl Processor for ParquetFileWriter {
} else if self.file_to_write.is_some() {
self.input.set_not_need_data();
Ok(Event::Async)
} else if self.input_data.is_some() {
} else if !self.input_data.is_empty() {
self.input.set_not_need_data();
Ok(Event::Sync)
} else if self.input.is_finished() {
if self.row_counts > 0 {
return Ok(Event::Sync);
}
if self.unload_output.is_empty() {
self.output.finish();
return Ok(Event::Finished);
Expand All @@ -123,7 +191,15 @@ impl Processor for ParquetFileWriter {
Ok(Event::NeedConsume)
}
} else if self.input.has_data() {
self.input_data = Some(self.input.pull_data().unwrap()?);
let block = self.input.pull_data().unwrap()?;
if self.targe_file_size.is_none() {
self.input_data.push(block);
} else {
let block_meta = block.get_owned_meta().unwrap();
let blocks = BlockBatch::downcast_from(block_meta).unwrap();
self.input_data.extend_from_slice(&blocks.blocks);
}

self.input.set_not_need_data();
Ok(Event::Sync)
} else {
Expand All @@ -133,23 +209,29 @@ impl Processor for ParquetFileWriter {
}

fn process(&mut self) -> Result<()> {
let block = self.input_data.take().unwrap();
let block_meta = block.get_owned_meta().unwrap();
let blocks = BlockBatch::downcast_from(block_meta).unwrap();
let mut input_bytes = 0;
let mut row_counts = 0;
for b in blocks.blocks {
input_bytes += b.memory_size();
row_counts += b.num_rows();
self.output_format.serialize_block(&b)?;
while let Some(b) = self.input_data.pop() {
self.input_bytes += b.memory_size();
self.row_counts += b.num_rows();
let batch = b.to_record_batch(&self.table_info.schema)?;
self.writer.write(&batch)?;

if let Some(target) = self.targe_file_size {
if self.row_counts > 0 {
// written row groups: compressed, controlled by MAX_ROW_GROUP_SIZE
let file_size = self.writer.bytes_written();
// in_progress row group: each column leaf has an at most 1MB uncompressed buffer and multi compressed pages
// may result in small file for schema with many columns
let in_progress = self.writer.in_progress_size();
if file_size + in_progress >= target {
self.flush()?;
return Ok(());
}
}
}
}
if self.input.is_finished() && self.row_counts > 0 {
self.flush()?;
}
let data = self.output_format.finalize()?;
let output_bytes = data.len();
self.file_to_write = Some((data, DataSummary {
row_counts,
input_bytes,
output_bytes,
}));
Ok(())
}

Expand Down
Loading

0 comments on commit b8dd3e8

Please sign in to comment.