Skip to content

Commit 2029d21

Browse files
committed
feat: implementation for ArrowReader::build_deletes_row_selection
1 parent 114317d commit 2029d21

File tree

1 file changed

+241
-5
lines changed

1 file changed

+241
-5
lines changed

crates/iceberg/src/arrow/reader.rs

+241-5
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ use bytes::Bytes;
3333
use fnv::FnvHashSet;
3434
use futures::future::BoxFuture;
3535
use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
36-
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection};
36+
use parquet::arrow::arrow_reader::{
37+
ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
38+
};
3739
use parquet::arrow::async_reader::AsyncFileReader;
3840
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
3941
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
@@ -341,15 +343,104 @@ impl ArrowReader {
341343
/// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated
342344
/// as having been deleted by a positional delete, taking into account any row groups that have
343345
/// been skipped entirely by the filter predicate
344-
#[allow(unused)]
345346
fn build_deletes_row_selection(
346347
row_group_metadata: &[RowGroupMetaData],
347348
selected_row_groups: &Option<Vec<usize>>,
348349
mut positional_deletes: RoaringTreemap,
349350
) -> Result<RowSelection> {
350-
// TODO
351+
let mut results: Vec<RowSelector> = Vec::new();
352+
let mut selected_row_groups_idx = 0;
353+
let mut current_page_base_idx: u64 = 0;
354+
355+
for (idx, row_group_metadata) in row_group_metadata.iter().enumerate() {
356+
let page_num_rows = row_group_metadata.num_rows() as u64;
357+
let next_page_base_idx = current_page_base_idx + page_num_rows;
351358

352-
Ok(RowSelection::default())
359+
// if row group selection is enabled,
360+
if let Some(selected_row_groups) = selected_row_groups {
361+
// if we've consumed all the selected row groups, we're done
362+
if selected_row_groups_idx == selected_row_groups.len() {
363+
break;
364+
}
365+
366+
if idx == selected_row_groups[selected_row_groups_idx] {
367+
// we're in a selected row group. Increment selected_row_groups_idx
368+
// so that next time around the for loop we're looking for the next
369+
// selected row group
370+
selected_row_groups_idx += 1;
371+
} else {
372+
// remove any positional deletes from the skipped page so that
373+
// `positional.deletes.min()` can be used
374+
positional_deletes.remove_range(current_page_base_idx..next_page_base_idx);
375+
376+
// still increment the current page base index but then skip to the next row group
377+
// in the file
378+
current_page_base_idx += page_num_rows;
379+
continue;
380+
}
381+
}
382+
383+
let mut next_deleted_row_idx = match positional_deletes.min() {
384+
Some(next_deleted_row_idx) => {
385+
// if the index of the next deleted row is beyond this page, add a selection for
386+
// the remainder of this page and skip to the next page
387+
if next_deleted_row_idx >= next_page_base_idx {
388+
results.push(RowSelector::select(page_num_rows as usize));
389+
continue;
390+
}
391+
392+
next_deleted_row_idx
393+
}
394+
395+
// If there are no more pos deletes, add a selector for the entirety of this page.
396+
_ => {
397+
results.push(RowSelector::select(page_num_rows as usize));
398+
continue;
399+
}
400+
};
401+
402+
let mut current_idx = current_page_base_idx;
403+
'chunks: while next_deleted_row_idx < next_page_base_idx {
404+
// `select` all rows that precede the next delete index
405+
if current_idx < next_deleted_row_idx {
406+
let run_length = next_deleted_row_idx - current_idx;
407+
results.push(RowSelector::select(run_length as usize));
408+
current_idx += run_length;
409+
}
410+
411+
// `skip` all consecutive deleted rows in the current row group
412+
let mut run_length = 0;
413+
while next_deleted_row_idx == current_idx
414+
&& next_deleted_row_idx < next_page_base_idx
415+
{
416+
run_length += 1;
417+
current_idx += 1;
418+
positional_deletes.remove(next_deleted_row_idx);
419+
420+
next_deleted_row_idx = match positional_deletes.min() {
421+
Some(next_deleted_row_idx) => next_deleted_row_idx,
422+
_ => {
423+
// We've processed the final positional delete.
424+
// Conclude the skip and then break so that we select the remaining
425+
// rows in the page and move on to the next row group
426+
results.push(RowSelector::skip(run_length));
427+
break 'chunks;
428+
}
429+
};
430+
}
431+
results.push(RowSelector::skip(run_length));
432+
}
433+
434+
if current_idx < next_page_base_idx {
435+
results.push(RowSelector::select(
436+
(next_page_base_idx - current_idx) as usize,
437+
));
438+
}
439+
440+
current_page_base_idx += page_num_rows;
441+
}
442+
443+
Ok(results.into())
353444
}
354445

355446
fn build_field_id_set_and_map(
@@ -1255,9 +1346,12 @@ mod tests {
12551346
use std::sync::Arc;
12561347

12571348
use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1349+
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
12581350
use parquet::arrow::ProjectionMask;
1351+
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
12591352
use parquet::schema::parser::parse_message_type;
1260-
use parquet::schema::types::SchemaDescriptor;
1353+
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
1354+
use roaring::RoaringTreemap;
12611355

12621356
use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
12631357
use crate::arrow::ArrowReader;
@@ -1423,4 +1517,146 @@ message schema {
14231517
.expect("Some ProjectionMask");
14241518
assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
14251519
}
1520+
1521+
#[test]
1522+
fn test_build_deletes_row_selection() {
1523+
let schema_descr = get_test_schema_descr();
1524+
1525+
let mut columns = vec![];
1526+
for ptr in schema_descr.columns() {
1527+
let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
1528+
columns.push(column);
1529+
}
1530+
1531+
let row_groups_metadata = vec![
1532+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
1533+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
1534+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
1535+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
1536+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
1537+
];
1538+
1539+
let selected_row_groups = Some(vec![1, 3]);
1540+
1541+
/* cases to cover:
1542+
* {skip|select} {first|intermediate|last} {one row|multiple rows} in
1543+
{first|imtermediate|last} {skipped|selected} row group
1544+
* row group selection disabled
1545+
*/
1546+
1547+
let positional_deletes = RoaringTreemap::from_iter(&[
1548+
1, // in skipped rg 0, should be ignored
1549+
3, // run of three consecutive items in skipped rg0
1550+
4, 5, 998, // two consecutive items at end of skipped rg0
1551+
999, 1000, // solitary row at start of selected rg1 (1, 9)
1552+
1010, // run of 3 rows in selected rg1
1553+
1011, 1012, // (3, 485)
1554+
1498, // run of two items at end of selected rg1
1555+
1499, 1500, // run of two items at start of skipped rg2
1556+
1501, 1600, // should ignore, in skipped rg2
1557+
1999, // single row at end of skipped rg2
1558+
2000, // run of two items at start of selected rg3
1559+
2001, // (4, 98)
1560+
2100, // single row in selected row group 3 (1, 99)
1561+
2200, // run of 3 consecutive rows in selected row group 3
1562+
2201, 2202, // (3, 796)
1563+
2999, // single item at end of selected rg3 (1)
1564+
3000, // single item at start of skipped rg4
1565+
]);
1566+
1567+
// using selected row groups 1 and 3
1568+
let result = ArrowReader::build_deletes_row_selection(
1569+
&row_groups_metadata,
1570+
&selected_row_groups,
1571+
positional_deletes.clone(),
1572+
)
1573+
.unwrap();
1574+
1575+
let expected = RowSelection::from(vec![
1576+
RowSelector::skip(1),
1577+
RowSelector::select(9),
1578+
RowSelector::skip(3),
1579+
RowSelector::select(485),
1580+
RowSelector::skip(4),
1581+
RowSelector::select(98),
1582+
RowSelector::skip(1),
1583+
RowSelector::select(99),
1584+
RowSelector::skip(3),
1585+
RowSelector::select(796),
1586+
RowSelector::skip(1),
1587+
]);
1588+
1589+
assert_eq!(result, expected);
1590+
1591+
// selecting all row groups
1592+
let result = ArrowReader::build_deletes_row_selection(
1593+
&row_groups_metadata,
1594+
&None,
1595+
positional_deletes,
1596+
)
1597+
.unwrap();
1598+
1599+
let expected = RowSelection::from(vec![
1600+
RowSelector::select(1),
1601+
RowSelector::skip(1),
1602+
RowSelector::select(1),
1603+
RowSelector::skip(3),
1604+
RowSelector::select(992),
1605+
RowSelector::skip(3),
1606+
RowSelector::select(9),
1607+
RowSelector::skip(3),
1608+
RowSelector::select(485),
1609+
RowSelector::skip(4),
1610+
RowSelector::select(98),
1611+
RowSelector::skip(1),
1612+
RowSelector::select(398),
1613+
RowSelector::skip(3),
1614+
RowSelector::select(98),
1615+
RowSelector::skip(1),
1616+
RowSelector::select(99),
1617+
RowSelector::skip(3),
1618+
RowSelector::select(796),
1619+
RowSelector::skip(2),
1620+
RowSelector::select(499),
1621+
]);
1622+
1623+
assert_eq!(result, expected);
1624+
}
1625+
1626+
fn build_test_row_group_meta(
1627+
schema_descr: SchemaDescPtr,
1628+
columns: Vec<ColumnChunkMetaData>,
1629+
num_rows: i64,
1630+
ordinal: i16,
1631+
) -> RowGroupMetaData {
1632+
RowGroupMetaData::builder(schema_descr.clone())
1633+
.set_num_rows(num_rows)
1634+
.set_total_byte_size(2000)
1635+
.set_column_metadata(columns)
1636+
.set_ordinal(ordinal)
1637+
.build()
1638+
.unwrap()
1639+
}
1640+
1641+
fn get_test_schema_descr() -> SchemaDescPtr {
1642+
use parquet::schema::types::Type as SchemaType;
1643+
1644+
let schema = SchemaType::group_type_builder("schema")
1645+
.with_fields(vec![
1646+
Arc::new(
1647+
SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
1648+
.build()
1649+
.unwrap(),
1650+
),
1651+
Arc::new(
1652+
SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
1653+
.build()
1654+
.unwrap(),
1655+
),
1656+
])
1657+
.build()
1658+
.unwrap();
1659+
1660+
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
1661+
}
14261662
}

0 commit comments

Comments
 (0)