Skip to content

Commit 2340de7

Browse files
committed
chore: migrate most partition value callsites to scalar
Signed-off-by: Robert Pack <[email protected]>
1 parent d59fe0b commit 2340de7

File tree

7 files changed

+159
-106
lines changed

7 files changed

+159
-106
lines changed

crates/core/src/kernel/snapshot/log_data.rs

Lines changed: 102 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use std::sync::Arc;
55
use arrow_array::{Array, Int32Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray};
66
use arrow_select::filter::filter_record_batch;
77
use chrono::{DateTime, Utc};
8+
use delta_kernel::engine::arrow_data::ArrowEngineData;
89
use delta_kernel::expressions::{Scalar, StructData};
9-
use delta_kernel::{Expression, ExpressionHandler};
10+
use delta_kernel::{Expression, ExpressionEvaluator, ExpressionHandler};
1011
use indexmap::IndexMap;
1112
use object_store::path::Path;
1213
use object_store::ObjectMeta;
@@ -34,27 +35,44 @@ pub(crate) trait PartitionsExt {
3435

3536
impl PartitionsExt for IndexMap<&str, Scalar> {
3637
fn hive_partition_path(&self) -> String {
37-
let fields = self
38-
.iter()
39-
.map(|(k, v)| {
40-
let encoded = v.serialize_encoded();
41-
format!("{k}={encoded}")
42-
})
43-
.collect::<Vec<_>>();
44-
fields.join("/")
38+
self.iter()
39+
.map(|(k, v)| format!("{k}={}", v.serialize_encoded()))
40+
.collect::<Vec<_>>()
41+
.join("/")
4542
}
4643
}
4744

4845
impl PartitionsExt for IndexMap<String, Scalar> {
4946
fn hive_partition_path(&self) -> String {
50-
let fields = self
47+
self.iter()
48+
.map(|(k, v)| format!("{k}={}", v.serialize_encoded()))
49+
.collect::<Vec<_>>()
50+
.join("/")
51+
}
52+
}
53+
54+
impl PartitionsExt for StructData {
55+
fn hive_partition_path(&self) -> String {
56+
self.fields()
5157
.iter()
52-
.map(|(k, v)| {
53-
let encoded = v.serialize_encoded();
54-
format!("{k}={encoded}")
55-
})
56-
.collect::<Vec<_>>();
57-
fields.join("/")
58+
.zip(self.values().iter())
59+
.map(|(k, v)| format!("{}={}", k.name(), v.serialize_encoded()))
60+
.collect::<Vec<_>>()
61+
.join("/")
62+
}
63+
}
64+
65+
pub trait StructDataExt {
66+
fn get(&self, key: &str) -> Option<&Scalar>;
67+
}
68+
69+
impl StructDataExt for StructData {
70+
fn get(&self, key: &str) -> Option<&Scalar> {
71+
self.fields()
72+
.iter()
73+
.zip(self.values().iter())
74+
.find(|(k, _)| k.name() == key)
75+
.map(|(_, v)| v)
5876
}
5977
}
6078

@@ -134,13 +152,10 @@ impl DeletionVectorView<'_> {
134152
/// functionality, e.g. parsing partition values.
135153
#[derive(Debug, PartialEq)]
136154
pub struct LogicalFile<'a> {
137-
path: &'a StringArray,
138-
/// The on-disk size of this data file in bytes
139-
size: &'a Int64Array,
140-
/// Last modification time of the file in milliseconds since the epoch.
141-
modification_time: &'a Int64Array,
155+
data: Arc<RecordBatch>,
142156
/// The partition values for this logical file.
143157
partition_values: &'a MapArray,
158+
partition_values_parsed: Option<&'a StructArray>,
144159
/// Struct containing all available statistics for the columns in this file.
145160
stats: &'a StructArray,
146161
/// Array containing the deletion vector data.
@@ -155,7 +170,7 @@ pub struct LogicalFile<'a> {
155170
impl LogicalFile<'_> {
156171
/// Path to the files storage location.
157172
pub fn path(&self) -> Cow<'_, str> {
158-
percent_decode_str(self.path.value(self.index)).decode_utf8_lossy()
173+
percent_decode_str(pick::<StringArray>(&self.data, 0).value(self.index)).decode_utf8_lossy()
159174
}
160175

161176
/// An object store [`Path`] to the file.
@@ -173,12 +188,12 @@ impl LogicalFile<'_> {
173188

174189
/// File size stored on disk.
175190
pub fn size(&self) -> i64 {
176-
self.size.value(self.index)
191+
pick::<Int64Array>(&self.data, 1).value(self.index)
177192
}
178193

179194
/// Last modification time of the file.
180195
pub fn modification_time(&self) -> i64 {
181-
self.modification_time.value(self.index)
196+
pick::<Int64Array>(&self.data, 2).value(self.index)
182197
}
183198

184199
/// Datetime of the last modification time of the file.
@@ -191,6 +206,14 @@ impl LogicalFile<'_> {
191206
))
192207
}
193208

209+
pub fn partition_values_scalar(&self) -> Option<StructData> {
210+
self.partition_values_parsed
211+
.and_then(|arr| match Scalar::from_array(arr, self.index) {
212+
Some(Scalar::Struct(s)) => Some(s),
213+
_ => None,
214+
})
215+
}
216+
194217
/// The partition values for this logical file.
195218
pub fn partition_values(&self) -> DeltaResult<PartitionValues<'_>> {
196219
if self.partition_fields.is_empty() {
@@ -296,11 +319,13 @@ impl LogicalFile<'_> {
296319
deletion_timestamp: Some(Utc::now().timestamp_millis()),
297320
extended_file_metadata: Some(true),
298321
size: Some(self.size()),
299-
partition_values: self.partition_values().ok().map(|pv| {
300-
pv.iter()
322+
partition_values: self.partition_values_scalar().map(|pv| {
323+
pv.fields()
324+
.iter()
325+
.zip(pv.values().iter())
301326
.map(|(k, v)| {
302327
(
303-
k.to_string(),
328+
k.name().to_owned(),
304329
if v.is_null() {
305330
None
306331
} else {
@@ -335,9 +360,8 @@ impl<'a> TryFrom<&LogicalFile<'a>> for ObjectMeta {
335360
/// Helper for processing data from the materialized Delta log.
336361
pub struct FileStatsAccessor<'a> {
337362
partition_fields: PartitionFields<'a>,
338-
paths: &'a StringArray,
363+
data: Arc<RecordBatch>,
339364
sizes: &'a Int64Array,
340-
modification_times: &'a Int64Array,
341365
stats: &'a StructArray,
342366
deletion_vector: Option<DeletionVector<'a>>,
343367
partition_values: &'a MapArray,
@@ -346,15 +370,57 @@ pub struct FileStatsAccessor<'a> {
346370
pointer: usize,
347371
}
348372

373+
lazy_static::lazy_static! {
374+
static ref FILE_SCHEMA: StructType = StructType::new([
375+
StructField::new("path", DataType::STRING, false),
376+
StructField::new("size", DataType::LONG, false),
377+
]);
378+
static ref FILE_PICKER: Arc<dyn ExpressionEvaluator> = ARROW_HANDLER.get_evaluator(
379+
Arc::new(FILE_SCHEMA.clone()),
380+
Expression::struct_from([
381+
Expression::column(["add", "path"]),
382+
Expression::column(["add", "size"]),
383+
Expression::column(["add", "modificationTime"])
384+
]),
385+
DataType::struct_type([
386+
StructField::new("path", DataType::STRING, false),
387+
StructField::new("size", DataType::LONG, false),
388+
StructField::new("modification_time", DataType::LONG, false),
389+
]),
390+
);
391+
}
392+
393+
fn pick<'a, T: Array + 'static>(data: &'a RecordBatch, idx: usize) -> &'a T {
394+
data.column(idx)
395+
.as_any()
396+
.downcast_ref::<T>()
397+
.ok_or_else(|| {
398+
DeltaTableError::generic(format!(
399+
"expected column '{}' to be of type '{}'",
400+
idx,
401+
std::any::type_name::<T>()
402+
))
403+
})
404+
.unwrap()
405+
}
406+
349407
impl<'a> FileStatsAccessor<'a> {
350408
pub(crate) fn try_new(
351409
data: &'a RecordBatch,
352410
metadata: &'a Metadata,
353411
schema: &'a StructType,
354412
) -> DeltaResult<Self> {
355-
let paths = extract_and_cast::<StringArray>(data, "add.path")?;
413+
let file_data = FILE_PICKER.evaluate(&ArrowEngineData::new(data.clone()))?;
414+
let result = file_data
415+
.into_any()
416+
.downcast::<ArrowEngineData>()
417+
.map_err(|_| {
418+
DeltaTableError::generic("failed to downcast evaluator result to ArrowEngineData.")
419+
})?
420+
.record_batch()
421+
.clone();
422+
356423
let sizes = extract_and_cast::<Int64Array>(data, "add.size")?;
357-
let modification_times = extract_and_cast::<Int64Array>(data, "add.modificationTime")?;
358424
let stats = extract_and_cast::<StructArray>(data, "add.stats_parsed")?;
359425
let partition_values = extract_and_cast::<MapArray>(data, "add.partitionValues")?;
360426
let partition_values_parsed =
@@ -398,9 +464,8 @@ impl<'a> FileStatsAccessor<'a> {
398464

399465
Ok(Self {
400466
partition_fields,
401-
paths,
467+
data: Arc::new(result),
402468
sizes,
403-
modification_times,
404469
stats,
405470
deletion_vector,
406471
partition_values,
@@ -418,10 +483,9 @@ impl<'a> FileStatsAccessor<'a> {
418483
)));
419484
}
420485
Ok(LogicalFile {
421-
path: self.paths,
422-
size: self.sizes,
423-
modification_time: self.modification_times,
486+
data: self.data.clone(),
424487
partition_values: self.partition_values,
488+
partition_values_parsed: self.partition_values_parsed.clone(),
425489
partition_fields: self.partition_fields.clone(),
426490
stats: self.stats,
427491
deletion_vector: self.deletion_vector.clone(),
@@ -444,30 +508,6 @@ impl<'a> Iterator for FileStatsAccessor<'a> {
444508
}
445509
}
446510

447-
pub struct LogDataIterator<'a> {
448-
data: &'a RecordBatch,
449-
pointer: usize,
450-
}
451-
452-
impl<'a> LogDataIterator<'a> {
453-
pub(crate) fn new(data: &'a RecordBatch) -> Self {
454-
Self { data, pointer: 0 }
455-
}
456-
457-
pub(crate) fn len(&self) -> usize {
458-
self.data.num_rows()
459-
}
460-
461-
pub fn path(&self) -> &str {
462-
let paths = self
463-
.data
464-
.column_by_name("path")
465-
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
466-
.unwrap();
467-
paths.value(self.pointer)
468-
}
469-
}
470-
471511
/// Provides semanitc access to the log data.
472512
///
473513
/// This is a helper struct that provides access to the log data in a more semantic way
@@ -896,8 +936,8 @@ mod tests {
896936

897937
assert_eq!(json_action.path(), struct_action.path());
898938
assert_eq!(
899-
json_action.partition_values().unwrap(),
900-
struct_action.partition_values().unwrap()
939+
json_action.partition_values_scalar(),
940+
struct_action.partition_values_scalar()
901941
);
902942
// assert_eq!(
903943
// json_action.max_values().unwrap(),

crates/core/src/operations/convert_to_delta.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -570,13 +570,16 @@ mod tests {
570570
.unwrap()
571571
.log_data()
572572
.into_iter()
573-
.flat_map(|add| {
574-
add.partition_values()
575-
.unwrap()
576-
.iter()
577-
.map(|(k, v)| (k.to_string(), v.clone()))
578-
.collect::<Vec<_>>()
573+
.filter_map(|add| {
574+
add.partition_values_scalar().map(|pv| {
575+
pv.fields()
576+
.iter()
577+
.zip(pv.values().iter())
578+
.map(|(k, v)| (k.name().to_owned(), v.clone()))
579+
.collect::<Vec<_>>()
580+
})
579581
})
582+
.flatten()
580583
.collect::<Vec<_>>();
581584
partition_values.sort_by_key(|(k, v)| (k.clone(), v.serialize()));
582585
assert_eq!(partition_values, expected_partition_values);

0 commit comments

Comments
 (0)