Skip to content

Commit 4c2ef08

Browse files
committed
refactor: pos deletes for task represented as RoaringBitmap.
* refactor: only pass row groups metadata rather than entire parquet metadata to . This makes it easier to test as we don't need to mock up a full
1 parent 0022a07 commit 4c2ef08

File tree

5 files changed

+28
-6
lines changed

5 files changed

+28
-6
lines changed

Cargo.lock

+17
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ port_scanner = "0.1.5"
8686
rand = "0.8.5"
8787
regex = "1.10.5"
8888
reqwest = { version = "0.12.2", default-features = false, features = ["json"] }
89+
roaring = "0.10"
8990
rust_decimal = "1.31"
9091
serde = { version = "1.0.204", features = ["rc"] }
9192
serde_bytes = "0.11.15"

crates/iceberg/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ parquet = { workspace = true, features = ["async"] }
7272
paste = { workspace = true }
7373
rand = { workspace = true }
7474
reqwest = { workspace = true }
75+
roaring = { workspace = true }
7576
rust_decimal = { workspace = true }
7677
serde = { workspace = true }
7778
serde_bytes = { workspace = true }

crates/iceberg/src/arrow/delete_file_manager.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use roaring::RoaringTreemap;
19+
1820
use crate::expr::BoundPredicate;
1921
use crate::io::FileIO;
2022
use crate::scan::FileScanTaskDeleteFile;
@@ -54,7 +56,7 @@ impl DeleteFileManager {
5456
pub(crate) fn get_positional_delete_indexes_for_data_file(
5557
&self,
5658
data_file_path: &str,
57-
) -> Option<Vec<usize>> {
59+
) -> Option<RoaringTreemap> {
5860
// TODO
5961

6062
None

crates/iceberg/src/arrow/reader.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
3636
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection};
3737
use parquet::arrow::async_reader::AsyncFileReader;
3838
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
39-
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
39+
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
4040
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
41+
use roaring::RoaringTreemap;
4142

4243
use crate::arrow::delete_file_manager::DeleteFileManager;
4344
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
@@ -276,9 +277,9 @@ impl ArrowReader {
276277

277278
if let Some(positional_delete_indexes) = positional_delete_indexes {
278279
let delete_row_selection = Self::build_deletes_row_selection(
279-
record_batch_stream_builder.metadata(),
280+
record_batch_stream_builder.metadata().row_groups(),
280281
&selected_row_group_indices,
281-
&positional_delete_indexes,
282+
positional_delete_indexes,
282283
)?;
283284

284285
// merge the row selection from the delete files with the row selection
@@ -342,9 +343,9 @@ impl ArrowReader {
342343
/// been skipped entirely by the filter predicate
343344
#[allow(unused)]
344345
fn build_deletes_row_selection(
345-
parquet_metadata: &Arc<ParquetMetaData>,
346+
row_group_metadata: &[RowGroupMetaData],
346347
selected_row_groups: &Option<Vec<usize>>,
347-
positional_deletes: &[usize],
348+
mut positional_deletes: RoaringTreemap,
348349
) -> Result<RowSelection> {
349350
// TODO
350351

0 commit comments

Comments
 (0)