@@ -24,27 +24,151 @@ use std::fs::File;
24
24
use std:: io:: BufReader ;
25
25
use std:: path:: { Path , PathBuf } ;
26
26
use std:: ptr:: NonNull ;
27
+ use std:: sync:: Arc ;
27
28
28
29
use arrow:: array:: ArrayData ;
29
30
use arrow:: datatypes:: { Schema , SchemaRef } ;
30
31
use arrow:: ipc:: { reader:: StreamReader , writer:: StreamWriter } ;
31
32
use arrow:: record_batch:: RecordBatch ;
32
- use tokio:: sync:: mpsc:: Sender ;
33
-
34
- use datafusion_common:: { exec_datafusion_err, HashSet , Result } ;
35
-
36
- fn read_spill ( sender : Sender < Result < RecordBatch > > , path : & Path ) -> Result < ( ) > {
37
- let file = BufReader :: new ( File :: open ( path) ?) ;
38
- // SAFETY: DataFusion's spill writer strictly follows Arrow IPC specifications
39
- // with validated schemas and buffers. Skip redundant validation during read
40
- // to speedup read operation. This is safe for DataFusion as input guaranteed to be correct when written.
41
- let reader = unsafe { StreamReader :: try_new ( file, None ) ?. with_skip_validation ( true ) } ;
42
- for batch in reader {
43
- sender
44
- . blocking_send ( batch. map_err ( Into :: into) )
45
- . map_err ( |e| exec_datafusion_err ! ( "{e}" ) ) ?;
33
+
34
+ use datafusion_common:: { exec_datafusion_err, DataFusionError , HashSet , Result } ;
35
+ use datafusion_common_runtime:: SpawnedTask ;
36
+ use datafusion_execution:: disk_manager:: RefCountedTempFile ;
37
+ use datafusion_execution:: RecordBatchStream ;
38
+ use futures:: { FutureExt as _, Stream } ;
39
+
40
+ /// Stream that reads spill files from disk where each batch is read in a spawned blocking task
41
+ /// It will read one batch at a time and will not do any buffering, to buffer data use [`spawn_buffered`]
42
+ struct SpillReaderStream {
43
+ schema : SchemaRef ,
44
+ state : SpillReaderStreamState ,
45
+ }
46
+
47
+ /// When we poll for the next batch, we will get back both the batch and the reader,
48
+ /// so we can call `next` again.
49
+ type NextRecordBatchResult = Result < ( StreamReader < BufReader < File > > , Option < RecordBatch > ) > ;
50
+
51
+ enum SpillReaderStreamState {
52
+ /// Initial state: the stream was not initialized yet
53
+ /// and the file was not opened
54
+ Uninitialized ( RefCountedTempFile ) ,
55
+
56
+ /// A read is in progress in a spawned blocking task for which we hold the handle.
57
+ ReadInProgress ( SpawnedTask < NextRecordBatchResult > ) ,
58
+
59
+ /// A read has finished and we wait for being polled again in order to start reading the next batch.
60
+ Waiting ( StreamReader < BufReader < File > > ) ,
61
+
62
+ /// The stream has finished, successfully or not.
63
+ Done ,
64
+ }
65
+
66
+ impl SpillReaderStream {
67
+ fn new ( schema : SchemaRef , spill_file : RefCountedTempFile ) -> Self {
68
+ Self {
69
+ schema,
70
+ state : SpillReaderStreamState :: Uninitialized ( spill_file) ,
71
+ }
72
+ }
73
+
74
+ fn poll_next_inner (
75
+ & mut self ,
76
+ cx : & mut std:: task:: Context < ' _ > ,
77
+ ) -> std:: task:: Poll < Option < Result < RecordBatch > > > {
78
+ match & mut self . state {
79
+ SpillReaderStreamState :: Uninitialized ( _) => {
80
+ // Temporarily replace with `Done` to be able to pass the file to the task.
81
+ let SpillReaderStreamState :: Uninitialized ( spill_file) =
82
+ std:: mem:: replace ( & mut self . state , SpillReaderStreamState :: Done )
83
+ else {
84
+ unreachable ! ( )
85
+ } ;
86
+
87
+ let task = SpawnedTask :: spawn_blocking ( move || {
88
+ let file = BufReader :: new ( File :: open ( spill_file. path ( ) ) ?) ;
89
+ let mut reader = StreamReader :: try_new ( file, None ) ?;
90
+
91
+ let next_batch = reader. next ( ) . transpose ( ) ?;
92
+
93
+ Ok ( ( reader, next_batch) )
94
+ } ) ;
95
+
96
+ self . state = SpillReaderStreamState :: ReadInProgress ( task) ;
97
+
98
+ // Poll again immediately so the inner task is polled and the waker is
99
+ // registered.
100
+ self . poll_next_inner ( cx)
101
+ }
102
+
103
+ SpillReaderStreamState :: ReadInProgress ( task) => {
104
+ let result = futures:: ready!( task. poll_unpin( cx) )
105
+ . unwrap_or_else ( |err| Err ( DataFusionError :: External ( Box :: new ( err) ) ) ) ;
106
+
107
+ match result {
108
+ Ok ( ( reader, batch) ) => {
109
+ match batch {
110
+ Some ( batch) => {
111
+ self . state = SpillReaderStreamState :: Waiting ( reader) ;
112
+
113
+ std:: task:: Poll :: Ready ( Some ( Ok ( batch) ) )
114
+ }
115
+ None => {
116
+ // Stream is done
117
+ self . state = SpillReaderStreamState :: Done ;
118
+
119
+ std:: task:: Poll :: Ready ( None )
120
+ }
121
+ }
122
+ }
123
+ Err ( err) => {
124
+ self . state = SpillReaderStreamState :: Done ;
125
+
126
+ std:: task:: Poll :: Ready ( Some ( Err ( err) ) )
127
+ }
128
+ }
129
+ }
130
+
131
+ SpillReaderStreamState :: Waiting ( _) => {
132
+ // Temporarily replace with `Done` to be able to pass the file to the task.
133
+ let SpillReaderStreamState :: Waiting ( mut reader) =
134
+ std:: mem:: replace ( & mut self . state , SpillReaderStreamState :: Done )
135
+ else {
136
+ unreachable ! ( )
137
+ } ;
138
+
139
+ let task = SpawnedTask :: spawn_blocking ( move || {
140
+ let next_batch = reader. next ( ) . transpose ( ) ?;
141
+
142
+ Ok ( ( reader, next_batch) )
143
+ } ) ;
144
+
145
+ self . state = SpillReaderStreamState :: ReadInProgress ( task) ;
146
+
147
+ // Poll again immediately so the inner task is polled and the waker is
148
+ // registered.
149
+ self . poll_next_inner ( cx)
150
+ }
151
+
152
+ SpillReaderStreamState :: Done => std:: task:: Poll :: Ready ( None ) ,
153
+ }
154
+ }
155
+ }
156
+
157
+ impl Stream for SpillReaderStream {
158
+ type Item = Result < RecordBatch > ;
159
+
160
+ fn poll_next (
161
+ self : std:: pin:: Pin < & mut Self > ,
162
+ cx : & mut std:: task:: Context < ' _ > ,
163
+ ) -> std:: task:: Poll < Option < Self :: Item > > {
164
+ self . get_mut ( ) . poll_next_inner ( cx)
165
+ }
166
+ }
167
+
168
+ impl RecordBatchStream for SpillReaderStream {
169
+ fn schema ( & self ) -> SchemaRef {
170
+ Arc :: clone ( & self . schema )
46
171
}
47
- Ok ( ( ) )
48
172
}
49
173
50
174
/// Spill the `RecordBatch` to disk as smaller batches
0 commit comments