forked from databendlabs/databend
-
Notifications
You must be signed in to change notification settings - Fork 0
/
transform_filter.rs
77 lines (68 loc) · 2.27 KB
/
transform_filter.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// Copyright 2020 The FuseQuery Authors.
//
// Code is licensed under AGPL License, Version 3.0.
use async_trait::async_trait;
use std::sync::Arc;
use crate::datablocks::DataBlock;
use crate::datastreams::{ExpressionStream, SendableDataBlockStream};
use crate::datavalues::{BooleanArray, DataSchema, DataSchemaRef};
use crate::error::{FuseQueryError, FuseQueryResult};
use crate::functions::Function;
use crate::planners::ExpressionPlan;
use crate::processors::{EmptyProcessor, IProcessor};
use arrow::compute::filter_record_batch;
pub struct FilterTransform {
func: Function,
input: Arc<dyn IProcessor>,
}
impl FilterTransform {
pub fn try_create(predicate: ExpressionPlan) -> FuseQueryResult<Self> {
if predicate.is_aggregate() {
return Err(FuseQueryError::Internal(format!(
"Aggregate function {:?} is found in WHERE in query",
predicate
)));
}
let func = predicate.to_function()?;
Ok(FilterTransform {
func,
input: Arc::new(EmptyProcessor::create()),
})
}
pub fn expression_executor(
_schema: &DataSchemaRef,
block: DataBlock,
funcs: Vec<Function>,
) -> FuseQueryResult<DataBlock> {
let mut func = funcs[0].clone();
let result = func.eval(&block)?.to_array(block.num_rows())?;
let filter_result = result
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| {
FuseQueryError::Internal("cannot downcast to boolean array".to_string())
})?;
Ok(DataBlock::try_from_arrow_batch(&filter_record_batch(
&block.to_arrow_batch()?,
filter_result,
)?)?)
}
}
#[async_trait]
impl IProcessor for FilterTransform {
fn name(&self) -> &str {
"FilterTransform"
}
fn connect_to(&mut self, input: Arc<dyn IProcessor>) -> FuseQueryResult<()> {
self.input = input;
Ok(())
}
async fn execute(&self) -> FuseQueryResult<SendableDataBlockStream> {
Ok(Box::pin(ExpressionStream::try_create(
self.input.execute().await?,
Arc::new(DataSchema::empty()),
vec![self.func.clone()],
FilterTransform::expression_executor,
)?))
}
}