Skip to content

[Epic] A collection of dynamic filtering related items #15512

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
1 of 4 tasks
alamb opened this issue Mar 31, 2025 · 12 comments
Open
1 of 4 tasks

[Epic] A collection of dynamic filtering related items #15512

alamb opened this issue Mar 31, 2025 · 12 comments
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Mar 31, 2025

Is your feature request related to a problem or challenge?

This is a collection of various items related to "dynamic filtering"

Roughly speaking dynamic filters are filters who values are known during runtime but not planning (for example from the values of topk)

Here is some more background:

Related items

@alamb alamb added the enhancement New feature or request label Mar 31, 2025
@adriangb
Copy link
Contributor

adriangb commented Apr 1, 2025

Adding #15534

@adriangb
Copy link
Contributor

adriangb commented Apr 3, 2025

I've started breaking out the work:

@acking-you
Copy link
Contributor

acking-you commented Apr 17, 2025

I briefly looked at the descriptions of these optimizations. For example, the method of dynamically handling the "order by limit" process using statistics is really cool! @alamb

Idea

But I have some new ideas that seem to be more universally applicable to order by limit(q23):

┌──────────────────────────────────────┐
│ [Step 1] Filter RowGroups            │
│ - Use Parquet metadata to skip RGs   │
│   WHERE RG.min(EventTime) > cutoff │
└───────────────────┬──────────────────┘
                    ↓
┌──────────────────────────────────────┐
│ [Step 2] Read EventTime + Filter     │
│ - Scan EventTime column in valid RGs │
│ - Sort values → Track top 10 rows    │
└───────────────────┬──────────────────┘
                    ↓
┌──────────────────────────────────────┐
│ [Step 3] Record Row Locations        │
│ - Map top 10 EventTime to physical   │
│   positions (RG_ID + Row_Offset)     │
└───────────────────┬──────────────────┘
                    ↓
┌──────────────────────────────────────┐
│ [Step 4] Fetch 10 Rows               │
│ - Directly read rows from Parquet    │
│   via recorded positions (non-seq)   │
└───────────────────┬──────────────────┘
                    ↓
              Final Result

Explore

Currently, q23 takes approximately 6 seconds to execute. I have confirmed that DataFusion does not have the aforementioned optimizations and still scans a very large number of rows and columns. By the way, is there a convenient way in datafusion-cli to view statistics on the number of rows and columns scanned? Currently, I directly print the batch information in the Like expression, which gives the following output (it seems endless, and the amount of data being scanned appears to be very large, all with exactly 105 columns):

Image

Some concerns

Parquet is composed of RowGroups. Is it difficult to read an individual page? In my previous work, I’ve seen optimizations for this scenario (based on DataFusion), but it used a custom columnar storage format, which was easier to implement. At that time, when working with very large datasets (similar to the hits dataset), the query time for "order by limit" was reduced to around 2 seconds.

Summary

The reading process of the entire data can be delayed by using "order by" on columns, which is very effective for the "order by limit" scenario. I'm not sure if DataFusion is currently doing this.

@zhuqi-lucas
Copy link
Contributor

zhuqi-lucas commented Apr 17, 2025

Thank you @acking-you for the idea. Does it similar to parquet filter pushdown? We are already trying to make it default.
#3463

With parquet filter pushdown, we will only scan the filtered pages, but the topk filter pushdown is still in progress.

@acking-you
Copy link
Contributor

acking-you commented Apr 17, 2025

Thank you @acking-you for the idea. Does it similar to parquet filter pushdown? We are already trying to make it default. #3463

With parquet filter pushdown, we will only scan the filtered pages, but the topk filter pushdown is still in progress.

This optimization targets the select * xxx order by limit scenario, where it can bring significant improvements. If we only push down the filter, it won't achieve the effect of materializing only the full columns within the limit rows. Therefore, for the order by limit scenario, we need to implement lazy materialization of columns.

The approach is quite easy to understand. It involves completing the sorting by only reading the order by columns and then performing a full scan to read the required "limit" rows.

Take select * from data order by timestamp desc limit 10 as an example

the most straightforward execution process for "order by limit" might look like the following:

///  Initial RecordBatch:
///    +----------------+---------------------+
///    |     Columns     |      Rows           |
///    +----------------+---------------------+
///    | *All columns*   | row1, row2, ..., rowN |
///    +----------------+---------------------+
//////  Full Table Scan:
///    (Read all columns and rows into memory)
//////  Scanned RecordBatch:
///    +----------------+---------------------+
///    | timestamp      | ... (other columns) |
///    +----------------+---------------------+
///    | 2023-09-01     | ...                 |
///    | 2023-09-02     | ...                 |
///    | ...            | ...                 |
///    +----------------+---------------------+
//////  Sort by `timestamp DESC`:
///    (Re-order rows via stable sort on timestamp)
//////  Sorted RecordBatch:
///    +----------------+---------------------+
///    | timestamp      | ... (other columns) |
///    +----------------+---------------------+
///    | 2023-09-15     | ...                 |  
///    | 2023-09-14     | ...                 |
///    | ...            | ...                 |
///    +----------------+---------------------+
//////  Apply `LIMIT 10`:
///    (Select top 10 rows from sorted batch)
//////  Final RecordBatch:
///    +----------------+---------------------+
///    | timestamp      | ... (other columns) |
///    +----------------+---------------------+
///    | 2023-09-15     | ...                 | 
///    | 2023-09-14     | ...                 |
///    | ... (8 more)   | ...                 |
///    +----------------+---------------------+

The effect after having delayed materialization of non-ordered columns is as follows:

///  Initial Data Source:
///    +----------------+---------------------+
///    |     Columns     |      Rows           |
///    +----------------+---------------------+
///    | timestamp      | 2023-09-01, ...     |
///    | other_col1     | data1, ...          |
///    | other_col2     | data2, ...          |
///    | ...            | ...                 |
///    +----------------+---------------------+
//////  Projection Scan:
///    (Only read `timestamp` + generate row IDs)
//////  Scanned RecordBatch:
///    +----------------+---------------------+
///    | row_id         | timestamp           |
///    +----------------+---------------------+
///    | 0              | 2023-09-01          |
///    | 1              | 2023-09-02          |
///    | ...            | ...                 |
///    | N-1            | 2023-09-15          |
///    +----------------+---------------------+
//////  Sort by `timestamp DESC`:
///    (Sort only row_id and timestamp)
//////  Sorted Indexes: [15, 14, 8, ...]  -- List of original row numbers sorted by timestamp
///  Sorted Timestamps:
///    +----------------+
///    | 2023-09-15     |
///    | 2023-09-14     |
///    | ...            |
///    +----------------+
//////  Apply `LIMIT 10`:
///    (Select top 10 row_ids)
//////  Final Indexes: [15, 14, 8, 3, 7, 2, 5, 11, 9, 6]
//////  Fetch Other Columns by row_id:
///    (Random access to original data via indexes)
//////  Final RecordBatch:
///    +----------------+---------------------+
///    | timestamp      | other_col1 | ...     |
///    +----------------+---------------------+
///    | 2023-09-15     | data15     | ...     |  -- The original row corresponding to row_id=15
///    | 2023-09-14     | data14     | ...     |  -- The original row corresponding to row_id=14
///    | ... (8 more)   | ...        | ...     |
///    +----------------+---------------------+

My opinion

In my previous attempt link, I found that in order to read 10 rows of data, DataFusion would end up scanning an additional 104 columns, which is a significant overhead. I believe the approach would be very helpful for this scenario.

@acking-you
Copy link
Contributor

I also noticed that recently, a PR was merged in ClickHouse that does exactly what was described above: ClickHouse/ClickHouse#55518, with the corresponding issue: ClickHouse/ClickHouse#45868. @zhuqi-lucas

@adriangb
Copy link
Contributor

@acking-you have you seen #15301?

@acking-you
Copy link
Contributor

@acking-you have you seen #15301?

Thank you for your hint. I haven't looked into this part of the code yet. I only reviewed the optimization method you described in #15037. It seems that there is no mention of deferred materialization for "order by limit" (perhaps I missed it since the content of the issue is quite long). So, have we considered optimizing "order by limit" in two phases? I'm planning to review and study the PR you mentioned over the weekend. Thanks again for your reply.

@adriangb
Copy link
Contributor

DataFusion already does late materialization: if orders filters by least to most expensive, then scans only the columns that the filter needs. Once it applies all filters it then materializes the projection. It's not the default yet but it will be soon. Please see #3463 which @zhuqi-lucas linked to above.

@acking-you
Copy link
Contributor

DataFusion already does late materialization: if orders filters by least to most expensive, then scans only the columns that the filter needs. Once it applies all filters it then materializes the projection. It's not the default yet but it will be soon. Please see #3463 which @zhuqi-lucas linked to above.

I'm sorry. I thought the late materialization was for filter column without considering order by column. This is my fault. I should have looked deeper.

@adriangb
Copy link
Contributor

Well the point of this recent work is to create filters from the order by clause :)

@alamb
Copy link
Contributor Author

alamb commented Apr 19, 2025

Currently, q23 takes approximately 6 seconds to execute. I have confirmed that DataFusion does not have the aforementioned optimizations and still scans a very large number of rows and columns. By the way, is there a convenient way in datafusion-cli to view statistics on the number of rows and columns scanned? Currently, I directly print the batch information in the Like expression, which gives the following output (it seems endless, and the amount of data being scanned appears to be very large, all with exactly 105 columns):

@acking-you I agree with your analysis and arrived at a similar conclusion in this ticket:

It seems that there is no mention of deferred materialization for "order by limit" (perhaps I missed it since the content of the issue is quite long). So, have we considered optimizing "order by limit" in two phases? I'm planning to review and study the PR you mentioned over the weekend. Thanks again for your reply.

I also agree The deferred materialization is key to improving performance massively. I believe this is the effect of #3463 though it does not use that term

Please see #3463 which @zhuqi-lucas linked to above.

So TLDR is by combing the following two items

I think DataFusion will have the equivalent of materialized filter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants