Skip to content

Commit 04a0d07

Browse files
sddliurenjie1024
andauthored
Scan Delete Support Part 3: ArrowReader::build_deletes_row_selection implementation (#951)
Third part of delete file read support. See #630 **Builds on top of #950 `build_deletes_row_selection` computes a `RowSelection` from a `RoaringTreemap` representing the indexes of rows in a data file that have been marked as deleted by positional delete files that apply to the data file being read (and, in the future, delete vectors). The resulting `RowSelection` will be merged with a `RowSelection` resulting from the scan's filter predicate (if present) and supplied to the `ParquetRecordBatchStreamBuilder` so that deleted rows are omitted from the `RecordBatchStream` returned by the reader. NB: I encountered quite a few edge cases in this method and the logic is quite complex. There is a good chance that a keen-eyed reviewer would be able to conceive of an edge-case that I haven't covered. --------- Co-authored-by: Renjie Liu <[email protected]>
1 parent cbd9ca4 commit 04a0d07

File tree

5 files changed

+314
-54
lines changed

5 files changed

+314
-54
lines changed

Cargo.lock

+1-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ port_scanner = "0.1.5"
8383
rand = "0.8.5"
8484
regex = "1.10.5"
8585
reqwest = { version = "0.12.2", default-features = false, features = ["json"] }
86-
roaring = "0.10"
86+
roaring = { version = "0.10", git = "https://github.com/RoaringBitmap/roaring-rs.git" }
8787
rust_decimal = "1.31"
8888
serde = { version = "1.0.204", features = ["rc"] }
8989
serde_bytes = "0.11.15"

crates/iceberg/src/arrow/delete_file_manager.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::sync::Arc;
19+
1820
use crate::delete_vector::DeleteVector;
1921
use crate::expr::BoundPredicate;
2022
use crate::io::FileIO;
@@ -85,7 +87,7 @@ impl CachingDeleteFileManager {
8587
pub(crate) fn get_positional_delete_indexes_for_data_file(
8688
&self,
8789
data_file_path: &str,
88-
) -> Option<DeleteVector> {
90+
) -> Option<Arc<DeleteVector>> {
8991
// TODO
9092

9193
None

crates/iceberg/src/arrow/reader.rs

+252-8
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ use bytes::Bytes;
3333
use fnv::FnvHashSet;
3434
use futures::future::BoxFuture;
3535
use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
36-
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection};
36+
use parquet::arrow::arrow_reader::{
37+
ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
38+
};
3739
use parquet::arrow::async_reader::AsyncFileReader;
3840
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
3941
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
@@ -280,7 +282,7 @@ impl ArrowReader {
280282
let delete_row_selection = Self::build_deletes_row_selection(
281283
record_batch_stream_builder.metadata().row_groups(),
282284
&selected_row_group_indices,
283-
positional_delete_indexes,
285+
positional_delete_indexes.as_ref(),
284286
)?;
285287

286288
// merge the row selection from the delete files with the row selection
@@ -342,15 +344,109 @@ impl ArrowReader {
342344
/// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated
343345
/// as having been deleted by a positional delete, taking into account any row groups that have
344346
/// been skipped entirely by the filter predicate
345-
#[allow(unused)]
346347
fn build_deletes_row_selection(
347-
row_group_metadata: &[RowGroupMetaData],
348+
row_group_metadata_list: &[RowGroupMetaData],
348349
selected_row_groups: &Option<Vec<usize>>,
349-
mut positional_deletes: DeleteVector,
350+
positional_deletes: &DeleteVector,
350351
) -> Result<RowSelection> {
351-
// TODO
352+
let mut results: Vec<RowSelector> = Vec::new();
353+
let mut selected_row_groups_idx = 0;
354+
let mut current_row_group_base_idx: u64 = 0;
355+
let mut delete_vector_iter = positional_deletes.iter();
356+
let mut next_deleted_row_idx_opt = delete_vector_iter.next();
357+
358+
for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
359+
let row_group_num_rows = row_group_metadata.num_rows() as u64;
360+
let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
361+
362+
// if row group selection is enabled,
363+
if let Some(selected_row_groups) = selected_row_groups {
364+
// if we've consumed all the selected row groups, we're done
365+
if selected_row_groups_idx == selected_row_groups.len() {
366+
break;
367+
}
368+
369+
if idx == selected_row_groups[selected_row_groups_idx] {
370+
// we're in a selected row group. Increment selected_row_groups_idx
371+
// so that next time around the for loop we're looking for the next
372+
// selected row group
373+
selected_row_groups_idx += 1;
374+
} else {
375+
// remove any positional deletes from the skipped page so that
376+
// `positional.deletes.min()` can be used
377+
delete_vector_iter.advance_to(next_row_group_base_idx);
378+
next_deleted_row_idx_opt = delete_vector_iter.next();
379+
380+
// still increment the current page base index but then skip to the next row group
381+
// in the file
382+
current_row_group_base_idx += row_group_num_rows;
383+
continue;
384+
}
385+
}
386+
387+
let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
388+
Some(next_deleted_row_idx) => {
389+
// if the index of the next deleted row is beyond this row group, add a selection for
390+
// the remainder of this row group and skip to the next row group
391+
if next_deleted_row_idx >= next_row_group_base_idx {
392+
results.push(RowSelector::select(row_group_num_rows as usize));
393+
continue;
394+
}
395+
396+
next_deleted_row_idx
397+
}
398+
399+
// If there are no more pos deletes, add a selector for the entirety of this row group.
400+
_ => {
401+
results.push(RowSelector::select(row_group_num_rows as usize));
402+
continue;
403+
}
404+
};
405+
406+
let mut current_idx = current_row_group_base_idx;
407+
'chunks: while next_deleted_row_idx < next_row_group_base_idx {
408+
// `select` all rows that precede the next delete index
409+
if current_idx < next_deleted_row_idx {
410+
let run_length = next_deleted_row_idx - current_idx;
411+
results.push(RowSelector::select(run_length as usize));
412+
current_idx += run_length;
413+
}
414+
415+
// `skip` all consecutive deleted rows in the current row group
416+
let mut run_length = 0;
417+
while next_deleted_row_idx == current_idx
418+
&& next_deleted_row_idx < next_row_group_base_idx
419+
{
420+
run_length += 1;
421+
current_idx += 1;
422+
423+
next_deleted_row_idx_opt = delete_vector_iter.next();
424+
next_deleted_row_idx = match next_deleted_row_idx_opt {
425+
Some(next_deleted_row_idx) => next_deleted_row_idx,
426+
_ => {
427+
// We've processed the final positional delete.
428+
// Conclude the skip and then break so that we select the remaining
429+
// rows in the row group and move on to the next row group
430+
results.push(RowSelector::skip(run_length));
431+
break 'chunks;
432+
}
433+
};
434+
}
435+
if run_length > 0 {
436+
results.push(RowSelector::skip(run_length));
437+
}
438+
}
439+
440+
if current_idx < next_row_group_base_idx {
441+
results.push(RowSelector::select(
442+
(next_row_group_base_idx - current_idx) as usize,
443+
));
444+
}
445+
446+
current_row_group_base_idx += row_group_num_rows;
447+
}
352448

353-
Ok(RowSelection::default())
449+
Ok(results.into())
354450
}
355451

356452
fn build_field_id_set_and_map(
@@ -1284,15 +1380,19 @@ mod tests {
12841380
use arrow_array::{ArrayRef, RecordBatch, StringArray};
12851381
use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
12861382
use futures::TryStreamExt;
1383+
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
12871384
use parquet::arrow::{ArrowWriter, ProjectionMask};
12881385
use parquet::basic::Compression;
1386+
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
12891387
use parquet::file::properties::WriterProperties;
12901388
use parquet::schema::parser::parse_message_type;
1291-
use parquet::schema::types::SchemaDescriptor;
1389+
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
1390+
use roaring::RoaringTreemap;
12921391
use tempfile::TempDir;
12931392

12941393
use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
12951394
use crate::arrow::{ArrowReader, ArrowReaderBuilder};
1395+
use crate::delete_vector::DeleteVector;
12961396
use crate::expr::visitors::bound_predicate_visitor::visit;
12971397
use crate::expr::{Bind, Predicate, Reference};
12981398
use crate::io::FileIO;
@@ -1593,4 +1693,148 @@ message schema {
15931693

15941694
(file_io, schema, table_location, tmp_dir)
15951695
}
1696+
1697+
#[test]
1698+
fn test_build_deletes_row_selection() {
1699+
let schema_descr = get_test_schema_descr();
1700+
1701+
let mut columns = vec![];
1702+
for ptr in schema_descr.columns() {
1703+
let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
1704+
columns.push(column);
1705+
}
1706+
1707+
let row_groups_metadata = vec![
1708+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
1709+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
1710+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
1711+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
1712+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
1713+
];
1714+
1715+
let selected_row_groups = Some(vec![1, 3]);
1716+
1717+
/* cases to cover:
1718+
* {skip|select} {first|intermediate|last} {one row|multiple rows} in
1719+
{first|imtermediate|last} {skipped|selected} row group
1720+
* row group selection disabled
1721+
*/
1722+
1723+
let positional_deletes = RoaringTreemap::from_iter(&[
1724+
1, // in skipped rg 0, should be ignored
1725+
3, // run of three consecutive items in skipped rg0
1726+
4, 5, 998, // two consecutive items at end of skipped rg0
1727+
999, 1000, // solitary row at start of selected rg1 (1, 9)
1728+
1010, // run of 3 rows in selected rg1
1729+
1011, 1012, // (3, 485)
1730+
1498, // run of two items at end of selected rg1
1731+
1499, 1500, // run of two items at start of skipped rg2
1732+
1501, 1600, // should ignore, in skipped rg2
1733+
1999, // single row at end of skipped rg2
1734+
2000, // run of two items at start of selected rg3
1735+
2001, // (4, 98)
1736+
2100, // single row in selected row group 3 (1, 99)
1737+
2200, // run of 3 consecutive rows in selected row group 3
1738+
2201, 2202, // (3, 796)
1739+
2999, // single item at end of selected rg3 (1)
1740+
3000, // single item at start of skipped rg4
1741+
]);
1742+
1743+
let positional_deletes = DeleteVector::new(positional_deletes);
1744+
1745+
// using selected row groups 1 and 3
1746+
let result = ArrowReader::build_deletes_row_selection(
1747+
&row_groups_metadata,
1748+
&selected_row_groups,
1749+
&positional_deletes,
1750+
)
1751+
.unwrap();
1752+
1753+
let expected = RowSelection::from(vec![
1754+
RowSelector::skip(1),
1755+
RowSelector::select(9),
1756+
RowSelector::skip(3),
1757+
RowSelector::select(485),
1758+
RowSelector::skip(4),
1759+
RowSelector::select(98),
1760+
RowSelector::skip(1),
1761+
RowSelector::select(99),
1762+
RowSelector::skip(3),
1763+
RowSelector::select(796),
1764+
RowSelector::skip(1),
1765+
]);
1766+
1767+
assert_eq!(result, expected);
1768+
1769+
// selecting all row groups
1770+
let result = ArrowReader::build_deletes_row_selection(
1771+
&row_groups_metadata,
1772+
&None,
1773+
&positional_deletes,
1774+
)
1775+
.unwrap();
1776+
1777+
let expected = RowSelection::from(vec![
1778+
RowSelector::select(1),
1779+
RowSelector::skip(1),
1780+
RowSelector::select(1),
1781+
RowSelector::skip(3),
1782+
RowSelector::select(992),
1783+
RowSelector::skip(3),
1784+
RowSelector::select(9),
1785+
RowSelector::skip(3),
1786+
RowSelector::select(485),
1787+
RowSelector::skip(4),
1788+
RowSelector::select(98),
1789+
RowSelector::skip(1),
1790+
RowSelector::select(398),
1791+
RowSelector::skip(3),
1792+
RowSelector::select(98),
1793+
RowSelector::skip(1),
1794+
RowSelector::select(99),
1795+
RowSelector::skip(3),
1796+
RowSelector::select(796),
1797+
RowSelector::skip(2),
1798+
RowSelector::select(499),
1799+
]);
1800+
1801+
assert_eq!(result, expected);
1802+
}
1803+
1804+
fn build_test_row_group_meta(
1805+
schema_descr: SchemaDescPtr,
1806+
columns: Vec<ColumnChunkMetaData>,
1807+
num_rows: i64,
1808+
ordinal: i16,
1809+
) -> RowGroupMetaData {
1810+
RowGroupMetaData::builder(schema_descr.clone())
1811+
.set_num_rows(num_rows)
1812+
.set_total_byte_size(2000)
1813+
.set_column_metadata(columns)
1814+
.set_ordinal(ordinal)
1815+
.build()
1816+
.unwrap()
1817+
}
1818+
1819+
fn get_test_schema_descr() -> SchemaDescPtr {
1820+
use parquet::schema::types::Type as SchemaType;
1821+
1822+
let schema = SchemaType::group_type_builder("schema")
1823+
.with_fields(vec![
1824+
Arc::new(
1825+
SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
1826+
.build()
1827+
.unwrap(),
1828+
),
1829+
Arc::new(
1830+
SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
1831+
.build()
1832+
.unwrap(),
1833+
),
1834+
])
1835+
.build()
1836+
.unwrap();
1837+
1838+
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
1839+
}
15961840
}

0 commit comments

Comments
 (0)