Skip to content

Poc for adaptive parquet predicate pushdown(bitmap/range) with page cache(3 data pages) #7454

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 91 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
cc6dd14
update
XiangpengHao Sep 4, 2024
5837fc7
update
XiangpengHao Dec 22, 2024
fec6313
update
XiangpengHao Dec 28, 2024
948db87
update
XiangpengHao Dec 29, 2024
8c50d90
poc reader
XiangpengHao Dec 30, 2024
f5422ce
update
XiangpengHao Dec 31, 2024
dfdc1b6
avoid recreating new buffers
XiangpengHao Dec 31, 2024
3c526f8
update
XiangpengHao Dec 31, 2024
53f5fad
bug fix
XiangpengHao Dec 31, 2024
56980de
selective cache
XiangpengHao Jan 1, 2025
4dd1b6b
clean up changes
XiangpengHao Jan 1, 2025
f8f983e
clean up more and format
XiangpengHao Jan 1, 2025
882aaf1
cleanup and add docs
XiangpengHao Jan 1, 2025
c8bdbcf
switch to mutex instead of rwlock
XiangpengHao Jan 2, 2025
cdb1d85
revert irrelevant changes
XiangpengHao Jan 2, 2025
69720e5
submodule
XiangpengHao Jan 3, 2025
a9550ab
update
XiangpengHao Jan 3, 2025
be1435f
rebase
XiangpengHao Jan 6, 2025
e4d9eb7
Merge remote-tracking branch 'upstream/main' into better-decoder
XiangpengHao Jan 8, 2025
21e015b
remove unrelated changes
XiangpengHao Jan 8, 2025
bbc3595
Merge remote-tracking branch 'upstream/main' into better-decoder
XiangpengHao Jan 10, 2025
547fb46
fix clippy
XiangpengHao Jan 10, 2025
05c8c8f
make various ci improvements
XiangpengHao Jan 10, 2025
314fda1
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 21, 2025
c895dd2
whitespace
alamb Mar 21, 2025
3cf0a98
Reduce some ugliness, avoid unwrap
alamb Mar 21, 2025
7b72f9d
more factory
alamb Mar 21, 2025
5bdf51a
lint
alamb Mar 22, 2025
a77e1e7
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 26, 2025
90a55d5
Isolate reader cache more
alamb Mar 26, 2025
9ffa81c
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 27, 2025
7c10b4a
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 28, 2025
822760c
Add benchmark for parquet reader with row_filter and project settings
zhuqi-lucas Apr 10, 2025
31a544f
fix clippy
zhuqi-lucas Apr 10, 2025
b16428d
change bench mark to use asyn read to trigger the page cache
zhuqi-lucas Apr 11, 2025
1aacd01
fix
zhuqi-lucas Apr 11, 2025
2d58006
Merge remote-tracking branch 'upstream/main' into benchmark_row_filter
zhuqi-lucas Apr 11, 2025
768826e
fix
zhuqi-lucas Apr 11, 2025
f624b91
Update comments, add background
alamb Apr 11, 2025
6c28e44
incremently addressing the comments
zhuqi-lucas Apr 11, 2025
69a2617
Fix bool random
zhuqi-lucas Apr 11, 2025
b044813
Merge commit '69a2617' into alamb/docs_for_bench
alamb Apr 11, 2025
6a37818
fixup
alamb Apr 11, 2025
2f6ccbb
Add fn switch and project enum
zhuqi-lucas Apr 11, 2025
994c747
Merge pull request #1 from alamb/alamb/docs_for_bench
zhuqi-lucas Apr 11, 2025
d0a656b
Fix clippy
zhuqi-lucas Apr 11, 2025
67480b9
Address comment
zhuqi-lucas Apr 12, 2025
16bc1bf
Add float(half set) and int(full set) change
zhuqi-lucas Apr 12, 2025
a4bedbd
Merge branch 'benchmark_row_filter' of github.com:zhuqi-lucas/arrow-r…
zhuqi-lucas Apr 12, 2025
d0ab2fe
Fix corner case: skipping page should also make dic page to none
zhuqi-lucas Apr 12, 2025
7638c41
Address comments
zhuqi-lucas Apr 13, 2025
8fc992b
Merge branch 'benchmark_row_filter' into better-decoder
zhuqi-lucas Apr 14, 2025
9271cc9
Set compression
zhuqi-lucas Apr 14, 2025
8e00ac5
fix
zhuqi-lucas Apr 14, 2025
36346aa
Merge branch 'benchmark_row_filter' into better-decoder
zhuqi-lucas Apr 14, 2025
890519e
Update comments
alamb Apr 14, 2025
7eb0476
refactor filter column indexes
alamb Apr 14, 2025
22c7b39
Read from in memory buffer
alamb Apr 14, 2025
86878ab
Merge remote-tracking branch 'apache/main' into benchmark_row_filter
alamb Apr 14, 2025
5ae9b58
celanu
alamb Apr 14, 2025
1effe88
Test both sync and async readers
alamb Apr 14, 2025
74abec0
Merge branch 'benchmark_row_filter' into better-decoder
zhuqi-lucas Apr 15, 2025
6ea0eef
Merge branch 'main' into better-decoder
zhuqi-lucas Apr 18, 2025
0c3aa9b
Improve the performance for skip record
zhuqi-lucas Apr 19, 2025
a1d3496
Init the boolean_selector
zhuqi-lucas Apr 25, 2025
2d6c866
Init version for unified select
zhuqi-lucas Apr 26, 2025
1e9b6e5
Change to use filter
zhuqi-lucas Apr 27, 2025
21dadbe
Fix then
zhuqi-lucas Apr 27, 2025
3fe4cef
Adaptive push down
zhuqi-lucas Apr 27, 2025
e5aad7c
Fix
zhuqi-lucas Apr 27, 2025
fa0bf69
Merge branch 'better-decoder' into unified_select
zhuqi-lucas Apr 30, 2025
6432de2
Init: combine page cache with unified select
zhuqi-lucas May 1, 2025
d26de88
Perf: make the cache not missing to avoid some clickbench regression
zhuqi-lucas May 5, 2025
04ca371
Revert "Improve the performance for skip record"
zhuqi-lucas May 7, 2025
c099788
Merge branch 'polish_better_decoder' into unified_select
zhuqi-lucas May 7, 2025
c045a4a
Combine with page cache
zhuqi-lucas May 7, 2025
6e32d3b
Need to fix
zhuqi-lucas May 8, 2025
b80f596
Add performance good result
zhuqi-lucas May 8, 2025
01d1dea
Fix
zhuqi-lucas May 9, 2025
7bcd011
Remove unused for the improvement
zhuqi-lucas May 11, 2025
123c3ad
Fix part of test
zhuqi-lucas May 11, 2025
efcc0de
Add new testing
zhuqi-lucas May 11, 2025
8495fbc
Merge remote-tracking branch 'upstream/main' into unified_select
zhuqi-lucas May 11, 2025
61290ab
Fix clippy
zhuqi-lucas May 11, 2025
b67b92c
Fix test
zhuqi-lucas May 11, 2025
774bed5
Clippy fix
zhuqi-lucas May 11, 2025
0ad20e5
Clippy fix
zhuqi-lucas May 11, 2025
ceceb8e
Fix 3 data page testing
zhuqi-lucas May 11, 2025
bc02e2a
Fix encrption error handling for new page cache logic
zhuqi-lucas May 12, 2025
a4065cc
Update parquet testing
zhuqi-lucas May 12, 2025
269b396
Clean up code
zhuqi-lucas May 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion parquet/src/arrow/array_reader/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,8 @@ impl ByteViewArrayDecoderDictionary {
}
}

/// Reads the next indexes from self.decoder
/// Reads the next `len` indexes from self.decoder
///
/// the indexes are assumed to be indexes into `dict`
/// the output values are written to output
///
Expand Down Expand Up @@ -464,6 +465,8 @@ impl ByteViewArrayDecoderDictionary {
}
}

output.views.reserve(len);

// Calculate the offset of the dictionary buffers in the output buffers
// For example if the 2nd buffer in the dictionary is the 5th buffer in the output buffers,
// then the base_buffer_idx is 5 - 2 = 3
Expand Down
73 changes: 53 additions & 20 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,6 @@

//! Contains reader which reads parquet data into arrow [`RecordBatch`]

use arrow_array::cast::AsArray;
use arrow_array::Array;
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
use arrow_select::filter::prep_null_mask_filter;
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};
use std::collections::VecDeque;
use std::sync::Arc;

pub use crate::arrow::array_reader::RowGroups;
use crate::arrow::array_reader::{build_array_reader, ArrayReader};
use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
Expand All @@ -38,6 +28,16 @@ use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use crate::file::reader::{ChunkReader, SerializedPageReader};
use crate::schema::types::SchemaDescriptor;
use arrow_array::cast::AsArray;
use arrow_array::{Array, BooleanArray};
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
use arrow_select::filter::prep_null_mask_filter;
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};
use std::collections::VecDeque;
use std::mem::take;
use std::sync::Arc;

mod filter;
mod selection;
Expand Down Expand Up @@ -761,6 +761,7 @@ impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
.map(|i| i[rg_idx][self.column_idx].page_locations.clone());
let total_rows = rg.num_rows() as usize;
let reader = self.reader.clone();
// todo: add cache???

SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
.add_crypto_context(
Expand Down Expand Up @@ -792,18 +793,20 @@ pub struct ParquetRecordBatchReader {
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
schema: SchemaRef,
selection: Option<VecDeque<RowSelector>>,
selection: Option<RowSelection>,
}

impl Iterator for ParquetRecordBatchReader {
type Item = Result<RecordBatch, ArrowError>;

fn next(&mut self) -> Option<Self::Item> {
let mut read_records = 0;
let mut filter: Option<BooleanArray> = None;
match self.selection.as_mut() {
Some(selection) => {
while read_records < self.batch_size && !selection.is_empty() {
let front = selection.pop_front().unwrap();
Some(RowSelection::Ranges(selectors)) => {
let mut selectors: VecDeque<RowSelector> = VecDeque::from_iter(take(selectors));
while read_records < self.batch_size && !selectors.is_empty() {
let front = selectors.pop_front().unwrap();
if front.skip {
let skipped = match self.array_reader.skip_records(front.row_count) {
Ok(skipped) => skipped,
Expand Down Expand Up @@ -833,7 +836,7 @@ impl Iterator for ParquetRecordBatchReader {
Some(remaining) if remaining != 0 => {
// if page row count less than batch_size we must set batch size to page row count.
// add check avoid dead loop
selection.push_front(RowSelector::select(remaining));
selectors.push_front(RowSelector::select(remaining));
need_read
}
_ => front.row_count,
Expand All @@ -844,7 +847,14 @@ impl Iterator for ParquetRecordBatchReader {
Err(error) => return Some(Err(error.into())),
}
}
self.selection = Some(RowSelection::Ranges(selectors.into()));
}
Some(RowSelection::BitMap(bitmap)) => {
self.array_reader.read_records(bitmap.len()).unwrap();
filter = Some(bitmap.clone());
self.selection = None;
}

None => {
if let Err(error) = self.array_reader.read_records(self.batch_size) {
return Some(Err(error.into()));
Expand All @@ -861,9 +871,31 @@ impl Iterator for ParquetRecordBatchReader {
)
});

match struct_array {
Err(err) => Some(Err(err)),
Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))),
let batch: RecordBatch = match struct_array {
Err(err) => return Some(Err(err)),
Ok(e) => e.into(),
};

if let Some(filter) = filter.as_mut() {
if batch.num_rows() == 0 {
return None;
}
if filter.len() != batch.num_rows() {
return Some(Err(ArrowError::ComputeError(format!(
"Filter length ({}) does not match batch rows ({})",
filter.len(),
batch.num_rows()
))));
}

match arrow_select::filter::filter_record_batch(&batch, filter) {
Ok(filtered_batch) => Some(Ok(filtered_batch)),
Err(e) => Some(Err(e)),
}
} else if batch.num_rows() > 0 {
Some(Ok(batch))
} else {
None
}
}
}
Expand Down Expand Up @@ -907,7 +939,7 @@ impl ParquetRecordBatchReader {
batch_size,
array_reader,
schema: Arc::new(Schema::new(levels.fields.clone())),
selection: selection.map(|s| s.trim().into()),
selection: selection.map(|s| s.trim()),
})
}

Expand All @@ -928,7 +960,7 @@ impl ParquetRecordBatchReader {
batch_size,
array_reader,
schema: Arc::new(schema),
selection: selection.map(|s| s.trim().into()),
selection: selection.map(|s| s.trim()),
}
}
}
Expand Down Expand Up @@ -1002,6 +1034,7 @@ pub(crate) fn evaluate_predicate(
filter.len()
));
}

match filter.null_count() {
0 => filters.push(filter),
_ => filters.push(prep_null_mask_filter(&filter)),
Expand Down
Loading
Loading