Skip to content

Commit

Permalink
Add FileScanConfig::new() API
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 23, 2024
1 parent d5542b1 commit c650bdf
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 28 deletions.
41 changes: 14 additions & 27 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue, Statistics,
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::utils::conjunction;
use datafusion_expr::{TableProviderFilterPushDown, TableType};
Expand Down Expand Up @@ -243,36 +243,23 @@ impl TableProvider for IndexTableProvider {
// will not be returned.
let files = self.index.get_files(predicate.clone())?;

// Create the scan configuration to scan the relevant files
let object_store_url = ObjectStoreUrl::parse("file://")?;
let mut base_config = FileScanConfig::new(object_store_url, self.schema())
.with_projection(projection.cloned())
.with_limit(limit);

// Transform to the format needed to pass to ParquetExec
// Create one file group per file (default to scanning them all in parallel)
let file_groups = files
.into_iter()
.map(|(file_name, file_size)| {
let path = self.dir.join(file_name);
let canonical_path = fs::canonicalize(path)?;
Ok(vec![PartitionedFile::new(
canonical_path.display().to_string(),
file_size,
)])
})
.collect::<Result<Vec<_>>>()?;

// for now, simply use ParquetExec
// TODO make a builder for FileScanConfig
let object_store_url = ObjectStoreUrl::parse("file://")?;
let base_config = FileScanConfig {
object_store_url,
file_schema: self.schema(),
file_groups,
statistics: Statistics::new_unknown(self.index.schema()),
projection: projection.cloned(),
limit,
table_partition_cols: vec![],
output_ordering: vec![],
};
for (file_name, file_size) in files.into_iter() {
let path = self.dir.join(file_name);
let canonical_path = fs::canonicalize(path)?;
let partitioned_file =
PartitionedFile::new(canonical_path.display().to_string(), file_size);
base_config.add_file(partitioned_file);
}

let metadata_size_hint = None;

let table_parquet_options = TableParquetOptions::default();

// TODO make a builder for parquet exec
Expand Down
98 changes: 98 additions & 0 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,43 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {

/// The base configurations to provide when creating a physical plan for
/// any given file format.
///
/// # Example
/// ```
/// # use std::sync::Arc;
/// # use arrow_schema::Schema;
/// use datafusion::datasource::listing::PartitionedFile;
/// # use datafusion::datasource::physical_plan::FileScanConfig;
/// # use datafusion_execution::object_store::ObjectStoreUrl;
/// # let file_schema = Arc::new(Schema::empty());
/// // create FileScan config for reading data from file://
/// let object_store_url = ObjectStoreUrl::local_filesystem();
/// let mut config = FileScanConfig::new(object_store_url, file_schema)
/// .with_limit(Some(1000)) // read only the first 1000 records
/// .with_projection(Some(vec![2, 3])); // project columns 2 and 3
///
/// // Read /tmp/file1.parquet with known size of 1234 bytes in a single group
/// config.add_file(PartitionedFile::new("file1.parquet", 1234));
///
/// // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
/// // in a single row group
/// config.add_file_group(vec![
/// PartitionedFile::new("file2.parquet", 56),
/// PartitionedFile::new("file3.parquet", 78),
/// ]);
/// ```
#[derive(Clone)]
pub struct FileScanConfig {
/// Object store URL, used to get an [`ObjectStore`] instance from
/// [`RuntimeEnv::object_store`]
///
/// This `ObjectStoreUrl` should be the prefix of the absolute url for files
/// as `file://` or `s3://my_bucket`. It should not include the path to the
/// file itself. The relevant URL prefix must be registered via
/// [`RuntimeEnv::register_object_store`]
///
/// [`ObjectStore`]: object_store::ObjectStore
/// [`RuntimeEnv::register_object_store`]: datafusion_execution::runtime_env::RuntimeEnv::register_object_store
/// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store
pub object_store_url: ObjectStoreUrl,
/// Schema before `projection` is applied. It contains the all columns that may
Expand All @@ -87,6 +118,7 @@ pub struct FileScanConfig {
/// sequentially, one after the next.
pub file_groups: Vec<Vec<PartitionedFile>>,
/// Estimated overall statistics of the files, taking `filters` into account.
/// Defaults to [`Statistics::new_unknown`].
pub statistics: Statistics,
/// Columns on which to project the data. Indexes that are higher than the
/// number of columns of `file_schema` refer to `table_partition_cols`.
Expand All @@ -101,6 +133,72 @@ pub struct FileScanConfig {
}

impl FileScanConfig {
/// Create a new `FileScanConfig` with default settings for scanning files.
///
/// No file groups are added by default. See [`Self::add_file`] and
/// [`Self::add_file_group`]
///
/// # Parameters:
/// * `object_store_url`: See [`Self::object_store_url`]
/// * `file_schema`: See [`Self::file_schema`]
pub fn new(object_store_url: ObjectStoreUrl, file_schema: SchemaRef) -> Self {
let statistics = Statistics::new_unknown(&file_schema);
Self {
object_store_url,
file_schema,
file_groups: vec![],
statistics,
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
}
}

/// Add a new file as a single file group
///
/// See [Self::file_groups] for more information
pub fn add_file(&mut self, file: PartitionedFile) {
self.add_file_group(vec![file])
}

/// Add a new file group
///
/// See [Self::file_groups] for more information
pub fn add_file_group(&mut self, file_group: Vec<PartitionedFile>) {
self.file_groups.push(file_group);
}

/// Set the statistics of the files
pub fn with_statistics(mut self, statistics: Statistics) -> Self {
self.statistics = statistics;
self
}

/// Set the projection of the files
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
self
}

/// Set the limit of the files
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}

/// Set the partitioning columns of the files
pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
self.table_partition_cols = table_partition_cols;
self
}

/// Set the output ordering of the files
pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
self.output_ordering = output_ordering;
self
}

/// Project the schema and the statistics on the given column indices
pub fn project(&self) -> (SchemaRef, Statistics, Vec<LexOrdering>) {
if self.projection.is_none() && self.table_partition_cols.is_empty() {
Expand Down
9 changes: 8 additions & 1 deletion datafusion/execution/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,14 @@ impl ObjectStoreUrl {
Ok(Self { url: parsed })
}

/// An [`ObjectStoreUrl`] for the local filesystem
/// An [`ObjectStoreUrl`] for the local filesystem (`file://`)
///
/// # Example
/// ```
/// # use datafusion_execution::object_store::ObjectStoreUrl;
/// let local_fs = ObjectStoreUrl::parse("file://").unwrap();
/// assert_eq!(local_fs, ObjectStoreUrl::local_filesystem())
/// ```
pub fn local_filesystem() -> Self {
Self::parse("file://").unwrap()
}
Expand Down

0 comments on commit c650bdf

Please sign in to comment.