21
21
//! Note: Most traits here need to be marked `Sync + Send` to be
22
22
//! compliant with the `SendableRecordBatchStream` trait.
23
23
24
- use std:: collections:: VecDeque ;
25
24
use std:: mem;
26
25
use std:: pin:: Pin ;
27
26
use std:: task:: { Context , Poll } ;
@@ -38,12 +37,15 @@ use crate::physical_plan::RecordBatchStream;
38
37
use arrow:: datatypes:: SchemaRef ;
39
38
use arrow:: error:: ArrowError ;
40
39
use arrow:: record_batch:: RecordBatch ;
40
+ use chrono:: TimeZone ;
41
41
use datafusion_common:: instant:: Instant ;
42
42
use datafusion_common:: ScalarValue ;
43
43
44
44
use futures:: future:: BoxFuture ;
45
45
use futures:: stream:: BoxStream ;
46
46
use futures:: { ready, FutureExt , Stream , StreamExt } ;
47
+ use object_store:: path:: Path ;
48
+ use object_store:: ObjectMeta ;
47
49
48
50
/// A fallible future that resolves to a stream of [`RecordBatch`]
49
51
pub type FileOpenFuture =
@@ -76,7 +78,8 @@ pub trait FileOpener: Unpin {
76
78
/// A stream that iterates record batch by record batch, file over file.
77
79
pub struct FileStream < F : FileOpener > {
78
80
/// An iterator over input files.
79
- file_iter : VecDeque < PartitionedFile > ,
81
+ files : Vec < PartitionedFile > ,
82
+ cur_file_idx : usize ,
80
83
/// The stream schema (file schema including partition columns and after
81
84
/// projection).
82
85
projected_schema : SchemaRef ,
@@ -263,7 +266,8 @@ impl<F: FileOpener> FileStream<F> {
263
266
let files = config. file_groups [ partition] . clone ( ) ;
264
267
265
268
Ok ( Self {
266
- file_iter : files. into ( ) ,
269
+ files,
270
+ cur_file_idx : 0 ,
267
271
projected_schema,
268
272
remain : config. limit ,
269
273
file_opener,
@@ -289,18 +293,36 @@ impl<F: FileOpener> FileStream<F> {
289
293
/// Since file opening is mostly IO (and may involve a
290
294
/// bunch of sequential IO), it can be parallelized with decoding.
291
295
fn start_next_file ( & mut self ) -> Option < Result < ( FileOpenFuture , Vec < ScalarValue > ) > > {
292
- let part_file = self . file_iter . pop_front ( ) ?;
296
+ if self . cur_file_idx == self . files . len ( ) {
297
+ return None ;
298
+ }
299
+
300
+ let part_file = & mut self . files [ self . cur_file_idx ] ;
301
+ self . cur_file_idx += 1 ;
302
+
303
+ let object_meta = mem:: replace (
304
+ & mut part_file. object_meta ,
305
+ ObjectMeta {
306
+ location : Path :: default ( ) ,
307
+ last_modified : chrono:: Utc . timestamp_nanos ( 0 ) ,
308
+ size : 0 ,
309
+ e_tag : None ,
310
+ version : None ,
311
+ } ,
312
+ ) ;
313
+
314
+ let partition_values = mem:: take ( & mut part_file. partition_values ) ;
293
315
294
316
let file_meta = FileMeta {
295
- object_meta : part_file . object_meta ,
296
- range : part_file. range ,
297
- extensions : part_file. extensions ,
317
+ object_meta,
318
+ range : part_file. range . clone ( ) ,
319
+ extensions : part_file. extensions . clone ( ) ,
298
320
} ;
299
321
300
322
Some (
301
323
self . file_opener
302
324
. open ( file_meta)
303
- . map ( |future| ( future, part_file . partition_values ) ) ,
325
+ . map ( |future| ( future, partition_values) ) ,
304
326
)
305
327
}
306
328
0 commit comments