@@ -2,12 +2,15 @@ use std::cmp::Ordering;
2
2
use std:: future;
3
3
use std:: marker:: PhantomData ;
4
4
use std:: ops:: Range ;
5
+ use std:: pin:: Pin ;
5
6
use std:: sync:: Arc ;
7
+ use std:: task:: { Context , Poll } ;
6
8
7
9
use dashmap:: { DashMap , Entry } ;
8
10
use futures:: stream:: FuturesUnordered ;
9
11
use futures:: { FutureExt , Stream , StreamExt , TryStreamExt , select, stream} ;
10
12
use moka:: future:: CacheBuilder ;
13
+ use pin_project_lite:: pin_project;
11
14
use vortex_buffer:: { Alignment , ByteBuffer } ;
12
15
use vortex_error:: { VortexExpect , VortexResult , vortex_err, vortex_panic} ;
13
16
use vortex_io:: VortexReadAt ;
@@ -124,17 +127,6 @@ impl InflightSegments {
124
127
}
125
128
126
129
pub fn cancel ( & self , segment_id : SegmentId ) {
127
- if self
128
- . 0
129
- . get ( & segment_id)
130
- . filter ( |e| e. value ( ) . completion_callback . is_some ( ) )
131
- . is_some ( )
132
- {
133
- // TODO(os): figure out how this can happen
134
- println ! ( "refusing to cancel explicit request {segment_id}" ) ;
135
- return ;
136
- }
137
-
138
130
if let Some ( (
139
131
_,
140
132
InflightSegment {
@@ -227,16 +219,17 @@ impl<R: VortexReadAt> ScanDriver for GenericScanDriver<R> {
227
219
// Note that we can provide a somewhat arbitrarily high capacity here since we're going to
228
220
// deduplicate and coalesce. Meaning the resulting stream will at-most cover the entire
229
221
// file and therefore be reasonably bounded.
230
- // TODO(os): smarter select
231
- let segment_requests = segment_requests. ready_chunks ( 1024 ) ;
232
- let prefetch_stream = prefetch_stream. ready_chunks ( self . options . io_concurrency ) ;
233
-
234
222
// Coalesce the segment requests to minimize the number of I/O operations.
235
223
let perf_hint = self . read . performance_hint ( ) ;
236
- let io_stream = stream:: select ( segment_requests, prefetch_stream)
237
- . map ( move |r| coalesce ( r, perf_hint. coalescing_window ( ) , perf_hint. max_read ( ) ) )
238
- . flat_map ( stream:: iter)
239
- . inspect ( move |( coalesced, _) | self . metrics . record ( coalesced) ) ;
224
+ let io_stream = SegmentRequestStream {
225
+ requests : segment_requests,
226
+ prefetch : prefetch_stream,
227
+ requests_ready_chunks : 1024 ,
228
+ prefetch_ready_chunks : self . options . io_concurrency ,
229
+ }
230
+ . map ( move |r| coalesce ( r, perf_hint. coalescing_window ( ) , perf_hint. max_read ( ) ) )
231
+ . flat_map ( stream:: iter)
232
+ . inspect ( move |( coalesced, _) | self . metrics . record ( coalesced) ) ;
240
233
241
234
// Submit the coalesced requests to the I/O.
242
235
let read = self . read . clone ( ) ;
@@ -270,6 +263,73 @@ impl<R: VortexReadAt> ScanDriver for GenericScanDriver<R> {
270
263
}
271
264
}
272
265
266
+ pin_project ! {
267
+ struct SegmentRequestStream <Requests , Prefetch > {
268
+ #[ pin]
269
+ pub requests: Requests ,
270
+ #[ pin]
271
+ pub prefetch: Prefetch ,
272
+ requests_ready_chunks: usize ,
273
+ prefetch_ready_chunks: usize ,
274
+ }
275
+ }
276
+
277
+ impl < Requests , Prefetch > Stream for SegmentRequestStream < Requests , Prefetch >
278
+ where
279
+ Requests : Stream < Item = SegmentRequest > ,
280
+ Prefetch : Stream < Item = SegmentRequest > ,
281
+ {
282
+ type Item = Vec < SegmentRequest > ;
283
+
284
+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
285
+ let mut this = self . project ( ) ;
286
+ let mut items = Vec :: new ( ) ;
287
+ let requests_ended = loop {
288
+ match this. requests . as_mut ( ) . poll_next ( cx) {
289
+ Poll :: Ready ( Some ( item) ) => {
290
+ if items. is_empty ( ) {
291
+ items. reserve ( * this. requests_ready_chunks ) ;
292
+ }
293
+ items. push ( item) ;
294
+ if items. len ( ) >= * this. requests_ready_chunks {
295
+ return Poll :: Ready ( Some ( items) ) ;
296
+ }
297
+ }
298
+ Poll :: Pending => break false ,
299
+ Poll :: Ready ( None ) => break true ,
300
+ }
301
+ } ;
302
+
303
+ let prefetch_limit =
304
+ ( items. len ( ) + * this. prefetch_ready_chunks ) . min ( * this. requests_ready_chunks ) ;
305
+
306
+ let prefetch_ended = loop {
307
+ match this. prefetch . as_mut ( ) . poll_next ( cx) {
308
+ Poll :: Ready ( Some ( item) ) => {
309
+ if items. is_empty ( ) {
310
+ items. reserve ( * this. requests_ready_chunks ) ;
311
+ }
312
+ items. push ( item) ;
313
+ if items. len ( ) >= prefetch_limit {
314
+ return Poll :: Ready ( Some ( items) ) ;
315
+ }
316
+ }
317
+ Poll :: Pending => break false ,
318
+ Poll :: Ready ( None ) => break true ,
319
+ }
320
+ } ;
321
+
322
+ if requests_ended && prefetch_ended {
323
+ return Poll :: Ready ( None ) ;
324
+ }
325
+ if items. is_empty ( ) {
326
+ Poll :: Pending
327
+ } else {
328
+ Poll :: Ready ( Some ( items) )
329
+ }
330
+ }
331
+ }
332
+
273
333
#[ derive( Debug ) ]
274
334
struct SegmentRequest {
275
335
id : SegmentId ,
0 commit comments