Skip to content

Commit 39f3380

Browse files
committed
wip: cancellations
1 parent bc26717 commit 39f3380

File tree

10 files changed

+185
-61
lines changed

10 files changed

+185
-61
lines changed

vortex-file/src/generic.rs

+18-15
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use vortex_error::{VortexExpect, VortexResult, vortex_err, vortex_panic};
1111
use vortex_io::VortexReadAt;
1212
use vortex_layout::instrument;
1313
use vortex_layout::scan::ScanDriver;
14-
use vortex_layout::segments::{AsyncSegmentReader, SegmentId, SegmentRegistry};
14+
use vortex_layout::segments::{AsyncSegmentReader, SegmentEvent, SegmentId, SegmentStream};
1515
use vortex_metrics::{Counter, VortexMetrics};
1616

1717
use crate::footer::{Footer, Segment};
@@ -96,8 +96,7 @@ impl<R: VortexReadAt> ScanDriver for GenericScanDriver<R> {
9696
self.segment_channel.reader()
9797
}
9898

99-
fn io_stream(self, segments: SegmentRegistry) -> impl Stream<Item = VortexResult<()>> {
100-
let all_possible_segments = segments.into_inner();
99+
fn io_stream(self, segments: SegmentStream) -> impl Stream<Item = VortexResult<()>> {
101100
let segment_requests = self.segment_channel.into_stream();
102101

103102
let segment_map = self.footer.segment_map().clone();
@@ -154,18 +153,22 @@ impl<R: VortexReadAt> ScanDriver for GenericScanDriver<R> {
154153
}
155154
});
156155
let segment_map = self.footer.segment_map().clone();
157-
let prefetch_stream =
158-
stream::iter(all_possible_segments.into_values()).flat_map(move |segment_ids| {
159-
let segment_map = segment_map.clone();
160-
stream::iter(segment_ids.into_iter().filter_map(move |id| {
161-
let location = segment_map.get(*id as usize)?;
162-
Some(FileSegmentRequest {
163-
id,
164-
location: location.clone(),
165-
callback: None,
166-
})
167-
}))
168-
});
156+
let prefetch_stream = segments.filter_map(move |event| {
157+
let value = segment_map.clone();
158+
async move {
159+
match event {
160+
SegmentEvent::Cancel(_id) => None,
161+
SegmentEvent::Request(id) => {
162+
let location = value.get(*id as usize)?;
163+
Some(FileSegmentRequest {
164+
id,
165+
location: location.clone(),
166+
callback: None,
167+
})
168+
}
169+
}
170+
}
171+
});
169172

170173
// Grab all available segment requests from the I/O queue so we get maximal visibility into
171174
// the requests for coalescing.

vortex-file/src/memory.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use futures::{Stream, stream};
55
use vortex_buffer::ByteBuffer;
66
use vortex_error::{VortexResult, vortex_err};
77
use vortex_layout::scan::ScanDriver;
8-
use vortex_layout::segments::{AsyncSegmentReader, SegmentId, SegmentRegistry};
8+
use vortex_layout::segments::{AsyncSegmentReader, SegmentId, SegmentStream};
99
use vortex_metrics::VortexMetrics;
1010

1111
use crate::segments::SegmentCache;
@@ -52,7 +52,7 @@ impl ScanDriver for InMemoryVortexFile {
5252
Arc::new(self.clone())
5353
}
5454

55-
fn io_stream(self, _segments: SegmentRegistry) -> impl Stream<Item = VortexResult<()>> {
55+
fn io_stream(self, _segments: SegmentStream) -> impl Stream<Item = VortexResult<()>> {
5656
stream::repeat_with(|| Ok(()))
5757
}
5858
}

vortex-layout/src/data.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use vortex_flatbuffers::{FlatBuffer, FlatBufferRoot, WriteFlatBuffer, layout};
1212
use crate::LayoutId;
1313
use crate::context::LayoutContext;
1414
use crate::reader::LayoutReader;
15-
use crate::segments::{AsyncSegmentReader, SegmentId, SegmentRegistry};
15+
use crate::segments::{AsyncSegmentReader, SegmentCollector, SegmentId};
1616
use crate::vtable::LayoutVTableRef;
1717

1818
/// [`Layout`] is the lazy equivalent to [`vortex_array::ArrayRef`], providing a hierarchical
@@ -280,7 +280,7 @@ impl Layout {
280280
row_offset: u64,
281281
filter_field_mask: &[FieldMask],
282282
projection_field_mask: &[FieldMask],
283-
segments: &mut SegmentRegistry,
283+
segments: &mut SegmentCollector,
284284
) -> VortexResult<()> {
285285
self.vtable().required_segments(
286286
self,

vortex-layout/src/layouts/chunked/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use vortex_error::VortexResult;
1212
use crate::data::Layout;
1313
use crate::layouts::chunked::reader::ChunkedReader;
1414
use crate::reader::{LayoutReader, LayoutReaderExt};
15-
use crate::segments::AsyncSegmentReader;
15+
use crate::segments::{AsyncSegmentReader, SegmentCollector};
1616
use crate::vtable::LayoutVTable;
1717
use crate::{CHUNKED_LAYOUT_ID, LayoutId};
1818

@@ -61,7 +61,7 @@ impl LayoutVTable for ChunkedLayout {
6161
row_offset: u64,
6262
filter_field_mask: &[FieldMask],
6363
projection_field_mask: &[FieldMask],
64-
segments: &mut crate::segments::SegmentRegistry,
64+
segments: &mut SegmentCollector,
6565
) -> VortexResult<()> {
6666
let nchunks = layout.nchildren() - (if layout.metadata().is_some() { 1 } else { 0 });
6767
let mut offset = row_offset;

vortex-layout/src/layouts/flat/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use vortex_error::VortexResult;
1111

1212
use crate::layouts::flat::reader::FlatReader;
1313
use crate::reader::{LayoutReader, LayoutReaderExt};
14-
use crate::segments::AsyncSegmentReader;
14+
use crate::segments::{AsyncSegmentReader, SegmentCollector};
1515
use crate::vtable::LayoutVTable;
1616
use crate::{FLAT_LAYOUT_ID, Layout, LayoutId};
1717

@@ -54,7 +54,7 @@ impl LayoutVTable for FlatLayout {
5454
row_offset: u64,
5555
_filter_field_mask: &[FieldMask],
5656
_projection_field_mask: &[FieldMask],
57-
segments: &mut crate::segments::SegmentRegistry,
57+
segments: &mut SegmentCollector,
5858
) -> VortexResult<()> {
5959
// if !filter_field_mask
6060
// .iter()

vortex-layout/src/layouts/stats/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use vortex_error::VortexResult;
1313
use crate::data::Layout;
1414
use crate::layouts::stats::reader::StatsReader;
1515
use crate::reader::{LayoutReader, LayoutReaderExt};
16-
use crate::segments::{AsyncSegmentReader, RequiredSegmentKind, SegmentRegistry};
16+
use crate::segments::{AsyncSegmentReader, RequiredSegmentKind, SegmentCollector};
1717
use crate::vtable::LayoutVTable;
1818
use crate::{LayoutId, STATS_LAYOUT_ID};
1919

@@ -53,7 +53,7 @@ impl LayoutVTable for StatsLayout {
5353
row_offset: u64,
5454
filter_field_mask: &[FieldMask],
5555
projection_field_mask: &[FieldMask],
56-
segments: &mut SegmentRegistry,
56+
segments: &mut SegmentCollector,
5757
) -> VortexResult<()> {
5858
if !filter_field_mask.is_empty() {
5959
layout

vortex-layout/src/layouts/struct_/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use vortex_error::{VortexResult, vortex_bail};
1212

1313
use crate::data::Layout;
1414
use crate::reader::{LayoutReader, LayoutReaderExt};
15-
use crate::segments::{AsyncSegmentReader, RequiredSegmentKind};
15+
use crate::segments::{AsyncSegmentReader, RequiredSegmentKind, SegmentCollector};
1616
use crate::vtable::LayoutVTable;
1717
use crate::{LayoutId, STRUCT_LAYOUT_ID};
1818

@@ -52,7 +52,7 @@ impl LayoutVTable for StructLayout {
5252
row_offset: u64,
5353
filter_field_mask: &[FieldMask],
5454
projection_field_mask: &[FieldMask],
55-
segments: &mut crate::segments::SegmentRegistry,
55+
segments: &mut SegmentCollector,
5656
) -> VortexResult<()> {
5757
for_all_matching_children(layout, filter_field_mask, |field_mask, child| {
5858
child.required_segments(

vortex-layout/src/scan/mod.rs

+15-9
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use vortex_mask::Mask;
1717

1818
use crate::scan::filter::FilterExpr;
1919
use crate::scan::unified::UnifiedDriverStream;
20-
use crate::segments::{AsyncSegmentReader, SegmentRegistry};
20+
use crate::segments::{AsyncSegmentReader, RowRangePruner, SegmentCollector, SegmentStream};
2121
use crate::{
2222
ExprEvaluator, Layout, LayoutReader, LayoutReaderExt, RowMask, instrument, range_intersection,
2323
};
@@ -39,7 +39,7 @@ pub trait ScanDriver: 'static + Sized {
3939
/// how frequently this future will be polled, so it should not be used to drive I/O.
4040
///
4141
/// TODO(ngates): make this a future
42-
fn io_stream(self, segments: SegmentRegistry) -> impl Stream<Item = VortexResult<()>>;
42+
fn io_stream(self, segments: SegmentStream) -> impl Stream<Item = VortexResult<()>>;
4343
}
4444

4545
/// A struct for building a scan operation.
@@ -141,12 +141,13 @@ impl<D: ScanDriver> ScanBuilder<D> {
141141

142142
let row_indices = self.row_indices.clone();
143143
let splits = self.split_by.splits(&self.layout, &field_mask)?;
144-
let mut segments = SegmentRegistry::default();
144+
let mut collector = SegmentCollector::default();
145145
let (filter_mask, projection_mask) = self.filter_and_projection_masks()?;
146146
self.layout
147-
.required_segments(0, &filter_mask, &projection_mask, &mut segments)?;
147+
.required_segments(0, &filter_mask, &projection_mask, &mut collector)?;
148+
let (mut row_range_pruner, segments) = collector.finish()?;
148149
if let Some(indices) = &row_indices {
149-
segments.retain_matching(indices.clone());
150+
row_range_pruner.retain_matching(indices.clone());
150151
}
151152

152153
let row_masks = splits
@@ -189,6 +190,7 @@ impl<D: ScanDriver> ScanBuilder<D> {
189190
canonicalize: self.canonicalize,
190191
concurrency: self.concurrency,
191192
prefetch_conjuncts: self.prefetch_conjuncts,
193+
row_range_pruner,
192194
segments,
193195
})
194196
}
@@ -255,7 +257,8 @@ pub struct Scan<D> {
255257
//TODO(adam): bake this into the executors?
256258
concurrency: usize,
257259
prefetch_conjuncts: bool,
258-
segments: SegmentRegistry,
260+
row_range_pruner: RowRangePruner,
261+
segments: SegmentStream,
259262
}
260263

261264
impl<D: ScanDriver> Scan<D> {
@@ -267,9 +270,7 @@ impl<D: ScanDriver> Scan<D> {
267270
// Create a single LayoutReader that is reused for the entire scan.
268271
let segment_reader = self.driver.segment_reader();
269272
let task_executor = self.task_executor.clone();
270-
let reader: Arc<dyn LayoutReader> = self
271-
.layout
272-
.reader(segment_reader.clone(), self.ctx.clone())?;
273+
let reader: Arc<dyn LayoutReader> = self.layout.reader(segment_reader, self.ctx.clone())?;
273274

274275
let pruning = self
275276
.filter
@@ -294,13 +295,15 @@ impl<D: ScanDriver> Scan<D> {
294295
// We start with a stream of row masks
295296
let row_masks = stream::iter(self.row_masks);
296297
let projection = self.projection.clone();
298+
let row_range_pruner = self.row_range_pruner.clone();
297299

298300
let exec_stream = row_masks
299301
.map(move |row_mask| {
300302
let reader = reader.clone();
301303
let projection = projection.clone();
302304
let pruning = pruning.clone();
303305
let reader = reader.clone();
306+
let mut row_range_pruner = row_range_pruner.clone();
304307

305308
// This future is the processing task
306309
instrument!("process", async move {
@@ -316,6 +319,9 @@ impl<D: ScanDriver> Scan<D> {
316319

317320
// Filter out all-false masks
318321
if row_mask.filter_mask().all_false() {
322+
row_range_pruner
323+
.remove(row_mask.begin()..row_mask.end())
324+
.await?;
319325
Ok(None)
320326
} else {
321327
let mut array = reader.evaluate_expr(row_mask, projection).await?;

0 commit comments

Comments
 (0)