Skip to content

Commit

Permalink
Merge branch 'branch-25.04' into enh/interval/non_subtype
Browse files Browse the repository at this point in the history
  • Loading branch information
mroeschke authored Feb 20, 2025
2 parents 80a81fe + cc5626b commit 878ea84
Show file tree
Hide file tree
Showing 18 changed files with 208 additions and 115 deletions.
33 changes: 22 additions & 11 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1463,7 +1463,7 @@ void reader::impl::preprocess_subpass_pages(read_mode mode, size_t chunk_read_li
page_input,
chunk_row_output_iter{pass.pages.device_ptr()});

// copy chunk row into the subpass pages
// copy chunk_row into the subpass pages
// only need to do this if we are not processing the whole pass in one subpass
if (!subpass.single_subpass) {
thrust::for_each(rmm::exec_policy_nosync(_stream),
Expand All @@ -1481,31 +1481,42 @@ void reader::impl::preprocess_subpass_pages(read_mode mode, size_t chunk_read_li
// able to decode for this pass. we will have selected a set of pages for each column in the
// row group, but not every page will have the same number of rows. so, we can only read as many
// rows as the smallest batch (by column) we have decompressed.
size_t page_index = 0;
size_t max_row = std::numeric_limits<size_t>::max();
size_t first_page_index = 0;
size_t max_row = std::numeric_limits<size_t>::max();
auto const last_pass_row =
_file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass + 1];
// for each column
for (size_t idx = 0; idx < subpass.column_page_count.size(); idx++) {
auto const& last_page = subpass.pages[page_index + (subpass.column_page_count[idx] - 1)];
auto const& chunk = pass.chunks[last_page.chunk_idx];
// compute max row for this column in the subpass
auto const& last_page = subpass.pages[first_page_index + (subpass.column_page_count[idx] - 1)];
auto const& last_chunk = pass.chunks[last_page.chunk_idx];
auto max_col_row = static_cast<size_t>(last_chunk.start_row) +
static_cast<size_t>(last_page.chunk_row) +
static_cast<size_t>(last_page.num_rows);

size_t max_col_row =
static_cast<size_t>(chunk.start_row + last_page.chunk_row + last_page.num_rows);
// special case. list rows can span page boundaries, but we can't tell if that is happening
// here because we have not yet decoded the pages. the very last row starting in the page may
// not terminate in the page. to handle this, only decode up to the second to last row in the
// subpass since we know that will safely completed.
bool const is_list = chunk.max_level[level_type::REPETITION] > 0;
bool const is_list = last_chunk.max_level[level_type::REPETITION] > 0;
// corner case: only decode up to the second-to-last row, except if this is the last page in the
// entire pass. this handles the case where we only have 1 chunk, 1 page, and potentially even
// just 1 row.
if (is_list && max_col_row < last_pass_row) {
auto const& first_page = subpass.pages[page_index];
size_t const min_col_row = static_cast<size_t>(chunk.start_row + first_page.chunk_row);
// compute min row for this column in the subpass
auto const& first_page = subpass.pages[first_page_index];
auto const& first_chunk = pass.chunks[first_page.chunk_idx];
auto const min_col_row =
static_cast<size_t>(first_chunk.start_row) + static_cast<size_t>(first_page.chunk_row);

// must have at least 2 rows in the subpass.
CUDF_EXPECTS((max_col_row - min_col_row) > 1, "Unexpected short subpass");
max_col_row--;
}

max_row = min(max_row, max_col_row);

page_index += subpass.column_page_count[idx];
first_page_index += subpass.column_page_count[idx];
}
subpass.skip_rows = pass.skip_rows + pass.processed_rows;
auto const pass_end = pass.skip_rows + pass.num_rows;
Expand Down
Loading

0 comments on commit 878ea84

Please sign in to comment.