Skip to content

Commit aeb25fe

Browse files
committed
Revert "Datafusion-cli: Redesign the datafusion-cli execution and print, make it totally streaming printing without memory overhead (apache#14877)"
This reverts commit 53fc94f.
1 parent e19010b commit aeb25fe

File tree

3 files changed

+69
-678
lines changed

3 files changed

+69
-678
lines changed

datafusion-cli/src/exec.rs

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ use crate::{
2626
object_storage::get_object_store,
2727
print_options::{MaxRows, PrintOptions},
2828
};
29+
use futures::StreamExt;
30+
use std::collections::HashMap;
31+
use std::fs::File;
32+
use std::io::prelude::*;
33+
use std::io::BufReader;
34+
2935
use datafusion::common::instant::Instant;
3036
use datafusion::common::{plan_datafusion_err, plan_err};
3137
use datafusion::config::ConfigFileType;
@@ -35,15 +41,13 @@ use datafusion::logical_expr::{DdlStatement, LogicalPlan};
3541
use datafusion::physical_plan::execution_plan::EmissionType;
3642
use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties};
3743
use datafusion::sql::parser::{DFParser, Statement};
38-
use datafusion::sql::sqlparser;
3944
use datafusion::sql::sqlparser::dialect::dialect_from_str;
45+
46+
use datafusion::execution::memory_pool::MemoryConsumer;
47+
use datafusion::physical_plan::spill::get_record_batch_memory_size;
48+
use datafusion::sql::sqlparser;
4049
use rustyline::error::ReadlineError;
4150
use rustyline::Editor;
42-
use std::collections::HashMap;
43-
use std::fs::File;
44-
use std::io::prelude::*;
45-
use std::io::BufReader;
46-
use std::sync::Arc;
4751
use tokio::signal;
4852

4953
/// run and execute SQL statements and commands, against a context with the given print options
@@ -225,17 +229,18 @@ pub(super) async fn exec_and_print(
225229
for statement in statements {
226230
let adjusted =
227231
AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement);
232+
228233
let plan = create_plan(ctx, statement).await?;
229234
let adjusted = adjusted.with_plan(&plan);
230235

231236
let df = ctx.execute_logical_plan(plan).await?;
232237
let physical_plan = df.create_physical_plan().await?;
233238

234-
let is_unbounded = physical_plan.boundedness().is_unbounded();
235-
let mut stream = execute_stream(Arc::clone(&physical_plan), task_ctx.clone())?;
239+
// Track memory usage for the query result if it's bounded
240+
let mut reservation =
241+
MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool());
236242

237-
// Both bounded and unbounded streams are streaming prints
238-
if is_unbounded {
243+
if physical_plan.boundedness().is_unbounded() {
239244
if physical_plan.pipeline_behavior() == EmissionType::Final {
240245
return plan_err!(
241246
"The given query can generate a valid result only once \
@@ -244,43 +249,37 @@ pub(super) async fn exec_and_print(
244249
}
245250
// As the input stream comes, we can generate results.
246251
// However, memory safety is not guaranteed.
247-
print_options
248-
.print_stream(MaxRows::Unlimited, stream, now)
249-
.await?;
252+
let stream = execute_stream(physical_plan, task_ctx.clone())?;
253+
print_options.print_stream(stream, now).await?;
250254
} else {
251255
// Bounded stream; collected results size is limited by the maxrows option
252256
let schema = physical_plan.schema();
257+
let mut stream = execute_stream(physical_plan, task_ctx.clone())?;
258+
let mut results = vec![];
259+
let mut row_count = 0_usize;
253260
let max_rows = match print_options.maxrows {
254261
MaxRows::Unlimited => usize::MAX,
255262
MaxRows::Limited(n) => n,
256263
};
257-
let stdout = std::io::stdout();
258-
let mut writer = stdout.lock();
259-
260-
// If we don't want to print the table, we should use the streaming print same as above
261-
if print_options.format != PrintFormat::Table
262-
&& print_options.format != PrintFormat::Automatic
263-
{
264-
print_options
265-
.print_stream(print_options.maxrows, stream, now)
266-
.await?;
267-
continue;
264+
while let Some(batch) = stream.next().await {
265+
let batch = batch?;
266+
let curr_num_rows = batch.num_rows();
267+
// Stop collecting results if the number of rows exceeds the limit
268+
// results batch should include the last batch that exceeds the limit
269+
if row_count < max_rows + curr_num_rows {
270+
// Try to grow the reservation to accommodate the batch in memory
271+
reservation.try_grow(get_record_batch_memory_size(&batch))?;
272+
results.push(batch);
273+
}
274+
row_count += curr_num_rows;
268275
}
269-
270-
// into_inner will finalize the print options to table if it's automatic
271276
adjusted
272277
.into_inner()
273-
.print_table_batch(
274-
print_options,
275-
schema,
276-
&mut stream,
277-
max_rows,
278-
&mut writer,
279-
now,
280-
)
281-
.await?;
278+
.print_batches(schema, &results, now, row_count)?;
279+
reservation.free();
282280
}
283281
}
282+
284283
Ok(())
285284
}
286285

0 commit comments

Comments
 (0)