Skip to content

Commit c88e532

Browse files
authored
prefetch conjuncts flag for pruning (#2577)
1 parent eec3b5b commit c88e532

File tree

3 files changed

+72
-38
lines changed

3 files changed

+72
-38
lines changed

vortex-datafusion/src/persistent/opener.rs

+1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ impl FileOpener for VortexFileOpener {
9090
.scan()
9191
.with_projection(projection.clone())
9292
.with_some_filter(filter.clone())
93+
.with_prefetch_conjuncts(true)
9394
.with_canonicalize(true)
9495
// DataFusion likes ~8k row batches. Ideally we would respect the config,
9596
// but at the moment our scanner has too much overhead to process small

vortex-layout/src/scan/filter.rs

+59-37
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,16 @@ pub struct FilterExpr {
4545
selectivity_threshold: f64,
4646
/// The quantile to use from the selectivity histogram of each conjunct.
4747
selectivity_quantile: f64,
48+
/// Do not block on conjunct evaluation for data fetching.
49+
prefetch_conjuncts: bool,
4850
}
4951

5052
impl FilterExpr {
51-
pub fn try_new(scope_dtype: StructDType, expr: ExprRef) -> VortexResult<Self> {
53+
pub fn try_new(
54+
scope_dtype: StructDType,
55+
expr: ExprRef,
56+
prefetch_conjuncts: bool,
57+
) -> VortexResult<Self> {
5258
// Find all the fields involved in the expression.
5359
let fields: Arc<[FieldName]> = immediate_scope_access(&expr, &scope_dtype)?
5460
.into_iter()
@@ -85,6 +91,7 @@ impl FilterExpr {
8591
})
8692
.take(nconjuncts)
8793
.collect(),
94+
prefetch_conjuncts,
8895
// The initial ordering is naive, we could order this by how well we expect each
8996
// comparison operator to perform. e.g. == might be more selective than <=? Not obvious.
9097
ordering: RwLock::new((0..nconjuncts).collect()),
@@ -199,6 +206,31 @@ pub struct FilterEvaluation {
199206
}
200207

201208
impl FilterEvaluation {
209+
async fn fetch_fields<E: ExprEvaluator>(
210+
&mut self,
211+
fields_to_read: Vec<usize>,
212+
evaluator: &E,
213+
) -> VortexResult<()> {
214+
// Construct futures to read the *full* field. We don't push down our mask as a
215+
// selection mask here, perhaps we should?
216+
let field_readers = fields_to_read
217+
.iter()
218+
.map(|&field_idx| self.filter_expr.fields[field_idx].clone())
219+
.map(|field_name| {
220+
evaluator.evaluate_expr(
221+
RowMask::new(Mask::new_true(self.mask.len()), self.row_offset),
222+
get_item(field_name, ident()),
223+
)
224+
})
225+
.collect::<Vec<_>>();
226+
227+
let field_arrays = try_join_all(field_readers).await?;
228+
for (field_idx, field_array) in fields_to_read.iter().zip_eq(field_arrays) {
229+
self.field_arrays[*field_idx] = Some(field_array);
230+
}
231+
Ok(())
232+
}
233+
202234
pub async fn evaluate<E: ExprEvaluator>(&mut self, evaluator: E) -> VortexResult<RowMask> {
203235
// First, we run all conjuncts through the evaluators pruning function. This helps trim
204236
// down the mask based on cheap statistics.
@@ -224,53 +256,43 @@ impl FilterEvaluation {
224256
return Ok(RowMask::new(self.mask.clone(), self.row_offset));
225257
}
226258

259+
if self.filter_expr.prefetch_conjuncts {
260+
self.fetch_fields((0..self.filter_expr.fields.len()).collect(), &evaluator)
261+
.await?;
262+
}
263+
227264
// Then we loop over the conjuncts and evaluate them.
228265
loop {
229-
let Some(next_conjunct) = self
266+
let Some(mut next_conjunct) = self
230267
.filter_expr
231268
.next_conjunct(&self.remaining, &self.field_arrays)
232269
else {
233270
// If there are no more conjuncts, then we've finished
234271
return Ok(RowMask::new(self.mask.clone(), self.row_offset));
235272
};
236273

237-
// Figure out which fields are needed for the next conjunct.
238-
// TODO(ngates): convert this into a conjunct group, where a group should only be
239-
// created if it has been observed to prune away to zero (therefore short-circuiting
240-
// the subsequent conjunct groups).
241-
let fields_to_read = self.filter_expr.conjunct_fields[next_conjunct]
242-
.iter()
243-
.filter(|&field_idx| self.field_arrays[*field_idx].is_none())
244-
.copied()
245-
.collect::<Vec<usize>>();
246-
247-
// Construct futures to read the *full* field. We don't push down our mask as a
248-
// selection mask here, perhaps we should?
249-
let field_readers = fields_to_read
250-
.iter()
251-
.map(|&field_idx| self.filter_expr.fields[field_idx].clone())
252-
.map(|field_name| {
253-
evaluator.evaluate_expr(
254-
RowMask::new(Mask::new_true(self.mask.len()), self.row_offset),
255-
get_item(field_name, ident()),
256-
)
257-
})
258-
.collect::<Vec<_>>();
259-
260-
let field_arrays = try_join_all(field_readers).await?;
261-
for (field_idx, field_array) in fields_to_read.iter().zip_eq(field_arrays) {
262-
self.field_arrays[*field_idx] = Some(field_array);
274+
if !self.filter_expr.prefetch_conjuncts {
275+
// Figure out which fields are needed for the next conjunct.
276+
// TODO(ngates): convert this into a conjunct group, where a group should only be
277+
// created if it has been observed to prune away to zero (therefore short-circuiting
278+
// the subsequent conjunct groups).
279+
let fields_to_read = self.filter_expr.conjunct_fields[next_conjunct]
280+
.iter()
281+
.filter(|&field_idx| self.field_arrays[*field_idx].is_none())
282+
.copied()
283+
.collect::<Vec<usize>>();
284+
285+
self.fetch_fields(fields_to_read, &evaluator).await?;
286+
// Now we've fetched some fields, we find the _now_ preferred conjunct to evaluate based
287+
// on the fields we actually have. This may have changed from before, for example if
288+
// we have `5 < X <= 10`, then we may have fetched X to evaluate `5 < X`, but now we
289+
// know that `X <= 10` is more selective and worth running first.
290+
next_conjunct = self
291+
.filter_expr
292+
.next_conjunct(&self.remaining, &self.field_arrays)
293+
.vortex_expect("we know there is another conjunct");
263294
}
264295

265-
// Now we've fetched some fields, we find the _now_ preferred conjunct to evaluate based
266-
// on the fields we actually have. This may have changed from before, for example if
267-
// we have `5 < X <= 10`, then we may have fetched X to evaluate `5 < X`, but now we
268-
// know that `X <= 10` is more selective and worth running first.
269-
let next_conjunct = self
270-
.filter_expr
271-
.next_conjunct(&self.remaining, &self.field_arrays)
272-
.vortex_expect("we know there is another conjunct");
273-
274296
log::debug!(
275297
"Evaluating conjunct {}",
276298
self.filter_expr.conjuncts[next_conjunct],

vortex-layout/src/scan/mod.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ pub struct ScanBuilder<D: ScanDriver> {
5353
canonicalize: bool,
5454
// The number of splits to make progress on concurrently.
5555
concurrency: usize,
56+
prefetch_conjuncts: bool,
5657
}
5758

5859
impl<D: ScanDriver> ScanBuilder<D> {
@@ -67,7 +68,8 @@ impl<D: ScanDriver> ScanBuilder<D> {
6768
row_indices: None,
6869
split_by: SplitBy::Layout,
6970
canonicalize: false,
70-
concurrency: 32,
71+
prefetch_conjuncts: false,
72+
concurrency: 1024,
7173
}
7274
}
7375

@@ -114,6 +116,12 @@ impl<D: ScanDriver> ScanBuilder<D> {
114116
self
115117
}
116118

119+
/// The number of row splits to make progress on concurrently, must be greater than 0.
120+
pub fn with_prefetch_conjuncts(mut self, prefetch: bool) -> Self {
121+
self.prefetch_conjuncts = prefetch;
122+
self
123+
}
124+
117125
pub fn with_task_executor(mut self, task_executor: TaskExecutor) -> Self {
118126
self.task_executor = Some(task_executor);
119127
self
@@ -189,6 +197,7 @@ impl<D: ScanDriver> ScanBuilder<D> {
189197
row_masks,
190198
canonicalize: self.canonicalize,
191199
concurrency: self.concurrency,
200+
prefetch_conjuncts: self.prefetch_conjuncts,
192201
})
193202
}
194203

@@ -215,6 +224,7 @@ pub struct Scan<D> {
215224
canonicalize: bool,
216225
//TODO(adam): bake this into the executors?
217226
concurrency: usize,
227+
prefetch_conjuncts: bool,
218228
}
219229

220230
impl<D: ScanDriver> Scan<D> {
@@ -243,6 +253,7 @@ impl<D: ScanDriver> Scan<D> {
243253
})?
244254
.clone(),
245255
filter.clone(),
256+
self.prefetch_conjuncts,
246257
)?);
247258

248259
VortexResult::Ok(pruning)

0 commit comments

Comments
 (0)