diff --git a/Cargo.lock b/Cargo.lock index 26bc608dd..f93f5f3b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1199,6 +1199,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "bytemuck" +version = "1.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef657dfab802224e671f5818e9a4935f9b1957ed18e58292690cc39e7a4092a3" + [[package]] name = "byteorder" version = "1.5.0" @@ -2971,6 +2977,7 @@ dependencies = [ "pretty_assertions", "rand 0.8.5", "reqwest", + "roaring", "rust_decimal", "serde", "serde_bytes", @@ -4945,6 +4952,16 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" +[[package]] +name = "roaring" +version = "0.10.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a652edd001c53df0b3f96a36a8dc93fce6866988efc16808235653c6bcac8bf2" +dependencies = [ + "bytemuck", + "byteorder", +] + [[package]] name = "rsa" version = "0.9.7" diff --git a/Cargo.toml b/Cargo.toml index 8d093a27b..42516e503 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,6 +86,7 @@ port_scanner = "0.1.5" rand = "0.8.5" regex = "1.10.5" reqwest = { version = "0.12.2", default-features = false, features = ["json"] } +roaring = "0.10" rust_decimal = "1.31" serde = { version = "1.0.204", features = ["rc"] } serde_bytes = "0.11.15" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 7320c455d..5bcacb095 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -72,6 +72,7 @@ parquet = { workspace = true, features = ["async"] } paste = { workspace = true } rand = { workspace = true } reqwest = { workspace = true } +roaring = { workspace = true } rust_decimal = { workspace = true } serde = { workspace = true } serde_bytes = { workspace = true } diff --git a/crates/iceberg/src/arrow/delete_file_manager.rs b/crates/iceberg/src/arrow/delete_file_manager.rs new file mode 100644 index 000000000..0d46f4768 --- /dev/null +++ b/crates/iceberg/src/arrow/delete_file_manager.rs @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::delete_vector::DeleteVector; +use crate::expr::BoundPredicate; +use crate::io::FileIO; +use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; +use crate::spec::SchemaRef; +use crate::{Error, ErrorKind, Result}; + +#[allow(unused)] +pub trait DeleteFileManager { + /// Read the delete file referred to in the task + /// + /// Returns the raw contents of the delete file as a RecordBatch stream + fn read_delete_file(task: &FileScanTaskDeleteFile) -> Result; +} + +#[allow(unused)] +#[derive(Clone, Debug)] +pub(crate) struct CachingDeleteFileManager { + file_io: FileIO, + concurrency_limit_data_files: usize, +} + +impl DeleteFileManager for CachingDeleteFileManager { + fn read_delete_file(_task: &FileScanTaskDeleteFile) -> Result { + // TODO, implementation in https://github.com/apache/iceberg-rust/pull/982 + + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Reading delete files is not yet supported", + )) + } +} + +#[allow(unused_variables)] +impl CachingDeleteFileManager { + pub fn new(file_io: FileIO, concurrency_limit_data_files: usize) -> CachingDeleteFileManager { + Self { + file_io, + concurrency_limit_data_files, + } + } + + pub(crate) async fn load_deletes( + &self, + delete_file_entries: Vec, + ) -> Result<()> { + // TODO + + if !delete_file_entries.is_empty() { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Reading delete files is not yet supported", + )) + } else { + Ok(()) + } + } + + pub(crate) fn build_delete_predicate( + &self, + snapshot_schema: SchemaRef, + ) -> Result> { + // TODO + + Ok(None) + } + + pub(crate) fn get_positional_delete_indexes_for_data_file( + &self, + data_file_path: &str, + ) -> Option { + // TODO + + None + } +} diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index 0c885e65f..0369db075 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -19,6 +19,7 @@ mod schema; pub use schema::*; +pub(crate) mod delete_file_manager; mod reader; pub(crate) mod record_batch_projector; pub(crate) mod record_batch_transformer; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 6915ef920..4d8acf5ee 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -36,11 +36,13 @@ use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY}; -use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; +use crate::arrow::delete_file_manager::CachingDeleteFileManager; use crate::arrow::record_batch_transformer::RecordBatchTransformer; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; +use crate::delete_vector::DeleteVector; use crate::error::Result; use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator; @@ -104,7 +106,11 @@ impl ArrowReaderBuilder { pub fn build(self) -> ArrowReader { ArrowReader { batch_size: self.batch_size, - file_io: self.file_io, + file_io: self.file_io.clone(), + delete_file_manager: CachingDeleteFileManager::new( + self.file_io.clone(), + self.concurrency_limit_data_files, + ), concurrency_limit_data_files: self.concurrency_limit_data_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, @@ -117,6 +123,7 @@ impl ArrowReaderBuilder { pub struct ArrowReader { batch_size: Option, file_io: FileIO, + delete_file_manager: CachingDeleteFileManager, /// the maximum number of data files that can be fetched at the same time concurrency_limit_data_files: usize, @@ -143,6 +150,7 @@ impl ArrowReader { task, batch_size, file_io, + self.delete_file_manager.clone(), row_group_filtering_enabled, row_selection_enabled, ) @@ -160,32 +168,22 @@ impl ArrowReader { task: FileScanTask, batch_size: Option, file_io: FileIO, + delete_file_manager: CachingDeleteFileManager, row_group_filtering_enabled: bool, row_selection_enabled: bool, ) -> Result { - // TODO: add support for delete files - if !task.deletes.is_empty() { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - "Delete files are not yet supported", - )); - } - - // Get the metadata for the Parquet file we need to read and build - // a reader for the data within - let parquet_file = file_io.new_input(&task.data_file_path)?; - let (parquet_metadata, parquet_reader) = - try_join!(parquet_file.metadata(), parquet_file.reader())?; - let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); - - let should_load_page_index = row_selection_enabled && task.predicate.is_some(); - - // Start creating the record batch stream, which wraps the parquet file reader - let mut record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options( - parquet_file_reader, - ArrowReaderOptions::new().with_page_index(should_load_page_index), - ) - .await?; + let should_load_page_index = + (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); + + // concurrently retrieve delete files and create RecordBatchStreamBuilder + let (_, mut record_batch_stream_builder) = try_join!( + delete_file_manager.load_deletes(task.deletes.clone()), + Self::create_parquet_record_batch_stream_builder( + &task.data_file_path, + file_io.clone(), + should_load_page_index, + ) + )?; // Create a projection mask for the batch stream to select which columns in the // Parquet file that we want in the response @@ -197,7 +195,7 @@ impl ArrowReader { )?; record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask); - // RecordBatchTransformer performs any required transformations on the RecordBatches + // RecordBatchTransformer performs any transformations required on the RecordBatches // that come back from the file, such as type promotion, default column insertion // and column re-ordering let mut record_batch_transformer = @@ -207,49 +205,102 @@ impl ArrowReader { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); } - if let Some(predicate) = &task.predicate { + let delete_predicate = delete_file_manager.build_delete_predicate(task.schema.clone())?; + + // In addition to the optional predicate supplied in the `FileScanTask`, + // we also have an optional predicate resulting from equality delete files. + // If both are present, we logical-AND them together to form a single filter + // predicate that we can pass to the `RecordBatchStreamBuilder`. + let final_predicate = match (&task.predicate, delete_predicate) { + (None, None) => None, + (Some(predicate), None) => Some(predicate.clone()), + (None, Some(ref predicate)) => Some(predicate.clone()), + (Some(filter_predicate), Some(delete_predicate)) => { + Some(filter_predicate.clone().and(delete_predicate)) + } + }; + + // There are two possible sources both for potential lists of selected RowGroup indices, + // and for `RowSelection`s. + // Selected RowGroup index lists can come from two sources: + // * When there are equality delete files that are applicable; + // * When there is a scan predicate and row_group_filtering_enabled = true. + // `RowSelection`s can be created in either or both of the following cases: + // * When there are positional delete files that are applicable; + // * When there is a scan predicate and row_selection_enabled = true + // Note that, in the former case we only perform row group filtering when + // there is a scan predicate AND row_group_filtering_enabled = true, + // but we perform row selection filtering if there are applicable + // equality delete files OR (there is a scan predicate AND row_selection_enabled), + // since the only implemented method of applying positional deletes is + // by using a `RowSelection`. + let mut selected_row_group_indices = None; + let mut row_selection = None; + + if let Some(predicate) = final_predicate { let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map( record_batch_stream_builder.parquet_schema(), - predicate, + &predicate, )?; let row_filter = Self::get_row_filter( - predicate, + &predicate, record_batch_stream_builder.parquet_schema(), &iceberg_field_ids, &field_id_map, )?; record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter); - let mut selected_row_groups = None; if row_group_filtering_enabled { let result = Self::get_selected_row_group_indices( - predicate, + &predicate, record_batch_stream_builder.metadata(), &field_id_map, &task.schema, )?; - selected_row_groups = Some(result); + selected_row_group_indices = Some(result); } if row_selection_enabled { - let row_selection = Self::get_row_selection( - predicate, + row_selection = Some(Self::get_row_selection_for_filter_predicate( + &predicate, record_batch_stream_builder.metadata(), - &selected_row_groups, + &selected_row_group_indices, &field_id_map, &task.schema, - )?; - - record_batch_stream_builder = - record_batch_stream_builder.with_row_selection(row_selection); + )?); } + } - if let Some(selected_row_groups) = selected_row_groups { - record_batch_stream_builder = - record_batch_stream_builder.with_row_groups(selected_row_groups); - } + let positional_delete_indexes = + delete_file_manager.get_positional_delete_indexes_for_data_file(&task.data_file_path); + + if let Some(positional_delete_indexes) = positional_delete_indexes { + let delete_row_selection = Self::build_deletes_row_selection( + record_batch_stream_builder.metadata().row_groups(), + &selected_row_group_indices, + positional_delete_indexes, + )?; + + // merge the row selection from the delete files with the row selection + // from the filter predicate, if there is one from the filter predicate + row_selection = match row_selection { + None => Some(delete_row_selection), + Some(filter_row_selection) => { + Some(filter_row_selection.intersection(&delete_row_selection)) + } + }; + } + + if let Some(row_selection) = row_selection { + record_batch_stream_builder = + record_batch_stream_builder.with_row_selection(row_selection); + } + + if let Some(selected_row_group_indices) = selected_row_group_indices { + record_batch_stream_builder = + record_batch_stream_builder.with_row_groups(selected_row_group_indices); } // Build the batch stream and send all the RecordBatches that it generates @@ -265,6 +316,43 @@ impl ArrowReader { Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } + async fn create_parquet_record_batch_stream_builder( + data_file_path: &str, + file_io: FileIO, + should_load_page_index: bool, + ) -> Result>> { + // Get the metadata for the Parquet file we need to read and build + // a reader for the data within + let parquet_file = file_io.new_input(data_file_path)?; + let (parquet_metadata, parquet_reader) = + try_join!(parquet_file.metadata(), parquet_file.reader())?; + let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); + + // Create the record batch stream builder, which wraps the parquet file reader + let record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options( + parquet_file_reader, + ArrowReaderOptions::new().with_page_index(should_load_page_index), + ) + .await?; + Ok(record_batch_stream_builder) + } + + /// computes a `RowSelection` from positional delete indices. + /// + /// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated + /// as having been deleted by a positional delete, taking into account any row groups that have + /// been skipped entirely by the filter predicate + #[allow(unused)] + fn build_deletes_row_selection( + row_group_metadata: &[RowGroupMetaData], + selected_row_groups: &Option>, + mut positional_deletes: DeleteVector, + ) -> Result { + // TODO + + Ok(RowSelection::default()) + } + fn build_field_id_set_and_map( parquet_schema: &SchemaDescriptor, predicate: &BoundPredicate, @@ -475,7 +563,7 @@ impl ArrowReader { Ok(results) } - fn get_row_selection( + fn get_row_selection_for_filter_predicate( predicate: &BoundPredicate, parquet_metadata: &Arc, selected_row_groups: &Option>, diff --git a/crates/iceberg/src/delete_vector.rs b/crates/iceberg/src/delete_vector.rs new file mode 100644 index 000000000..d57f7e4be --- /dev/null +++ b/crates/iceberg/src/delete_vector.rs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use roaring::RoaringTreemap; + +#[allow(unused)] +pub struct DeleteVector { + inner: RoaringTreemap, +} + +impl DeleteVector { + pub fn iter(&self) -> DeleteVectorIterator { + let mut iter = self.inner.bitmaps(); + match iter.next() { + Some((high_bits, bitmap)) => { + DeleteVectorIterator { + inner: Some(DeleteVectorIteratorInner { + // iter, + high_bits: (high_bits as u64) << 32, + bitmap_iter: bitmap.iter(), + }), + } + } + _ => DeleteVectorIterator { inner: None }, + } + } +} + +pub struct DeleteVectorIterator<'a> { + inner: Option>, +} + +struct DeleteVectorIteratorInner<'a> { + // TODO: roaring::treemap::iter::BitmapIter is currently private. + // See https://github.com/RoaringBitmap/roaring-rs/issues/312 + // iter: roaring::treemap::iter::BitmapIter<'a>, + high_bits: u64, + bitmap_iter: roaring::bitmap::Iter<'a>, +} + +impl Iterator for DeleteVectorIterator<'_> { + type Item = u64; + + fn next(&mut self) -> Option { + let Some(ref mut inner) = &mut self.inner else { + return None; + }; + + if let Some(lower) = inner.bitmap_iter.next() { + return Some(inner.high_bits & lower as u64); + }; + + // TODO: roaring::treemap::iter::BitmapIter is currently private. + // See https://github.com/RoaringBitmap/roaring-rs/issues/312 + + // replace with commented-out code below once BitmapIter is pub, + // or use RoaringTreemap::iter if `advance_to` gets implemented natively + None + + // let Some((high_bits, bitmap)) = inner.iter.next() else { + // self.inner = None; + // return None; + // }; + // + // inner.high_bits = (high_bits as u64) << 32; + // inner.bitmap_iter = bitmap.iter(); + // + // self.next() + } +} + +impl<'a> DeleteVectorIterator<'a> { + pub fn advance_to(&'a mut self, _pos: u64) { + // TODO + } +} diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index 64bfaa062..18b570055 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -726,6 +726,12 @@ pub enum BoundPredicate { Set(SetExpression), } +impl BoundPredicate { + pub(crate) fn and(self, other: BoundPredicate) -> BoundPredicate { + BoundPredicate::And(LogicalExpression::new([Box::new(self), Box::new(other)])) + } +} + impl Display for BoundPredicate { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index d684be54c..80444df4a 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -87,4 +87,5 @@ pub(crate) mod delete_file_index; mod utils; pub mod writer; +mod delete_vector; mod puffin;