Skip to content

Commit ebf5f60

Browse files
committed
feat: handle partition filters via kernel expressions
Signed-off-by: Robert Pack <[email protected]>
1 parent 2340de7 commit ebf5f60

File tree

12 files changed

+519
-408
lines changed

12 files changed

+519
-408
lines changed

crates/core/src/kernel/arrow/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use lazy_static::lazy_static;
1111
pub(crate) mod extract;
1212
pub(crate) mod json;
1313

14+
pub(crate) const LIST_ARRAY_ROOT: &str = "element";
1415
const MAP_ROOT_DEFAULT: &str = "key_value";
1516
const MAP_KEY_DEFAULT: &str = "key";
1617
const MAP_VALUE_DEFAULT: &str = "value";

crates/core/src/kernel/snapshot/log_data.rs

Lines changed: 197 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::borrow::Cow;
2-
use std::collections::HashMap;
32
use std::sync::Arc;
43

4+
use arrow::array::AsArray;
55
use arrow_array::{Array, Int32Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray};
66
use arrow_select::filter::filter_record_batch;
77
use chrono::{DateTime, Utc};
@@ -26,22 +26,10 @@ const COL_MIN_VALUES: &str = "minValues";
2626
const COL_MAX_VALUES: &str = "maxValues";
2727
const COL_NULL_COUNT: &str = "nullCount";
2828

29-
pub(crate) type PartitionFields<'a> = Arc<IndexMap<&'a str, &'a StructField>>;
30-
pub(crate) type PartitionValues<'a> = IndexMap<&'a str, Scalar>;
31-
3229
pub(crate) trait PartitionsExt {
3330
fn hive_partition_path(&self) -> String;
3431
}
3532

36-
impl PartitionsExt for IndexMap<&str, Scalar> {
37-
fn hive_partition_path(&self) -> String {
38-
self.iter()
39-
.map(|(k, v)| format!("{k}={}", v.serialize_encoded()))
40-
.collect::<Vec<_>>()
41-
.join("/")
42-
}
43-
}
44-
4533
impl PartitionsExt for IndexMap<String, Scalar> {
4634
fn hive_partition_path(&self) -> String {
4735
self.iter()
@@ -163,8 +151,6 @@ pub struct LogicalFile<'a> {
163151

164152
/// Pointer to a specific row in the log data.
165153
index: usize,
166-
/// Schema fields the table is partitioned by.
167-
partition_fields: PartitionFields<'a>,
168154
}
169155

170156
impl LogicalFile<'_> {
@@ -214,61 +200,6 @@ impl LogicalFile<'_> {
214200
})
215201
}
216202

217-
/// The partition values for this logical file.
218-
pub fn partition_values(&self) -> DeltaResult<PartitionValues<'_>> {
219-
if self.partition_fields.is_empty() {
220-
return Ok(IndexMap::new());
221-
}
222-
let map_value = self.partition_values.value(self.index);
223-
let keys = map_value
224-
.column(0)
225-
.as_any()
226-
.downcast_ref::<StringArray>()
227-
.ok_or(DeltaTableError::generic(
228-
"expected partition values key field to be of type string",
229-
))?;
230-
let values = map_value
231-
.column(1)
232-
.as_any()
233-
.downcast_ref::<StringArray>()
234-
.ok_or(DeltaTableError::generic(
235-
"expected partition values value field to be of type string",
236-
))?;
237-
238-
let values = keys
239-
.iter()
240-
.zip(values.iter())
241-
.map(|(k, v)| {
242-
let (key, field) = self.partition_fields.get_key_value(k.unwrap()).unwrap();
243-
let field_type = match field.data_type() {
244-
DataType::Primitive(p) => Ok(p),
245-
_ => Err(DeltaTableError::generic(
246-
"nested partitioning values are not supported",
247-
)),
248-
}?;
249-
Ok((
250-
*key,
251-
v.map(|vv| field_type.parse_scalar(vv))
252-
.transpose()?
253-
.unwrap_or(Scalar::Null(field.data_type().clone())),
254-
))
255-
})
256-
.collect::<DeltaResult<HashMap<_, _>>>()?;
257-
258-
// NOTE: we recreate the map as a IndexMap to ensure the order of the keys is consistently
259-
// the same as the order of partition fields.
260-
self.partition_fields
261-
.iter()
262-
.map(|(k, f)| {
263-
let val = values
264-
.get(*k)
265-
.cloned()
266-
.unwrap_or(Scalar::Null(f.data_type.clone()));
267-
Ok((*k, val))
268-
})
269-
.collect::<DeltaResult<IndexMap<_, _>>>()
270-
}
271-
272203
/// Defines a deletion vector
273204
pub fn deletion_vector(&self) -> Option<DeletionVectorView<'_>> {
274205
self.deletion_vector.as_ref().and_then(|arr| {
@@ -346,11 +277,11 @@ impl LogicalFile<'_> {
346277
impl<'a> TryFrom<&LogicalFile<'a>> for ObjectMeta {
347278
type Error = DeltaTableError;
348279

349-
fn try_from(file_stats: &LogicalFile<'a>) -> Result<Self, Self::Error> {
280+
fn try_from(value: &LogicalFile<'a>) -> Result<Self, Self::Error> {
350281
Ok(ObjectMeta {
351-
location: file_stats.object_store_path(),
352-
size: file_stats.size() as usize,
353-
last_modified: file_stats.modification_datetime()?,
282+
location: value.object_store_path(),
283+
size: value.size() as usize,
284+
last_modified: value.modification_datetime()?,
354285
version: None,
355286
e_tag: None,
356287
})
@@ -359,7 +290,6 @@ impl<'a> TryFrom<&LogicalFile<'a>> for ObjectMeta {
359290

360291
/// Helper for processing data from the materialized Delta log.
361292
pub struct FileStatsAccessor<'a> {
362-
partition_fields: PartitionFields<'a>,
363293
data: Arc<RecordBatch>,
364294
sizes: &'a Int64Array,
365295
stats: &'a StructArray,
@@ -425,22 +355,6 @@ impl<'a> FileStatsAccessor<'a> {
425355
let partition_values = extract_and_cast::<MapArray>(data, "add.partitionValues")?;
426356
let partition_values_parsed =
427357
extract_and_cast_opt::<StructArray>(data, "add.partitionValues_parsed");
428-
let partition_fields = Arc::new(
429-
metadata
430-
.partition_columns
431-
.iter()
432-
.map(|c| {
433-
Ok((
434-
c.as_str(),
435-
schema
436-
.field(c.as_str())
437-
.ok_or(DeltaTableError::PartitionError {
438-
partition: c.clone(),
439-
})?,
440-
))
441-
})
442-
.collect::<DeltaResult<IndexMap<_, _>>>()?,
443-
);
444358
let deletion_vector = extract_and_cast_opt::<StructArray>(data, "add.deletionVector");
445359
let deletion_vector = deletion_vector.and_then(|dv| {
446360
if dv.null_count() == dv.len() {
@@ -463,7 +377,6 @@ impl<'a> FileStatsAccessor<'a> {
463377
});
464378

465379
Ok(Self {
466-
partition_fields,
467380
data: Arc::new(result),
468381
sizes,
469382
stats,
@@ -486,7 +399,6 @@ impl<'a> FileStatsAccessor<'a> {
486399
data: self.data.clone(),
487400
partition_values: self.partition_values,
488401
partition_values_parsed: self.partition_values_parsed.clone(),
489-
partition_fields: self.partition_fields.clone(),
490402
stats: self.stats,
491403
deletion_vector: self.deletion_vector.clone(),
492404
index,
@@ -508,6 +420,198 @@ impl<'a> Iterator for FileStatsAccessor<'a> {
508420
}
509421
}
510422

423+
pub struct LogFileView<'a> {
424+
data: &'a RecordBatch,
425+
curr: Option<usize>,
426+
}
427+
428+
impl LogFileView<'_> {
429+
fn index(&self) -> usize {
430+
self.curr.expect("index initialized")
431+
}
432+
433+
/// Path to the files storage location.
434+
pub fn path(&self) -> Cow<'_, str> {
435+
percent_decode_str(pick::<StringArray>(&self.data, 0).value(self.index()))
436+
.decode_utf8_lossy()
437+
}
438+
439+
/// An object store [`Path`] to the file.
440+
///
441+
/// this tries to parse the file string and if that fails, it will return the string as is.
442+
// TODO assert consistent handling of the paths encoding when reading log data so this logic can be removed.
443+
pub fn object_store_path(&self) -> Path {
444+
let path = self.path();
445+
// Try to preserve percent encoding if possible
446+
match Path::parse(path.as_ref()) {
447+
Ok(path) => path,
448+
Err(_) => Path::from(path.as_ref()),
449+
}
450+
}
451+
452+
/// File size stored on disk.
453+
pub fn size(&self) -> i64 {
454+
pick::<Int64Array>(&self.data, 1).value(self.index())
455+
}
456+
457+
/// Last modified time of the file.
458+
pub fn modification_time(&self) -> i64 {
459+
pick::<Int64Array>(&self.data, 2).value(self.index())
460+
}
461+
462+
/// Datetime of the last modification time of the file.
463+
pub fn modification_datetime(&self) -> DeltaResult<chrono::DateTime<Utc>> {
464+
DateTime::from_timestamp_millis(self.modification_time()).ok_or(DeltaTableError::from(
465+
crate::protocol::ProtocolError::InvalidField(format!(
466+
"invalid modification_time: {:?}",
467+
self.modification_time()
468+
)),
469+
))
470+
}
471+
472+
pub fn partition_values(&self) -> Option<StructData> {
473+
self.data
474+
.column_by_name("partition_values")
475+
.and_then(|c| c.as_struct_opt())
476+
.and_then(|arr| match Scalar::from_array(arr, self.index()) {
477+
Some(Scalar::Struct(s)) => Some(s),
478+
_ => None,
479+
})
480+
}
481+
482+
fn stats(&self) -> Option<&StructArray> {
483+
self.data
484+
.column_by_name("stats")
485+
.and_then(|c| c.as_struct_opt())
486+
}
487+
488+
/// The number of records stored in the data file.
489+
pub fn num_records(&self) -> Option<usize> {
490+
self.stats().and_then(|c| {
491+
c.column_by_name(COL_NUM_RECORDS)
492+
.and_then(|c| c.as_any().downcast_ref::<Int64Array>())
493+
.map(|a| a.value(self.index()) as usize)
494+
})
495+
}
496+
497+
/// Struct containing all available null counts for the columns in this file.
498+
pub fn null_counts(&self) -> Option<Scalar> {
499+
self.stats().and_then(|c| {
500+
c.column_by_name(COL_NULL_COUNT)
501+
.and_then(|c| Scalar::from_array(c.as_ref(), self.index()))
502+
})
503+
}
504+
505+
/// Struct containing all available min values for the columns in this file.
506+
pub fn min_values(&self) -> Option<Scalar> {
507+
self.stats().and_then(|c| {
508+
c.column_by_name(COL_MIN_VALUES)
509+
.and_then(|c| Scalar::from_array(c.as_ref(), self.index()))
510+
})
511+
}
512+
513+
/// Struct containing all available max values for the columns in this file.
514+
pub fn max_values(&self) -> Option<Scalar> {
515+
self.stats().and_then(|c| {
516+
c.column_by_name(COL_MAX_VALUES)
517+
.and_then(|c| Scalar::from_array(c.as_ref(), self.index()))
518+
})
519+
}
520+
}
521+
522+
impl<'a> Iterator for LogFileView<'a> {
523+
type Item = LogFileView<'a>;
524+
525+
fn next(&mut self) -> Option<Self::Item> {
526+
if self.data.num_rows() < 1 {
527+
return None;
528+
}
529+
if self.curr.is_some() && self.index() >= self.data.num_rows() - 1 {
530+
return None;
531+
}
532+
self.curr = self.curr.map(|c| c + 1).or(Some(0));
533+
Some(Self {
534+
data: self.data,
535+
curr: self.curr,
536+
})
537+
}
538+
}
539+
540+
impl<'a> TryFrom<&LogFileView<'a>> for ObjectMeta {
541+
type Error = DeltaTableError;
542+
543+
fn try_from(value: &LogFileView<'a>) -> Result<Self, Self::Error> {
544+
Ok(ObjectMeta {
545+
location: value.object_store_path(),
546+
size: value.size() as usize,
547+
last_modified: value.modification_datetime()?,
548+
version: None,
549+
e_tag: None,
550+
})
551+
}
552+
}
553+
554+
pub struct LogDataView {
555+
data: RecordBatch,
556+
metadata: Metadata,
557+
schema: StructType,
558+
}
559+
560+
impl LogDataView {
561+
pub(crate) fn new(data: RecordBatch, metadata: Metadata, schema: StructType) -> Self {
562+
Self {
563+
data,
564+
metadata,
565+
schema,
566+
}
567+
}
568+
569+
fn partition_data(&self) -> Option<RecordBatch> {
570+
self.data
571+
.column_by_name("partition_values")
572+
.and_then(|c| c.as_any().downcast_ref::<StructArray>())
573+
.map(|c| c.into())
574+
}
575+
576+
pub fn with_partition_filter(self, predicate: Option<&Expression>) -> DeltaResult<Self> {
577+
if let (Some(pred), Some(data)) = (predicate, self.partition_data()) {
578+
let data = ArrowEngineData::new(data);
579+
let evaluator = ARROW_HANDLER.get_evaluator(
580+
Arc::new(data.record_batch().schema_ref().as_ref().try_into()?),
581+
pred.clone(),
582+
DataType::BOOLEAN,
583+
);
584+
let result = ArrowEngineData::try_from_engine_data(evaluator.evaluate(&data)?)?;
585+
let filter = result.record_batch().column(0).as_boolean();
586+
return Ok(Self {
587+
data: filter_record_batch(&self.data, filter)?,
588+
metadata: self.metadata,
589+
schema: self.schema,
590+
});
591+
}
592+
Ok(self)
593+
}
594+
595+
pub fn iter(&self) -> impl Iterator<Item = LogFileView<'_>> {
596+
LogFileView {
597+
data: &self.data,
598+
curr: None,
599+
}
600+
}
601+
}
602+
603+
impl<'a> IntoIterator for &'a LogDataView {
604+
type Item = LogFileView<'a>;
605+
type IntoIter = LogFileView<'a>;
606+
607+
fn into_iter(self) -> Self::IntoIter {
608+
LogFileView {
609+
data: &self.data,
610+
curr: None,
611+
}
612+
}
613+
}
614+
511615
/// Provides semanitc access to the log data.
512616
///
513617
/// This is a helper struct that provides access to the log data in a more semantic way

0 commit comments

Comments
 (0)