-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
JoinOptimization: Add build side pushdown to probe side #13054
base: main
Are you sure you want to change the base?
Conversation
@Dandandan Sorry for the late response, I have reinvested it and format the code. Really appreciate your suggestion. Thanks a lot. |
Really nice @Lordworms , will have a good look later today. |
return Ok(Transformed::yes(Arc::new(new_hash_join))); | ||
} | ||
Ok(Transformed::no(plan)) | ||
} else if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could allow this cool optimization on any ExecutionPlan by adding the "with_dynamic_filter" to the ExecutionPlan?
Maybe also a "supports_dynamic_filter" to know when to call "with_dynamic_filter".
So same principle as the existing static filter pushdown setup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes a lot of sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll. refactor this, but currently support ParquetExec first? Since I think for other PhysicalScanExec, the way to add dynamic filter is to add a FilterExec above, but in parquet, we could utilize the predicate to add filters dynamically?
@@ -711,10 +724,15 @@ impl DisplayAs for ParquetExec { | |||
) | |||
}) | |||
.unwrap_or_default(); | |||
|
|||
let dynamic_filter = | |||
format!("dynamic_filter: {:?}", self.dynamic_filters); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's best to only write this if available. + Would be good to implement Display
for DynamicFilterInfo
as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for that, this is for debug use, I'll refactor it
} | ||
} | ||
|
||
impl PhysicalOptimizerRule for JoinFilterPushdown { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is executed during plan time, won't it be reasonable to execute this during execution of HashJoin
after build side is loaded?
data_types: Vec<&DataType>, | ||
total_batches: usize, | ||
) -> Result<Self, DataFusionError> { | ||
let (max_accumulators, min_accumulators) = data_types |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can use a logical plan / physical plan instead of manually invoking min/max accumulators? Not sure if that makes sense :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know either, First I want to try using min/max ScalarFunction, but the ScalaFunction translation from logical_expr to physical_expr is not implemented yet... I guess using accumulator would be more Intuitive?
} | ||
|
||
inner.batch_count = inner.batch_count.saturating_sub(1); | ||
if records.num_rows() == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't the input of a join partition be empty, in which case it will never be finalized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't try that, I'll add a test for that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it happens sometime, haven't got a way to solve it yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a new judge "if a partition id has been recorded and it return empty batch again, we treat it as an empty partition and return true"
let max_expr: Arc<dyn PhysicalExpr> = Arc::new(Literal::new(max_scalar)); | ||
let min_expr: Arc<dyn PhysicalExpr> = Arc::new(Literal::new(min_scalar)); | ||
|
||
let range_condition: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it makes sense to not create a dynamic filter in certain conditions, eg min is minimum value of data type, max is maximum of datatype (expression will not filter out any rows)?
let max_scalar = max_value.clone(); | ||
let min_scalar = min_value.clone(); | ||
|
||
let max_expr: Arc<dyn PhysicalExpr> = Arc::new(Literal::new(max_scalar)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For readability, it's better to use LogicalPlan
here and convert to physical? This might also support some optimizations for free (e.g. expression simplification)...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it.
|
||
struct DynamicFilterInfoInner { | ||
max_accumulators: Vec<MaxAccumulator>, | ||
min_accumulators: Vec<MinAccumulator>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can just use batches: Vec<Vec<Arc<dyn Array>>>
here and compute any filters from that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes it pretty easy to support additional filters. It's pretty easy to add InListExpr
filter based on this (I have some code locally).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I'll refactor it, after that maybe I can combine your commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, after changes have cooled down a bit I can update and share my branch.
Adding something like that on top is actually quite easy after making the make_set
helper public:
+ let col =
+ Arc::<datafusion_physical_expr::expressions::Column>::clone(column);
+ let batches: Vec<&dyn Array> = inner.batches[i]
+ .iter()
+ .map(|x| x as &dyn Array)
+ .collect();
+
+ // use inlist rather than min/max, use some threshold to avoid big inputs
+ // TODO: big inputs could be using bloom filter
+ if batches.iter().map(|x|x.len()).sum::<usize>() < 100 {
+ let batches = concat(batches.as_slice())?;
+ let set = make_set(&batches)?;
+
+ let unique_condition = InListExpr::new(col, vec![], false, Some(set));
+ condition = Arc::new(BinaryExpr::new(
+ Arc::new(unique_condition),
Operator::And,
- range_condition,
- ))
- as Arc<dyn PhysicalExpr>)),
- None => Ok(Some(range_condition)),
+ condition,
+ ));
}
+ Ok(Some(condition))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something that would be fun to try is creating a PhysicalExpr
from the JoinHashMap
- that way we can build a good filter without any overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can do that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to do this , but since hash function is one-way function, I can only combine the record batches to get original value... I didn't see the benefits here, I think one way to reduce overhead is just supporting max/min filter here(as what duckdb does)... If there is a better way, please tell me. I am getting a little confused here. Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the idea is just to filter out values where the hash isn't present in the hashmap, that way we can filter out most values.
It's fine to focus on min/max first, the positive side is that evaluation is fast, but it won't filter out a lot of values if the min/max range from the left side is as big as the right side (regardless of number of values being much smaller on left side).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the idea is just to filter out values where the hash isn't present in the hashmap, that way we can filter out most values.
I got it, I was think about rebuild original array using hashmap and records 😅...... I'll add the logic
It's fine to focus on min/max first, the positive side is that evaluation is fast, but it won't filter out a lot of values if the min/max range from the left side is as big as the right side (regardless of number of values being much smaller on left side).
|
||
let data_type = arrays[0].data_type(); | ||
match data_type { | ||
DataType::Int8 => process_min_max!(arrays, Int8Array, Int8, i8), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently support numeric types columns(which adds a range filter), would support string types in next PR(add IN (xx, xx) filter)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String types could perform range as well, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add it.
Could you update the display of sources to include these pushed-down filters? I believe it would be more clear if we could see them clearly in the output (I'm also curious to see how this impacts the test plans). Additionally, are there any benchmark results you could share with us? |
How would you display them in sources? The dynamic filter will only be added during execution, so it will only be available through e.g. ParquetExec after loading the build side. |
Doesn’t |
Yes that's correct, just wanted to stress the actual filter isn't added during the optimization phase, only a "placeholder" that might be filled with some filter expressions during execution for particular columns. So it only shows |
Maybe I can show them in 'explain analyze' |
1aeb19a
to
2ff52e6
Compare
45d70a8
to
85b26d8
Compare
@@ -1406,12 +1403,24 @@ impl HashJoinStream { | |||
self.hashes_buffer.resize(batch.num_rows(), 0); | |||
create_hashes(&keys_values, &self.random_state, &mut self.hashes_buffer)?; | |||
|
|||
let (filtered_batch, filtered_hashes) = | |||
if let Some(dynamic_filter) = &self.dynamic_filter_info { | |||
dynamic_filter.filter_probe_batch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, filter_probe_batch
shouldn't be added as filter on the hash_join
but rather as PhysicalExpr
(returning boolean) to filter the ParquetExec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm,
filter_probe_batch
shouldn't be added as filter on thehash_join
but rather asPhysicalExpr
(returning boolean) to filter theParquetExec
Sure, I'll refactor that, other than that I found a problem with ParquetExec after rebase the main #13298. If the pushdown cost is much more than do it directly. I am not sure whether it is a 'optimization' or not....
85b26d8
to
e5fd1eb
Compare
I repushed with an adaptive way to generate dynamic filter(when unique value exceed a threshold, generate range filter, otherwise inlist filter), and due to #13298 I didn't pushdown the filter down to parquet scan, I pushed the filter down to file stream, also tried with random data like import pandas as pd
import numpy as np
part = pd.DataFrame({
'p_partkey': np.random.randint(10, 21, size=1000),
'p_brand': np.random.choice(['Brand#1', 'Brand#2', 'Brand#3'], 1000),
'p_container': np.random.choice(['SM BOX', 'LG BOX', 'MED BOX'], 1000)
})
lineitem = pd.DataFrame({
'l_partkey': np.random.randint(1, 1000001, size=100000000),
'l_quantity': np.random.uniform(1, 50, size=100000000),
'l_extendedprice': np.random.uniform(100, 10000, size=100000000)
}) which only get like 40% of increase, I think we could try to pushdown filter down to parquet scan if the filter for reading parquet is optimized. Also perhaps we need to improve performance of Inlist since when I tried 20 elements, the performance is 5 times slower than using range filter. |
e5fd1eb
to
e5a33d8
Compare
ca627a0
to
8762266
Compare
Which issue does this PR close?
Closes #7955
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?