Skip to content

Commit

Permalink
remove rest of nvcomp from pq reader
Browse files Browse the repository at this point in the history
  • Loading branch information
vuule committed Feb 27, 2025
1 parent 826a35c commit 0268b3d
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 71 deletions.
21 changes: 21 additions & 0 deletions cpp/src/io/comp/io_uncomp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,26 @@ void decompress(compression_type compression,
*/
size_t get_uncompressed_size(compression_type compression, host_span<uint8_t const> src);

/**
* @brief Struct to hold information about decompression.
*
* This struct contains details about the decompression process, including
* the type of compression, the number of pages, the maximum size
* of a decompressed page, and the total decompressed size.
*/
struct decompression_info {
compression_type type;
size_t num_pages;
size_t max_page_decompressed_size;
size_t total_decompressed_size;
};

/**
* @brief Functor which returns total scratch space required based on computed decompression_info
* data.
*
*/
[[nodiscard]] size_t get_decompression_scratch_size(decompression_info const& di);

} // namespace io::detail
} // namespace CUDF_EXPORT cudf
24 changes: 19 additions & 5 deletions cpp/src/io/comp/uncomp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -729,10 +729,7 @@ void host_decompress(compression_type compression,
}
}

[[nodiscard]] bool use_host_decompression(
compression_type compression,
[[maybe_unused]] device_span<device_span<uint8_t const> const> inputs,
[[maybe_unused]] device_span<device_span<uint8_t> const> outputs)
[[nodiscard]] bool use_host_decompression(compression_type compression)
{
CUDF_EXPECTS(
not host_decompression_supported(compression) or device_decompression_supported(compression),
Expand All @@ -743,6 +740,23 @@ void host_decompress(compression_type compression,
return getenv_or("LIBCUDF_HOST_DECOMPRESSION", std::string{"OFF"}) == "ON";
}

[[nodiscard]] size_t get_decompression_scratch_size(decompression_info const& di)
{
if (di.type == compression_type::NONE or use_host_decompression(di.type)) { return 0; }

auto const nvcomp_type = to_nvcomp_compression(di.type);
auto nvcomp_disabled = nvcomp_type.has_value() ? nvcomp::is_decompression_disabled(*nvcomp_type)
: "invalid compression type";
if (not nvcomp_disabled) {
nvcomp::batched_decompress_temp_size(
nvcomp_type.value(), di.num_pages, di.max_page_decompressed_size, di.total_decompressed_size);
}

if (di.type == compression_type::BROTLI) return get_gpu_debrotli_scratch_size(di.num_pages);
// only Brotli kernel requires scratch memory
return 0;
}

void decompress(compression_type compression,
device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
Expand All @@ -753,7 +767,7 @@ void decompress(compression_type compression,
{
CUDF_FUNC_RANGE();
if (inputs.empty()) { return; }
if (use_host_decompression(compression, inputs, outputs)) {
if (use_host_decompression(compression)) {
return host_decompress(compression, inputs, outputs, results, stream);
} else {
return device_decompress(
Expand Down
94 changes: 28 additions & 66 deletions cpp/src/io/parquet/reader_impl_chunking.cu
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include "compact_protocol_reader.hpp"
#include "io/comp/gpuinflate.hpp"
#include "io/comp/io_uncomp.hpp"
#include "io/comp/nvcomp_adapter.hpp"
#include "io/utilities/time_utils.cuh"
#include "reader_impl.hpp"
#include "reader_impl_chunking.hpp"
Expand Down Expand Up @@ -46,9 +45,9 @@ namespace cudf::io::parquet::detail {

namespace {

namespace nvcomp = cudf::io::detail::nvcomp;
using cudf::io::detail::compression_result;
using cudf::io::detail::compression_status;
using cudf::io::detail::decompression_info;

struct split_info {
row_range rows;
Expand Down Expand Up @@ -998,13 +997,6 @@ void detect_malformed_pages(device_span<PageInfo const> pages,
}
}

struct decompression_info {
Compression codec;
size_t num_pages;
size_t max_page_decompressed_size;
size_t total_decompressed_size;
};

/**
* @brief Functor which retrieves per-page decompression information.
*
Expand All @@ -1014,7 +1006,17 @@ struct get_decomp_info {

__device__ decompression_info operator()(PageInfo const& p) const
{
return {static_cast<Compression>(chunks[p.chunk_idx].codec),
auto const comp_type = [codec = chunks[p.chunk_idx].codec]() {
switch (codec) {
case SNAPPY: return compression_type::SNAPPY;
case GZIP: return compression_type::GZIP;
case BROTLI: return compression_type::BROTLI;
case ZSTD: return compression_type::ZSTD;
case LZ4_RAW: return compression_type::LZ4;
default: return compression_type::NONE;
}
}();
return {comp_type,
1,
static_cast<size_t>(p.uncompressed_page_size),
static_cast<size_t>(p.uncompressed_page_size)};
Expand All @@ -1029,54 +1031,13 @@ struct decomp_sum {
__device__ decompression_info operator()(decompression_info const& a,
decompression_info const& b) const
{
return {a.codec,
return {a.type,
a.num_pages + b.num_pages,
cuda::std::max(a.max_page_decompressed_size, b.max_page_decompressed_size),
a.total_decompressed_size + b.total_decompressed_size};
}
};

/**
* @brief Functor which returns total scratch space required based on computed decompression_info
* data.
*
*/
struct get_decomp_scratch {
size_t operator()(decompression_info const& di) const
{
switch (di.codec) {
case UNCOMPRESSED:
case GZIP: return 0;

case BROTLI: return cudf::io::detail::get_gpu_debrotli_scratch_size(di.num_pages);

case SNAPPY:
if (cudf::io::nvcomp_integration::is_stable_enabled()) {
return nvcomp::batched_decompress_temp_size(nvcomp::compression_type::SNAPPY,
di.num_pages,
di.max_page_decompressed_size,
di.total_decompressed_size);
} else {
return 0;
}
break;

case ZSTD:
return nvcomp::batched_decompress_temp_size(nvcomp::compression_type::ZSTD,
di.num_pages,
di.max_page_decompressed_size,
di.total_decompressed_size);
case LZ4_RAW:
return nvcomp::batched_decompress_temp_size(nvcomp::compression_type::LZ4,
di.num_pages,
di.max_page_decompressed_size,
di.total_decompressed_size);

default: CUDF_FAIL("Invalid compression codec for parquet decompression");
}
}
};

/**
* @brief Add the cost of decompression codec scratch space to the per-page cumulative
* size information.
Expand All @@ -1103,14 +1064,14 @@ void include_decompression_scratch_size(device_span<ColumnChunkDesc const> chunk
thrust::equal_to<int32_t>{},
decomp_sum{});

// retrieve to host so we can call nvcomp to get compression scratch sizes
// retrieve to host so we can get compression scratch sizes
auto h_decomp_info = cudf::detail::make_host_vector_sync(decomp_info, stream);
auto temp_cost = cudf::detail::make_host_vector<size_t>(pages.size(), stream);
thrust::transform(thrust::host,
h_decomp_info.begin(),
h_decomp_info.end(),
temp_cost.begin(),
get_decomp_scratch{});
cudf::io::detail::get_decompression_scratch_size);

// add to the cumulative_page_info data
rmm::device_uvector<size_t> d_temp_cost = cudf::detail::make_device_uvector_async(
Expand Down Expand Up @@ -1295,15 +1256,15 @@ void reader::impl::setup_next_subpass(read_mode mode)

auto const num_columns = _input_columns.size();

// if the user has passed a very small value (under the hardcoded minimum_subpass_expected_size),
// respect it.
// if the user has passed a very small value (under the hardcoded
// minimum_subpass_expected_size), respect it.
auto const min_subpass_size = std::min(_input_pass_read_limit, minimum_subpass_expected_size);

// what do we do if the base memory size (the compressed data) itself is approaching or larger
// than the overall read limit? we are still going to be decompressing in subpasses, but we have
// to assume some reasonable minimum size needed to safely decompress a single subpass. so always
// reserve at least that much space. this can result in using up to 2x the specified user limit
// but should only ever happen with unrealistically low numbers.
// to assume some reasonable minimum size needed to safely decompress a single subpass. so
// always reserve at least that much space. this can result in using up to 2x the specified user
// limit but should only ever happen with unrealistically low numbers.
size_t const remaining_read_limit =
_input_pass_read_limit == 0 ? 0
: pass.base_mem_size + min_subpass_size >= _input_pass_read_limit
Expand Down Expand Up @@ -1372,8 +1333,8 @@ void reader::impl::setup_next_subpass(read_mode mode)
if (subpass.single_subpass) {
subpass.pages = pass.pages;
}
// copy the appropriate subset of pages from each column and store the mapping back to the source
// (pass) pages
// copy the appropriate subset of pages from each column and store the mapping back to the
// source (pass) pages
else {
subpass.page_buf = cudf::detail::hostdevice_vector<PageInfo>(total_pages, total_pages, _stream);
subpass.page_src_index = rmm::device_uvector<size_t>(total_pages, _stream);
Expand Down Expand Up @@ -1571,7 +1532,8 @@ void reader::impl::compute_input_passes()
return;
}

// generate passes. make sure to account for the case where a single row group doesn't fit within
// generate passes. make sure to account for the case where a single row group doesn't fit
// within
//
std::size_t const comp_read_limit =
_input_pass_read_limit > 0
Expand All @@ -1594,8 +1556,8 @@ void reader::impl::compute_input_passes()
auto const [compressed_rg_size, _ /*compressed + uncompressed*/] =
get_row_group_size(row_group);

// We must use the effective size of the first row group we are reading to accurately calculate
// the first non-zero input_pass_start_row_count.
// We must use the effective size of the first row group we are reading to accurately
// calculate the first non-zero input_pass_start_row_count.
auto const row_group_rows =
(skip_rows) ? rgi.start_row + row_group.num_rows - skip_rows : row_group.num_rows;

Expand All @@ -1605,8 +1567,8 @@ void reader::impl::compute_input_passes()
// can we add this row group
if (cur_pass_byte_size + compressed_rg_size >= comp_read_limit) {
// A single row group (the current one) is larger than the read limit:
// We always need to include at least one row group, so end the pass at the end of the current
// row group
// We always need to include at least one row group, so end the pass at the end of the
// current row group
if (cur_rg_start == cur_rg_index) {
_file_itm_data.input_pass_row_group_offsets.push_back(cur_rg_index + 1);
_file_itm_data.input_pass_start_row_count.push_back(cur_row_count + row_group_rows);
Expand Down

0 comments on commit 0268b3d

Please sign in to comment.