Skip to content

Commit 9a3fa7c

Browse files
committed
refactor: delete row selection uses roaring treemap iterator and advance_to
1 parent 477a6ba commit 9a3fa7c

File tree

5 files changed

+98
-80
lines changed

5 files changed

+98
-80
lines changed

Cargo.lock

+1-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ port_scanner = "0.1.5"
8383
rand = "0.8.5"
8484
regex = "1.10.5"
8585
reqwest = { version = "0.12.2", default-features = false, features = ["json"] }
86-
roaring = "0.10"
86+
roaring = { version = "0.10", git = "https://github.com/RoaringBitmap/roaring-rs.git" }
8787
rust_decimal = "1.31"
8888
serde = { version = "1.0.204", features = ["rc"] }
8989
serde_bytes = "0.11.15"

crates/iceberg/src/arrow/delete_file_manager.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::sync::Arc;
19+
1820
use crate::delete_vector::DeleteVector;
1921
use crate::expr::BoundPredicate;
2022
use crate::io::FileIO;
@@ -85,7 +87,7 @@ impl CachingDeleteFileManager {
8587
pub(crate) fn get_positional_delete_indexes_for_data_file(
8688
&self,
8789
data_file_path: &str,
88-
) -> Option<DeleteVector> {
90+
) -> Option<Arc<DeleteVector>> {
8991
// TODO
9092

9193
None

crates/iceberg/src/arrow/reader.rs

+36-34
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ impl ArrowReader {
282282
let delete_row_selection = Self::build_deletes_row_selection(
283283
record_batch_stream_builder.metadata().row_groups(),
284284
&selected_row_group_indices,
285-
positional_delete_indexes,
285+
positional_delete_indexes.as_ref(),
286286
)?;
287287

288288
// merge the row selection from the delete files with the row selection
@@ -345,17 +345,19 @@ impl ArrowReader {
345345
/// as having been deleted by a positional delete, taking into account any row groups that have
346346
/// been skipped entirely by the filter predicate
347347
fn build_deletes_row_selection(
348-
row_group_metadata: &[RowGroupMetaData],
348+
row_group_metadata_list: &[RowGroupMetaData],
349349
selected_row_groups: &Option<Vec<usize>>,
350-
mut positional_deletes: DeleteVector,
350+
positional_deletes: &DeleteVector,
351351
) -> Result<RowSelection> {
352352
let mut results: Vec<RowSelector> = Vec::new();
353353
let mut selected_row_groups_idx = 0;
354-
let mut current_page_base_idx: u64 = 0;
354+
let mut current_row_group_base_idx: u64 = 0;
355+
let mut delete_vector_iter = positional_deletes.iter();
356+
let mut next_deleted_row_idx_opt = delete_vector_iter.next();
355357

356-
for (idx, row_group_metadata) in row_group_metadata.iter().enumerate() {
357-
let page_num_rows = row_group_metadata.num_rows() as u64;
358-
let next_page_base_idx = current_page_base_idx + page_num_rows;
358+
for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
359+
let row_group_num_rows = row_group_metadata.num_rows() as u64;
360+
let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
359361

360362
// if row group selection is enabled,
361363
if let Some(selected_row_groups) = selected_row_groups {
@@ -372,36 +374,37 @@ impl ArrowReader {
372374
} else {
373375
// remove any positional deletes from the skipped page so that
374376
// `positional.deletes.min()` can be used
375-
positional_deletes.remove_range(current_page_base_idx..next_page_base_idx);
377+
delete_vector_iter.advance_to(next_row_group_base_idx);
378+
next_deleted_row_idx_opt = delete_vector_iter.next();
376379

377380
// still increment the current page base index but then skip to the next row group
378381
// in the file
379-
current_page_base_idx += page_num_rows;
382+
current_row_group_base_idx += row_group_num_rows;
380383
continue;
381384
}
382385
}
383386

384-
let mut next_deleted_row_idx = match positional_deletes.min() {
387+
let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
385388
Some(next_deleted_row_idx) => {
386-
// if the index of the next deleted row is beyond this page, add a selection for
387-
// the remainder of this page and skip to the next page
388-
if next_deleted_row_idx >= next_page_base_idx {
389-
results.push(RowSelector::select(page_num_rows as usize));
389+
// if the index of the next deleted row is beyond this row group, add a selection for
390+
// the remainder of this row group and skip to the next row group
391+
if next_deleted_row_idx >= next_row_group_base_idx {
392+
results.push(RowSelector::select(row_group_num_rows as usize));
390393
continue;
391394
}
392395

393396
next_deleted_row_idx
394397
}
395398

396-
// If there are no more pos deletes, add a selector for the entirety of this page.
399+
// If there are no more pos deletes, add a selector for the entirety of this row group.
397400
_ => {
398-
results.push(RowSelector::select(page_num_rows as usize));
401+
results.push(RowSelector::select(row_group_num_rows as usize));
399402
continue;
400403
}
401404
};
402405

403-
let mut current_idx = current_page_base_idx;
404-
'chunks: while next_deleted_row_idx < next_page_base_idx {
406+
let mut current_idx = current_row_group_base_idx;
407+
'chunks: while next_deleted_row_idx < next_row_group_base_idx {
405408
// `select` all rows that precede the next delete index
406409
if current_idx < next_deleted_row_idx {
407410
let run_length = next_deleted_row_idx - current_idx;
@@ -412,18 +415,18 @@ impl ArrowReader {
412415
// `skip` all consecutive deleted rows in the current row group
413416
let mut run_length = 0;
414417
while next_deleted_row_idx == current_idx
415-
&& next_deleted_row_idx < next_page_base_idx
418+
&& next_deleted_row_idx < next_row_group_base_idx
416419
{
417420
run_length += 1;
418421
current_idx += 1;
419-
positional_deletes.remove(next_deleted_row_idx);
420422

421-
next_deleted_row_idx = match positional_deletes.min() {
423+
next_deleted_row_idx_opt = delete_vector_iter.next();
424+
next_deleted_row_idx = match next_deleted_row_idx_opt {
422425
Some(next_deleted_row_idx) => next_deleted_row_idx,
423426
_ => {
424427
// We've processed the final positional delete.
425428
// Conclude the skip and then break so that we select the remaining
426-
// rows in the page and move on to the next row group
429+
// rows in the row group and move on to the next row group
427430
results.push(RowSelector::skip(run_length));
428431
break 'chunks;
429432
}
@@ -432,13 +435,13 @@ impl ArrowReader {
432435
results.push(RowSelector::skip(run_length));
433436
}
434437

435-
if current_idx < next_page_base_idx {
438+
if current_idx < next_row_group_base_idx {
436439
results.push(RowSelector::select(
437-
(next_page_base_idx - current_idx) as usize,
440+
(next_row_group_base_idx - current_idx) as usize,
438441
));
439442
}
440443

441-
current_page_base_idx += page_num_rows;
444+
current_row_group_base_idx += row_group_num_rows;
442445
}
443446

444447
Ok(results.into())
@@ -1375,18 +1378,19 @@ mod tests {
13751378
use arrow_array::{ArrayRef, RecordBatch, StringArray};
13761379
use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
13771380
use futures::TryStreamExt;
1381+
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
13781382
use parquet::arrow::{ArrowWriter, ProjectionMask};
13791383
use parquet::basic::Compression;
1380-
use parquet::file::properties::WriterProperties;
1381-
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
13821384
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1385+
use parquet::file::properties::WriterProperties;
13831386
use parquet::schema::parser::parse_message_type;
1384-
use tempfile::TempDir;
13851387
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
13861388
use roaring::RoaringTreemap;
1389+
use tempfile::TempDir;
13871390

13881391
use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
13891392
use crate::arrow::{ArrowReader, ArrowReaderBuilder};
1393+
use crate::delete_vector::DeleteVector;
13901394
use crate::expr::visitors::bound_predicate_visitor::visit;
13911395
use crate::expr::{Bind, Predicate, Reference};
13921396
use crate::io::FileIO;
@@ -1733,16 +1737,14 @@ message schema {
17331737
2999, // single item at end of selected rg3 (1)
17341738
3000, // single item at start of skipped rg4
17351739
]);
1736-
1737-
let positional_deletes = DeleteVector {
1738-
inner: positional_deletes
1739-
};
1740+
1741+
let positional_deletes = DeleteVector::new(positional_deletes);
17401742

17411743
// using selected row groups 1 and 3
17421744
let result = ArrowReader::build_deletes_row_selection(
17431745
&row_groups_metadata,
17441746
&selected_row_groups,
1745-
positional_deletes.clone(),
1747+
&positional_deletes,
17461748
)
17471749
.unwrap();
17481750

@@ -1766,7 +1768,7 @@ message schema {
17661768
let result = ArrowReader::build_deletes_row_selection(
17671769
&row_groups_metadata,
17681770
&None,
1769-
positional_deletes,
1771+
&positional_deletes,
17701772
)
17711773
.unwrap();
17721774

crates/iceberg/src/delete_vector.rs

+57-42
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use roaring::bitmap::Iter;
19+
use roaring::treemap::BitmapIter;
1820
use roaring::RoaringTreemap;
1921

2022
#[allow(unused)]
@@ -23,68 +25,81 @@ pub struct DeleteVector {
2325
}
2426

2527
impl DeleteVector {
26-
pub fn iter(&self) -> DeleteVectorIterator {
27-
let mut iter = self.inner.bitmaps();
28-
match iter.next() {
29-
Some((high_bits, bitmap)) => {
30-
DeleteVectorIterator {
31-
inner: Some(DeleteVectorIteratorInner {
32-
// iter,
33-
high_bits: (high_bits as u64) << 32,
34-
bitmap_iter: bitmap.iter(),
35-
}),
36-
}
37-
}
38-
_ => DeleteVectorIterator { inner: None },
28+
#[allow(unused)]
29+
pub(crate) fn new(roaring_treemap: RoaringTreemap) -> DeleteVector {
30+
DeleteVector {
31+
inner: roaring_treemap,
3932
}
4033
}
34+
35+
pub fn iter(&self) -> DeleteVectorIterator {
36+
let outer = self.inner.bitmaps();
37+
DeleteVectorIterator { outer, inner: None }
38+
}
4139
}
4240

41+
// Ideally, we'd just wrap `roaring::RoaringTreemap`'s iterator, `roaring::treemap::Iter` here.
42+
// But right now, it does not have a corresponding implementation of `roaring::bitmap::Iter::advance_to`,
43+
// which is very handy in ArrowReader::build_deletes_row_selection.
44+
// There is a PR open on roaring to add this (https://github.com/RoaringBitmap/roaring-rs/pull/314)
45+
// and if that gets merged then we can simplify `DeleteVectorIterator` here, refactoring `advance_to`
46+
// to just a wrapper around the underlying iterator's method.
4347
pub struct DeleteVectorIterator<'a> {
48+
// NB: `BitMapIter` was only exposed publicly in https://github.com/RoaringBitmap/roaring-rs/pull/316
49+
// which is not yet released. As a consequence our Cargo.toml temporarily uses a git reference for
50+
// the roaring dependency.
51+
outer: BitmapIter<'a>,
4452
inner: Option<DeleteVectorIteratorInner<'a>>,
4553
}
4654

4755
struct DeleteVectorIteratorInner<'a> {
48-
// TODO: roaring::treemap::iter::BitmapIter is currently private.
49-
// See https://github.com/RoaringBitmap/roaring-rs/issues/312
50-
// iter: roaring::treemap::iter::BitmapIter<'a>,
51-
high_bits: u64,
52-
bitmap_iter: roaring::bitmap::Iter<'a>,
56+
high_bits: u32,
57+
bitmap_iter: Iter<'a>,
5358
}
5459

5560
impl Iterator for DeleteVectorIterator<'_> {
5661
type Item = u64;
5762

5863
fn next(&mut self) -> Option<Self::Item> {
59-
let Some(ref mut inner) = &mut self.inner else {
60-
return None;
61-
};
62-
63-
if let Some(lower) = inner.bitmap_iter.next() {
64-
return Some(inner.high_bits & lower as u64);
65-
};
66-
67-
// TODO: roaring::treemap::iter::BitmapIter is currently private.
68-
// See https://github.com/RoaringBitmap/roaring-rs/issues/312
64+
if let Some(ref mut inner) = &mut self.inner {
65+
if let Some(inner_next) = inner.bitmap_iter.next() {
66+
return Some(u64::from(inner.high_bits) << 32 | u64::from(inner_next));
67+
}
68+
}
6969

70-
// replace with commented-out code below once BitmapIter is pub,
71-
// or use RoaringTreemap::iter if `advance_to` gets implemented natively
72-
None
70+
if let Some((high_bits, next_bitmap)) = self.outer.next() {
71+
self.inner = Some(DeleteVectorIteratorInner {
72+
high_bits,
73+
bitmap_iter: next_bitmap.iter(),
74+
})
75+
} else {
76+
return None;
77+
}
7378

74-
// let Some((high_bits, bitmap)) = inner.iter.next() else {
75-
// self.inner = None;
76-
// return None;
77-
// };
78-
//
79-
// inner.high_bits = (high_bits as u64) << 32;
80-
// inner.bitmap_iter = bitmap.iter();
81-
//
82-
// self.next()
79+
self.next()
8380
}
8481
}
8582

8683
impl<'a> DeleteVectorIterator<'a> {
87-
pub fn advance_to(&'a mut self, _pos: u64) {
88-
// TODO
84+
pub fn advance_to(&mut self, pos: u64) {
85+
let hi = (pos >> 32) as u32;
86+
let lo = pos as u32;
87+
88+
let Some(ref mut inner) = self.inner else {
89+
return;
90+
};
91+
92+
while inner.high_bits < hi {
93+
let Some((next_hi, next_bitmap)) = self.outer.next() else {
94+
return;
95+
};
96+
97+
*inner = DeleteVectorIteratorInner {
98+
high_bits: next_hi,
99+
bitmap_iter: next_bitmap.iter(),
100+
}
101+
}
102+
103+
inner.bitmap_iter.advance_to(lo);
89104
}
90105
}

0 commit comments

Comments
 (0)