Skip to content

Revert Datafusion-cli: Redesign the datafusion-cli execution and print, make it totally streaming printing without memory overhead #14948

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 34 additions & 35 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ use crate::{
object_storage::get_object_store,
print_options::{MaxRows, PrintOptions},
};
use futures::StreamExt;
use std::collections::HashMap;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;

use datafusion::common::instant::Instant;
use datafusion::common::{plan_datafusion_err, plan_err};
use datafusion::config::ConfigFileType;
Expand All @@ -35,15 +41,13 @@ use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::execution_plan::EmissionType;
use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser;
use datafusion::sql::sqlparser::dialect::dialect_from_str;

use datafusion::execution::memory_pool::MemoryConsumer;
use datafusion::physical_plan::spill::get_record_batch_memory_size;
use datafusion::sql::sqlparser;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use std::collections::HashMap;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;
use std::sync::Arc;
use tokio::signal;

/// run and execute SQL statements and commands, against a context with the given print options
Expand Down Expand Up @@ -225,17 +229,18 @@ pub(super) async fn exec_and_print(
for statement in statements {
let adjusted =
AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement);

let plan = create_plan(ctx, statement).await?;
let adjusted = adjusted.with_plan(&plan);

let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

let is_unbounded = physical_plan.boundedness().is_unbounded();
let mut stream = execute_stream(Arc::clone(&physical_plan), task_ctx.clone())?;
// Track memory usage for the query result if it's bounded
let mut reservation =
MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool());

// Both bounded and unbounded streams are streaming prints
if is_unbounded {
if physical_plan.boundedness().is_unbounded() {
if physical_plan.pipeline_behavior() == EmissionType::Final {
return plan_err!(
"The given query can generate a valid result only once \
Expand All @@ -244,43 +249,37 @@ pub(super) async fn exec_and_print(
}
// As the input stream comes, we can generate results.
// However, memory safety is not guaranteed.
print_options
.print_stream(MaxRows::Unlimited, stream, now)
.await?;
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
} else {
// Bounded stream; collected results size is limited by the maxrows option
let schema = physical_plan.schema();
let mut stream = execute_stream(physical_plan, task_ctx.clone())?;
let mut results = vec![];
let mut row_count = 0_usize;
let max_rows = match print_options.maxrows {
MaxRows::Unlimited => usize::MAX,
MaxRows::Limited(n) => n,
};
let stdout = std::io::stdout();
let mut writer = stdout.lock();

// If we don't want to print the table, we should use the streaming print same as above
if print_options.format != PrintFormat::Table
&& print_options.format != PrintFormat::Automatic
{
print_options
.print_stream(print_options.maxrows, stream, now)
.await?;
continue;
while let Some(batch) = stream.next().await {
let batch = batch?;
let curr_num_rows = batch.num_rows();
// Stop collecting results if the number of rows exceeds the limit
// results batch should include the last batch that exceeds the limit
if row_count < max_rows + curr_num_rows {
// Try to grow the reservation to accommodate the batch in memory
reservation.try_grow(get_record_batch_memory_size(&batch))?;
results.push(batch);
}
row_count += curr_num_rows;
}

// into_inner will finalize the print options to table if it's automatic
adjusted
.into_inner()
.print_table_batch(
print_options,
schema,
&mut stream,
max_rows,
&mut writer,
now,
)
.await?;
.print_batches(schema, &results, now, row_count)?;
reservation.free();
}
}

Ok(())
}

Expand Down
Loading
Loading