Skip to content

Commit

Permalink
Merge branch 'branch-25.04' into tokenized-minhash
Browse files Browse the repository at this point in the history
  • Loading branch information
davidwendt committed Feb 21, 2025
2 parents a392ac1 + 163e27b commit e7d77c9
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 119 deletions.
9 changes: 6 additions & 3 deletions cpp/include/cudf/strings/string_view.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -159,8 +159,11 @@ __device__ inline string_view::const_iterator::const_iterator(string_view const&

__device__ inline string_view::const_iterator& string_view::const_iterator::operator++()
{
if (byte_pos < bytes)
byte_pos += strings::detail::bytes_in_utf8_byte(static_cast<uint8_t>(p[byte_pos]));
if (byte_pos < bytes) {
// max is used to prevent an infinite loop on invalid UTF-8 data
byte_pos +=
cuda::std::max(1, strings::detail::bytes_in_utf8_byte(static_cast<uint8_t>(p[byte_pos])));
}
++char_pos;
return *this;
}
Expand Down
39 changes: 20 additions & 19 deletions cpp/src/io/fst/dispatch_dfa.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -209,29 +209,25 @@ struct DispatchFSM : DeviceFSMPolicy {
FstScanTileStateT fst_tile_state)

{
cudaError_t error = cudaSuccess;
cub::KernelConfig dfa_simulation_config;

using PolicyT = typename ActivePolicyT::AgentDFAPolicy;
if (CubDebug(error = dfa_simulation_config.Init<PolicyT>(dfa_kernel))) return error;

// Kernel invocation
uint32_t grid_size = std::max(
1u, CUB_QUOTIENT_CEILING(num_chars, PolicyT::BLOCK_THREADS * PolicyT::ITEMS_PER_THREAD));
uint32_t block_threads = dfa_simulation_config.block_threads;

dfa_kernel<<<grid_size, block_threads, 0, stream>>>(dfa,
d_chars_in,
num_chars,
seed_state,
d_thread_state_transition,
tile_state,
fst_tile_state,
transduced_out_it,
transduced_out_idx_it,
d_num_transduced_out_it);

dfa_kernel<<<grid_size, PolicyT::BLOCK_THREADS, 0, stream>>>(dfa,
d_chars_in,
num_chars,
seed_state,
d_thread_state_transition,
tile_state,
fst_tile_state,
transduced_out_it,
transduced_out_idx_it,
d_num_transduced_out_it);

// Check for errors
cudaError_t error = cudaSuccess;
if (CubDebug(error = cudaPeekAtLastError())) return error;

return error;
Expand Down Expand Up @@ -394,8 +390,13 @@ struct DispatchFSM : DeviceFSMPolicy {

// Alias the temporary allocations from the single storage blob (or compute the necessary size
// of the blob)
error =
cub::AliasTemporaries(d_temp_storage, temp_storage_bytes, allocations, allocation_sizes);
// TODO (@miscco): remove this once rapids moves to CCCL 2.8
#if CCCL_VERSION_MAJOR >= 3
error = cub::detail::AliasTemporaries(
#else // ^^^ CCCL 3.x ^^^ / vvv CCCL 2.x vvv
error = cub::AliasTemporaries(
#endif // CCCL 2.x
d_temp_storage, temp_storage_bytes, allocations, allocation_sizes);
if (error != cudaSuccess) return error;

// Return if the caller is simply requesting the size of the storage allocation
Expand Down
19 changes: 9 additions & 10 deletions cpp/src/io/fst/logical_stack.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -332,9 +332,8 @@ void sparse_stack_op_to_top_of_stack(StackSymbolItT d_symbols,
// Transforming sequence of stack symbols to stack operations
using StackSymbolToStackOpT = detail::StackSymbolToStackOp<StackOpT, StackSymbolToStackOpTypeT>;

// TransformInputIterator converting stack symbols to stack operations
using TransformInputItT =
cub::TransformInputIterator<StackOpT, StackSymbolToStackOpT, StackSymbolItT>;
// transform_iterator converting stack symbols to stack operations
using TransformInputItT = thrust::transform_iterator<StackSymbolToStackOpT, StackSymbolItT>;

constexpr bool supports_reset_op = SupportResetOperation == stack_op_support::WITH_RESET_SUPPORT;

Expand Down Expand Up @@ -365,8 +364,8 @@ void sparse_stack_op_to_top_of_stack(StackSymbolItT d_symbols,
// with the empty_stack_symbol
StackOpT const empty_stack{0, empty_stack_symbol};

cub::TransformInputIterator<StackOpT, detail::RemapEmptyStack<StackOpT>, StackOpT*>
kv_ops_scan_in(nullptr, detail::RemapEmptyStack<StackOpT>{empty_stack});
thrust::transform_iterator<detail::RemapEmptyStack<StackOpT>, StackOpT*> kv_ops_scan_in(
nullptr, detail::RemapEmptyStack<StackOpT>{empty_stack});
StackOpT* kv_ops_scan_out = nullptr;

std::size_t stack_level_scan_bytes = 0;
Expand Down Expand Up @@ -532,7 +531,7 @@ void sparse_stack_op_to_top_of_stack(StackSymbolItT d_symbols,
end_bit,
stream));

// TransformInputIterator that remaps all operations on stack level 0 to the empty stack symbol
// transform_iterator that remaps all operations on stack level 0 to the empty stack symbol
kv_ops_scan_in = {reinterpret_cast<StackOpT*>(d_kv_operations_unsigned.Current()),
detail::RemapEmptyStack<StackOpT>{empty_stack}};
kv_ops_scan_out = reinterpret_cast<StackOpT*>(d_kv_operations_unsigned.Alternate());
Expand All @@ -553,9 +552,9 @@ void sparse_stack_op_to_top_of_stack(StackSymbolItT d_symbols,
thrust::device_ptr<StackSymbolT>{d_top_of_stack + num_symbols_out},
read_symbol);

// Transform the stack operations to the stack symbol they represent
cub::TransformInputIterator<StackSymbolT, detail::StackOpToStackSymbol, StackOpT*>
kv_op_to_stack_sym_it(kv_ops_scan_out, detail::StackOpToStackSymbol{});
// transform_iterator the stack operations to the stack symbol they represent
thrust::transform_iterator<detail::StackOpToStackSymbol, StackOpT*> kv_op_to_stack_sym_it(
kv_ops_scan_out, detail::StackOpToStackSymbol{});

// Scatter the stack symbols to the output tape (spots that are not scattered to have been
// pre-filled with the read-symbol)
Expand Down
33 changes: 22 additions & 11 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1463,7 +1463,7 @@ void reader::impl::preprocess_subpass_pages(read_mode mode, size_t chunk_read_li
page_input,
chunk_row_output_iter{pass.pages.device_ptr()});

// copy chunk row into the subpass pages
// copy chunk_row into the subpass pages
// only need to do this if we are not processing the whole pass in one subpass
if (!subpass.single_subpass) {
thrust::for_each(rmm::exec_policy_nosync(_stream),
Expand All @@ -1481,31 +1481,42 @@ void reader::impl::preprocess_subpass_pages(read_mode mode, size_t chunk_read_li
// able to decode for this pass. we will have selected a set of pages for each column in the
// row group, but not every page will have the same number of rows. so, we can only read as many
// rows as the smallest batch (by column) we have decompressed.
size_t page_index = 0;
size_t max_row = std::numeric_limits<size_t>::max();
size_t first_page_index = 0;
size_t max_row = std::numeric_limits<size_t>::max();
auto const last_pass_row =
_file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass + 1];
// for each column
for (size_t idx = 0; idx < subpass.column_page_count.size(); idx++) {
auto const& last_page = subpass.pages[page_index + (subpass.column_page_count[idx] - 1)];
auto const& chunk = pass.chunks[last_page.chunk_idx];
// compute max row for this column in the subpass
auto const& last_page = subpass.pages[first_page_index + (subpass.column_page_count[idx] - 1)];
auto const& last_chunk = pass.chunks[last_page.chunk_idx];
auto max_col_row = static_cast<size_t>(last_chunk.start_row) +
static_cast<size_t>(last_page.chunk_row) +
static_cast<size_t>(last_page.num_rows);

size_t max_col_row =
static_cast<size_t>(chunk.start_row + last_page.chunk_row + last_page.num_rows);
// special case. list rows can span page boundaries, but we can't tell if that is happening
// here because we have not yet decoded the pages. the very last row starting in the page may
// not terminate in the page. to handle this, only decode up to the second to last row in the
// subpass since we know that will safely completed.
bool const is_list = chunk.max_level[level_type::REPETITION] > 0;
bool const is_list = last_chunk.max_level[level_type::REPETITION] > 0;
// corner case: only decode up to the second-to-last row, except if this is the last page in the
// entire pass. this handles the case where we only have 1 chunk, 1 page, and potentially even
// just 1 row.
if (is_list && max_col_row < last_pass_row) {
auto const& first_page = subpass.pages[page_index];
size_t const min_col_row = static_cast<size_t>(chunk.start_row + first_page.chunk_row);
// compute min row for this column in the subpass
auto const& first_page = subpass.pages[first_page_index];
auto const& first_chunk = pass.chunks[first_page.chunk_idx];
auto const min_col_row =
static_cast<size_t>(first_chunk.start_row) + static_cast<size_t>(first_page.chunk_row);

// must have at least 2 rows in the subpass.
CUDF_EXPECTS((max_col_row - min_col_row) > 1, "Unexpected short subpass");
max_col_row--;
}

max_row = min(max_row, max_col_row);

page_index += subpass.column_page_count[idx];
first_page_index += subpass.column_page_count[idx];
}
subpass.skip_rows = pass.skip_rows + pass.processed_rows;
auto const pass_end = pass.skip_rows + pass.num_rows;
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/io/parquet/writer_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/structs/structs_column_view.hpp>

#include <functional>
#include <string>

namespace cudf::io::parquet::detail {

using namespace cudf::io::detail;
Expand Down
Loading

0 comments on commit e7d77c9

Please sign in to comment.