-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Example for building an external index for parquet files #10546
Comments
Update here is I have a basic example #10549 ready for review / merge |
Sorry for jumping in here, maybe this isn't the best issue but it's hard to keep up with all of the amazing work you're doing @alamb! I wanted to pitch a use case I've been thinking about of storing a secondary index on a searchable async location. Think a relational database with ACID guarantees. In particular the key would be that hooks to do selections / pruning be async and that they pass in filters: I'd push down the filters into filters in the metadata store and run an actual query there that returns the files / row groups to scan. This is in contrast to #10549 for example where the index is in memory and fully materialized. I realize that |
Thanks @adriangb ❤️
Yes, I agree this is a very common usecase in modern database / data systems and one I hope will be easier to implement with some of these APIs (btw see #10813 for an even lower level API which I think brings this idea to its lowest leve.)
I agree that you could do an datafusion/datafusion-examples/examples/parquet_index.rs Lines 223 to 263 in 586241f
One thing that is still unclear in my mind is what other APIs we could offer to make it easier to implement an external index. Most of the the code in parquet_index.rs is to create the in memory index. Maybe we could create an example that shows how to implement a remote index 🤔 |
I do think that example would be nice, it's basically what I was trying to build 😄 My approach was going to be something like: async fn scan(
&self,
state: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let object_store_url = ObjectStoreUrl::parse("file://")?;
let mut file_scan_config = FileScanConfig::new(object_store_url, self.schema())
.with_projection(projection.cloned())
.with_limit(limit);
// Use the index to get row groups to be scanned
// Index does best effort to parse filters and push them down into the metadata store
let partitioned_files_with_row_group_selection = self.index.get_files(filters).await?;
for file in partitioned_files_with_row_group_selection {
file_scan_config = file_scan_config.with_file(PartitionedFile::new(
file.canonical_path.display().to_string(),
file.file_size,
).with_extensions(Arc::new(file.access_plan())));
}
let df_schema = DFSchema::try_from(self.schema())?;
// convert filters like [`a = 1`, `b = 2`] to a single filter like `a = 1 AND b = 2`
let predicate = conjunction(filters.to_vec());
let predicate = predicate
.map(|predicate| state.create_physical_expr(predicate, &df_schema))
.transpose()?
.unwrap_or_else(|| datafusion_physical_expr::expressions::lit(true));
let exec = ParquetExec::builder(file_scan_config)
.with_predicate(predicate)
.build_arc();
Ok(exec)
} (several functions and types made up) Does this sound about in line with what you would think of as an example? I think implementing the async store as a familiar RDMS (SQLite via SQLx?) would be a good example. |
Yes that is very much in line. Using SQLite via sql-x would be cool, though I don't think we would want to add new dependencies into the core datafusion crates themselves. I made a new repo in datafusion-contrib here https://github.com/datafusion-contrib/datafusion-async-parquet-index and invited you to be an admin, in case you want to do things there |
Is your feature request related to a problem or challenge?
It is common in databases and other analytic system to have additional external "indexes" (perhaps stored in the "metadata catalog", perhaps stored alongside the data files, perhaps embedded in the files, perhaps elsewhere)
These indexes are used to speed up queries by "pruning" the files -- specifically evaluating a predicate on the index and then only reading files / portions of files that would pass the filters in the query
Implementing such a index requires several three steps:
DataFusion has code already to do 2 (PruningPredicate) and 3 (ParquetExec) but I am not sure how obvious it is to put togeher
DataFusion actually also has basic support for 1 (e.g. the ListingTableProvider reads statistics and prunes based on their statistics)
We have some version of this in InfluxDB today, and we are in the process of extending it. However, I think the usecase is much more general and could be useful for other systems as well (e.g. full text indexes across parquet, for example).
I also think creating an example will help motivate the work from @NGA-TRAN in #10453 and myself in #9929
Describe the solution you'd like
No response
Describe alternatives you've considered
No response
Additional context
No response
The text was updated successfully, but these errors were encountered: