17
17
18
18
//! Execution functions
19
19
20
- use std:: collections:: HashMap ;
21
- use std:: fs:: File ;
22
- use std:: io:: prelude:: * ;
23
- use std:: io:: BufReader ;
24
-
25
20
use crate :: cli_context:: CliSessionContext ;
26
21
use crate :: helper:: split_from_semicolon;
27
22
use crate :: print_format:: PrintFormat ;
@@ -31,6 +26,11 @@ use crate::{
31
26
object_storage:: get_object_store,
32
27
print_options:: { MaxRows , PrintOptions } ,
33
28
} ;
29
+ use futures:: StreamExt ;
30
+ use std:: collections:: HashMap ;
31
+ use std:: fs:: File ;
32
+ use std:: io:: prelude:: * ;
33
+ use std:: io:: BufReader ;
34
34
35
35
use datafusion:: common:: instant:: Instant ;
36
36
use datafusion:: common:: { plan_datafusion_err, plan_err} ;
@@ -39,10 +39,12 @@ use datafusion::datasource::listing::ListingTableUrl;
39
39
use datafusion:: error:: { DataFusionError , Result } ;
40
40
use datafusion:: logical_expr:: { DdlStatement , LogicalPlan } ;
41
41
use datafusion:: physical_plan:: execution_plan:: EmissionType ;
42
- use datafusion:: physical_plan:: { collect , execute_stream, ExecutionPlanProperties } ;
42
+ use datafusion:: physical_plan:: { execute_stream, ExecutionPlanProperties } ;
43
43
use datafusion:: sql:: parser:: { DFParser , Statement } ;
44
44
use datafusion:: sql:: sqlparser:: dialect:: dialect_from_str;
45
45
46
+ use datafusion:: execution:: memory_pool:: MemoryConsumer ;
47
+ use datafusion:: physical_plan:: spill:: get_record_batch_memory_size;
46
48
use datafusion:: sql:: sqlparser;
47
49
use rustyline:: error:: ReadlineError ;
48
50
use rustyline:: Editor ;
@@ -235,6 +237,10 @@ pub(super) async fn exec_and_print(
235
237
let df = ctx. execute_logical_plan ( plan) . await ?;
236
238
let physical_plan = df. create_physical_plan ( ) . await ?;
237
239
240
+ // Track memory usage for the query result if it's bounded
241
+ let mut reservation =
242
+ MemoryConsumer :: new ( "DataFusion-Cli" ) . register ( task_ctx. memory_pool ( ) ) ;
243
+
238
244
if physical_plan. boundedness ( ) . is_unbounded ( ) {
239
245
if physical_plan. pipeline_behavior ( ) == EmissionType :: Final {
240
246
return plan_err ! (
@@ -247,10 +253,29 @@ pub(super) async fn exec_and_print(
247
253
let stream = execute_stream ( physical_plan, task_ctx. clone ( ) ) ?;
248
254
print_options. print_stream ( stream, now) . await ?;
249
255
} else {
250
- // Bounded stream; collected results are printed after all input consumed.
256
+ // Bounded stream; collected results size is limited by the maxrows option
251
257
let schema = physical_plan. schema ( ) ;
252
- let results = collect ( physical_plan, task_ctx. clone ( ) ) . await ?;
253
- adjusted. into_inner ( ) . print_batches ( schema, & results, now) ?;
258
+ let mut stream = execute_stream ( physical_plan, task_ctx. clone ( ) ) ?;
259
+ let mut results = vec ! [ ] ;
260
+ let mut row_count = 0_usize ;
261
+ while let Some ( batch) = stream. next ( ) . await {
262
+ let batch = batch?;
263
+ let curr_num_rows = batch. num_rows ( ) ;
264
+ if let MaxRows :: Limited ( max_rows) = print_options. maxrows {
265
+ // Stop collecting results if the number of rows exceeds the limit
266
+ // results batch should include the last batch that exceeds the limit
267
+ if row_count < max_rows + curr_num_rows {
268
+ // Try to grow the reservation to accommodate the batch in memory
269
+ reservation. try_grow ( get_record_batch_memory_size ( & batch) ) ?;
270
+ results. push ( batch) ;
271
+ }
272
+ }
273
+ row_count += curr_num_rows;
274
+ }
275
+ adjusted
276
+ . into_inner ( )
277
+ . print_batches ( schema, & results, now, row_count) ?;
278
+ reservation. free ( ) ;
254
279
}
255
280
}
256
281
0 commit comments