@@ -29,9 +29,8 @@ use crate::{
29
29
DisplayFormatType , Distribution , EquivalenceProperties , ExecutionPlan , Partitioning ,
30
30
} ;
31
31
32
- use arrow:: array:: ArrayRef ;
33
32
use arrow:: datatypes:: SchemaRef ;
34
- use arrow:: record_batch:: { RecordBatch , RecordBatchOptions } ;
33
+ use arrow:: record_batch:: RecordBatch ;
35
34
use datafusion_common:: stats:: Precision ;
36
35
use datafusion_common:: { internal_err, DataFusionError , Result } ;
37
36
use datafusion_execution:: TaskContext ;
@@ -507,26 +506,15 @@ impl LimitStream {
507
506
//
508
507
self . fetch -= batch. num_rows ( ) ;
509
508
Some ( batch)
510
- } else {
509
+ } else if batch . num_rows ( ) >= self . fetch {
511
510
let batch_rows = self . fetch ;
512
511
self . fetch = 0 ;
513
512
self . input = None ; // clear input so it can be dropped early
514
513
515
- let limited_columns: Vec < ArrayRef > = batch
516
- . columns ( )
517
- . iter ( )
518
- . map ( |col| col. slice ( 0 , col. len ( ) . min ( batch_rows) ) )
519
- . collect ( ) ;
520
- let options =
521
- RecordBatchOptions :: new ( ) . with_row_count ( Option :: from ( batch_rows) ) ;
522
- Some (
523
- RecordBatch :: try_new_with_options (
524
- batch. schema ( ) ,
525
- limited_columns,
526
- & options,
527
- )
528
- . unwrap ( ) ,
529
- )
514
+ // It is guaranteed that batch_rows is <= batch.num_rows
515
+ Some ( batch. slice ( 0 , batch_rows) )
516
+ } else {
517
+ unreachable ! ( )
530
518
}
531
519
}
532
520
}
@@ -575,6 +563,7 @@ mod tests {
575
563
use crate :: { common, test} ;
576
564
577
565
use crate :: aggregates:: { AggregateExec , AggregateMode , PhysicalGroupBy } ;
566
+ use arrow_array:: RecordBatchOptions ;
578
567
use arrow_schema:: Schema ;
579
568
use datafusion_physical_expr:: expressions:: col;
580
569
use datafusion_physical_expr:: PhysicalExpr ;
0 commit comments