diff --git a/Cargo.lock b/Cargo.lock index ce6c464c1de4..e8c3735f7e92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1927,6 +1927,7 @@ version = "47.0.0" dependencies = [ "arrow", "async-trait", + "chrono", "datafusion-catalog", "datafusion-common", "datafusion-datasource", diff --git a/datafusion/catalog-listing/Cargo.toml b/datafusion/catalog-listing/Cargo.toml index 734580202232..bdddaeaa1334 100644 --- a/datafusion/catalog-listing/Cargo.toml +++ b/datafusion/catalog-listing/Cargo.toml @@ -48,6 +48,7 @@ object_store = { workspace = true } tokio = { workspace = true } [dev-dependencies] +chrono = { workspace = true } tempfile = { workspace = true } [lints] diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index 8efb74d4ea1e..52874129fb40 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -17,6 +17,7 @@ //! Helper functions for the table implementation +use std::collections::HashMap as StdHashMap; use std::mem; use std::sync::Arc; @@ -28,7 +29,8 @@ use datafusion_datasource::PartitionedFile; use datafusion_expr::{BinaryExpr, Operator}; use arrow::{ - array::{Array, ArrayRef, AsArray, StringBuilder}, + array::{Array, ArrayRef, AsArray, BooleanArray, StringBuilder}, + buffer::BooleanBuffer, compute::{and, cast, prep_null_mask_filter}, datatypes::{DataType, Field, Fields, Schema}, record_batch::RecordBatch, @@ -279,26 +281,59 @@ async fn prune_partitions( .collect(); let schema = Arc::new(Schema::new(fields)); - let df_schema = DFSchema::from_unqualified_fields( - partition_cols - .iter() - .map(|(n, d)| Field::new(n, d.clone(), true)) - .collect(), - Default::default(), - )?; - let batch = RecordBatch::try_new(schema, arrays)?; // TODO: Plumb this down let props = ExecutionProps::new(); + // Don't retain partitions that evaluated to null + let prepared = apply_filters(&batch, filters, &props)?; + + // If all rows are retained, return all partitions + if prepared.true_count() == prepared.len() { + return Ok(partitions); + } + + // Sanity check + assert_eq!(prepared.len(), partitions.len()); + + let filtered = partitions + .into_iter() + .zip(prepared.values()) + .filter_map(|(p, f)| f.then_some(p)) + .collect(); + + Ok(filtered) +} + +/// Applies the given filters to the input batch and returns a boolean mask that represents +/// the result of the filters applied to each row. +pub(crate) fn apply_filters( + batch: &RecordBatch, + filters: &[Expr], + props: &ExecutionProps, +) -> Result { + if filters.is_empty() { + return Ok(BooleanArray::new( + BooleanBuffer::new_set(batch.num_rows()), + None, + )); + } + + let num_rows = batch.num_rows(); + + let df_schema = DFSchema::from_unqualified_fields( + batch.schema().fields().clone(), + StdHashMap::default(), + )?; + // Applies `filter` to `batch` returning `None` on error let do_filter = |filter| -> Result { - let expr = create_physical_expr(filter, &df_schema, &props)?; - expr.evaluate(&batch)?.into_array(partitions.len()) + let expr = create_physical_expr(filter, &df_schema, props)?; + expr.evaluate(batch)?.into_array(num_rows) }; - //.Compute the conjunction of the filters + // Compute the conjunction of the filters let mask = filters .iter() .map(|f| do_filter(f).map(|a| a.as_boolean().clone())) @@ -307,25 +342,16 @@ async fn prune_partitions( let mask = match mask { Some(Ok(mask)) => mask, Some(Err(err)) => return Err(err), - None => return Ok(partitions), + None => return Ok(BooleanArray::new(BooleanBuffer::new_set(num_rows), None)), }; - // Don't retain partitions that evaluated to null + // Don't retain rows that evaluated to null let prepared = match mask.null_count() { 0 => mask, _ => prep_null_mask_filter(&mask), }; - // Sanity check - assert_eq!(prepared.len(), partitions.len()); - - let filtered = partitions - .into_iter() - .zip(prepared.values()) - .filter_map(|(p, f)| f.then_some(p)) - .collect(); - - Ok(filtered) + Ok(prepared) } #[derive(Debug)] @@ -545,12 +571,41 @@ mod tests { use std::ops::Not; use super::*; + use arrow::array::{ + BooleanArray as ArrowBooleanArray, Int32Array, Int64Array, StringArray, + }; use datafusion_expr::{ case, col, lit, AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF, }; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::ExecutionPlan; + // Helper function for testing prune_partitions + async fn test_prune_partitions( + table_path: &str, + partition_paths: Vec<&str>, + filters: &[Expr], + partition_cols: &[(String, DataType)], + ) -> Result> { + let url = ListingTableUrl::parse(table_path).unwrap(); + + // Create partition objects + let partitions = partition_paths + .into_iter() + .map(|path| Partition { + path: Path::from(path), + depth: path.matches('/').count(), + files: None, + }) + .collect(); + + // Run the prune_partitions function + let pruned = prune_partitions(&url, partitions, filters, partition_cols).await?; + + // Return the paths of the pruned partitions for easier assertion + Ok(pruned.into_iter().map(|p| p.path.to_string()).collect()) + } + #[test] fn test_split_files() { let new_partitioned_file = |path: &str| PartitionedFile::new(path.to_owned(), 10); @@ -1005,6 +1060,617 @@ mod tests { ); } + #[test] + fn test_apply_filters_empty() { + // Test with empty filters + let schema = Arc::new(Schema::new(vec![ + Field::new("col1", DataType::Int32, false), + Field::new("col2", DataType::Utf8, false), + ])); + + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])), + ], + ) + .unwrap(); + + let props = ExecutionProps::new(); + let result = apply_filters(&batch, &[], &props).unwrap(); + + // With empty filters, all rows should be selected + assert_eq!(result.len(), 5); + assert_eq!(result.true_count(), 5); + + let expected_values = [true, true, true, true, true]; + for (i, val) in expected_values.iter().enumerate() { + assert_eq!(result.value(i), *val); + } + } + + #[test] + fn test_apply_filters_single_filter() { + let schema = Arc::new(Schema::new(vec![ + Field::new("col1", DataType::Int32, false), + Field::new("col2", DataType::Utf8, false), + ])); + + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])), + ], + ) + .unwrap(); + + let props = ExecutionProps::new(); + + // Test with a filter on col1 + let filter = col("col1").gt(lit(3)); + let result = apply_filters(&batch, &[filter], &props).unwrap(); + + assert_eq!(result.len(), 5); + assert_eq!(result.true_count(), 2); + + let expected_values = [false, false, false, true, true]; + for (i, val) in expected_values.iter().enumerate() { + assert_eq!(result.value(i), *val); + } + + // Test with a filter on col2 + let filter = col("col2").eq(lit("b")); + let result = apply_filters(&batch, &[filter], &props).unwrap(); + + assert_eq!(result.len(), 5); + assert_eq!(result.true_count(), 1); + + let expected_values = [false, true, false, false, false]; + for (i, val) in expected_values.iter().enumerate() { + assert_eq!(result.value(i), *val); + } + } + + #[test] + fn test_apply_filters_multiple_filters() { + let schema = Arc::new(Schema::new(vec![ + Field::new("col1", DataType::Int32, false), + Field::new("col2", DataType::Utf8, false), + Field::new("col3", DataType::Boolean, false), + ])); + + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])), + Arc::new(ArrowBooleanArray::from(vec![ + true, true, false, true, false, + ])), + ], + ) + .unwrap(); + + let props = ExecutionProps::new(); + + // Test with multiple filters (AND semantics) + let filter1 = col("col1").gt(lit(2)); + let filter2 = col("col3").eq(lit(true)); + let result = apply_filters(&batch, &[filter1, filter2], &props).unwrap(); + + assert_eq!(result.len(), 5); + assert_eq!(result.true_count(), 1); + + let expected_values = [false, false, false, true, false]; + for (i, val) in expected_values.iter().enumerate() { + assert_eq!(result.value(i), *val); + } + } + + #[test] + fn test_apply_filters_with_nulls() { + let schema = Arc::new(Schema::new(vec![ + Field::new("col1", DataType::Int32, true), + Field::new("col2", DataType::Utf8, true), + ])); + + // Create arrays with nulls + let mut int_builder = Int32Array::builder(5); + int_builder.append_value(1); + int_builder.append_null(); + int_builder.append_value(3); + int_builder.append_value(4); + int_builder.append_null(); + let int_array = int_builder.finish(); + + let mut str_builder = StringBuilder::new(); + str_builder.append_value("a"); + str_builder.append_value("b"); + str_builder.append_null(); + str_builder.append_value("d"); + str_builder.append_value("e"); + let str_array = str_builder.finish(); + + let batch = + RecordBatch::try_new(schema, vec![Arc::new(int_array), Arc::new(str_array)]) + .unwrap(); + + let props = ExecutionProps::new(); + + // Filter that involves nulls + let filter = col("col1").is_not_null(); + let result = apply_filters(&batch, &[filter], &props).unwrap(); + + assert_eq!(result.len(), 5); + assert_eq!(result.true_count(), 3); + + let expected_values = [true, false, true, true, false]; + for (i, val) in expected_values.iter().enumerate() { + assert_eq!(result.value(i), *val); + } + + // Multiple filters with nulls + let filter1 = col("col1").is_not_null(); + let filter2 = col("col2").is_not_null(); + let result = apply_filters(&batch, &[filter1, filter2], &props).unwrap(); + + assert_eq!(result.len(), 5); + assert_eq!(result.true_count(), 2); + + let expected_values = [true, false, false, true, false]; + for (i, val) in expected_values.iter().enumerate() { + assert_eq!(result.value(i), *val); + } + } + + #[test] + fn test_apply_filters_with_complex_expressions() { + let schema = Arc::new(Schema::new(vec![ + Field::new("col1", DataType::Int32, false), + Field::new("col2", DataType::Int64, false), + Field::new("col3", DataType::Utf8, false), + ])); + + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(Int64Array::from(vec![10, 20, 30, 40, 50])), + Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])), + ], + ) + .unwrap(); + + let props = ExecutionProps::new(); + + // Test with a complex filter expression + let filter = col("col1") + .gt(lit(2)) + .and(col("col2").lt(lit(50i64))) // Ensure the literal matches the column type + .or(col("col3").eq(lit("e"))); + + let result = apply_filters(&batch, &[filter], &props).unwrap(); + + assert_eq!(result.len(), 5); + assert_eq!(result.true_count(), 3); + + // col1 > 2 AND col2 < 50 => row 2 (col1=3, col2=30) and row 3 (col1=4, col2=40) + // OR col3 = "e" => row 4 (col3="e") + let expected_values = [false, false, true, true, true]; + for (i, val) in expected_values.iter().enumerate() { + assert_eq!(result.value(i), *val); + } + } + + #[test] + fn test_apply_filters_case_expression() { + let schema = Arc::new(Schema::new(vec![ + Field::new("col1", DataType::Int32, false), + Field::new("col2", DataType::Utf8, false), + ])); + + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])), + ], + ) + .unwrap(); + + let props = ExecutionProps::new(); + + // Test with a CASE expression + let case_expr = case(col("col1")) + .when(lit(1), lit(true)) + .when(lit(3), lit(true)) + .when(lit(5), lit(true)) + .otherwise(lit(false)) + .unwrap(); + + let result = apply_filters(&batch, &[case_expr], &props).unwrap(); + + assert_eq!(result.len(), 5); + assert_eq!(result.true_count(), 3); + + let expected_values = [true, false, true, false, true]; + for (i, val) in expected_values.iter().enumerate() { + assert_eq!(result.value(i), *val); + } + } + + #[tokio::test] + async fn test_prune_partitions_empty_filters() { + let table_path = "file:///table"; + let partition_paths = vec![ + "table/year=2021/month=01/day=01", + "table/year=2021/month=01/day=02", + "table/year=2021/month=02/day=01", + "table/year=2022/month=01/day=01", + ]; + + let partition_cols = vec![ + ("year".to_string(), DataType::Int32), + ("month".to_string(), DataType::Int32), + ("day".to_string(), DataType::Int32), + ]; + + // With empty filters, all partitions should be returned + let pruned = test_prune_partitions( + table_path, + partition_paths.clone(), + &[], + &partition_cols, + ) + .await + .unwrap(); + + assert_eq!(pruned.len(), partition_paths.len()); + for (i, path) in partition_paths.iter().enumerate() { + assert_eq!(pruned[i], *path); + } + } + + #[tokio::test] + async fn test_prune_partitions_single_filter() { + let table_path = "file:///table"; + let partition_paths = vec![ + "table/year=2021/month=01/day=01", + "table/year=2021/month=01/day=02", + "table/year=2021/month=02/day=01", + "table/year=2022/month=01/day=01", + ]; + + let partition_cols = vec![ + ("year".to_string(), DataType::Int32), + ("month".to_string(), DataType::Int32), + ("day".to_string(), DataType::Int32), + ]; + + // Filter by year=2021 + let filter = col("year").eq(lit(2021)); + let pruned = test_prune_partitions( + table_path, + partition_paths.clone(), + &[filter], + &partition_cols, + ) + .await + .unwrap(); + + assert_eq!(pruned.len(), 3); + assert_eq!( + pruned, + vec![ + "table/year=2021/month=01/day=01", + "table/year=2021/month=01/day=02", + "table/year=2021/month=02/day=01", + ] + ); + + // Filter by month=01 + let filter = col("month").eq(lit(1)); + let pruned = test_prune_partitions( + table_path, + partition_paths.clone(), + &[filter], + &partition_cols, + ) + .await + .unwrap(); + + assert_eq!(pruned.len(), 3); + assert_eq!( + pruned, + vec![ + "table/year=2021/month=01/day=01", + "table/year=2021/month=01/day=02", + "table/year=2022/month=01/day=01", + ] + ); + + // Filter by day=01 + let filter = col("day").eq(lit(1)); + let pruned = test_prune_partitions( + table_path, + partition_paths.clone(), + &[filter], + &partition_cols, + ) + .await + .unwrap(); + + assert_eq!(pruned.len(), 3); + assert_eq!( + pruned, + vec![ + "table/year=2021/month=01/day=01", + "table/year=2021/month=02/day=01", + "table/year=2022/month=01/day=01", + ] + ); + } + + #[tokio::test] + async fn test_prune_partitions_multiple_filters() { + let table_path = "file:///table"; + let partition_paths = vec![ + "table/year=2021/month=01/day=01", + "table/year=2021/month=01/day=02", + "table/year=2021/month=02/day=01", + "table/year=2022/month=01/day=01", + "table/year=2022/month=02/day=02", + ]; + + let partition_cols = vec![ + ("year".to_string(), DataType::Int32), + ("month".to_string(), DataType::Int32), + ("day".to_string(), DataType::Int32), + ]; + + // Filter by year=2021 AND month=01 + let filter1 = col("year").eq(lit(2021)); + let filter2 = col("month").eq(lit(1)); + let pruned = test_prune_partitions( + table_path, + partition_paths.clone(), + &[filter1, filter2], + &partition_cols, + ) + .await + .unwrap(); + + assert_eq!(pruned.len(), 2); + assert_eq!( + pruned, + vec![ + "table/year=2021/month=01/day=01", + "table/year=2021/month=01/day=02", + ] + ); + + // Filter by year=2022 AND day=02 + let filter1 = col("year").eq(lit(2022)); + let filter2 = col("day").eq(lit(2)); + let pruned = test_prune_partitions( + table_path, + partition_paths.clone(), + &[filter1, filter2], + &partition_cols, + ) + .await + .unwrap(); + + assert_eq!(pruned.len(), 1); + assert_eq!(pruned, vec!["table/year=2022/month=02/day=02",]); + } + + #[tokio::test] + async fn test_prune_partitions_complex_filters() { + let table_path = "file:///table"; + let partition_paths = vec![ + "table/year=2021/month=01/day=01", + "table/year=2021/month=01/day=02", + "table/year=2021/month=02/day=01", + "table/year=2022/month=01/day=01", + "table/year=2022/month=02/day=02", + ]; + + let partition_cols = vec![ + ("year".to_string(), DataType::Int32), + ("month".to_string(), DataType::Int32), + ("day".to_string(), DataType::Int32), + ]; + + // Filter by year=2021 OR day=02 + let filter = col("year").eq(lit(2021)).or(col("day").eq(lit(2))); + let pruned = test_prune_partitions( + table_path, + partition_paths.clone(), + &[filter], + &partition_cols, + ) + .await + .unwrap(); + + // This should match all paths with year=2021 or day=02 + assert_eq!(pruned.len(), 4); + + // Check that all the expected paths are included + assert!(pruned.contains(&"table/year=2021/month=01/day=01".to_string())); + assert!(pruned.contains(&"table/year=2021/month=01/day=02".to_string())); + assert!(pruned.contains(&"table/year=2021/month=02/day=01".to_string())); + assert!(pruned.contains(&"table/year=2022/month=02/day=02".to_string())); + + // Filter by (year=2021 AND month=01) OR (day=02 AND year=2022) + let filter = col("year") + .eq(lit(2021)) + .and(col("month").eq(lit(1))) + .or(col("day").eq(lit(2)).and(col("year").eq(lit(2022)))); + let pruned = test_prune_partitions( + table_path, + partition_paths.clone(), + &[filter], + &partition_cols, + ) + .await + .unwrap(); + + assert_eq!(pruned.len(), 3); + assert_eq!( + pruned, + vec![ + "table/year=2021/month=01/day=01", + "table/year=2021/month=01/day=02", + "table/year=2022/month=02/day=02", + ] + ); + } + + #[tokio::test] + async fn test_prune_partitions_no_matches() { + let table_path = "file:///table"; + let partition_paths = vec![ + "table/year=2021/month=01/day=01", + "table/year=2021/month=01/day=02", + "table/year=2021/month=02/day=01", + "table/year=2022/month=01/day=01", + ]; + + let partition_cols = vec![ + ("year".to_string(), DataType::Int32), + ("month".to_string(), DataType::Int32), + ("day".to_string(), DataType::Int32), + ]; + + // Filter by year=2023 (no matches) + let filter = col("year").eq(lit(2023)); + let pruned = test_prune_partitions( + table_path, + partition_paths.clone(), + &[filter], + &partition_cols, + ) + .await + .unwrap(); + + assert_eq!(pruned.len(), 0); + } + + #[tokio::test] + async fn test_prune_partitions_range_filters() { + let table_path = "file:///table"; + let partition_paths = vec![ + "table/year=2020/month=01/day=01", + "table/year=2021/month=01/day=01", + "table/year=2021/month=06/day=15", + "table/year=2022/month=01/day=01", + "table/year=2023/month=01/day=01", + ]; + + let partition_cols = vec![ + ("year".to_string(), DataType::Int32), + ("month".to_string(), DataType::Int32), + ("day".to_string(), DataType::Int32), + ]; + + // Filter by year >= 2021 AND year <= 2022 + let filter1 = col("year").gt_eq(lit(2021)); + let filter2 = col("year").lt_eq(lit(2022)); + let pruned = test_prune_partitions( + table_path, + partition_paths.clone(), + &[filter1, filter2], + &partition_cols, + ) + .await + .unwrap(); + + assert_eq!(pruned.len(), 3); + assert_eq!( + pruned, + vec![ + "table/year=2021/month=01/day=01", + "table/year=2021/month=06/day=15", + "table/year=2022/month=01/day=01", + ] + ); + + // Filter by year > 2021 + let filter = col("year").gt(lit(2021)); + let pruned = test_prune_partitions( + table_path, + partition_paths.clone(), + &[filter], + &partition_cols, + ) + .await + .unwrap(); + + assert_eq!(pruned.len(), 2); + assert_eq!( + pruned, + vec![ + "table/year=2022/month=01/day=01", + "table/year=2023/month=01/day=01", + ] + ); + } + + #[tokio::test] + async fn test_prune_partitions_with_missing_partition_values() { + let table_path = "file:///table"; + let partition_paths = vec![ + "table/year=2021/month=01", // missing day partition + "table/year=2021/month=01/day=01", + "table/year=2021", // missing month and day partitions + "table/year=2022/month=01/day=01", + ]; + + let partition_cols = vec![ + ("year".to_string(), DataType::Int32), + ("month".to_string(), DataType::Int32), + ("day".to_string(), DataType::Int32), + ]; + + // Filter by year=2021 + let filter = col("year").eq(lit(2021)); + let pruned = test_prune_partitions( + table_path, + partition_paths.clone(), + &[filter], + &partition_cols, + ) + .await + .unwrap(); + + // All partitions with year=2021 should be included, + // even those with missing month or day values + assert_eq!(pruned.len(), 3); + assert!(pruned.contains(&"table/year=2021/month=01".to_string())); + assert!(pruned.contains(&"table/year=2021/month=01/day=01".to_string())); + assert!(pruned.contains(&"table/year=2021".to_string())); + + // Filter by month=01 + let filter = col("month").eq(lit(1)); + let pruned = test_prune_partitions( + table_path, + partition_paths.clone(), + &[filter], + &partition_cols, + ) + .await + .unwrap(); + + // Only partitions with month=01 should be included + assert_eq!(pruned.len(), 3); + assert!(pruned.contains(&"table/year=2021/month=01".to_string())); + assert!(pruned.contains(&"table/year=2021/month=01/day=01".to_string())); + assert!(pruned.contains(&"table/year=2022/month=01/day=01".to_string())); + } + pub fn make_test_store_and_state( files: &[(&str, u64)], ) -> (Arc, Arc) { diff --git a/datafusion/catalog-listing/src/metadata.rs b/datafusion/catalog-listing/src/metadata.rs new file mode 100644 index 000000000000..bbc4ce8e6bd0 --- /dev/null +++ b/datafusion/catalog-listing/src/metadata.rs @@ -0,0 +1,471 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::{ + array::RecordBatch, + datatypes::{Field, Fields, Schema}, +}; +use datafusion_common::Result; +use datafusion_datasource::{metadata::MetadataColumn, PartitionedFile}; +use datafusion_expr::{execution_props::ExecutionProps, Expr}; + +use crate::helpers::apply_filters; + +/// Determine if the given file matches the input metadata filters. +/// `filters` should only contain expressions that can be evaluated +/// using only the metadata columns. +pub fn apply_metadata_filters( + file: PartitionedFile, + filters: &[Expr], + metadata_cols: &[MetadataColumn], +) -> Result> { + // if no metadata col => simply return all the files + if metadata_cols.is_empty() { + return Ok(Some(file)); + } + + let mut builders: Vec<_> = metadata_cols.iter().map(|col| col.builder(1)).collect(); + + for builder in builders.iter_mut() { + builder.append(&file.object_meta); + } + + let arrays = builders + .into_iter() + .map(|builder| builder.finish()) + .collect::>(); + + let fields: Fields = metadata_cols + .iter() + .map(|col| Field::new(col.to_string(), col.arrow_type(), true)) + .collect(); + let schema = Arc::new(Schema::new(fields)); + + let batch = RecordBatch::try_new(schema, arrays)?; + + let props = ExecutionProps::new(); + + // Don't retain rows that evaluated to null + let prepared = apply_filters(&batch, filters, &props)?; + + // If the filter evaluates to true, return the file + if prepared.true_count() == 1 { + return Ok(Some(file)); + } + + Ok(None) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::{DateTime, TimeZone, Utc}; + use datafusion_common::ScalarValue; + use datafusion_expr::{col, lit}; + use object_store::{path::Path, ObjectMeta}; + + // Helper function to create a test file with specific metadata + fn create_test_file( + path: &str, + size: u64, + last_modified: DateTime, + ) -> PartitionedFile { + let object_meta = ObjectMeta { + location: Path::from(path), + last_modified, + size, + e_tag: None, + version: None, + }; + + PartitionedFile { + object_meta, + partition_values: vec![], + range: None, + statistics: None, + extensions: None, + metadata_size_hint: None, + } + } + + #[test] + fn test_apply_metadata_filters_empty_filters() { + // Create a test file + let file = create_test_file( + "test/file.parquet", + 1024, // 1KB + Utc.with_ymd_and_hms(2023, 1, 1, 10, 0, 0).unwrap(), + ); + + // Test with empty filters + let result = apply_metadata_filters( + file.clone(), + &[], + &[ + MetadataColumn::Location, + MetadataColumn::Size, + MetadataColumn::LastModified, + ], + ) + .unwrap(); + + // With empty filters, the file should be returned + assert!(result.is_some()); + assert_eq!( + result.unwrap().object_meta.location.as_ref(), + "test/file.parquet" + ); + } + + #[test] + fn test_apply_metadata_filters_empty_metadata_cols() { + // Create a test file + let file = create_test_file( + "test/file.parquet", + 1024, // 1KB + Utc.with_ymd_and_hms(2023, 1, 1, 10, 0, 0).unwrap(), + ); + + // Test with a filter but empty metadata columns + let filter = col("location").eq(lit("test/file.parquet")); + let result = apply_metadata_filters(file.clone(), &[filter], &[]).unwrap(); + + // With no metadata columns, the file should be returned regardless of the filter + assert!(result.is_some()); + assert_eq!( + result.unwrap().object_meta.location.as_ref(), + "test/file.parquet" + ); + } + + #[test] + fn test_apply_metadata_filters_location() { + // Create a test file + let file = create_test_file( + "test/file.parquet", + 1024, // 1KB + Utc.with_ymd_and_hms(2023, 1, 1, 10, 0, 0).unwrap(), + ); + + // Test with location filter - matching + let filter = col("location").eq(lit("test/file.parquet")); + let result = + apply_metadata_filters(file.clone(), &[filter], &[MetadataColumn::Location]) + .unwrap(); + + // The file should match + assert!(result.is_some()); + assert_eq!( + result.unwrap().object_meta.location.as_ref(), + "test/file.parquet" + ); + + // Test with location filter - not matching + let filter = col("location").eq(lit("test/different.parquet")); + let result = + apply_metadata_filters(file.clone(), &[filter], &[MetadataColumn::Location]) + .unwrap(); + + // The file should not match + assert!(result.is_none()); + + // Test with location filter - partial match (contains) + let filter = col("location").like(lit("%file.parquet")); + let result = + apply_metadata_filters(file.clone(), &[filter], &[MetadataColumn::Location]) + .unwrap(); + + // The file should match + assert!(result.is_some()); + } + + #[test] + fn test_apply_metadata_filters_size() { + // Create a test file + let file = create_test_file( + "test/file.parquet", + 1024, // 1KB + Utc.with_ymd_and_hms(2023, 1, 1, 10, 0, 0).unwrap(), + ); + + // Test with size filter - matching + let filter = col("size").eq(lit(1024u64)); + let result = + apply_metadata_filters(file.clone(), &[filter], &[MetadataColumn::Size]) + .unwrap(); + + // The file should match + assert!(result.is_some()); + + // Test with size filter - not matching + let filter = col("size").eq(lit(2048u64)); + let result = + apply_metadata_filters(file.clone(), &[filter], &[MetadataColumn::Size]) + .unwrap(); + + // The file should not match + assert!(result.is_none()); + + // Test with size filter - range comparison + let filter = col("size") + .gt(lit(512u64)) + .and(col("size").lt(lit(2048u64))); + let result = + apply_metadata_filters(file.clone(), &[filter], &[MetadataColumn::Size]) + .unwrap(); + + // The file should match + assert!(result.is_some()); + } + + #[test] + fn test_apply_metadata_filters_last_modified() { + let timestamp = Utc.with_ymd_and_hms(2023, 1, 1, 10, 0, 0).unwrap(); + + // Create a test file + let file = create_test_file( + "test/file.parquet", + 1024, // 1KB + timestamp, + ); + + // Convert to micros timestamp for comparison with the Arrow type + let timestamp_micros = timestamp.timestamp_micros(); + + // Test with last_modified filter - matching + let filter = col("last_modified").eq(lit(ScalarValue::TimestampMicrosecond( + Some(timestamp_micros), + Some("UTC".to_string().into()), + ))); + let result = apply_metadata_filters( + file.clone(), + &[filter], + &[MetadataColumn::LastModified], + ) + .unwrap(); + + // The file should match + assert!(result.is_some()); + + // Test with last_modified filter - not matching + let different_timestamp = Utc.with_ymd_and_hms(2023, 2, 1, 10, 0, 0).unwrap(); + let different_timestamp_micros = different_timestamp.timestamp_micros(); + let filter = col("last_modified").eq(lit(ScalarValue::TimestampMicrosecond( + Some(different_timestamp_micros), + Some("UTC".to_string().into()), + ))); + let result = apply_metadata_filters( + file.clone(), + &[filter], + &[MetadataColumn::LastModified], + ) + .unwrap(); + + // The file should not match + assert!(result.is_none()); + + // Test with last_modified filter - range comparison + let earlier = Utc.with_ymd_and_hms(2022, 12, 1, 0, 0, 0).unwrap(); + let earlier_micros = earlier.timestamp_micros(); + let later = Utc.with_ymd_and_hms(2023, 2, 1, 0, 0, 0).unwrap(); + let later_micros = later.timestamp_micros(); + + let filter = col("last_modified") + .gt(lit(ScalarValue::TimestampMicrosecond( + Some(earlier_micros), + Some("UTC".to_string().into()), + ))) + .and( + col("last_modified").lt(lit(ScalarValue::TimestampMicrosecond( + Some(later_micros), + Some("UTC".to_string().into()), + ))), + ); + + let result = apply_metadata_filters( + file.clone(), + &[filter], + &[MetadataColumn::LastModified], + ) + .unwrap(); + + // The file should match + assert!(result.is_some()); + } + + #[test] + fn test_apply_metadata_filters_multiple_columns() { + let timestamp = Utc.with_ymd_and_hms(2023, 1, 1, 10, 0, 0).unwrap(); + let timestamp_micros = timestamp.timestamp_micros(); + + // Create a test file + let file = create_test_file( + "test/file.parquet", + 1024, // 1KB + timestamp, + ); + + // Test with multiple metadata columns - all matching + let filter = col("location") + .eq(lit("test/file.parquet")) + .and(col("size").eq(lit(1024u64))); + + let result = apply_metadata_filters( + file.clone(), + &[filter], + &[MetadataColumn::Location, MetadataColumn::Size], + ) + .unwrap(); + + // The file should match + assert!(result.is_some()); + + // Test with multiple metadata columns - one not matching + let filter = col("location") + .eq(lit("test/file.parquet")) + .and(col("size").eq(lit(2048u64))); + + let result = apply_metadata_filters( + file.clone(), + &[filter], + &[MetadataColumn::Location, MetadataColumn::Size], + ) + .unwrap(); + + // The file should not match + assert!(result.is_none()); + + // Test with all three metadata columns + let filter = col("location") + .eq(lit("test/file.parquet")) + .and(col("size").gt(lit(512u64))) + .and( + col("last_modified").eq(lit(ScalarValue::TimestampMicrosecond( + Some(timestamp_micros), + Some("UTC".to_string().into()), + ))), + ); + + let result = apply_metadata_filters( + file.clone(), + &[filter], + &[ + MetadataColumn::Location, + MetadataColumn::Size, + MetadataColumn::LastModified, + ], + ) + .unwrap(); + + // The file should match + assert!(result.is_some()); + } + + #[test] + fn test_apply_metadata_filters_complex_expressions() { + let timestamp = Utc.with_ymd_and_hms(2023, 1, 1, 10, 0, 0).unwrap(); + let timestamp_micros = timestamp.timestamp_micros(); + + // Create a test file + let file = create_test_file( + "test/file.parquet", + 1024, // 1KB + timestamp, + ); + + // Test with a complex expression (OR condition) + let filter = col("location") + .eq(lit("test/different.parquet")) + .or(col("size").eq(lit(1024u64))); + + let result = apply_metadata_filters( + file.clone(), + &[filter], + &[MetadataColumn::Location, MetadataColumn::Size], + ) + .unwrap(); + + // The file should match because one condition is true + assert!(result.is_some()); + + // Test with a more complex nested expression + let filter = col("location").like(lit("%file.parquet")).and( + col("size").lt(lit(2048u64)).or(col("last_modified").gt(lit( + ScalarValue::TimestampMicrosecond( + Some(timestamp_micros), + Some("UTC".to_string().into()), + ), + ))), + ); + + let result = apply_metadata_filters( + file.clone(), + &[filter], + &[ + MetadataColumn::Location, + MetadataColumn::Size, + MetadataColumn::LastModified, + ], + ) + .unwrap(); + + // The file should match + assert!(result.is_some()); + } + + #[test] + fn test_apply_metadata_filters_multiple_filters() { + let timestamp = Utc.with_ymd_and_hms(2023, 1, 1, 10, 0, 0).unwrap(); + + // Create a test file + let file = create_test_file( + "test/file.parquet", + 1024, // 1KB + timestamp, + ); + + // Test with multiple separate filters (AND semantics) + let filter1 = col("location").eq(lit("test/file.parquet")); + let filter2 = col("size").eq(lit(1024u64)); + + let result = apply_metadata_filters( + file.clone(), + &[filter1, filter2], + &[MetadataColumn::Location, MetadataColumn::Size], + ) + .unwrap(); + + // The file should match + assert!(result.is_some()); + + // Test with multiple separate filters - one not matching + let filter1 = col("location").eq(lit("test/file.parquet")); + let filter2 = col("size").eq(lit(2048u64)); + + let result = apply_metadata_filters( + file.clone(), + &[filter1, filter2], + &[MetadataColumn::Location, MetadataColumn::Size], + ) + .unwrap(); + + // The file should not match + assert!(result.is_none()); + } +} diff --git a/datafusion/catalog-listing/src/mod.rs b/datafusion/catalog-listing/src/mod.rs index fb0a960f37b6..6cce7cd2dacb 100644 --- a/datafusion/catalog-listing/src/mod.rs +++ b/datafusion/catalog-listing/src/mod.rs @@ -25,3 +25,4 @@ #![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))] pub mod helpers; +pub mod metadata; diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 84a63faffbbd..36a9a5effede 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -19,7 +19,7 @@ use super::helpers::{expr_applicable_for_cols, pruned_partition_list}; use super::{ListingTableUrl, PartitionedFile}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::{any::Any, str::FromStr, sync::Arc}; use crate::datasource::{ @@ -31,12 +31,17 @@ use crate::datasource::{ }; use crate::execution::context::SessionState; use datafusion_catalog::TableProvider; +use datafusion_catalog_listing::metadata::apply_metadata_filters; use datafusion_common::{config_err, DataFusionError, Result}; -use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; -use datafusion_expr::dml::InsertOp; -use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown}; -use datafusion_expr::{SortExpr, TableType}; +use datafusion_datasource::{ + file_scan_config::{FileScanConfig, FileScanConfigBuilder}, + metadata::MetadataColumn, +}; +use datafusion_expr::{ + dml::InsertOp, utils::conjunction, Expr, SortExpr, TableProviderFilterPushDown, + TableType, +}; use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::{ExecutionPlan, Statistics}; @@ -310,6 +315,9 @@ pub struct ListingOptions { /// multiple equivalent orderings, the outer `Vec` will have a /// single element. pub file_sort_order: Vec>, + /// Additional columns to include in the table schema, based on the metadata of the files. + /// See [Self::with_metadata_cols] for details. + pub metadata_cols: Vec, } impl ListingOptions { @@ -327,6 +335,7 @@ impl ListingOptions { collect_stat: true, target_partitions: 1, file_sort_order: vec![], + metadata_cols: vec![], } } @@ -445,6 +454,51 @@ impl ListingOptions { self } + /// Set metadata columns on [`ListingOptions`] and returns self. + /// + /// "metadata columns" are columns that are computed from the `ObjectMeta` of the files from object store. + /// + /// Available metadata columns: + /// - `location`: The full path to the object + /// - `last_modified`: The last modified time + /// - `size`: The size in bytes of the object + /// + /// For example, given the following files in object store: + /// + /// ```text + /// /mnt/nyctaxi/tripdata01.parquet + /// /mnt/nyctaxi/tripdata02.parquet + /// /mnt/nyctaxi/tripdata03.parquet + /// ``` + /// + /// If the `last_modified` field in the `ObjectMeta` for `tripdata01.parquet` is `2024-01-01 12:00:00`, + /// then the table schema will include a column named `last_modified` with the value `2024-01-01 12:00:00` + /// for all rows read from `tripdata01.parquet`. + /// + /// | [other columns] | last_modified | + /// |-----------------|-----------------------| + /// | ... | 2024-01-01 12:00:00 | + /// | ... | 2024-01-02 15:30:00 | + /// | ... | 2024-01-03 09:15:00 | + /// + /// # Example + /// ``` + /// # use std::sync::Arc; + /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; + /// # use datafusion_datasource::metadata::MetadataColumn; + /// + /// let listing_options = ListingOptions::new(Arc::new( + /// ParquetFormat::default() + /// )) + /// .with_metadata_cols(vec![MetadataColumn::LastModified]); + /// + /// assert_eq!(listing_options.metadata_cols, vec![MetadataColumn::LastModified]); + /// ``` + pub fn with_metadata_cols(mut self, metadata_cols: Vec) -> Self { + self.metadata_cols = metadata_cols; + self + } + /// Set stat collection on [`ListingOptions`] and returns self. /// /// ``` @@ -533,6 +587,26 @@ impl ListingOptions { Ok(schema) } + /// Validates that the metadata columns do not already exist in the schema, + /// and that there are no duplicate metadata columns of the same variant. + pub fn validate_metadata_cols(&self, schema: &SchemaRef) -> Result<()> { + let mut seen = HashSet::with_capacity(self.metadata_cols.len()); + + for col in self.metadata_cols.iter() { + // Check if column already exists in the schema + if schema.column_with_name(col.name()).is_some() { + return plan_err!("Column {} already exists in schema", col); + } + + // Check for duplicate metadata columns + if !seen.insert(*col) { + return plan_err!("Duplicate metadata column: {}", col); + } + } + + Ok(()) + } + /// Infers the partition columns stored in `LOCATION` and compares /// them with the columns provided in `PARTITIONED BY` to help prevent /// accidental corrupts of partitioned tables. @@ -746,9 +820,10 @@ pub struct ListingTable { /// - Represents the actual fields found in files like Parquet, CSV, etc. /// - Used when reading the raw data from files file_schema: SchemaRef, - /// `table_schema` combines `file_schema` + partition columns + /// `table_schema` combines `file_schema` + partition columns + metadata columns /// - Partition columns are derived from directory paths (not stored in files) /// - These are columns like "year=2022/month=01" in paths like `/data/year=2022/month=01/file.parquet` + /// - Metadata columns are optional columns that are not part of the file schema but are derived from the file metadata table_schema: SchemaRef, options: ListingOptions, definition: Option, @@ -776,6 +851,12 @@ impl ListingTable { builder.push(Field::new(part_col_name, part_col_type.clone(), false)); } + // Validate and add metadata columns to the schema + options.validate_metadata_cols(&file_schema)?; + for col in &options.metadata_cols { + builder.push(col.field()); + } + let table_schema = Arc::new( builder .finish() @@ -843,16 +924,39 @@ impl ListingTable { fn try_create_output_ordering(&self) -> Result> { create_ordering(&self.table_schema, &self.options.file_sort_order) } + + /// Returns the partition column fields from the table schema. + fn partition_column_fields(&self) -> Result> { + self.options + .table_partition_cols + .iter() + .map(|col| Ok(self.table_schema.field_with_name(&col.0)?)) + .collect::>>() + } + + /// Returns the partition column names from the table schema. + fn partition_column_names(&self) -> Result> { + Ok(self + .options + .table_partition_cols + .iter() + .map(|col| col.0.as_str())) + } + + /// Returns the metadata column names from the table schema. + fn metadata_column_names(&self) -> impl Iterator { + self.options.metadata_cols.iter().map(|col| col.name()) + } } -// Expressions can be used for parttion pruning if they can be evaluated using -// only the partiton columns and there are partition columns. -fn can_be_evaluted_for_partition_pruning( - partition_column_names: &[&str], +// Expressions can be used for extended columns (partition/metadata) pruning if they can be evaluated using +// only the extended columns and there are extended columns. +fn can_be_evaluated_for_extended_col_pruning( + extended_column_names: &[&str], expr: &Expr, ) -> bool { - !partition_column_names.is_empty() - && expr_applicable_for_cols(partition_column_names, expr) + !extended_column_names.is_empty() + && expr_applicable_for_cols(extended_column_names, expr) } #[async_trait] @@ -880,23 +984,19 @@ impl TableProvider for ListingTable { filters: &[Expr], limit: Option, ) -> Result> { - // extract types of partition columns - let table_partition_cols = self - .options - .table_partition_cols - .iter() - .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone())) - .collect::>>()?; + // extract types of extended columns (partition + metadata columns) + let partition_col_names = self.partition_column_names()?.collect::>(); + let metadata_col_names = self.metadata_column_names().collect::>(); - let table_partition_col_names = table_partition_cols - .iter() - .map(|field| field.name().as_str()) - .collect::>(); - // If the filters can be resolved using only partition cols, there is no need to - // pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated + // If the filters can be resolved using only partition/metadata cols, there is no need to + // push it down to the TableScan, otherwise, `unhandled` pruning predicates will be generated let (partition_filters, filters): (Vec<_>, Vec<_>) = filters.iter().cloned().partition(|filter| { - can_be_evaluted_for_partition_pruning(&table_partition_col_names, filter) + can_be_evaluated_for_extended_col_pruning(&partition_col_names, filter) + }); + let (metadata_filters, filters): (Vec<_>, Vec<_>) = + filters.iter().cloned().partition(|filter| { + can_be_evaluated_for_extended_col_pruning(&metadata_col_names, filter) }); // We should not limit the number of partitioned files to scan if there are filters and limit @@ -904,7 +1004,12 @@ impl TableProvider for ListingTable { let statistic_file_limit = if filters.is_empty() { limit } else { None }; let (mut partitioned_file_lists, statistics) = self - .list_files_for_scan(state, &partition_filters, statistic_file_limit) + .list_files_for_scan( + state, + &partition_filters, + &metadata_filters, + statistic_file_limit, + ) .await?; // if no files need to be read, return an `EmptyExec` @@ -960,6 +1065,17 @@ impl TableProvider for ListingTable { return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))); }; + let table_partition_cols = self + .partition_column_fields()? + .into_iter() + .cloned() + .collect(); + + let metadata_cols = self + .metadata_column_names() + .filter_map(|c| MetadataColumn::from_str(c).ok()) + .collect::>(); + // create the execution plan self.options .format @@ -977,6 +1093,7 @@ impl TableProvider for ListingTable { .with_limit(limit) .with_output_ordering(output_ordering) .with_table_partition_cols(table_partition_cols) + .with_metadata_cols(metadata_cols) .build(), filters.as_ref(), ) @@ -987,18 +1104,19 @@ impl TableProvider for ListingTable { &self, filters: &[&Expr], ) -> Result> { - let partition_column_names = self - .options - .table_partition_cols - .iter() - .map(|col| col.0.as_str()) + let extended_column_names = self + .partition_column_names()? + .chain(self.metadata_column_names()) .collect::>(); + filters .iter() .map(|filter| { - if can_be_evaluted_for_partition_pruning(&partition_column_names, filter) - { - // if filter can be handled by partition pruning, it is exact + if can_be_evaluated_for_extended_col_pruning( + &extended_column_names, + filter, + ) { + // if filter can be handled by pruning from the extended columns, it is exact return Ok(TableProviderFilterPushDown::Exact); } @@ -1109,7 +1227,8 @@ impl ListingTable { async fn list_files_for_scan<'a>( &'a self, ctx: &'a dyn Session, - filters: &'a [Expr], + partition_filters: &'a [Expr], + metadata_filters: &'a [Expr], limit: Option, ) -> Result<(Vec, Statistics)> { let store = if let Some(url) = self.table_paths.first() { @@ -1123,7 +1242,7 @@ impl ListingTable { ctx, store.as_ref(), table_path, - filters, + partition_filters, &self.options.file_extension, &self.options.table_partition_cols, ) @@ -1132,8 +1251,26 @@ impl ListingTable { let meta_fetch_concurrency = ctx.config_options().execution.meta_fetch_concurrency; let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency); - // collect the statistics if required by the config + + let metadata_cols = self + .metadata_column_names() + .map(MetadataColumn::from_str) + .collect::>>()?; + + // collect the statistics if required by the config + filter out files that don't match the metadata filters let files = file_list + .filter_map(|par_file| async { + if metadata_cols.is_empty() { + return Some(par_file); + } + + let Ok(par_file) = par_file else { + return Some(par_file); + }; + + apply_metadata_filters(par_file, metadata_filters, &metadata_cols) + .transpose() + }) .map(|part_file| async { let part_file = part_file?; let statistics = if self.options.collect_stat { @@ -1293,6 +1430,8 @@ mod tests { #[cfg(feature = "parquet")] use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::{provider_as_source, DefaultTableSource, MemTable}; + use std::collections::HashMap; + use crate::execution::options::ArrowReadOptions; use crate::prelude::*; use crate::test::{columns, object_store::register_test_store}; @@ -1832,7 +1971,9 @@ mod tests { let table = ListingTable::try_new(config)?; - let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?; + let (file_list, _) = table + .list_files_for_scan(&ctx.state(), &[], &[], None) + .await?; assert_eq!(file_list.len(), output_partitioning); @@ -1867,7 +2008,9 @@ mod tests { let table = ListingTable::try_new(config)?; - let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?; + let (file_list, _) = table + .list_files_for_scan(&ctx.state(), &[], &[], None) + .await?; assert_eq!(file_list.len(), output_partitioning); @@ -1917,7 +2060,9 @@ mod tests { let table = ListingTable::try_new(config)?; - let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?; + let (file_list, _) = table + .list_files_for_scan(&ctx.state(), &[], &[], None) + .await?; assert_eq!(file_list.len(), output_partitioning); @@ -2471,4 +2616,57 @@ mod tests { Ok(()) } + + #[test] + fn test_validate_metadata_cols() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + // Test valid case - no metadata columns + let options = ListingOptions::new(Arc::new(CsvFormat::default())); + assert!(options.validate_metadata_cols(&schema).is_ok()); + + // Test valid case - all different metadata columns + let options = ListingOptions::new(Arc::new(CsvFormat::default())) + .with_metadata_cols(vec![ + MetadataColumn::Location, + MetadataColumn::Size, + MetadataColumn::LastModified, + ]); + assert!(options.validate_metadata_cols(&schema).is_ok()); + + // Test invalid case - duplicate metadata column + let options = ListingOptions::new(Arc::new(CsvFormat::default())) + .with_metadata_cols(vec![ + MetadataColumn::Location, + MetadataColumn::Size, + MetadataColumn::Size, // Duplicate + ]); + let result = options.validate_metadata_cols(&schema); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Duplicate metadata column")); + + // Test invalid case - column already exists in schema + let schema_with_location = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("location", DataType::Utf8, false), // Same name as Location metadata + ])); + + let options = ListingOptions::new(Arc::new(CsvFormat::default())) + .with_metadata_cols(vec![MetadataColumn::Location]); + + let result = options.validate_metadata_cols(&schema_with_location); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("already exists in schema")); + + Ok(()) + } } diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index 160084213c7c..3631b13c7429 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -24,6 +24,7 @@ use std::ops::Range; use std::sync::Arc; use arrow::datatypes::DataType; +use datafusion::assert_batches_sorted_eq; use datafusion::datasource::listing::ListingTableUrl; use datafusion::datasource::physical_plan::ParquetSource; use datafusion::datasource::source::DataSourceExec; @@ -41,6 +42,8 @@ use datafusion_catalog::TableProvider; use datafusion_common::stats::Precision; use datafusion_common::test_util::batches_to_sort_string; use datafusion_common::ScalarValue; +use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::metadata::MetadataColumn; use datafusion_execution::config::SessionConfig; use datafusion_expr::{col, lit, Expr, Operator}; use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal}; @@ -73,6 +76,7 @@ async fn parquet_partition_pruning_filter() -> Result<()> { ("month", DataType::Int32), ("day", DataType::Int32), ], + &[], "mirror:///", "alltypes_plain.parquet", ) @@ -563,6 +567,143 @@ async fn parquet_overlapping_columns() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_metadata_columns() -> Result<()> { + let ctx = SessionContext::new(); + + let table = create_partitioned_alltypes_parquet_table( + &ctx, + &[ + "year=2021/month=09/day=09/file.parquet", + "year=2021/month=10/day=09/file.parquet", + "year=2021/month=10/day=28/file.parquet", + ], + &[ + ("year", DataType::Int32), + ("month", DataType::Int32), + ("day", DataType::Int32), + ], + &[ + MetadataColumn::Location, + MetadataColumn::Size, + MetadataColumn::LastModified, + ], + "mirror:///", + "alltypes_plain.parquet", + ) + .await; + + ctx.register_table("t", table).unwrap(); + + let result = ctx + .sql("SELECT id, size, location, last_modified FROM t WHERE size > 1500 ORDER BY id LIMIT 10") + .await? + .collect() + .await?; + + let expected = [ + "+----+------+----------------------------------------+----------------------+", + "| id | size | location | last_modified |", + "+----+------+----------------------------------------+----------------------+", + "| 0 | 1851 | year=2021/month=09/day=09/file.parquet | 1970-01-01T00:00:00Z |", + "| 0 | 1851 | year=2021/month=10/day=09/file.parquet | 1970-01-01T00:00:00Z |", + "| 0 | 1851 | year=2021/month=10/day=28/file.parquet | 1970-01-01T00:00:00Z |", + "| 1 | 1851 | year=2021/month=09/day=09/file.parquet | 1970-01-01T00:00:00Z |", + "| 1 | 1851 | year=2021/month=10/day=09/file.parquet | 1970-01-01T00:00:00Z |", + "| 1 | 1851 | year=2021/month=10/day=28/file.parquet | 1970-01-01T00:00:00Z |", + "| 2 | 1851 | year=2021/month=09/day=09/file.parquet | 1970-01-01T00:00:00Z |", + "| 2 | 1851 | year=2021/month=10/day=09/file.parquet | 1970-01-01T00:00:00Z |", + "| 2 | 1851 | year=2021/month=10/day=28/file.parquet | 1970-01-01T00:00:00Z |", + "| 3 | 1851 | year=2021/month=10/day=28/file.parquet | 1970-01-01T00:00:00Z |", + "+----+------+----------------------------------------+----------------------+", + ]; + assert_batches_sorted_eq!(expected, &result); + + Ok(()) +} + +#[tokio::test] +async fn test_metadata_columns_pushdown() -> Result<()> { + let ctx = SessionContext::new(); + + let table = create_partitioned_alltypes_parquet_table( + &ctx, + &[ + "year=2021/month=09/day=09/file.parquet", + "year=2021/month=10/day=09/file.parquet", + "year=2021/month=10/day=28/file.parquet", + ], + &[ + ("year", DataType::Int32), + ("month", DataType::Int32), + ("day", DataType::Int32), + ], + &[ + MetadataColumn::Location, + MetadataColumn::Size, + MetadataColumn::LastModified, + ], + "mirror:///", + "alltypes_plain.parquet", + ) + .await; + + // The metadata filters can be resolved using only the metadata columns. + let filters = [ + Expr::eq( + col("location"), + lit("year=2021/month=09/day=09/file.parquet"), + ), + Expr::gt(col("size"), lit(400u64)), + Expr::gt_eq( + col("last_modified"), + lit(ScalarValue::TimestampMicrosecond( + Some(0), + Some("UTC".into()), + )), + ), + Expr::gt(col("id"), lit(1)), + ]; + let exec = table.scan(&ctx.state(), None, &filters, None).await?; + let data_source_exec = exec.as_any().downcast_ref::().unwrap(); + let data_source = data_source_exec.data_source(); + let file_source = data_source + .as_any() + .downcast_ref::() + .unwrap(); + let parquet_config = file_source + .file_source() + .as_any() + .downcast_ref::() + .unwrap(); + let pred = parquet_config.predicate().unwrap(); + // Only the last filter should be pushdown to TableScan + let expected = Arc::new(BinaryExpr::new( + Arc::new(Column::new_with_schema("id", &exec.schema()).unwrap()), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + )); + + assert!(pred.as_any().is::()); + let pred = pred.as_any().downcast_ref::().unwrap(); + + assert_eq!(pred, expected.as_ref()); + + // Only the first file should be scanned + let scan_files = file_source + .file_groups + .iter() + .flat_map(|x| x.iter().map(|y| y.path()).collect::>()) + .collect::>(); + assert_eq!(scan_files.len(), 1); + assert_eq!( + scan_files[0].to_string(), + "year=2021/month=09/day=09/file.parquet" + ); + + Ok(()) +} + fn register_partitioned_aggregate_csv( ctx: &SessionContext, store_paths: &[&str], @@ -607,6 +748,7 @@ async fn register_partitioned_alltypes_parquet( ctx, store_paths, partition_cols, + &[], table_path, source_file, ) @@ -619,6 +761,7 @@ async fn create_partitioned_alltypes_parquet_table( ctx: &SessionContext, store_paths: &[&str], partition_cols: &[(&str, DataType)], + metadata_cols: &[MetadataColumn], table_path: &str, source_file: &str, ) -> Arc { @@ -636,7 +779,8 @@ async fn create_partitioned_alltypes_parquet_table( .iter() .map(|x| (x.0.to_owned(), x.1.clone())) .collect::>(), - ); + ) + .with_metadata_cols(metadata_cols.to_vec()); let table_path = ListingTableUrl::parse(table_path).unwrap(); let store_path = diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index fb756cc11fbb..0f2ce3aa7912 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -20,7 +20,7 @@ use std::{ any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter, - fmt::Result as FmtResult, marker::PhantomData, sync::Arc, + fmt::Result as FmtResult, marker::PhantomData, str::FromStr, sync::Arc, }; use crate::file_groups::FileGroup; @@ -29,6 +29,7 @@ use crate::{ file::FileSource, file_compression_type::FileCompressionType, file_stream::FileStream, + metadata::MetadataColumn, source::{DataSource, DataSourceExec}, statistics::MinMaxStatistics, PartitionedFile, @@ -63,6 +64,7 @@ use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, }; use log::{debug, warn}; +use object_store::ObjectMeta; /// The base configurations for a [`DataSourceExec`], the a physical plan for /// any given file format. @@ -160,13 +162,15 @@ pub struct FileScanConfig { /// Table constraints pub constraints: Constraints, /// Columns on which to project the data. Indexes that are higher than the - /// number of columns of `file_schema` refer to `table_partition_cols`. + /// number of columns of `file_schema` refer to `table_partition_cols` and then `metadata_cols`. pub projection: Option>, /// The maximum number of records to read from this plan. If `None`, /// all records after filtering are returned. pub limit: Option, /// The partitioning columns pub table_partition_cols: Vec, + /// The metadata columns + pub metadata_cols: Vec, /// All equivalent lexicographical orderings that describe the schema. pub output_ordering: Vec, /// File compression type @@ -242,6 +246,7 @@ pub struct FileScanConfigBuilder { limit: Option, projection: Option>, table_partition_cols: Vec, + metadata_cols: Vec, constraints: Option, file_groups: Vec, statistics: Option, @@ -275,6 +280,7 @@ impl FileScanConfigBuilder { limit: None, projection: None, table_partition_cols: vec![], + metadata_cols: vec![], constraints: None, batch_size: None, } @@ -309,6 +315,12 @@ impl FileScanConfigBuilder { self } + /// Set the metadata columns + pub fn with_metadata_cols(mut self, metadata_cols: Vec) -> Self { + self.metadata_cols = metadata_cols; + self + } + /// Set the table constraints pub fn with_constraints(mut self, constraints: Constraints) -> Self { self.constraints = Some(constraints); @@ -394,6 +406,7 @@ impl FileScanConfigBuilder { limit, projection, table_partition_cols, + metadata_cols, constraints, file_groups, statistics, @@ -419,6 +432,7 @@ impl FileScanConfigBuilder { limit, projection, table_partition_cols, + metadata_cols, constraints, file_groups, output_ordering, @@ -443,6 +457,7 @@ impl From for FileScanConfigBuilder { limit: config.limit, projection: config.projection, table_partition_cols: config.table_partition_cols, + metadata_cols: config.metadata_cols, constraints: Some(config.constraints), batch_size: config.batch_size, } @@ -662,6 +677,7 @@ impl FileScanConfig { projection: None, limit: None, table_partition_cols: vec![], + metadata_cols: vec![], output_ordering: vec![], file_compression_type: FileCompressionType::UNCOMPRESSED, new_lines_in_values: false, @@ -696,7 +712,8 @@ impl FileScanConfig { match &self.projection { Some(proj) => proj.clone(), None => (0..self.file_schema.fields().len() - + self.table_partition_cols.len()) + + self.table_partition_cols.len() + + self.metadata_cols.len()) .collect(), } } @@ -730,11 +747,24 @@ impl FileScanConfig { .projection_indices() .into_iter() .map(|idx| { - if idx < self.file_schema.fields().len() { - self.file_schema.field(idx).clone() - } else { - let partition_idx = idx - self.file_schema.fields().len(); - self.table_partition_cols[partition_idx].clone() + let file_schema_len = self.file_schema.fields().len(); + let partition_cols_len = self.table_partition_cols.len(); + + match idx { + // File schema columns + i if i < file_schema_len => self.file_schema.field(i).clone(), + + // Partition columns + i if i < file_schema_len + partition_cols_len => { + let partition_idx = i - file_schema_len; + self.table_partition_cols[partition_idx].clone() + } + + // Metadata columns + i => { + let metadata_idx = i - file_schema_len - partition_cols_len; + self.metadata_cols[metadata_idx].field() + } } }) .collect(); @@ -801,6 +831,12 @@ impl FileScanConfig { self } + /// Set the metadata columns of the files + pub fn with_metadata_cols(mut self, metadata_cols: Vec) -> Self { + self.metadata_cols = metadata_cols; + self + } + /// Set the output ordering of the files #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")] pub fn with_output_ordering(mut self, output_ordering: Vec) -> Self { @@ -873,7 +909,7 @@ impl FileScanConfig { }) } - /// Projects only file schema, ignoring partition columns + /// Projects only file schema, ignoring partition/metadata columns pub fn projected_file_schema(&self) -> SchemaRef { let fields = self.file_column_projection_indices().map(|indices| { indices @@ -1121,13 +1157,13 @@ impl DisplayAs for FileScanConfig { } } -/// A helper that projects partition columns into the file record batches. +/// A helper that projects extended (i.e. partition/metadata) columns into the file record batches. /// /// One interesting trick is the usage of a cache for the key buffers of the partition column /// dictionaries. Indeed, the partition columns are constant, so the dictionaries that represent them /// have all their keys equal to 0. This enables us to re-use the same "all-zero" buffer across batches, /// which makes the space consumption of the partition columns O(batch_size) instead of O(record_count). -pub struct PartitionColumnProjector { +pub struct ExtendedColumnProjector { /// An Arrow buffer initialized to zeros that represents the key array of all partition /// columns (partition columns are materialized by dictionary arrays with only one /// value in the dictionary, thus all the keys are equal to zero). @@ -1136,15 +1172,22 @@ pub struct PartitionColumnProjector { /// schema. Sorted by index in the target schema so that we can iterate on it to /// insert the partition columns in the target record batch. projected_partition_indexes: Vec<(usize, usize)>, + /// Similar to `projected_partition_indexes` but only stores the indexes in the target schema + projected_metadata_indexes: Vec, /// The schema of the table once the projection was applied. projected_schema: SchemaRef, } -impl PartitionColumnProjector { - // Create a projector to insert the partitioning columns into batches read from files - // - `projected_schema`: the target schema with both file and partitioning columns +impl ExtendedColumnProjector { + // Create a projector to insert the partitioning/metadata columns into batches read from files + // - `projected_schema`: the target schema with file, partitioning and metadata columns // - `table_partition_cols`: all the partitioning column names - pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self { + // - `metadata_cols`: all the metadata column names + pub fn new( + projected_schema: SchemaRef, + table_partition_cols: &[String], + metadata_cols: &[MetadataColumn], + ) -> Self { let mut idx_map = HashMap::new(); for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() { if let Ok(schema_idx) = projected_schema.index_of(partition_name) { @@ -1155,25 +1198,42 @@ impl PartitionColumnProjector { let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect(); projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b)); + let mut projected_metadata_indexes = vec![]; + for metadata_name in metadata_cols.iter() { + if let Ok(schema_idx) = projected_schema.index_of(metadata_name.name()) { + projected_metadata_indexes.push(schema_idx); + } + } + // Sort to ensure that the final metadata column vector is expanded properly + if !projected_metadata_indexes.is_empty() { + projected_metadata_indexes.sort(); + } + Self { - projected_partition_indexes, key_buffer_cache: Default::default(), + projected_partition_indexes, + projected_metadata_indexes, projected_schema, } } - // Transform the batch read from the file by inserting the partitioning columns + // Transform the batch read from the file by inserting both partitioning and metadata columns // to the right positions as deduced from `projected_schema` // - `file_batch`: batch read from the file, with internal projection applied // - `partition_values`: the list of partition values, one for each partition column + // - `metadata`: the metadata of the file containing information like location, size, etc. pub fn project( &mut self, file_batch: RecordBatch, partition_values: &[ScalarValue], + metadata: &ObjectMeta, ) -> Result { - let expected_cols = - self.projected_schema.fields().len() - self.projected_partition_indexes.len(); + // Calculate expected number of columns from the file (excluding partition and metadata columns) + let expected_cols = self.projected_schema.fields().len() + - self.projected_partition_indexes.len() + - self.projected_metadata_indexes.len(); + // Verify the file batch has the expected number of columns if file_batch.columns().len() != expected_cols { return exec_err!( "Unexpected batch schema from file, expected {} cols but got {}", @@ -1182,18 +1242,21 @@ impl PartitionColumnProjector { ); } + // Start with the columns from the file batch let mut cols = file_batch.columns().to_vec(); + + // Insert partition columns for &(pidx, sidx) in &self.projected_partition_indexes { - let p_value = - partition_values - .get(pidx) - .ok_or(DataFusionError::Execution( - "Invalid partitioning found on disk".to_string(), - ))?; + // Get the partition value from the provided values + let p_value = partition_values.get(pidx).ok_or_else(|| { + DataFusionError::Execution( + "Invalid partitioning found on disk".to_string(), + ) + })?; let mut partition_value = Cow::Borrowed(p_value); - // check if user forgot to dict-encode the partition value + // Check if user forgot to dict-encode the partition value and apply auto-fix if needed let field = self.projected_schema.field(sidx); let expected_data_type = field.data_type(); let actual_data_type = partition_value.data_type(); @@ -1207,6 +1270,7 @@ impl PartitionColumnProjector { } } + // Create array and insert at the correct schema position cols.insert( sidx, create_output_array( @@ -1214,9 +1278,25 @@ impl PartitionColumnProjector { partition_value.as_ref(), file_batch.num_rows(), )?, - ) + ); } + // Insert metadata columns + for &sidx in &self.projected_metadata_indexes { + // Get the metadata column type from the field name + let field_name = self.projected_schema.field(sidx).name(); + let metadata_col = MetadataColumn::from_str(field_name).map_err(|e| { + DataFusionError::Execution(format!("Invalid metadata column: {}", e)) + })?; + + // Convert metadata to scalar value based on the column type + let scalar_value = metadata_col.to_scalar_value(metadata); + + // Create array and insert at the correct schema position + cols.insert(sidx, scalar_value.to_array_of_size(file_batch.num_rows())?); + } + + // Create a new record batch with all columns in the correct order RecordBatch::try_new_with_options( Arc::clone(&self.projected_schema), cols, @@ -1521,11 +1601,15 @@ mod tests { generate_test_files, test_util::MockSource, tests::aggr_test_schema, verify_sort_integrity, }; + use object_store::{path::Path, ObjectMeta}; use super::*; use arrow::{ - array::{Int32Array, RecordBatch}, + array::{ + Int32Array, RecordBatch, StringArray, TimestampMicrosecondArray, UInt64Array, + }, compute::SortOptions, + datatypes::TimeUnit, }; use datafusion_common::stats::Precision; @@ -1558,6 +1642,16 @@ mod tests { schema.fields().iter().map(|f| f.name().clone()).collect() } + fn test_object_meta() -> ObjectMeta { + ObjectMeta { + location: Path::from("test"), + size: 100, + last_modified: chrono::Utc::now(), + e_tag: None, + version: None, + } + } + #[test] fn physical_plan_config_no_projection() { let file_schema = aggr_test_schema(); @@ -1719,12 +1813,13 @@ mod tests { let proj_schema = conf.projected_schema(); // created a projector for that projected schema - let mut proj = PartitionColumnProjector::new( + let mut proj = ExtendedColumnProjector::new( proj_schema, &partition_cols .iter() .map(|x| x.0.clone()) .collect::>(), + &[], ); // project first batch @@ -1737,6 +1832,7 @@ mod tests { wrap_partition_value_in_dict(ScalarValue::from("10")), wrap_partition_value_in_dict(ScalarValue::from("26")), ], + &test_object_meta(), ) .expect("Projection of partition columns into record batch failed"); let expected = [ @@ -1765,6 +1861,7 @@ mod tests { wrap_partition_value_in_dict(ScalarValue::from("10")), wrap_partition_value_in_dict(ScalarValue::from("27")), ], + &test_object_meta(), ) .expect("Projection of partition columns into record batch failed"); let expected = [ @@ -1795,6 +1892,7 @@ mod tests { wrap_partition_value_in_dict(ScalarValue::from("10")), wrap_partition_value_in_dict(ScalarValue::from("28")), ], + &test_object_meta(), ) .expect("Projection of partition columns into record batch failed"); let expected = [ @@ -1823,6 +1921,7 @@ mod tests { ScalarValue::from("10"), ScalarValue::from("26"), ], + &test_object_meta(), ) .expect("Projection of partition columns into record batch failed"); let expected = [ @@ -2220,6 +2319,37 @@ mod tests { .unwrap() } + /// Create a test ObjectMeta with given path, size and a fixed timestamp + fn create_test_object_meta(path: &str, size: usize) -> ObjectMeta { + let timestamp = chrono::DateTime::parse_from_rfc3339("2023-01-01T00:00:00Z") + .unwrap() + .with_timezone(&chrono::Utc); + + ObjectMeta { + location: Path::from(path), + size: size.try_into().unwrap(), + last_modified: timestamp, + e_tag: None, + version: None, + } + } + + /// Create a configuration with metadata columns + fn config_with_metadata( + file_schema: SchemaRef, + projection: Option>, + metadata_cols: Vec, + ) -> FileScanConfig { + FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + file_schema.clone(), + Arc::new(MockSource::default()), + ) + .with_projection(projection) + .with_metadata_cols(metadata_cols) + .build() + } + #[test] fn test_file_scan_config_builder() { let file_schema = aggr_test_schema(); @@ -2329,6 +2459,289 @@ mod tests { } } + #[test] + fn test_projected_schema_with_metadata_col() { + let file_schema = aggr_test_schema(); + let metadata_cols = vec![ + MetadataColumn::Location, + MetadataColumn::Size, + MetadataColumn::LastModified, + ]; + + let conf = config_with_metadata(Arc::clone(&file_schema), None, metadata_cols); + + // Get projected schema + let schema = conf.projected_schema(); + + // Verify schema has all file schema fields plus metadata columns + assert_eq!(schema.fields().len(), file_schema.fields().len() + 3); + + // Check that metadata fields are added at the end + let file_schema_len = file_schema.fields().len(); + assert_eq!(schema.field(file_schema_len).name(), "location"); + assert_eq!(schema.field(file_schema_len + 1).name(), "size"); + assert_eq!(schema.field(file_schema_len + 2).name(), "last_modified"); + + // Check data types + assert_eq!(schema.field(file_schema_len).data_type(), &DataType::Utf8); + assert_eq!( + schema.field(file_schema_len + 1).data_type(), + &DataType::UInt64 + ); + assert_eq!( + schema.field(file_schema_len + 2).data_type(), + &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())) + ); + } + + #[test] + fn test_projected_schema_with_projection_and_metadata_cols() { + let file_schema = aggr_test_schema(); + let metadata_cols = vec![MetadataColumn::Location, MetadataColumn::Size]; + + // Create projection that includes only the first two columns from file schema plus metadata + let file_schema_len = file_schema.fields().len(); + let projection = Some(vec![ + 0, + 1, // First two columns from file schema + file_schema_len, // Location metadata column + file_schema_len + 1, // Size metadata column + ]); + + let conf = + config_with_metadata(Arc::clone(&file_schema), projection, metadata_cols); + + // Get projected schema + let schema = conf.projected_schema(); + + // Verify schema has only the projected columns + assert_eq!(schema.fields().len(), 4); + + // Check that the first two columns are from the file schema + assert_eq!(schema.field(0).name(), file_schema.field(0).name()); + assert_eq!(schema.field(1).name(), file_schema.field(1).name()); + + // Check that metadata fields are correctly projected + assert_eq!(schema.field(2).name(), "location"); + assert_eq!(schema.field(3).name(), "size"); + } + + #[test] + fn test_projected_schema_with_partition_and_metadata_cols() { + let file_schema = aggr_test_schema(); + let partition_cols = to_partition_cols(vec![ + ( + "year".to_owned(), + wrap_partition_type_in_dict(DataType::Int32), + ), + ( + "month".to_owned(), + wrap_partition_type_in_dict(DataType::Int32), + ), + ]); + let metadata_cols = vec![MetadataColumn::Location, MetadataColumn::Size]; + + // Create config with partition and metadata columns + let conf = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + file_schema.clone(), + Arc::new(MockSource::default()), + ) + .with_table_partition_cols(partition_cols) + .with_metadata_cols(metadata_cols) + .build(); + + // Get projected schema + let schema = conf.projected_schema(); + + // Verify schema has all file schema fields plus partition and metadata columns + let expected_len = file_schema.fields().len() + 2 + 2; // file + partition + metadata + assert_eq!(schema.fields().len(), expected_len); + + // Check order of columns: file, partition, metadata + let file_len = file_schema.fields().len(); + assert_eq!(schema.field(file_len).name(), "year"); + assert_eq!(schema.field(file_len + 1).name(), "month"); + assert_eq!(schema.field(file_len + 2).name(), "location"); + assert_eq!(schema.field(file_len + 3).name(), "size"); + } + + #[test] + fn test_metadata_column_projector() { + let file_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + // Create metadata columns + let metadata_cols = vec![ + MetadataColumn::Location, + MetadataColumn::Size, + MetadataColumn::LastModified, + ]; + + // Create test object metadata + let object_meta = create_test_object_meta("bucket/file.parquet", 1024); + + // Create projected schema with metadata columns + let projected_fields = vec![ + file_schema.field(0).clone(), + file_schema.field(1).clone(), + metadata_cols[0].field(), + metadata_cols[1].field(), + metadata_cols[2].field(), + ]; + let projected_schema = Arc::new(Schema::new(projected_fields)); + + // Create projector + let mut projector = ExtendedColumnProjector::new( + Arc::clone(&projected_schema), + &[], // No partition columns + &metadata_cols, + ); + + // Create a test record batch + let a_values = Int32Array::from(vec![1, 2, 3]); + let b_values = Int32Array::from(vec![4, 5, 6]); + let file_batch = RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])), + vec![Arc::new(a_values), Arc::new(b_values)], + ) + .unwrap(); + + // Apply projection + let result = projector.project(file_batch, &[], &object_meta).unwrap(); + + // Verify result + assert_eq!(result.num_columns(), 5); + assert_eq!(result.num_rows(), 3); + + // Check metadata column values + let location_col = result + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(location_col.value(0), "bucket/file.parquet"); + + let size_col = result + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(size_col.value(0), 1024); + + let timestamp_col = result + .column(4) + .as_any() + .downcast_ref::() + .unwrap(); + // The timestamp should match what we set in create_test_object_meta + let expected_ts = chrono::DateTime::parse_from_rfc3339("2023-01-01T00:00:00Z") + .unwrap() + .with_timezone(&chrono::Utc) + .timestamp_micros(); + assert_eq!(timestamp_col.value(0), expected_ts); + } + + #[test] + fn test_extended_column_projector_partition_and_metadata() { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + + // Create partition values + let partition_cols = vec!["year".to_string(), "month".to_string()]; + let partition_values = vec![ + ScalarValue::Dictionary( + Box::new(DataType::UInt16), + Box::new(ScalarValue::Int32(Some(2023))), + ), + ScalarValue::Dictionary( + Box::new(DataType::UInt16), + Box::new(ScalarValue::Int32(Some(1))), + ), + ]; + + // Create metadata columns + let metadata_cols = vec![MetadataColumn::Location, MetadataColumn::Size]; + + // Create test object metadata + let object_meta = create_test_object_meta("bucket/file.parquet", 1024); + + // Create projected schema with file, partition and metadata columns + let projected_fields = vec![ + file_schema.field(0).clone(), + Field::new( + "year", + DataType::Dictionary( + Box::new(DataType::UInt16), + Box::new(DataType::Int32), + ), + true, + ), + Field::new( + "month", + DataType::Dictionary( + Box::new(DataType::UInt16), + Box::new(DataType::Int32), + ), + true, + ), + metadata_cols[0].field(), + metadata_cols[1].field(), + ]; + let projected_schema = Arc::new(Schema::new(projected_fields)); + + // Create projector + let mut projector = ExtendedColumnProjector::new( + Arc::clone(&projected_schema), + &partition_cols, + &metadata_cols, + ); + + // Create a test record batch with just the file columns + let a_values = Int32Array::from(vec![1, 2, 3]); + let file_batch = RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])), + vec![Arc::new(a_values)], + ) + .unwrap(); + + // Apply projection + let result = projector + .project(file_batch, &partition_values, &object_meta) + .unwrap(); + + // Verify result + assert_eq!(result.num_columns(), 5); + assert_eq!(result.num_rows(), 3); + + // Columns should be in order: file column, partition columns, metadata columns + assert_eq!(result.schema().field(0).name(), "a"); + assert_eq!(result.schema().field(1).name(), "year"); + assert_eq!(result.schema().field(2).name(), "month"); + assert_eq!(result.schema().field(3).name(), "location"); + assert_eq!(result.schema().field(4).name(), "size"); + + // Check metadata column values + let location_col = result + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(location_col.value(0), "bucket/file.parquet"); + + let size_col = result + .column(4) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(size_col.value(0), 1024); + } + #[test] fn test_file_scan_config_builder_new_from() { let schema = aggr_test_schema(); diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index 1dc53bd6b931..c4a077514c8c 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -28,7 +28,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use crate::file_meta::FileMeta; -use crate::file_scan_config::{FileScanConfig, PartitionColumnProjector}; +use crate::file_scan_config::{ExtendedColumnProjector, FileScanConfig}; use crate::PartitionedFile; use arrow::datatypes::SchemaRef; use datafusion_common::error::Result; @@ -41,6 +41,7 @@ use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use datafusion_common::instant::Instant; use datafusion_common::ScalarValue; +use object_store::ObjectMeta; use futures::future::BoxFuture; use futures::stream::BoxStream; @@ -58,8 +59,8 @@ pub struct FileStream { /// A dynamic [`FileOpener`]. Calling `open()` returns a [`FileOpenFuture`], /// which can be resolved to a stream of `RecordBatch`. file_opener: Arc, - /// The partition column projector - pc_projector: PartitionColumnProjector, + /// The extended (partitioning + metadata) column projector + col_projector: ExtendedColumnProjector, /// The stream state state: FileStreamState, /// File stream specific metrics @@ -79,13 +80,14 @@ impl FileStream { metrics: &ExecutionPlanMetricsSet, ) -> Result { let projected_schema = config.projected_schema(); - let pc_projector = PartitionColumnProjector::new( + let col_projector = ExtendedColumnProjector::new( Arc::clone(&projected_schema), &config .table_partition_cols .iter() .map(|x| x.name().clone()) .collect::>(), + &config.metadata_cols, ); let file_group = config.file_groups[partition].clone(); @@ -95,7 +97,7 @@ impl FileStream { projected_schema, remain: config.limit, file_opener, - pc_projector, + col_projector, state: FileStreamState::Idle, file_stream_metrics: FileStreamMetrics::new(metrics, partition), baseline_metrics: BaselineMetrics::new(metrics, partition), @@ -116,20 +118,22 @@ impl FileStream { /// /// Since file opening is mostly IO (and may involve a /// bunch of sequential IO), it can be parallelized with decoding. - fn start_next_file(&mut self) -> Option)>> { + fn start_next_file( + &mut self, + ) -> Option, ObjectMeta)>> { let part_file = self.file_iter.pop_front()?; let file_meta = FileMeta { - object_meta: part_file.object_meta, + object_meta: part_file.object_meta.clone(), range: part_file.range, extensions: part_file.extensions, metadata_size_hint: part_file.metadata_size_hint, }; Some( - self.file_opener - .open(file_meta) - .map(|future| (future, part_file.partition_values)), + self.file_opener.open(file_meta).map(|future| { + (future, part_file.partition_values, part_file.object_meta) + }), ) } @@ -140,10 +144,11 @@ impl FileStream { self.file_stream_metrics.time_opening.start(); match self.start_next_file().transpose() { - Ok(Some((future, partition_values))) => { + Ok(Some((future, partition_values, object_meta))) => { self.state = FileStreamState::Open { future, partition_values, + object_meta, } } Ok(None) => return Poll::Ready(None), @@ -156,9 +161,11 @@ impl FileStream { FileStreamState::Open { future, partition_values, + object_meta, } => match ready!(future.poll_unpin(cx)) { Ok(reader) => { let partition_values = mem::take(partition_values); + let object_meta = object_meta.clone(); // include time needed to start opening in `start_next_file` self.file_stream_metrics.time_opening.stop(); @@ -167,13 +174,19 @@ impl FileStream { self.file_stream_metrics.time_scanning_total.start(); match next { - Ok(Some((next_future, next_partition_values))) => { + Ok(Some(( + next_future, + next_partition_values, + next_object_meta, + ))) => { self.state = FileStreamState::Scan { partition_values, + object_meta, reader, next: Some(( NextOpen::Pending(next_future), next_partition_values, + next_object_meta, )), }; } @@ -181,6 +194,7 @@ impl FileStream { self.state = FileStreamState::Scan { reader, partition_values, + object_meta, next: None, }; } @@ -208,9 +222,10 @@ impl FileStream { reader, partition_values, next, + object_meta, } => { // We need to poll the next `FileOpenFuture` here to drive it forward - if let Some((next_open_future, _)) = next { + if let Some((next_open_future, _, _)) = next { if let NextOpen::Pending(f) = next_open_future { if let Poll::Ready(reader) = f.as_mut().poll(cx) { *next_open_future = NextOpen::Ready(reader); @@ -222,8 +237,8 @@ impl FileStream { self.file_stream_metrics.time_scanning_until_data.stop(); self.file_stream_metrics.time_scanning_total.stop(); let result = self - .pc_projector - .project(batch, partition_values) + .col_projector + .project(batch, partition_values, object_meta) .map_err(|e| ArrowError::ExternalError(e.into())) .map(|batch| match &mut self.remain { Some(remain) => { @@ -256,7 +271,7 @@ impl FileStream { match self.on_error { // If `OnError::Skip` we skip the file as soon as we hit the first error OnError::Skip => match mem::take(next) { - Some((future, partition_values)) => { + Some((future, partition_values, object_meta)) => { self.file_stream_metrics.time_opening.start(); match future { @@ -264,6 +279,7 @@ impl FileStream { self.state = FileStreamState::Open { future, partition_values, + object_meta, } } NextOpen::Ready(reader) => { @@ -272,6 +288,7 @@ impl FileStream { reader, )), partition_values, + object_meta, } } } @@ -289,7 +306,7 @@ impl FileStream { self.file_stream_metrics.time_scanning_total.stop(); match mem::take(next) { - Some((future, partition_values)) => { + Some((future, partition_values, object_meta)) => { self.file_stream_metrics.time_opening.start(); match future { @@ -297,6 +314,7 @@ impl FileStream { self.state = FileStreamState::Open { future, partition_values, + object_meta, } } NextOpen::Ready(reader) => { @@ -305,6 +323,7 @@ impl FileStream { reader, )), partition_values, + object_meta, } } } @@ -388,19 +407,23 @@ pub enum FileStreamState { future: FileOpenFuture, /// The partition values for this file partition_values: Vec, + /// The object meta for this file + object_meta: ObjectMeta, }, /// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`] /// returned by [`FileOpener::open`] Scan { /// Partitioning column values for the current batch_iter partition_values: Vec, + /// The object metadata for the current file + object_meta: ObjectMeta, /// The reader instance reader: BoxStream<'static, Result>, /// A [`FileOpenFuture`] for the next file to be processed, /// and its corresponding partition column values, if any. /// This allows the next file to be opened in parallel while the /// current file is read. - next: Option<(NextOpen, Vec)>, + next: Option<(NextOpen, Vec, ObjectMeta)>, }, /// Encountered an error Error, @@ -536,7 +559,7 @@ mod tests { use crate::file_meta::FileMeta; use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError}; use crate::test_util::MockSource; - use arrow::array::RecordBatch; + use arrow::array::{Array, RecordBatch}; use arrow::datatypes::Schema; use datafusion_common::{assert_batches_eq, internal_err}; @@ -983,4 +1006,321 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_extended_column_projector() -> Result<()> { + use crate::file_scan_config::ExtendedColumnProjector; + use crate::metadata::MetadataColumn; + use arrow::array::{StringArray, UInt64Array}; + use arrow::datatypes::{DataType, Field, Schema}; + use chrono::Utc; + use datafusion_common::ScalarValue; + use object_store::path::Path; + use object_store::ObjectMeta; + use std::sync::Arc; + + // Create a simple file batch + let file_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Utf8, true), + ])); + + let file_batch = RecordBatch::try_new( + file_schema.clone(), + vec![ + Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ], + )?; + + // Create a test object meta + let timestamp = Utc::now(); + let object_meta = ObjectMeta { + location: Path::from("test/file.parquet"), + last_modified: timestamp, + size: 1024, + e_tag: None, + version: None, + }; + + // Test 1: Projector with partition columns + + // Create a schema that includes both file columns and a partition column + let schema_with_partition = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Utf8, true), + Field::new("year", DataType::Utf8, true), + ])); + + // Create the partition values + let partition_values = vec![ScalarValue::Utf8(Some("2023".to_string()))]; + + // Create the projector + let mut projector = ExtendedColumnProjector::new( + schema_with_partition.clone(), + &["year".to_string()], + &[], + ); + + // Project the batch + let projected_batch = + projector.project(file_batch.clone(), &partition_values, &object_meta)?; + + // Verify the projected batch + assert_eq!(projected_batch.schema().fields().len(), 3); + assert_eq!(projected_batch.num_rows(), 3); + assert_eq!(projected_batch.column(0).len(), 3); + assert_eq!(projected_batch.column(1).len(), 3); + assert_eq!(projected_batch.column(2).len(), 3); + + // Check the partition column + let year_col = projected_batch.column(2); + let year_array = year_col.as_any().downcast_ref::().unwrap(); + assert_eq!(year_array.value(0), "2023"); + assert_eq!(year_array.value(1), "2023"); + assert_eq!(year_array.value(2), "2023"); + + // Test 2: Projector with metadata columns + + // Create a schema that includes both file columns and metadata columns + let schema_with_metadata = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Utf8, true), + Field::new("location", DataType::Utf8, true), + Field::new("size", DataType::UInt64, true), + ])); + + // Create the projector with metadata columns + let mut projector = ExtendedColumnProjector::new( + schema_with_metadata.clone(), + &[], + &[MetadataColumn::Location, MetadataColumn::Size], + ); + + // Project the batch + let projected_batch = projector.project(file_batch.clone(), &[], &object_meta)?; + + // Verify the projected batch + assert_eq!(projected_batch.schema().fields().len(), 4); + assert_eq!(projected_batch.num_rows(), 3); + + // Check the metadata columns + let location_col = projected_batch.column(2); + let location_array = location_col.as_any().downcast_ref::().unwrap(); + assert_eq!(location_array.value(0), "test/file.parquet"); + assert_eq!(location_array.value(1), "test/file.parquet"); + assert_eq!(location_array.value(2), "test/file.parquet"); + + let size_col = projected_batch.column(3); + let size_array = size_col.as_any().downcast_ref::().unwrap(); + assert_eq!(size_array.value(0), 1024); + assert_eq!(size_array.value(1), 1024); + assert_eq!(size_array.value(2), 1024); + + // Test 3: Projector with both partition and metadata columns + + // Create a schema that includes file, partition, and metadata columns + let schema_combined = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Utf8, true), + Field::new("year", DataType::Utf8, true), + Field::new("location", DataType::Utf8, true), + Field::new("size", DataType::UInt64, true), + ])); + + // Create the projector + let mut projector = ExtendedColumnProjector::new( + schema_combined.clone(), + &["year".to_string()], + &[MetadataColumn::Location, MetadataColumn::Size], + ); + + // Project the batch + let projected_batch = + projector.project(file_batch.clone(), &partition_values, &object_meta)?; + + // Verify the projected batch + assert_eq!(projected_batch.schema().fields().len(), 5); + assert_eq!(projected_batch.num_rows(), 3); + + // Check the partition column + let year_col = projected_batch.column(2); + let year_array = year_col.as_any().downcast_ref::().unwrap(); + assert_eq!(year_array.value(0), "2023"); + + // Check the metadata columns + let location_col = projected_batch.column(3); + let location_array = location_col.as_any().downcast_ref::().unwrap(); + assert_eq!(location_array.value(0), "test/file.parquet"); + + let size_col = projected_batch.column(4); + let size_array = size_col.as_any().downcast_ref::().unwrap(); + assert_eq!(size_array.value(0), 1024); + + // Test 4: Projector with columns in mixed order + + // Create a schema with columns in a different order + let schema_mixed = Arc::new(Schema::new(vec![ + Field::new("location", DataType::Utf8, true), // metadata column first + Field::new("id", DataType::Int32, false), // file column + Field::new("year", DataType::Utf8, true), // partition column + Field::new("value", DataType::Utf8, true), // file column + Field::new("size", DataType::UInt64, true), // metadata column last + ])); + + // Create the projector + let mut projector = ExtendedColumnProjector::new( + schema_mixed.clone(), + &["year".to_string()], + &[MetadataColumn::Location, MetadataColumn::Size], + ); + + // We need to reorder the file batch to match the expected file columns in the mixed schema + // The expected order is: [id, value] + let file_batch_reordered = RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Utf8, true), + ])), + vec![ + Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ], + )?; + + // Project the batch + let projected_batch = + projector.project(file_batch_reordered, &partition_values, &object_meta)?; + + // Verify the projected batch + assert_eq!(projected_batch.schema().fields().len(), 5); + assert_eq!(projected_batch.num_rows(), 3); + + let schema = projected_batch.schema(); + let field_indices: std::collections::HashMap<_, _> = schema + .fields() + .iter() + .enumerate() + .map(|(i, f)| (f.name().as_str(), i)) + .collect(); + + // Check location column + let location_idx = *field_indices.get("location").unwrap(); + let location_col = projected_batch.column(location_idx); + let location_array = location_col.as_any().downcast_ref::().unwrap(); + assert_eq!(location_array.value(0), "test/file.parquet"); + + // Check ID column + let id_idx = *field_indices.get("id").unwrap(); + let id_col = projected_batch.column(id_idx); + let id_array = id_col + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_array.value(0), 1); + + let year_idx = *field_indices.get("year").unwrap(); + let year_col = projected_batch.column(year_idx); + let year_array = year_col.as_any().downcast_ref::().unwrap(); + assert_eq!(year_array.value(0), "a"); + + // Check value column + let value_idx = *field_indices.get("value").unwrap(); + let value_col = projected_batch.column(value_idx); + let value_array = value_col.as_any().downcast_ref::().unwrap(); + assert_eq!(value_array.value(0), "2023"); + + // Check size column + let size_idx = *field_indices.get("size").unwrap(); + let size_col = projected_batch.column(size_idx); + let size_array = size_col.as_any().downcast_ref::().unwrap(); + assert_eq!(size_array.value(0), 1024); + + // Test 5: Test with dictionary-encoded partition values + + // Create a schema with dictionary type for the partition column + // We need to use UInt16 as the key type to match the implementation's default + let schema_with_dict = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Utf8, true), + Field::new( + "year", + DataType::Dictionary( + Box::new(DataType::UInt16), + Box::new(DataType::Utf8), + ), + true, + ), + ])); + + // Create the partition values with dictionary encoding + // Must use the same UInt16 key type to match + let partition_values = vec![ScalarValue::Dictionary( + Box::new(DataType::UInt16), + Box::new(ScalarValue::Utf8(Some("2023".to_string()))), + )]; + + // Create the projector + let mut projector = ExtendedColumnProjector::new( + schema_with_dict.clone(), + &["year".to_string()], + &[], + ); + + // Project the batch + let projected_batch = + projector.project(file_batch.clone(), &partition_values, &object_meta)?; + + // Verify the projected batch + assert_eq!(projected_batch.schema().fields().len(), 3); + assert_eq!(projected_batch.num_rows(), 3); + + // Check that the partition column has the correct dictionary type + { + let schema_ref = projected_batch.schema(); + let year_field = schema_ref.field(2); + + if let DataType::Dictionary(key_type, _) = year_field.data_type() { + // Ensure the key type is UInt16 as expected + assert_eq!(**key_type, DataType::UInt16); + } else { + panic!("Expected Dictionary type, got {:?}", year_field.data_type()); + } + } + + // Test 6: Auto-fix for non-dictionary partition values + + // Create a projector expecting dictionary-encoded values + let mut projector = ExtendedColumnProjector::new( + schema_with_dict.clone(), + &["year".to_string()], + &[], + ); + + // Use non-dictionary values (the projector should auto-fix) + let non_dict_values = vec![ScalarValue::Utf8(Some("2023".to_string()))]; + + // Project the batch + let projected_batch = + projector.project(file_batch.clone(), &non_dict_values, &object_meta)?; + + // Verify the projected batch + assert_eq!(projected_batch.schema().fields().len(), 3); + assert_eq!(projected_batch.num_rows(), 3); + + // Check that the partition column has the correct dictionary type + { + let schema_ref = projected_batch.schema(); + let year_field = schema_ref.field(2); + + if let DataType::Dictionary(key_type, _) = year_field.data_type() { + // Ensure the key type is UInt16 as expected + assert_eq!(**key_type, DataType::UInt16); + } else { + panic!("Expected Dictionary type, got {:?}", year_field.data_type()); + } + } + + Ok(()) + } } diff --git a/datafusion/datasource/src/metadata.rs b/datafusion/datasource/src/metadata.rs new file mode 100644 index 000000000000..5afbae52fffc --- /dev/null +++ b/datafusion/datasource/src/metadata.rs @@ -0,0 +1,348 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Functions that support extracting metadata from files based on their ObjectMeta. + +use std::fmt; +use std::str::FromStr; +use std::sync::Arc; + +use datafusion_common::plan_err; +use datafusion_common::Result; + +use arrow::{ + array::{Array, StringBuilder, TimestampMicrosecondBuilder, UInt64Builder}, + datatypes::{DataType, Field, TimeUnit}, +}; +use datafusion_common::ScalarValue; + +use datafusion_common::DataFusionError; +use object_store::ObjectMeta; + +/// A metadata column that can be used to filter files +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum MetadataColumn { + /// The location of the file in object store + Location, + /// The last modified timestamp of the file + LastModified, + /// The size of the file in bytes + Size, +} + +impl fmt::Display for MetadataColumn { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.name()) + } +} + +impl MetadataColumn { + /// The name of the metadata column (one of `location`, `last_modified`, or `size`) + pub fn name(&self) -> &str { + match self { + MetadataColumn::Location => "location", + MetadataColumn::LastModified => "last_modified", + MetadataColumn::Size => "size", + } + } + + /// Returns the arrow type of this metadata column + pub fn arrow_type(&self) -> DataType { + match self { + MetadataColumn::Location => DataType::Utf8, + MetadataColumn::LastModified => { + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())) + } + MetadataColumn::Size => DataType::UInt64, + } + } + + /// Returns the arrow field for this metadata column + pub fn field(&self) -> Field { + Field::new(self.to_string(), self.arrow_type(), true) + } + + /// Returns the scalar value for this metadata column given an object meta + pub fn to_scalar_value(&self, meta: &ObjectMeta) -> ScalarValue { + match self { + MetadataColumn::Location => { + ScalarValue::Utf8(Some(meta.location.to_string())) + } + MetadataColumn::LastModified => ScalarValue::TimestampMicrosecond( + Some(meta.last_modified.timestamp_micros()), + Some("UTC".into()), + ), + MetadataColumn::Size => ScalarValue::UInt64(Some(meta.size)), + } + } + + pub fn builder(&self, capacity: usize) -> MetadataBuilder { + match self { + MetadataColumn::Location => MetadataBuilder::Location( + StringBuilder::with_capacity(capacity, capacity * 10), + ), + MetadataColumn::LastModified => MetadataBuilder::LastModified( + TimestampMicrosecondBuilder::with_capacity(capacity).with_data_type( + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + ), + ), + MetadataColumn::Size => { + MetadataBuilder::Size(UInt64Builder::with_capacity(capacity)) + } + } + } +} + +impl FromStr for MetadataColumn { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "location" => Ok(MetadataColumn::Location), + "last_modified" => Ok(MetadataColumn::LastModified), + "size" => Ok(MetadataColumn::Size), + _ => plan_err!( + "Invalid metadata column: {}, expected: location, last_modified, or size", + s + ), + } + } +} + +pub enum MetadataBuilder { + Location(StringBuilder), + LastModified(TimestampMicrosecondBuilder), + Size(UInt64Builder), +} + +impl MetadataBuilder { + pub fn append(&mut self, meta: &ObjectMeta) { + match self { + Self::Location(builder) => builder.append_value(&meta.location), + Self::LastModified(builder) => { + builder.append_value(meta.last_modified.timestamp_micros()) + } + Self::Size(builder) => builder.append_value(meta.size), + } + } + + pub fn finish(self) -> Arc { + match self { + MetadataBuilder::Location(mut builder) => Arc::new(builder.finish()), + MetadataBuilder::LastModified(mut builder) => Arc::new(builder.finish()), + MetadataBuilder::Size(mut builder) => Arc::new(builder.finish()), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{StringArray, TimestampMicrosecondArray, UInt64Array}; + use chrono::{DateTime, TimeZone, Utc}; + use object_store::path::Path; + use std::str::FromStr; + + // Helper function to create a test ObjectMeta + fn create_test_object_meta( + path: &str, + size: usize, + timestamp: DateTime, + ) -> ObjectMeta { + ObjectMeta { + location: Path::from(path), + last_modified: timestamp, + size: size.try_into().unwrap(), + e_tag: None, + version: None, + } + } + + #[test] + fn test_metadata_column_name() { + assert_eq!(MetadataColumn::Location.name(), "location"); + assert_eq!(MetadataColumn::LastModified.name(), "last_modified"); + assert_eq!(MetadataColumn::Size.name(), "size"); + } + + #[test] + fn test_metadata_column_arrow_type() { + assert_eq!(MetadataColumn::Location.arrow_type(), DataType::Utf8); + assert_eq!( + MetadataColumn::LastModified.arrow_type(), + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())) + ); + assert_eq!(MetadataColumn::Size.arrow_type(), DataType::UInt64); + } + + #[test] + fn test_metadata_column_field() { + let field = MetadataColumn::Location.field(); + assert_eq!(field.name(), "location"); + assert_eq!(field.data_type(), &DataType::Utf8); + assert!(field.is_nullable()); + + let field = MetadataColumn::LastModified.field(); + assert_eq!(field.name(), "last_modified"); + assert_eq!( + field.data_type(), + &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())) + ); + assert!(field.is_nullable()); + + let field = MetadataColumn::Size.field(); + assert_eq!(field.name(), "size"); + assert_eq!(field.data_type(), &DataType::UInt64); + assert!(field.is_nullable()); + } + + #[test] + fn test_metadata_column_to_scalar_value() { + let timestamp = Utc.with_ymd_and_hms(2023, 1, 1, 10, 0, 0).unwrap(); + let meta = create_test_object_meta("test/file.parquet", 1024, timestamp); + + // Test Location scalar value + let scalar = MetadataColumn::Location.to_scalar_value(&meta); + assert_eq!( + scalar, + ScalarValue::Utf8(Some("test/file.parquet".to_string())) + ); + + // Test LastModified scalar value + let scalar = MetadataColumn::LastModified.to_scalar_value(&meta); + assert_eq!( + scalar, + ScalarValue::TimestampMicrosecond( + Some(timestamp.timestamp_micros()), + Some("UTC".into()) + ) + ); + + // Test Size scalar value + let scalar = MetadataColumn::Size.to_scalar_value(&meta); + assert_eq!(scalar, ScalarValue::UInt64(Some(1024))); + } + + #[test] + fn test_metadata_column_from_str() { + // Test valid values + assert_eq!( + MetadataColumn::from_str("location").unwrap(), + MetadataColumn::Location + ); + assert_eq!( + MetadataColumn::from_str("last_modified").unwrap(), + MetadataColumn::LastModified + ); + assert_eq!( + MetadataColumn::from_str("size").unwrap(), + MetadataColumn::Size + ); + + // Test invalid value + let err = MetadataColumn::from_str("invalid").unwrap_err(); + assert!(err.to_string().contains("Invalid metadata column")); + } + + #[test] + fn test_metadata_column_display() { + assert_eq!(format!("{}", MetadataColumn::Location), "location"); + assert_eq!(format!("{}", MetadataColumn::LastModified), "last_modified"); + assert_eq!(format!("{}", MetadataColumn::Size), "size"); + } + + #[test] + fn test_metadata_builder_location() { + let timestamp = Utc.with_ymd_and_hms(2023, 1, 1, 10, 0, 0).unwrap(); + let meta1 = create_test_object_meta("file1.parquet", 1024, timestamp); + let meta2 = create_test_object_meta("file2.parquet", 2048, timestamp); + + // Create a location builder and append values + let mut builder = MetadataColumn::Location.builder(2); + builder.append(&meta1); + builder.append(&meta2); + + // Finish and check the array + let array = builder.finish(); + let string_array = array.as_any().downcast_ref::().unwrap(); + + assert_eq!(string_array.len(), 2); + assert_eq!(string_array.value(0), "file1.parquet"); + assert_eq!(string_array.value(1), "file2.parquet"); + } + + #[test] + fn test_metadata_builder_last_modified() { + let timestamp1 = Utc.with_ymd_and_hms(2023, 1, 1, 10, 0, 0).unwrap(); + let timestamp2 = Utc.with_ymd_and_hms(2023, 1, 2, 10, 0, 0).unwrap(); + + let meta1 = create_test_object_meta("file1.parquet", 1024, timestamp1); + let meta2 = create_test_object_meta("file2.parquet", 2048, timestamp2); + + // Create a last_modified builder and append values + let mut builder = MetadataColumn::LastModified.builder(2); + builder.append(&meta1); + builder.append(&meta2); + + // Finish and check the array + let array = builder.finish(); + let ts_array = array + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(ts_array.len(), 2); + assert_eq!(ts_array.value(0), timestamp1.timestamp_micros()); + assert_eq!(ts_array.value(1), timestamp2.timestamp_micros()); + } + + #[test] + fn test_metadata_builder_size() { + let timestamp = Utc.with_ymd_and_hms(2023, 1, 1, 10, 0, 0).unwrap(); + let meta1 = create_test_object_meta("file1.parquet", 1024, timestamp); + let meta2 = create_test_object_meta("file2.parquet", 2048, timestamp); + + // Create a size builder and append values + let mut builder = MetadataColumn::Size.builder(2); + builder.append(&meta1); + builder.append(&meta2); + + // Finish and check the array + let array = builder.finish(); + let uint64_array = array.as_any().downcast_ref::().unwrap(); + + assert_eq!(uint64_array.len(), 2); + assert_eq!(uint64_array.value(0), 1024); + assert_eq!(uint64_array.value(1), 2048); + } + + #[test] + fn test_metadata_builder_empty() { + // Test with empty builders + let location_builder = MetadataColumn::Location.builder(0); + let location_array = location_builder.finish(); + assert_eq!(location_array.len(), 0); + + let last_modified_builder = MetadataColumn::LastModified.builder(0); + let last_modified_array = last_modified_builder.finish(); + assert_eq!(last_modified_array.len(), 0); + + let size_builder = MetadataColumn::Size.builder(0); + let size_array = size_builder.finish(); + assert_eq!(size_array.len(), 0); + } +} diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 3e44851d145b..746b023b5949 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -38,6 +38,7 @@ pub mod file_scan_config; pub mod file_sink_config; pub mod file_stream; pub mod memory; +pub mod metadata; pub mod schema_adapter; pub mod sink; pub mod source;