Skip to content

feat: Improve datafusion-cli memory usage and considering reserve mem… #14766

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Feb 21, 2025
4 changes: 3 additions & 1 deletion datafusion-cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ impl Command {
Self::Help => {
let now = Instant::now();
let command_batch = all_commands_info();
print_options.print_batches(command_batch.schema(), &[command_batch], now)
let schema = command_batch.schema();
let num_rows = command_batch.num_rows();
print_options.print_batches(schema, &[command_batch], now, num_rows)
}
Self::ListTables => {
exec_and_print(ctx, print_options, "SHOW TABLES".into()).await
Expand Down
43 changes: 34 additions & 9 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@

//! Execution functions

use std::collections::HashMap;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;

use crate::cli_context::CliSessionContext;
use crate::helper::split_from_semicolon;
use crate::print_format::PrintFormat;
Expand All @@ -31,6 +26,11 @@ use crate::{
object_storage::get_object_store,
print_options::{MaxRows, PrintOptions},
};
use futures::StreamExt;
use std::collections::HashMap;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;

use datafusion::common::instant::Instant;
use datafusion::common::{plan_datafusion_err, plan_err};
Expand All @@ -39,10 +39,12 @@ use datafusion::datasource::listing::ListingTableUrl;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::execution_plan::EmissionType;
use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties};
use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;

use datafusion::execution::memory_pool::MemoryConsumer;
use datafusion::physical_plan::spill::get_record_batch_memory_size;
use datafusion::sql::sqlparser;
use rustyline::error::ReadlineError;
use rustyline::Editor;
Expand Down Expand Up @@ -235,6 +237,10 @@ pub(super) async fn exec_and_print(
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

// Track memory usage for the query result if it's bounded
let mut reservation =
MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool());

if physical_plan.boundedness().is_unbounded() {
if physical_plan.pipeline_behavior() == EmissionType::Final {
return plan_err!(
Expand All @@ -247,10 +253,29 @@ pub(super) async fn exec_and_print(
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
} else {
// Bounded stream; collected results are printed after all input consumed.
// Bounded stream; collected results size is limited by the maxrows option
let schema = physical_plan.schema();
let results = collect(physical_plan, task_ctx.clone()).await?;
adjusted.into_inner().print_batches(schema, &results, now)?;
let mut stream = execute_stream(physical_plan, task_ctx.clone())?;
let mut results = vec![];
let mut row_count = 0_usize;
while let Some(batch) = stream.next().await {
let batch = batch?;
let curr_num_rows = batch.num_rows();
if let MaxRows::Limited(max_rows) = print_options.maxrows {
// Stop collecting results if the number of rows exceeds the limit
// results batch should include the last batch that exceeds the limit
if row_count < max_rows + curr_num_rows {
// Try to grow the reservation to accommodate the batch in memory
reservation.try_grow(get_record_batch_memory_size(&batch))?;
results.push(batch);
}
}
row_count += curr_num_rows;
}
adjusted
.into_inner()
.print_batches(schema, &results, now, row_count)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice if it even could print based on the stream instead of collecting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Dandandan review , it's a good point if we setting the maxrows huge number because the results only keep the maxrows count batch size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a follow-up ticket:
#14810

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

reservation.free();
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/print_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ impl PrintOptions {
schema: SchemaRef,
batches: &[RecordBatch],
query_start_time: Instant,
row_count: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a great idea

) -> Result<()> {
let stdout = std::io::stdout();
let mut writer = stdout.lock();

self.format
.print_batches(&mut writer, schema, batches, self.maxrows, true)?;

let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
let formatted_exec_details = get_execution_details_formatted(
row_count,
if self.format == PrintFormat::Table {
Expand Down
Loading