Skip to content

Commit eda3a7e

Browse files
Support for metadata columns (location, size, last_modified) in ListingTableProvider (#74)
* Initial work on metadata columns * Metadata filtering working * Working on plumbing to file scan config * wip * All wired up * Working! * Use MetadataColumn enum * Add integration tests for metadata selection + pushdown filtering UPSTREAM NOTE: This PR was submitted upstream: apache#15181
1 parent 7ca928b commit eda3a7e

File tree

7 files changed

+722
-117
lines changed

7 files changed

+722
-117
lines changed

datafusion/core/src/datasource/listing/helpers.rs

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ use datafusion_common::{HashMap, Result, ScalarValue};
2828
use datafusion_expr::{BinaryExpr, Operator};
2929

3030
use arrow::{
31-
array::{Array, ArrayRef, AsArray, StringBuilder},
31+
array::{Array, ArrayRef, AsArray, BooleanArray, StringBuilder},
32+
buffer::BooleanBuffer,
3233
compute::{and, cast, prep_null_mask_filter},
3334
datatypes::{DataType, Field, Schema},
3435
record_batch::RecordBatch,
@@ -277,26 +278,59 @@ async fn prune_partitions(
277278
.collect();
278279
let schema = Arc::new(Schema::new(fields));
279280

280-
let df_schema = DFSchema::from_unqualified_fields(
281-
partition_cols
282-
.iter()
283-
.map(|(n, d)| Field::new(n, d.clone(), true))
284-
.collect(),
285-
Default::default(),
286-
)?;
287-
288281
let batch = RecordBatch::try_new(schema, arrays)?;
289282

290283
// TODO: Plumb this down
291284
let props = ExecutionProps::new();
292285

286+
// Don't retain partitions that evaluated to null
287+
let prepared = apply_filters(&batch, filters, &props)?;
288+
289+
// If all rows are retained, return all partitions
290+
if prepared.true_count() == prepared.len() {
291+
return Ok(partitions);
292+
}
293+
294+
// Sanity check
295+
assert_eq!(prepared.len(), partitions.len());
296+
297+
let filtered = partitions
298+
.into_iter()
299+
.zip(prepared.values())
300+
.filter_map(|(p, f)| f.then_some(p))
301+
.collect();
302+
303+
Ok(filtered)
304+
}
305+
306+
/// Applies the given filters to the input batch and returns a boolean mask that represents
307+
/// the result of the filters applied to each row.
308+
pub(crate) fn apply_filters(
309+
batch: &RecordBatch,
310+
filters: &[Expr],
311+
props: &ExecutionProps,
312+
) -> Result<BooleanArray> {
313+
if filters.is_empty() {
314+
return Ok(BooleanArray::new(
315+
BooleanBuffer::new_set(batch.num_rows()),
316+
None,
317+
));
318+
}
319+
320+
let num_rows = batch.num_rows();
321+
322+
let df_schema = DFSchema::from_unqualified_fields(
323+
batch.schema().fields().clone(),
324+
HashMap::default(),
325+
)?;
326+
293327
// Applies `filter` to `batch` returning `None` on error
294328
let do_filter = |filter| -> Result<ArrayRef> {
295-
let expr = create_physical_expr(filter, &df_schema, &props)?;
296-
expr.evaluate(&batch)?.into_array(partitions.len())
329+
let expr = create_physical_expr(filter, &df_schema, props)?;
330+
expr.evaluate(batch)?.into_array(num_rows)
297331
};
298332

299-
//.Compute the conjunction of the filters
333+
// Compute the conjunction of the filters
300334
let mask = filters
301335
.iter()
302336
.map(|f| do_filter(f).map(|a| a.as_boolean().clone()))
@@ -305,25 +339,16 @@ async fn prune_partitions(
305339
let mask = match mask {
306340
Some(Ok(mask)) => mask,
307341
Some(Err(err)) => return Err(err),
308-
None => return Ok(partitions),
342+
None => return Ok(BooleanArray::new(BooleanBuffer::new_set(num_rows), None)),
309343
};
310344

311-
// Don't retain partitions that evaluated to null
345+
// Don't retain rows that evaluated to null
312346
let prepared = match mask.null_count() {
313347
0 => mask,
314348
_ => prep_null_mask_filter(&mask),
315349
};
316350

317-
// Sanity check
318-
assert_eq!(prepared.len(), partitions.len());
319-
320-
let filtered = partitions
321-
.into_iter()
322-
.zip(prepared.values())
323-
.filter_map(|(p, f)| f.then_some(p))
324-
.collect();
325-
326-
Ok(filtered)
351+
Ok(prepared)
327352
}
328353

329354
#[derive(Debug)]
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Functions that support extracting metadata from files.
19+
20+
use std::fmt;
21+
use std::str::FromStr;
22+
use std::sync::Arc;
23+
24+
use super::PartitionedFile;
25+
use crate::datasource::listing::helpers::apply_filters;
26+
use datafusion_common::plan_err;
27+
use datafusion_common::Result;
28+
29+
use arrow::{
30+
array::{Array, StringBuilder, TimestampMicrosecondBuilder, UInt64Builder},
31+
datatypes::{DataType, Field, Schema, TimeUnit},
32+
record_batch::RecordBatch,
33+
};
34+
use arrow_schema::Fields;
35+
use datafusion_common::ScalarValue;
36+
use datafusion_expr::execution_props::ExecutionProps;
37+
38+
use datafusion_common::DataFusionError;
39+
use datafusion_expr::Expr;
40+
use object_store::ObjectMeta;
41+
42+
/// A metadata column that can be used to filter files
43+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
44+
pub enum MetadataColumn {
45+
/// The location of the file in object store
46+
Location,
47+
/// The last modified timestamp of the file
48+
LastModified,
49+
/// The size of the file in bytes
50+
Size,
51+
}
52+
53+
impl fmt::Display for MetadataColumn {
54+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55+
write!(f, "{}", self.name())
56+
}
57+
}
58+
59+
impl MetadataColumn {
60+
/// The name of the metadata column (one of `location`, `last_modified`, or `size`)
61+
pub fn name(&self) -> &str {
62+
match self {
63+
MetadataColumn::Location => "location",
64+
MetadataColumn::LastModified => "last_modified",
65+
MetadataColumn::Size => "size",
66+
}
67+
}
68+
69+
/// Returns the arrow type of this metadata column
70+
pub fn arrow_type(&self) -> DataType {
71+
match self {
72+
MetadataColumn::Location => DataType::Utf8,
73+
MetadataColumn::LastModified => {
74+
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
75+
}
76+
MetadataColumn::Size => DataType::UInt64,
77+
}
78+
}
79+
80+
/// Returns the arrow field for this metadata column
81+
pub fn field(&self) -> Field {
82+
Field::new(self.to_string(), self.arrow_type(), true)
83+
}
84+
85+
/// Returns the scalar value for this metadata column given an object meta
86+
pub fn to_scalar_value(&self, meta: &ObjectMeta) -> ScalarValue {
87+
match self {
88+
MetadataColumn::Location => {
89+
ScalarValue::Utf8(Some(meta.location.to_string()))
90+
}
91+
MetadataColumn::LastModified => ScalarValue::TimestampMicrosecond(
92+
Some(meta.last_modified.timestamp_micros()),
93+
Some("UTC".into()),
94+
),
95+
MetadataColumn::Size => ScalarValue::UInt64(Some(meta.size as u64)),
96+
}
97+
}
98+
99+
pub(crate) fn builder(&self, capacity: usize) -> MetadataBuilder {
100+
match self {
101+
MetadataColumn::Location => MetadataBuilder::Location(
102+
StringBuilder::with_capacity(capacity, capacity * 10),
103+
),
104+
MetadataColumn::LastModified => MetadataBuilder::LastModified(
105+
TimestampMicrosecondBuilder::with_capacity(capacity).with_data_type(
106+
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
107+
),
108+
),
109+
MetadataColumn::Size => {
110+
MetadataBuilder::Size(UInt64Builder::with_capacity(capacity))
111+
}
112+
}
113+
}
114+
}
115+
116+
impl FromStr for MetadataColumn {
117+
type Err = DataFusionError;
118+
119+
fn from_str(s: &str) -> Result<Self, Self::Err> {
120+
match s {
121+
"location" => Ok(MetadataColumn::Location),
122+
"last_modified" => Ok(MetadataColumn::LastModified),
123+
"size" => Ok(MetadataColumn::Size),
124+
_ => plan_err!(
125+
"Invalid metadata column: {}, expected: location, last_modified, or size",
126+
s
127+
),
128+
}
129+
}
130+
}
131+
132+
pub(crate) enum MetadataBuilder {
133+
Location(StringBuilder),
134+
LastModified(TimestampMicrosecondBuilder),
135+
Size(UInt64Builder),
136+
}
137+
138+
impl MetadataBuilder {
139+
pub fn append(&mut self, meta: &ObjectMeta) {
140+
match self {
141+
Self::Location(builder) => builder.append_value(&meta.location),
142+
Self::LastModified(builder) => {
143+
builder.append_value(meta.last_modified.timestamp_micros())
144+
}
145+
Self::Size(builder) => builder.append_value(meta.size as u64),
146+
}
147+
}
148+
149+
pub fn finish(self) -> Arc<dyn Array> {
150+
match self {
151+
MetadataBuilder::Location(mut builder) => Arc::new(builder.finish()),
152+
MetadataBuilder::LastModified(mut builder) => Arc::new(builder.finish()),
153+
MetadataBuilder::Size(mut builder) => Arc::new(builder.finish()),
154+
}
155+
}
156+
}
157+
158+
/// Determine if the given file matches the input metadata filters.
159+
/// `filters` should only contain expressions that can be evaluated
160+
/// using only the metadata columns.
161+
pub(crate) fn apply_metadata_filters(
162+
file: PartitionedFile,
163+
filters: &[Expr],
164+
metadata_cols: &[MetadataColumn],
165+
) -> Result<Option<PartitionedFile>> {
166+
// if no metadata col => simply return all the files
167+
if metadata_cols.is_empty() {
168+
return Ok(Some(file));
169+
}
170+
171+
let mut builders: Vec<_> = metadata_cols.iter().map(|col| col.builder(1)).collect();
172+
173+
for builder in builders.iter_mut() {
174+
builder.append(&file.object_meta);
175+
}
176+
177+
let arrays = builders
178+
.into_iter()
179+
.map(|builder| builder.finish())
180+
.collect::<Vec<_>>();
181+
182+
let fields: Fields = metadata_cols
183+
.iter()
184+
.map(|col| Field::new(col.to_string(), col.arrow_type(), true))
185+
.collect();
186+
let schema = Arc::new(Schema::new(fields));
187+
188+
let batch = RecordBatch::try_new(schema, arrays)?;
189+
190+
// TODO: Plumb this down
191+
let props = ExecutionProps::new();
192+
193+
// Don't retain rows that evaluated to null
194+
let prepared = apply_filters(&batch, filters, &props)?;
195+
196+
// If the filter evaluates to true, return the file
197+
if prepared.true_count() == 1 {
198+
return Ok(Some(file));
199+
}
200+
201+
Ok(None)
202+
}

datafusion/core/src/datasource/listing/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
//! to get the list of files to process.
2020
2121
mod helpers;
22+
mod metadata;
2223
mod table;
2324
mod url;
2425

@@ -31,6 +32,7 @@ use std::pin::Pin;
3132
use std::sync::Arc;
3233

3334
pub use self::url::ListingTableUrl;
35+
pub use metadata::MetadataColumn;
3436
pub use table::{ListingOptions, ListingTable, ListingTableConfig};
3537

3638
/// Stream of files get listed from object store

0 commit comments

Comments
 (0)