diff --git a/.github/actions/load-dat/action.yaml b/.github/actions/load-dat/action.yaml new file mode 100644 index 0000000000..6d40707b3c --- /dev/null +++ b/.github/actions/load-dat/action.yaml @@ -0,0 +1,26 @@ +name: Delta Acceptance Tests +description: Load Delta Lake acceptance test data + +inputs: + version: + description: "The Python version to set up" + required: false + default: "0.0.3" + + target-directory: + description: target directory for acceptance test data + required: false + default: ${{ github.workspace }}/dat + +runs: + using: composite + + steps: + - name: load DAT + shell: bash + run: | + rm -rf ${{ inputs.target-directory }} + curl -OL https://github.com/delta-incubator/dat/releases/download/v${{ inputs.version }}/deltalake-dat-v${{ inputs.version }}.tar.gz + mkdir -p ${{ inputs.target-directory }} + tar --no-same-permissions -xzf deltalake-dat-v${{ inputs.version }}.tar.gz --directory ${{ inputs.target-directory }} + rm deltalake-dat-v${{ inputs.version }}.tar.gz diff --git a/.github/actions/setup-env/action.yml b/.github/actions/setup-env/action.yml index 8339c45449..74f05ea84d 100644 --- a/.github/actions/setup-env/action.yml +++ b/.github/actions/setup-env/action.yml @@ -4,12 +4,12 @@ description: "Set up Python, virtual environment, and Rust toolchain" inputs: python-version: description: "The Python version to set up" - required: true + required: false default: "3.10" rust-toolchain: description: "The Rust toolchain to set up" - required: true + required: false default: "stable" runs: diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index ba23352e66..aa634cb789 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -25,7 +25,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: '1.82' + toolchain: "1.82" override: true - name: Build @@ -40,7 +40,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: '1.82' + toolchain: "1.82" override: true - name: Format @@ -62,7 +62,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: '1.82' + toolchain: "1.82" override: true - name: build and lint with clippy @@ -92,9 +92,12 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: '1.82' + toolchain: "1.82" override: true + - name: Load DAT data + uses: ./.github/actions/load-dat + - name: Run tests run: cargo test --verbose --features ${{ env.DEFAULT_FEATURES }} @@ -121,7 +124,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: '1.82' + toolchain: "1.82" override: true # Install Java and Hadoop for HDFS integration tests @@ -136,6 +139,9 @@ jobs: tar -xf hadoop-3.4.0.tar.gz -C $GITHUB_WORKSPACE echo "$GITHUB_WORKSPACE/hadoop-3.4.0/bin" >> $GITHUB_PATH + - name: Load DAT data + uses: ./.github/actions/load-dat + - name: Start emulated services run: docker compose up -d @@ -162,7 +168,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: '1.82' + toolchain: "1.82" override: true - name: Download Lakectl @@ -177,4 +183,3 @@ jobs: - name: Run tests with rustls (default) run: | cargo test --features integration_test_lakefs,lakefs,datafusion - diff --git a/.github/workflows/codecov.yml b/.github/workflows/codecov.yml index a790cea087..c9c9057a7c 100644 --- a/.github/workflows/codecov.yml +++ b/.github/workflows/codecov.yml @@ -16,17 +16,25 @@ jobs: CARGO_TERM_COLOR: always steps: - uses: actions/checkout@v4 + - name: Install rust uses: actions-rs/toolchain@v1 with: profile: default toolchain: '1.82' override: true + - name: Install cargo-llvm-cov uses: taiki-e/install-action@cargo-llvm-cov + - uses: Swatinem/rust-cache@v2 + + - name: Load DAT data + uses: ./.github/actions/load-dat + - name: Generate code coverage run: cargo llvm-cov --features ${DEFAULT_FEATURES} --workspace --codecov --output-path codecov.json -- --skip read_table_version_hdfs --skip test_read_tables_lakefs + - name: Upload coverage to Codecov uses: codecov/codecov-action@v4 with: diff --git a/.gitignore b/.gitignore index f23e3b772d..ec48dec4de 100644 --- a/.gitignore +++ b/.gitignore @@ -22,7 +22,7 @@ __blobstorage__ .githubchangeloggenerator.cache.log .githubchangeloggenerator.cache/ .githubchangeloggenerator* -data +.zed/ # Add all Cargo.lock files except for those in binary crates Cargo.lock @@ -33,5 +33,6 @@ Cargo.lock justfile site __pycache__ +dat/ .zed .zed/ diff --git a/Cargo.toml b/Cargo.toml index 05c7fb658d..233057f7d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,11 +26,16 @@ debug = true debug = "line-tables-only" [workspace.dependencies] -delta_kernel = { version = "0.9.0", features = [ +delta_kernel = { git = "https://github.com/roeap/delta-kernel-rs", rev = "b0cd12264ae4ada8d51cff02b25864258568eb88", features = [ "arrow_54", "developer-visibility", "default-engine-rustls", ] } +# delta_kernel = { path = "../delta-kernel-rs/kernel", features = [ +# "arrow_54", +# "developer-visibility", +# "default-engine-rustls", +# ] } # arrow arrow = { version = "54" } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 7f016ee840..5ad59b8bd1 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -15,7 +15,7 @@ rust-version.workspace = true features = ["datafusion", "json", "unity-experimental"] [dependencies] -delta_kernel.workspace = true +delta_kernel = { workspace = true } # arrow arrow = { workspace = true } @@ -29,10 +29,7 @@ arrow-ord = { workspace = true } arrow-row = { workspace = true } arrow-schema = { workspace = true, features = ["serde"] } arrow-select = { workspace = true } -parquet = { workspace = true, features = [ - "async", - "object_store", -] } +parquet = { workspace = true, features = ["async", "object_store"] } pin-project-lite = "^0.2.7" # datafusion @@ -49,7 +46,7 @@ datafusion-functions-aggregate = { workspace = true, optional = true } # serde serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } -strum = { workspace = true} +strum = { workspace = true } # "stdlib" bytes = { workspace = true } @@ -75,6 +72,9 @@ tokio = { workspace = true, features = [ "parking_lot", ] } +# cahce +quick_cache = { version = "0.6.9" } + # other deps (these should be organized and pulled into workspace.dependencies as necessary) cfg-if = "1" dashmap = "6" @@ -100,6 +100,7 @@ humantime = { version = "2.1.0" } [dev-dependencies] criterion = "0.5" ctor = "0" +datatest-stable = "0.2" deltalake-test = { path = "../test", features = ["datafusion"] } dotenvy = "0" fs_extra = "1.2.0" @@ -130,3 +131,8 @@ python = ["arrow/pyarrow"] native-tls = ["delta_kernel/default-engine"] rustls = ["delta_kernel/default-engine-rustls"] cloud = ["object_store/cloud"] + +[[test]] +name = "dat" +harness = false + diff --git a/crates/core/src/kernel/mod.rs b/crates/core/src/kernel/mod.rs index 6a9ba71c94..9e98c7051c 100644 --- a/crates/core/src/kernel/mod.rs +++ b/crates/core/src/kernel/mod.rs @@ -10,6 +10,7 @@ pub mod error; pub mod models; pub mod scalars; mod snapshot; +pub mod snapshot_next; pub mod transaction; pub use error::*; diff --git a/crates/core/src/kernel/snapshot_next/cache.rs b/crates/core/src/kernel/snapshot_next/cache.rs new file mode 100644 index 0000000000..594d599942 --- /dev/null +++ b/crates/core/src/kernel/snapshot_next/cache.rs @@ -0,0 +1,186 @@ +use std::sync::Arc; + +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use futures::stream::BoxStream; +use futures::StreamExt; +use object_store::path::Path; +use object_store::{ + Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult, +}; +use quick_cache::sync::Cache; + +#[derive(Debug, Clone)] +struct Entry { + data: Bytes, + last_modified: DateTime, + attributes: Attributes, + e_tag: Option, +} + +impl Entry { + fn new( + data: Bytes, + last_modified: DateTime, + e_tag: Option, + attributes: Attributes, + ) -> Self { + Self { + data, + last_modified, + e_tag, + attributes, + } + } +} + +/// An object store implementation that conditionally caches file requests. +/// +/// This implementation caches the file requests based on on the evaluation +/// of a condition. The condition is evaluated on the path of the file and +/// can be configured to meet the requirements of the user. +/// +/// This is __not__ a general purpose cache and is specifically designed to cache +/// the commit files of a Delta table. E.g. it is assumed that files written to +/// the object store are immutable and no attempt is made to invalidate the cache +/// when files are updated in the remote object store. +#[derive(Clone)] +pub(super) struct CommitCacheObjectStore { + inner: Arc, + check: Arc bool + Send + Sync>, + cache: Arc>, +} + +impl std::fmt::Debug for CommitCacheObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ConditionallyCachedObjectStore") + .field("object_store", &self.inner) + .finish() + } +} + +impl std::fmt::Display for CommitCacheObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ConditionallyCachedObjectStore({})", self.inner) + } +} + +fn cache_json(path: &Path) -> bool { + path.extension() + .map_or(false, |ext| ext.eq_ignore_ascii_case("json")) +} + +impl CommitCacheObjectStore { + /// Create a new conditionally cached object store. + pub fn new(inner: Arc) -> Self { + Self { + inner, + check: Arc::new(cache_json), + cache: Arc::new(Cache::new(100)), + } + } + + async fn get_opts_impl( + &self, + location: &Path, + options: GetOptions, + ) -> ObjectStoreResult { + if options.range.is_some() || !(self.check)(location) || options.head { + return self.inner.get_opts(location, options).await; + } + + let entry = if let Some(entry) = self.cache.get(location) { + entry + } else { + let response = self.inner.get_opts(location, options.clone()).await?; + let attributes = response.attributes.clone(); + let meta = response.meta.clone(); + let data = response.bytes().await?; + let entry = Entry::new(data, meta.last_modified, meta.e_tag, attributes); + self.cache.insert(location.clone(), entry.clone()); + entry + }; + + let meta = ObjectMeta { + location: location.clone(), + last_modified: entry.last_modified, + size: entry.data.len(), + e_tag: entry.e_tag, + version: None, + }; + let (range, data) = (0..entry.data.len(), entry.data); + let stream = futures::stream::once(futures::future::ready(Ok(data))); + Ok(GetResult { + payload: GetResultPayload::Stream(stream.boxed()), + attributes: entry.attributes, + meta, + range, + }) + } +} + +#[async_trait::async_trait] +impl ObjectStore for CommitCacheObjectStore { + async fn put_opts( + &self, + location: &Path, + bytes: PutPayload, + options: PutOptions, + ) -> ObjectStoreResult { + self.inner.put_opts(location, bytes, options).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { + self.get_opts_impl(location, options).await + } + + async fn head(&self, location: &Path) -> ObjectStoreResult { + self.inner.head(location).await + } + + async fn delete(&self, location: &Path) -> ObjectStoreResult<()> { + self.cache.remove(location); + self.inner.delete(location).await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult> { + self.inner.list(prefix) + } + + fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> BoxStream<'_, ObjectStoreResult> { + self.inner.list_with_offset(prefix, offset) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + self.inner.copy_if_not_exists(from, to).await + } + + async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + self.inner.rename_if_not_exists(from, to).await + } + + async fn put_multipart(&self, location: &Path) -> ObjectStoreResult> { + self.inner.put_multipart(location).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + options: PutMultipartOpts, + ) -> ObjectStoreResult> { + self.inner.put_multipart_opts(location, options).await + } +} diff --git a/crates/core/src/kernel/snapshot_next/eager.rs b/crates/core/src/kernel/snapshot_next/eager.rs new file mode 100644 index 0000000000..96866ea642 --- /dev/null +++ b/crates/core/src/kernel/snapshot_next/eager.rs @@ -0,0 +1,165 @@ +use std::sync::Arc; + +use arrow::compute::concat_batches; +use arrow_array::RecordBatch; +use delta_kernel::actions::visitors::SetTransactionMap; +use delta_kernel::actions::{Add, Metadata, Protocol, SetTransaction}; +use delta_kernel::engine::arrow_extensions::ScanExt; +use delta_kernel::scan::scan_row_schema; +use delta_kernel::schema::Schema; +use delta_kernel::table_properties::TableProperties; +use delta_kernel::{ExpressionRef, Table, Version}; +use itertools::Itertools; +use object_store::ObjectStore; +use url::Url; + +use super::iterators::AddIterator; +use super::lazy::LazySnapshot; +use super::{Snapshot, SnapshotError}; +use crate::kernel::CommitInfo; +use crate::{DeltaResult, DeltaTableConfig}; + +/// An eager snapshot of a Delta Table at a specific version. +/// +/// This snapshot loads some log data eagerly and keeps it in memory. +#[derive(Clone)] +pub struct EagerSnapshot { + snapshot: LazySnapshot, + files: Option, +} + +impl Snapshot for EagerSnapshot { + fn table_root(&self) -> &Url { + self.snapshot.table_root() + } + + fn version(&self) -> Version { + self.snapshot.version() + } + + fn schema(&self) -> Arc { + self.snapshot.schema() + } + + fn protocol(&self) -> &Protocol { + self.snapshot.protocol() + } + + fn metadata(&self) -> &Metadata { + self.snapshot.metadata() + } + + fn table_properties(&self) -> &TableProperties { + self.snapshot.table_properties() + } + + fn logical_files( + &self, + predicate: Option, + ) -> DeltaResult> + '_>> { + let scan = self.snapshot.inner.as_ref().clone(); + let builder = scan.into_scan_builder().with_predicate(predicate).build()?; + let iter: Vec<_> = builder + .scan_metadata_from_existing_arrow( + self.snapshot.engine_ref().as_ref(), + self.version(), + Some(self.file_data()?.clone()), + )? + .collect(); + Ok(Box::new(iter.into_iter().map(|sc| Ok(sc?.scan_files)))) + } + + fn tombstones(&self) -> DeltaResult>>> { + self.snapshot.tombstones() + } + + fn application_transactions(&self) -> DeltaResult { + self.snapshot.application_transactions() + } + + fn application_transaction(&self, app_id: &str) -> DeltaResult> { + self.snapshot.application_transaction(app_id) + } + + fn commit_infos( + &self, + start_version: Option, + limit: Option, + ) -> DeltaResult>> { + self.snapshot.commit_infos(start_version, limit) + } + + fn update(&mut self, target_version: Option) -> DeltaResult { + self.update_impl(target_version) + } +} + +impl EagerSnapshot { + /// Create a new [`EagerSnapshot`] instance + pub async fn try_new( + table_root: impl AsRef, + store: Arc, + config: DeltaTableConfig, + version: impl Into>, + ) -> DeltaResult { + let snapshot = + LazySnapshot::try_new(Table::try_from_uri(table_root)?, store, version).await?; + let files = config + .require_files + .then(|| -> DeltaResult<_> { + let all: Vec = snapshot.logical_files(None)?.try_collect()?; + if all.is_empty() { + return Ok(RecordBatch::new_empty(Arc::new( + (&scan_row_schema()).try_into()?, + ))); + } + Ok(concat_batches(&all[0].schema(), &all)?) + }) + .transpose()?; + Ok(Self { snapshot, files }) + } + + pub fn file_data(&self) -> DeltaResult<&RecordBatch> { + Ok(self + .files + .as_ref() + .ok_or(SnapshotError::FilesNotInitialized)?) + } + + pub fn file_actions(&self) -> DeltaResult> + '_> { + AddIterator::try_new(self.file_data()?) + } + + /// Get the number of files in the current snapshot + pub fn files_count(&self) -> DeltaResult { + Ok(self + .files + .as_ref() + .map(|f| f.num_rows()) + .ok_or(SnapshotError::FilesNotInitialized)?) + } + + fn update_impl(&mut self, target_version: Option) -> DeltaResult { + let mut snapshot = self.snapshot.clone(); + let current = snapshot.version(); + if !snapshot.update(target_version.clone())? { + return Ok(false); + } + + let scan = snapshot.inner.clone().scan_builder().build()?; + let engine = snapshot.engine_ref().clone(); + let files: Vec<_> = scan + .scan_metadata_from_existing_arrow( + engine.as_ref(), + current, + Some(self.file_data()?.clone()), + )? + .map_ok(|s| s.scan_files) + .try_collect()?; + + self.files = Some(concat_batches(&files[0].schema(), &files)?); + self.snapshot = snapshot; + + Ok(true) + } +} diff --git a/crates/core/src/kernel/snapshot_next/iterators.rs b/crates/core/src/kernel/snapshot_next/iterators.rs new file mode 100644 index 0000000000..2d609ed0ee --- /dev/null +++ b/crates/core/src/kernel/snapshot_next/iterators.rs @@ -0,0 +1,263 @@ +use std::collections::HashSet; +use std::sync::Arc; + +use arrow_array::cast::AsArray; +use arrow_array::types::Int64Type; +use arrow_array::{ + Array, ArrayRef, BooleanArray, Int64Array, RecordBatch, StringArray, StructArray, +}; +use chrono::{DateTime, Utc}; +use delta_kernel::actions::visitors::AddVisitor; +use delta_kernel::actions::Add; +use delta_kernel::actions::ADD_NAME; +use delta_kernel::engine::arrow_data::ArrowEngineData; +use delta_kernel::engine::arrow_expression::ProvidesColumnByName; +use delta_kernel::engine_data::{GetData, RowVisitor}; +use delta_kernel::expressions::{Scalar, StructData}; + +use crate::kernel::scalars::ScalarExt; +use crate::{DeltaResult, DeltaTableError}; + +pub struct AddIterator<'a> { + paths: &'a StringArray, + getters: Arc>>, + index: usize, +} + +impl AddIterator<'_> { + pub fn try_new(actions: &RecordBatch) -> DeltaResult> { + validate_add(&actions)?; + + let visitor = AddVisitor::default(); + let fields = visitor.selected_column_names_and_types(); + + let mut mask = HashSet::new(); + for column in fields.0 { + for i in 0..column.len() { + mask.insert(&column[..i + 1]); + } + } + + let mut getters = vec![]; + ArrowEngineData::extract_columns(&mut vec![], &mut getters, fields.1, &mask, actions)?; + + let paths = extract_column(actions, &[ADD_NAME, "path"])?.as_string::(); + + Ok(AddIterator { + paths, + getters: Arc::new(getters), + index: 0, + }) + } +} + +impl Iterator for AddIterator<'_> { + type Item = DeltaResult; + + fn next(&mut self) -> Option { + if self.index < self.paths.len() { + let path = self.paths.value(self.index).to_string(); + let add = AddVisitor::visit_add(self.index, path, self.getters.as_slice()) + .map_err(DeltaTableError::from); + self.index += 1; + Some(add) + } else { + None + } + } +} + +#[derive(Clone)] +pub struct LogicalFileView { + files: RecordBatch, + index: usize, +} + +impl LogicalFileView { + /// Path of the file. + pub fn path(&self) -> &str { + self.files.column(0).as_string::().value(self.index) + } + + /// Size of the file in bytes. + pub fn size(&self) -> i64 { + self.files + .column(1) + .as_primitive::() + .value(self.index) + } + + /// Modification time of the file in milliseconds since epoch. + pub fn modification_time(&self) -> i64 { + self.files + .column(2) + .as_primitive::() + .value(self.index) + } + + /// Datetime of the last modification time of the file. + pub fn modification_datetime(&self) -> DeltaResult> { + DateTime::from_timestamp_millis(self.modification_time()).ok_or(DeltaTableError::from( + crate::protocol::ProtocolError::InvalidField(format!( + "invalid modification_time: {:?}", + self.modification_time() + )), + )) + } + + pub fn stats(&self) -> Option<&str> { + let col = self.files.column(3).as_string::(); + col.is_valid(self.index).then(|| col.value(self.index)) + } + + pub fn partition_values(&self) -> Option { + self.files + .column_by_name("fileConstantValues") + .and_then(|col| col.as_struct_opt()) + .and_then(|s| s.column_by_name("partitionValues")) + .and_then(|arr| { + arr.is_valid(self.index) + .then(|| match Scalar::from_array(arr, self.index) { + Some(Scalar::Struct(s)) => Some(s), + _ => None, + }) + .flatten() + }) + } +} + +pub struct LogicalFileViewIterator +where + I: IntoIterator>, +{ + inner: I::IntoIter, + batch: Option, + current: usize, +} + +impl LogicalFileViewIterator +where + I: IntoIterator>, +{ + /// Create a new [LogicalFileViewIterator]. + /// + /// If `iter` is an infallible iterator, use `.map(Ok)`. + pub fn new(iter: I) -> Self { + Self { + inner: iter.into_iter(), + batch: None, + current: 0, + } + } +} + +impl Iterator for LogicalFileViewIterator +where + I: IntoIterator>, +{ + type Item = DeltaResult; + + fn next(&mut self) -> Option { + if let Some(batch) = &self.batch { + if self.current < batch.num_rows() { + let item = LogicalFileView { + files: batch.clone(), + index: self.current, + }; + self.current += 1; + return Some(Ok(item)); + } + } + match self.inner.next() { + Some(Ok(batch)) => { + if validate_logical_file(&batch).is_err() { + return Some(Err(DeltaTableError::generic( + "Invalid logical file data encountered.", + ))); + } + self.batch = Some(batch); + self.current = 0; + self.next() + } + Some(Err(e)) => Some(Err(e)), + None => None, + } + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +pub(crate) fn validate_add(batch: &RecordBatch) -> DeltaResult<()> { + validate_column::(batch, &[ADD_NAME, "path"])?; + validate_column::(batch, &[ADD_NAME, "size"])?; + validate_column::(batch, &[ADD_NAME, "modificationTime"])?; + validate_column::(batch, &[ADD_NAME, "dataChange"])?; + Ok(()) +} + +fn validate_logical_file(batch: &RecordBatch) -> DeltaResult<()> { + validate_column::(batch, &["path"])?; + validate_column::(batch, &["size"])?; + validate_column::(batch, &["modificationTime"])?; + // validate_column::(batch, &["deletionVector"])?; + // validate_column::(batch, &["fileConstantValues"])?; + Ok(()) +} + +fn validate_column<'a, T: Array + 'static>( + actions: &'a RecordBatch, + col: &'a [impl AsRef], +) -> DeltaResult<()> { + if let Ok(arr) = extract_column(actions, col) { + if arr.as_any().downcast_ref::().is_none() { + return Err(DeltaTableError::from( + crate::protocol::ProtocolError::InvalidField(format!("Invalid column: {:?}", arr)), + )); + } + if arr.null_count() > 0 { + return Err(DeltaTableError::from( + crate::protocol::ProtocolError::InvalidField(format!( + "Column has null values: {:?}", + arr + )), + )); + } + } else { + return Err(DeltaTableError::from( + crate::protocol::ProtocolError::InvalidField("Column not found".to_string()), + )); + } + Ok(()) +} + +fn extract_column<'a>( + mut parent: &'a dyn ProvidesColumnByName, + col: &[impl AsRef], +) -> DeltaResult<&'a ArrayRef> { + let mut field_names = col.iter(); + let Some(mut field_name) = field_names.next() else { + return Err(arrow_schema::ArrowError::SchemaError( + "Empty column path".to_string(), + ))?; + }; + loop { + let child = parent.column_by_name(field_name.as_ref()).ok_or_else(|| { + arrow_schema::ArrowError::SchemaError(format!("No such field: {}", field_name.as_ref())) + })?; + field_name = match field_names.next() { + Some(name) => name, + None => return Ok(child), + }; + parent = child + .as_any() + .downcast_ref::() + .ok_or_else(|| { + arrow_schema::ArrowError::SchemaError(format!( + "Not a struct: {}", + field_name.as_ref() + )) + })?; + } +} diff --git a/crates/core/src/kernel/snapshot_next/lazy.rs b/crates/core/src/kernel/snapshot_next/lazy.rs new file mode 100644 index 0000000000..41a260c6cd --- /dev/null +++ b/crates/core/src/kernel/snapshot_next/lazy.rs @@ -0,0 +1,301 @@ +//! Snapshot of a Delta Table at a specific version. +//! +use std::io::{BufRead, BufReader, Cursor}; +use std::sync::{Arc, LazyLock}; + +use arrow::array::AsArray; +use arrow_array::RecordBatch; +use arrow_select::filter::filter_record_batch; +use delta_kernel::actions::set_transaction::SetTransactionScanner; +use delta_kernel::actions::visitors::SetTransactionMap; +use delta_kernel::actions::{get_log_schema, REMOVE_NAME, SIDECAR_NAME}; +use delta_kernel::actions::{Metadata, Protocol, SetTransaction}; +use delta_kernel::engine::arrow_data::ArrowEngineData; +use delta_kernel::engine::arrow_extensions::{ExpressionEvaluatorExt, ScanExt}; +use delta_kernel::engine::default::executor::tokio::{ + TokioBackgroundExecutor, TokioMultiThreadExecutor, +}; +use delta_kernel::engine::default::DefaultEngine; +use delta_kernel::log_segment::LogSegment; +use delta_kernel::schema::{DataType, Schema}; +use delta_kernel::snapshot::Snapshot as SnapshotInner; +use delta_kernel::table_properties::TableProperties; +use delta_kernel::{ + Engine, EvaluationHandler, Expression, ExpressionEvaluator, ExpressionRef, Table, Version, +}; +use itertools::Itertools; +use object_store::ObjectStore; +use url::Url; + +use super::cache::CommitCacheObjectStore; +use super::Snapshot; +use crate::kernel::{Action, CommitInfo, ARROW_HANDLER}; +use crate::{DeltaResult, DeltaTableError}; + +// TODO: avoid repetitive parsing of json stats + +#[derive(Clone)] +pub struct LazySnapshot { + pub(super) inner: Arc, + engine: Arc, +} + +impl Snapshot for LazySnapshot { + fn table_root(&self) -> &Url { + self.inner.table_root() + } + + fn version(&self) -> Version { + self.inner.version() + } + + fn schema(&self) -> Arc { + self.inner.schema() + } + + fn protocol(&self) -> &Protocol { + self.inner.protocol() + } + + fn metadata(&self) -> &Metadata { + self.inner.metadata() + } + + fn table_properties(&self) -> &TableProperties { + self.inner.table_properties() + } + + fn logical_files( + &self, + predicate: Option, + ) -> DeltaResult> + '_>> { + let scan = self + .inner + .clone() + .scan_builder() + .with_predicate(predicate) + .build()?; + + // Move scan_metadata_arrow to a separate variable to avoid returning a reference to a local variable + let engine = self.engine.clone(); + let scan_result: Vec<_> = scan + .scan_metadata_arrow(engine.as_ref())? + .map(|sc| Ok(sc?.scan_files)) + .collect(); + Ok(Box::new(scan_result.into_iter())) + } + + fn tombstones(&self) -> DeltaResult>>> { + static META_PREDICATE: LazyLock = + LazyLock::new(|| Arc::new(Expression::column([REMOVE_NAME, "path"]).is_not_null())); + static EVALUATOR: LazyLock> = LazyLock::new(|| { + ARROW_HANDLER.new_expression_evaluator( + get_log_schema().project(&[REMOVE_NAME]).unwrap(), + META_PREDICATE.as_ref().clone(), + DataType::BOOLEAN, + ) + }); + let read_schema = get_log_schema().project(&[REMOVE_NAME])?; + let read_schema2 = get_log_schema().project(&[REMOVE_NAME, SIDECAR_NAME])?; + Ok(Box::new( + self.inner + .log_segment() + .read_actions( + self.engine.as_ref(), + read_schema, + read_schema2, + Some(META_PREDICATE.clone()), + )? + .map_ok(|(d, _)| { + let batch = RecordBatch::from(ArrowEngineData::try_from_engine_data(d)?); + let selection = EVALUATOR.evaluate_arrow(batch.clone())?; + let filter = selection.column(0).as_boolean_opt().ok_or_else(|| { + DeltaTableError::generic("failed to downcast to BooleanArray") + })?; + Ok(filter_record_batch(&batch, filter)?) + }) + .flatten(), + )) + } + + fn application_transactions(&self) -> DeltaResult { + let scanner = SetTransactionScanner::new(self.inner.clone()); + Ok(scanner.application_transactions(self.engine.as_ref())?) + } + + fn application_transaction(&self, app_id: &str) -> DeltaResult> { + let scanner = SetTransactionScanner::new(self.inner.clone()); + Ok(scanner.application_transaction(self.engine.as_ref(), app_id)?) + } + + fn commit_infos( + &self, + start_version: Option, + limit: Option, + ) -> DeltaResult>> { + // let start_version = start_version.into(); + let fs_client = self.engine.storage_handler(); + let end_version = start_version.unwrap_or_else(|| self.version()); + let start_version = limit + .and_then(|limit| { + if limit == 0 { + Some(end_version) + } else { + Some(end_version.saturating_sub(limit as u64 - 1)) + } + }) + .unwrap_or(0); + + let log_root = self.inner.table_root().join("_delta_log").unwrap(); + let mut log_segment = LogSegment::for_table_changes( + fs_client.as_ref(), + log_root, + start_version, + end_version, + )?; + log_segment.ascending_commit_files.reverse(); + let files = log_segment + .ascending_commit_files + .iter() + .map(|commit_file| (commit_file.location.location.clone(), None)) + .collect_vec(); + + Ok(Box::new( + fs_client + .read_files(files)? + .zip(log_segment.ascending_commit_files.into_iter()) + .filter_map(|(data, path)| { + data.ok().and_then(|d| { + let reader = BufReader::new(Cursor::new(d)); + for line in reader.lines() { + match line.and_then(|l| Ok(serde_json::from_str::(&l)?)) { + Ok(Action::CommitInfo(commit_info)) => { + return Some((path.version, commit_info)) + } + Err(_) => return None, + _ => continue, + }; + } + None + }) + }), + )) + } + + fn update(&mut self, target_version: Option) -> DeltaResult { + let snapshot = + SnapshotInner::try_new_from(self.inner.clone(), self.engine.as_ref(), target_version)?; + let did_update = snapshot.version() != self.inner.version(); + self.inner = snapshot; + Ok(did_update) + } +} + +impl LazySnapshot { + /// Create a new [`Snapshot`] instance. + pub fn new(inner: Arc, engine: Arc) -> Self { + Self { inner, engine } + } + + /// Create a new [`Snapshot`] instance for a table. + pub async fn try_new( + table: Table, + store: Arc, + version: impl Into>, + ) -> DeltaResult { + // TODO: how to deal with the dedicated IO runtime? Would this already be covered by the + // object store implementation pass to this? + let store = Arc::new(CommitCacheObjectStore::new(store)); + let handle = tokio::runtime::Handle::current(); + let engine: Arc = match handle.runtime_flavor() { + tokio::runtime::RuntimeFlavor::MultiThread => Arc::new(DefaultEngine::new( + store, + Arc::new(TokioMultiThreadExecutor::new(handle)), + )), + tokio::runtime::RuntimeFlavor::CurrentThread => Arc::new(DefaultEngine::new( + store, + Arc::new(TokioBackgroundExecutor::new()), + )), + _ => return Err(DeltaTableError::generic("unsupported runtime flavor")), + }; + + let snapshot = table.snapshot(engine.as_ref(), version.into())?; + Ok(Self::new(Arc::new(snapshot), engine)) + } + + /// A shared reference to the engine used for interacting with the Delta Table. + pub(crate) fn engine_ref(&self) -> &Arc { + &self.engine + } + + /// Get the timestamp of the given version in miliscends since epoch. + /// + /// Extracts the timestamp from the commit file of the given version + /// from the current log segment. If the commit file is not part of the + /// current log segment, `None` is returned. + pub fn version_timestamp(&self, version: Version) -> Option { + self.inner + .log_segment() + .ascending_commit_files + .iter() + .find(|f| f.version == version) + .map(|f| f.location.last_modified) + } +} + +#[cfg(test)] +mod tests { + use delta_kernel::schema::StructType; + use deltalake_test::utils::*; + use deltalake_test::TestResult; + + use super::*; + + async fn load_snapshot() -> TestResult<()> { + let ctx = IntegrationContext::new(Box::::default())?; + ctx.load_table(TestTables::Simple).await?; + + let store = ctx + .table_builder(TestTables::Simple) + .build_storage()? + .object_store(None); + let table = Table::try_from_uri("memory:///")?; + let snapshot = LazySnapshot::try_new(table, store, None).await?; + + let schema_string = r#"{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}}]}"#; + let expected: StructType = serde_json::from_str(schema_string)?; + assert_eq!(snapshot.schema().as_ref(), &expected); + + let infos = snapshot.commit_infos(None, None)?.collect_vec(); + assert_eq!(infos.len(), 5); + + let tombstones: Vec<_> = snapshot.tombstones()?.try_collect()?; + let num_tombstones = tombstones.iter().map(|b| b.num_rows() as i64).sum::(); + assert_eq!(num_tombstones, 31); + + let expected = vec![ + "part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet", + "part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c000.snappy.parquet", + "part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet", + "part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet", + "part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet", + ]; + let file_names: Vec<_> = snapshot + .logical_files_view(None)? + .map_ok(|f| f.path().to_owned()) + .try_collect()?; + assert_eq!(file_names, expected); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn load_snapshot_multi() -> TestResult<()> { + load_snapshot().await + } + + #[tokio::test(flavor = "current_thread")] + async fn load_snapshot_current() -> TestResult<()> { + load_snapshot().await + } +} diff --git a/crates/core/src/kernel/snapshot_next/mod.rs b/crates/core/src/kernel/snapshot_next/mod.rs new file mode 100644 index 0000000000..a558d84b92 --- /dev/null +++ b/crates/core/src/kernel/snapshot_next/mod.rs @@ -0,0 +1,343 @@ +//! Snapshot of a Delta Table at a specific version. + +use std::sync::Arc; + +use arrow_array::RecordBatch; +use delta_kernel::actions::visitors::SetTransactionMap; +use delta_kernel::actions::{Metadata, Protocol, SetTransaction}; +use delta_kernel::expressions::{Scalar, StructData}; +use delta_kernel::scan::scan_row_schema; +use delta_kernel::schema::Schema; +use delta_kernel::table_properties::TableProperties; +use delta_kernel::{ExpressionRef, Version}; +use iterators::{LogicalFileView, LogicalFileViewIterator}; +use url::Url; + +use crate::kernel::actions::CommitInfo; +use crate::{DeltaResult, DeltaTableError}; + +pub use eager::EagerSnapshot; +pub use lazy::LazySnapshot; + +mod cache; +mod eager; +mod iterators; +mod lazy; + +// TODO: avoid repetitive parsing of json stats + +#[derive(thiserror::Error, Debug)] +enum SnapshotError { + #[error("Tried accessing file data at snapshot initialized with no files.")] + FilesNotInitialized, +} + +impl From for DeltaTableError { + fn from(e: SnapshotError) -> Self { + match &e { + SnapshotError::FilesNotInitialized => DeltaTableError::generic(e), + } + } +} + +/// Helper trait to extract individual values from a `StructData`. +pub trait StructDataExt { + fn get(&self, key: &str) -> Option<&Scalar>; +} + +impl StructDataExt for StructData { + fn get(&self, key: &str) -> Option<&Scalar> { + self.fields() + .iter() + .zip(self.values().iter()) + .find(|(k, _)| k.name() == key) + .map(|(_, v)| v) + } +} + +/// In-memory representation of a specific snapshot of a Delta table. While a `DeltaTable` exists +/// throughout time, `Snapshot`s represent a view of a table at a specific point in time; they +/// have a defined schema (which may change over time for any given table), specific version, and +/// frozen log segment. +pub trait Snapshot { + /// Location where the Delta Table (metadata) is stored. + fn table_root(&self) -> &Url; + + /// Version of this `Snapshot` in the table. + fn version(&self) -> Version; + + /// Table [`Schema`] at this `Snapshot`s version. + fn schema(&self) -> Arc; + + /// Table [`Metadata`] at this `Snapshot`s version. + /// + /// Metadata contains information about the table, such as the table name, + /// the schema, the partition columns, the configuration, etc. + fn metadata(&self) -> &Metadata; + + /// Table [`Protocol`] at this `Snapshot`s version. + /// + /// The protocol indicates the min reader / writer version required to + /// read / write the table. For modern readers / writers, the reader / + /// writer features active in the table are also available. + fn protocol(&self) -> &Protocol; + + /// Get the [`TableProperties`] for this [`Snapshot`]. + fn table_properties(&self) -> &TableProperties; + + fn logical_file_schema(&self) -> Schema { + scan_row_schema() + } + + /// Get all logical files present in the current snapshot. + /// + /// # Parameters + /// - `predicate`: An optional predicate to filter the files based on file statistics. + /// + /// # Returns + /// An iterator of [`RecordBatch`]es, where each batch contains logical file data. + fn logical_files( + &self, + predicate: Option, + ) -> DeltaResult> + '_>>; + + fn logical_files_view( + &self, + predicate: Option, + ) -> DeltaResult> + '_>> { + #[allow(deprecated)] + Ok(Box::new(LogicalFileViewIterator::new( + self.logical_files(predicate)?, + ))) + } + + /// Get all tombstones in the table. + /// + /// Remove Actions (tombstones) are records that indicate that a file has been deleted. + /// They are returned mostly for the purposes of VACUUM operations. + /// + /// # Returns + /// An iterator of [`RecordBatch`]es, where each batch contains remove action data. + fn tombstones(&self) -> DeltaResult>>>; + + /// Scan the Delta Log to obtain the latest transaction for all applications + /// + /// This method requires a full scan of the log to find all transactions. + /// When a specific application id is requested, it is much more efficient to use + /// [`application_transaction`](Self::application_transaction) instead. + fn application_transactions(&self) -> DeltaResult; + + /// Scan the Delta Log for the latest transaction entry for a specific application. + /// + /// Initiates a log scan, but terminates as soon as the transaction + /// for the given application is found. + /// + /// # Parameters + /// - `app_id`: The application id for which to fetch the transaction. + /// + /// # Returns + /// The latest transaction for the given application id, if it exists. + fn application_transaction(&self, app_id: &str) -> DeltaResult>; + + /// Get commit info for the table. + /// + /// The [`CommitInfo`]s are returned in descending order of version + /// with the most recent commit first starting from the `start_version`. + /// + /// [`CommitInfo`]s are read on a best-effort basis. If the action + /// for a version is not available or cannot be parsed, it is skipped. + /// + /// # Parameters + /// - `start_version`: The version from which to start fetching commit info. + /// Defaults to the latest version. + /// - `limit`: The maximum number of commit infos to fetch. + /// + /// # Returns + /// An iterator of commit info tuples. The first element of the tuple is the version + /// of the commit, the second element is the corresponding commit info. + // TODO(roeap): this is currently using our commit info, we should be using + // the definition form kernel, once handling over there matured. + fn commit_infos( + &self, + start_version: Option, + limit: Option, + ) -> DeltaResult>>; + + /// Update the snapshot to a specific version. + /// + /// The target version must be greater then the current version of the snapshot. + /// + /// # Parameters + /// - `target_version`: The version to update the snapshot to. Defaults to latest. + /// + /// # Returns + /// A boolean indicating if the snapshot was updated. + fn update(&mut self, target_version: Option) -> DeltaResult; +} + +impl Snapshot for Box { + fn table_root(&self) -> &Url { + self.as_ref().table_root() + } + + fn version(&self) -> Version { + self.as_ref().version() + } + + fn schema(&self) -> Arc { + self.as_ref().schema() + } + + fn metadata(&self) -> &Metadata { + self.as_ref().metadata() + } + + fn protocol(&self) -> &Protocol { + self.as_ref().protocol() + } + + fn table_properties(&self) -> &TableProperties { + self.as_ref().table_properties() + } + + fn logical_files( + &self, + predicate: Option, + ) -> DeltaResult> + '_>> { + self.as_ref().logical_files(predicate) + } + + fn tombstones(&self) -> DeltaResult>>> { + self.as_ref().tombstones() + } + + fn application_transactions(&self) -> DeltaResult { + self.as_ref().application_transactions() + } + + fn application_transaction(&self, app_id: &str) -> DeltaResult> { + self.as_ref().application_transaction(app_id) + } + + fn commit_infos( + &self, + start_version: Option, + limit: Option, + ) -> DeltaResult>> { + self.as_ref().commit_infos(start_version, limit) + } + + fn update(&mut self, target_version: Option) -> DeltaResult { + self.as_mut().update(target_version) + } +} + +#[cfg(test)] +mod tests { + use std::{future::Future, pin::Pin}; + + use delta_kernel::Table; + use deltalake_test::utils::*; + + use super::*; + + fn get_lazy( + ctx: &IntegrationContext, + table: TestTables, + version: Option, + ) -> TestResult>>>>> { + let store = ctx.table_builder(table).build_storage()?.object_store(None); + let table = Table::try_from_uri("memory:///")?; + Ok(Box::pin(async move { + Ok(Box::new(LazySnapshot::try_new(table, store, version).await?) as Box) + })) + } + + fn get_eager( + ctx: &IntegrationContext, + table: TestTables, + version: Option, + ) -> TestResult>>>>> { + let store = ctx.table_builder(table).build_storage()?.object_store(None); + let config = Default::default(); + Ok(Box::pin(async move { + Ok( + Box::new(EagerSnapshot::try_new("memory:///", store, config, version).await?) + as Box, + ) + })) + } + + #[tokio::test] + async fn test_snapshots() -> TestResult { + let context = IntegrationContext::new(Box::::default())?; + context.load_table(TestTables::Checkpoints).await?; + context.load_table(TestTables::Simple).await?; + context.load_table(TestTables::SimpleWithCheckpoint).await?; + context.load_table(TestTables::WithDvSmall).await?; + + test_snapshot(&context, get_lazy).await?; + test_snapshot(&context, get_eager).await?; + + Ok(()) + } + + // NOTE: test needs to be async, so that we can pick up the runtime from the context + async fn test_snapshot(ctx: &IntegrationContext, get_snapshot: F) -> TestResult<()> + where + F: Fn( + &IntegrationContext, + TestTables, + Option, + ) -> TestResult>>>>>, + { + for version in 0..=12 { + let snapshot = get_snapshot(ctx, TestTables::Checkpoints, Some(version))?.await?; + assert_eq!(snapshot.version(), version); + + test_commit_infos(snapshot.as_ref())?; + test_logical_files(snapshot.as_ref())?; + test_logical_files_view(snapshot.as_ref())?; + } + + let mut snapshot = get_snapshot(ctx, TestTables::Checkpoints, Some(0))?.await?; + for version in 1..=12 { + snapshot.update(Some(version))?; + assert_eq!(snapshot.version(), version); + + test_commit_infos(snapshot.as_ref())?; + test_logical_files(snapshot.as_ref())?; + test_logical_files_view(snapshot.as_ref())?; + } + + Ok(()) + } + + fn test_logical_files(snapshot: &dyn Snapshot) -> TestResult<()> { + let logical_files = snapshot + .logical_files(None)? + .collect::, _>>()?; + let num_files = logical_files + .iter() + .map(|b| b.num_rows() as i64) + .sum::(); + assert_eq!((num_files as u64), snapshot.version()); + Ok(()) + } + + fn test_logical_files_view(snapshot: &dyn Snapshot) -> TestResult<()> { + let num_files_view = snapshot + .logical_files_view(None)? + .map(|f| f.unwrap().path().to_string()) + .count() as u64; + assert_eq!(num_files_view, snapshot.version()); + Ok(()) + } + + fn test_commit_infos(snapshot: &dyn Snapshot) -> TestResult<()> { + let commit_infos = snapshot.commit_infos(None, Some(100))?.collect::>(); + assert_eq!((commit_infos.len() as u64), snapshot.version() + 1); + assert_eq!(commit_infos.first().unwrap().0, snapshot.version()); + Ok(()) + } +} diff --git a/crates/core/src/operations/vacuum.rs b/crates/core/src/operations/vacuum.rs index 1951ae7f9d..3ba1dc1381 100644 --- a/crates/core/src/operations/vacuum.rs +++ b/crates/core/src/operations/vacuum.rs @@ -217,6 +217,7 @@ impl VacuumBuilder { self.log_store.object_store(None).clone(), ) .await?; + let valid_files = self.snapshot.file_paths_iter().collect::>(); let mut files_to_delete = vec![]; diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index 34bd8ef5f3..193ceacc63 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -171,16 +171,11 @@ pub async fn create_checkpoint_for( return Err(CheckpointError::StaleTableVersion(version, state.version()).into()); } - // TODO: checkpoints _can_ be multi-part... haven't actually found a good reference for - // an appropriate split point yet though so only writing a single part currently. - // See https://github.com/delta-io/delta-rs/issues/288 let last_checkpoint_path = log_store.log_path().child("_last_checkpoint"); - - debug!("Writing parquet bytes to checkpoint buffer."); let tombstones = state .unexpired_tombstones(log_store.object_store(None).clone()) .await - .map_err(|_| ProtocolError::Generic("filed to get tombstones".into()))? + .map_err(|_| ProtocolError::Generic("failed to get tombstones".into()))? .collect::>(); let (checkpoint, parquet_bytes) = parquet_bytes_from_state(state, tombstones)?; @@ -292,9 +287,9 @@ fn parquet_bytes_from_state( // and omit metadata columns if at least one remove action has `extended_file_metadata=false`. // We've added the additional check on `size.is_some` because in delta-spark the primitive long type // is used, hence we want to omit possible errors when `extended_file_metadata=true`, but `size=null` - let use_extended_remove_schema = tombstones + let use_extended_remove_schema = !tombstones .iter() - .all(|r| r.extended_file_metadata == Some(true) && r.size.is_some()); + .any(|r| r.extended_file_metadata == Some(false) || r.size.is_none()); // If use_extended_remove_schema=false for some of the tombstones, then it should be for each. if !use_extended_remove_schema { diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index 9eb54ee02c..94effd22d8 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -645,7 +645,6 @@ mod tests { ); } - #[cfg(feature = "cloud")] #[test] #[cfg(feature = "cloud")] fn test_retry_config_from_options() { diff --git a/crates/core/tests/dat.rs b/crates/core/tests/dat.rs new file mode 100644 index 0000000000..82daf5c20e --- /dev/null +++ b/crates/core/tests/dat.rs @@ -0,0 +1,99 @@ +use std::path::Path; +use std::sync::Arc; + +use delta_kernel::Table; +use deltalake_core::kernel::snapshot_next::{LazySnapshot, Snapshot}; +use deltalake_test::acceptance::read_dat_case; + +static SKIPPED_TESTS: &[&str; 1] = &["iceberg_compat_v1"]; + +fn reader_test_lazy(path: &Path) -> datatest_stable::Result<()> { + let root_dir = format!( + "{}/{}", + env!["CARGO_MANIFEST_DIR"], + path.parent().unwrap().to_str().unwrap() + ); + for skipped in SKIPPED_TESTS { + if root_dir.ends_with(skipped) { + println!("Skipping test: {}", skipped); + return Ok(()); + } + } + + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()? + .block_on(async { + let case = read_dat_case(root_dir).unwrap(); + + let table = Table::try_from_uri(case.table_root().unwrap()).expect("table"); + let snapshot = LazySnapshot::try_new( + table, + Arc::new(object_store::local::LocalFileSystem::default()), + None, + ) + .await + .unwrap(); + + let table_info = case.table_summary().expect("load summary"); + assert_eq!(snapshot.version(), table_info.version); + assert_eq!( + ( + snapshot.protocol().min_reader_version(), + snapshot.protocol().min_writer_version() + ), + (table_info.min_reader_version, table_info.min_writer_version) + ); + }); + Ok(()) +} + +fn reader_test_eager(path: &Path) -> datatest_stable::Result<()> { + let root_dir = format!( + "{}/{}", + env!["CARGO_MANIFEST_DIR"], + path.parent().unwrap().to_str().unwrap() + ); + for skipped in SKIPPED_TESTS { + if root_dir.ends_with(skipped) { + println!("Skipping test: {}", skipped); + return Ok(()); + } + } + + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()? + .block_on(async { + let case = read_dat_case(root_dir).unwrap(); + + let table = Table::try_from_uri(case.table_root().unwrap()).expect("table"); + let snapshot = LazySnapshot::try_new( + table, + Arc::new(object_store::local::LocalFileSystem::default()), + None, + ) + .await + .unwrap(); + + let table_info = case.table_summary().expect("load summary"); + assert_eq!(snapshot.version(), table_info.version); + assert_eq!( + ( + snapshot.protocol().min_reader_version(), + snapshot.protocol().min_writer_version() + ), + (table_info.min_reader_version, table_info.min_writer_version) + ); + }); + Ok(()) +} + +datatest_stable::harness!( + reader_test_lazy, + "../../dat/out/reader_tests/generated/", + r"test_case_info\.json", + reader_test_eager, + "../../dat/out/reader_tests/generated/", + r"test_case_info\.json" +); diff --git a/crates/test/Cargo.toml b/crates/test/Cargo.toml index 2d13097c3e..d793a08407 100644 --- a/crates/test/Cargo.toml +++ b/crates/test/Cargo.toml @@ -5,9 +5,18 @@ edition = "2021" publish = false [dependencies] +delta_kernel = { workspace = true } +deltalake-core = { version = "0.25.0", path = "../core" } + +arrow-array = { workspace = true, features = ["chrono-tz"] } +arrow-cast = { workspace = true } +arrow-ord = { workspace = true } +arrow-schema = { workspace = true, features = ["serde"] } +arrow-select = { workspace = true } +parquet = { workspace = true, features = ["async", "object_store"] } + bytes = { workspace = true } chrono = { workspace = true, default-features = false, features = ["clock"] } -deltalake-core = { version = "0.25.0", path = "../core" } dotenvy = "0" fs_extra = "1.3.0" futures = { version = "0.3" } @@ -16,7 +25,9 @@ rand = "0.8" serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } tempfile = "3" +thiserror = { workspace = true } tokio = { version = "1", features = ["macros", "rt-multi-thread"] } +url = { workspace = true } [features] default = [] diff --git a/crates/test/src/acceptance/data.rs b/crates/test/src/acceptance/data.rs new file mode 100644 index 0000000000..6d8ae4dbca --- /dev/null +++ b/crates/test/src/acceptance/data.rs @@ -0,0 +1,130 @@ +use std::{path::Path, sync::Arc}; + +use arrow_array::{Array, RecordBatch}; +use arrow_ord::sort::{lexsort_to_indices, SortColumn}; +use arrow_schema::{DataType, Schema}; +use arrow_select::{concat::concat_batches, take::take}; +use delta_kernel::DeltaResult; +use futures::{stream::TryStreamExt, StreamExt}; +use object_store::{local::LocalFileSystem, ObjectStore}; +use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; + +use super::TestCaseInfo; +use crate::TestResult; + +pub async fn read_golden(path: &Path, _version: Option<&str>) -> DeltaResult { + let expected_root = path.join("expected").join("latest").join("table_content"); + let store = Arc::new(LocalFileSystem::new_with_prefix(&expected_root)?); + let files: Vec<_> = store.list(None).try_collect().await?; + let mut batches = vec![]; + let mut schema = None; + for meta in files.into_iter() { + if let Some(ext) = meta.location.extension() { + if ext == "parquet" { + let reader = ParquetObjectReader::new(store.clone(), meta); + let builder = ParquetRecordBatchStreamBuilder::new(reader).await?; + if schema.is_none() { + schema = Some(builder.schema().clone()); + } + let mut stream = builder.build()?; + while let Some(batch) = stream.next().await { + batches.push(batch?); + } + } + } + } + let all_data = concat_batches(&schema.unwrap(), &batches)?; + Ok(all_data) +} + +pub fn sort_record_batch(batch: RecordBatch) -> DeltaResult { + // Sort by all columns + let mut sort_columns = vec![]; + for col in batch.columns() { + match col.data_type() { + DataType::Struct(_) | DataType::List(_) | DataType::Map(_, _) => { + // can't sort structs, lists, or maps + } + _ => sort_columns.push(SortColumn { + values: col.clone(), + options: None, + }), + } + } + let indices = lexsort_to_indices(&sort_columns, None)?; + let columns = batch + .columns() + .iter() + .map(|c| take(c, &indices, None).unwrap()) + .collect(); + Ok(RecordBatch::try_new(batch.schema(), columns)?) +} + +// Ensure that two schema have the same field names, and dict_id/ordering. +// We ignore: +// - data type: This is checked already in `assert_columns_match` +// - nullability: parquet marks many things as nullable that we don't in our schema +// - metadata: because that diverges from the real data to the golden tabled data +fn assert_schema_fields_match(schema: &Schema, golden: &Schema) { + for (schema_field, golden_field) in schema.fields.iter().zip(golden.fields.iter()) { + assert!( + schema_field.name() == golden_field.name(), + "Field names don't match" + ); + assert!( + schema_field.dict_id() == golden_field.dict_id(), + "Field dict_id doesn't match" + ); + assert!( + schema_field.dict_is_ordered() == golden_field.dict_is_ordered(), + "Field dict_is_ordered doesn't match" + ); + } +} + +// some things are equivalent, but don't show up as equivalent for `==`, so we normalize here +fn normalize_col(col: Arc) -> Arc { + if let DataType::Timestamp(unit, Some(zone)) = col.data_type() { + if **zone == *"+00:00" { + arrow_cast::cast::cast(&col, &DataType::Timestamp(*unit, Some("UTC".into()))) + .expect("Could not cast to UTC") + } else { + col + } + } else { + col + } +} + +fn assert_columns_match(actual: &[Arc], expected: &[Arc]) { + for (actual, expected) in actual.iter().zip(expected) { + let actual = normalize_col(actual.clone()); + let expected = normalize_col(expected.clone()); + // note that array equality includes data_type equality + // See: https://arrow.apache.org/rust/arrow_data/equal/fn.equal.html + assert_eq!( + &actual, &expected, + "Column data didn't match. Got {actual:?}, expected {expected:?}" + ); + } +} + +pub async fn assert_scan_data( + all_data: Vec, + test_case: &TestCaseInfo, +) -> TestResult<()> { + let all_data = concat_batches(&all_data[0].schema(), all_data.iter()).unwrap(); + let all_data = sort_record_batch(all_data)?; + + let golden = read_golden(test_case.root_dir(), None).await?; + let golden = sort_record_batch(golden)?; + + assert_columns_match(all_data.columns(), golden.columns()); + assert_schema_fields_match(all_data.schema().as_ref(), golden.schema().as_ref()); + assert!( + all_data.num_rows() == golden.num_rows(), + "Didn't have same number of rows" + ); + + Ok(()) +} diff --git a/crates/test/src/acceptance/meta.rs b/crates/test/src/acceptance/meta.rs new file mode 100644 index 0000000000..6a44f2cb69 --- /dev/null +++ b/crates/test/src/acceptance/meta.rs @@ -0,0 +1,73 @@ +use std::collections::HashMap; +use std::fs::File; +use std::path::{Path, PathBuf}; + +use delta_kernel::{Error, Version}; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Debug, thiserror::Error)] +pub enum AssertionError { + #[error("Invalid test case data")] + InvalidTestCase, + + #[error("Kernel error: {0}")] + KernelError(#[from] Error), +} + +pub type TestResult = std::result::Result; + +#[derive(Serialize, Deserialize, PartialEq, Eq, Debug)] +struct TestCaseInfoJson { + name: String, + description: String, +} + +#[derive(PartialEq, Eq, Debug)] +pub struct TestCaseInfo { + name: String, + description: String, + root_dir: PathBuf, +} + +impl TestCaseInfo { + /// Root path for this test cases Delta table. + pub fn table_root(&self) -> TestResult { + let table_root = self.root_dir.join("delta"); + Url::from_directory_path(table_root).map_err(|_| AssertionError::InvalidTestCase) + } + + pub fn root_dir(&self) -> &PathBuf { + &self.root_dir + } + + pub fn table_summary(&self) -> TestResult { + let info_path = self + .root_dir() + .join("expected/latest/table_version_metadata.json"); + let file = File::open(info_path).map_err(|_| AssertionError::InvalidTestCase)?; + let info: TableVersionMetaData = + serde_json::from_reader(file).map_err(|_| AssertionError::InvalidTestCase)?; + Ok(info) + } +} + +#[derive(Serialize, Deserialize, PartialEq, Eq, Debug)] +pub struct TableVersionMetaData { + pub version: Version, + pub properties: HashMap, + pub min_reader_version: i32, + pub min_writer_version: i32, +} + +pub fn read_dat_case(case_root: impl AsRef) -> TestResult { + let info_path = case_root.as_ref().join("test_case_info.json"); + let file = File::open(info_path).map_err(|_| AssertionError::InvalidTestCase)?; + let info: TestCaseInfoJson = + serde_json::from_reader(file).map_err(|_| AssertionError::InvalidTestCase)?; + Ok(TestCaseInfo { + root_dir: case_root.as_ref().into(), + name: info.name, + description: info.description, + }) +} diff --git a/crates/test/src/acceptance/mod.rs b/crates/test/src/acceptance/mod.rs new file mode 100644 index 0000000000..521fd294ae --- /dev/null +++ b/crates/test/src/acceptance/mod.rs @@ -0,0 +1,5 @@ +pub mod data; +pub mod meta; + +pub use data::*; +pub use meta::*; diff --git a/crates/test/src/lib.rs b/crates/test/src/lib.rs index 6930f6a718..2095730475 100644 --- a/crates/test/src/lib.rs +++ b/crates/test/src/lib.rs @@ -14,6 +14,7 @@ use deltalake_core::DeltaTableBuilder; use deltalake_core::{ObjectStore, Path}; use tempfile::TempDir; +pub mod acceptance; pub mod clock; pub mod concurrent; #[cfg(feature = "datafusion")]