From 0268b3d4700ab4261246f435542070a6aab97774 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Wed, 26 Feb 2025 16:36:51 -0800 Subject: [PATCH] remove rest of nvcomp from pq reader --- cpp/src/io/comp/io_uncomp.hpp | 21 +++++ cpp/src/io/comp/uncomp.cpp | 24 ++++-- cpp/src/io/parquet/reader_impl_chunking.cu | 94 +++++++--------------- 3 files changed, 68 insertions(+), 71 deletions(-) diff --git a/cpp/src/io/comp/io_uncomp.hpp b/cpp/src/io/comp/io_uncomp.hpp index dbd41f49d65..ab9a6c54258 100644 --- a/cpp/src/io/comp/io_uncomp.hpp +++ b/cpp/src/io/comp/io_uncomp.hpp @@ -71,5 +71,26 @@ void decompress(compression_type compression, */ size_t get_uncompressed_size(compression_type compression, host_span src); +/** + * @brief Struct to hold information about decompression. + * + * This struct contains details about the decompression process, including + * the type of compression, the number of pages, the maximum size + * of a decompressed page, and the total decompressed size. + */ +struct decompression_info { + compression_type type; + size_t num_pages; + size_t max_page_decompressed_size; + size_t total_decompressed_size; +}; + +/** + * @brief Functor which returns total scratch space required based on computed decompression_info + * data. + * + */ +[[nodiscard]] size_t get_decompression_scratch_size(decompression_info const& di); + } // namespace io::detail } // namespace CUDF_EXPORT cudf diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 591b3e90440..3f98e09b192 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -729,10 +729,7 @@ void host_decompress(compression_type compression, } } -[[nodiscard]] bool use_host_decompression( - compression_type compression, - [[maybe_unused]] device_span const> inputs, - [[maybe_unused]] device_span const> outputs) +[[nodiscard]] bool use_host_decompression(compression_type compression) { CUDF_EXPECTS( not host_decompression_supported(compression) or device_decompression_supported(compression), @@ -743,6 +740,23 @@ void host_decompress(compression_type compression, return getenv_or("LIBCUDF_HOST_DECOMPRESSION", std::string{"OFF"}) == "ON"; } +[[nodiscard]] size_t get_decompression_scratch_size(decompression_info const& di) +{ + if (di.type == compression_type::NONE or use_host_decompression(di.type)) { return 0; } + + auto const nvcomp_type = to_nvcomp_compression(di.type); + auto nvcomp_disabled = nvcomp_type.has_value() ? nvcomp::is_decompression_disabled(*nvcomp_type) + : "invalid compression type"; + if (not nvcomp_disabled) { + nvcomp::batched_decompress_temp_size( + nvcomp_type.value(), di.num_pages, di.max_page_decompressed_size, di.total_decompressed_size); + } + + if (di.type == compression_type::BROTLI) return get_gpu_debrotli_scratch_size(di.num_pages); + // only Brotli kernel requires scratch memory + return 0; +} + void decompress(compression_type compression, device_span const> inputs, device_span const> outputs, @@ -753,7 +767,7 @@ void decompress(compression_type compression, { CUDF_FUNC_RANGE(); if (inputs.empty()) { return; } - if (use_host_decompression(compression, inputs, outputs)) { + if (use_host_decompression(compression)) { return host_decompress(compression, inputs, outputs, results, stream); } else { return device_decompress( diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 06f80c1468c..1f6b6af793a 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -17,7 +17,6 @@ #include "compact_protocol_reader.hpp" #include "io/comp/gpuinflate.hpp" #include "io/comp/io_uncomp.hpp" -#include "io/comp/nvcomp_adapter.hpp" #include "io/utilities/time_utils.cuh" #include "reader_impl.hpp" #include "reader_impl_chunking.hpp" @@ -46,9 +45,9 @@ namespace cudf::io::parquet::detail { namespace { -namespace nvcomp = cudf::io::detail::nvcomp; using cudf::io::detail::compression_result; using cudf::io::detail::compression_status; +using cudf::io::detail::decompression_info; struct split_info { row_range rows; @@ -998,13 +997,6 @@ void detect_malformed_pages(device_span pages, } } -struct decompression_info { - Compression codec; - size_t num_pages; - size_t max_page_decompressed_size; - size_t total_decompressed_size; -}; - /** * @brief Functor which retrieves per-page decompression information. * @@ -1014,7 +1006,17 @@ struct get_decomp_info { __device__ decompression_info operator()(PageInfo const& p) const { - return {static_cast(chunks[p.chunk_idx].codec), + auto const comp_type = [codec = chunks[p.chunk_idx].codec]() { + switch (codec) { + case SNAPPY: return compression_type::SNAPPY; + case GZIP: return compression_type::GZIP; + case BROTLI: return compression_type::BROTLI; + case ZSTD: return compression_type::ZSTD; + case LZ4_RAW: return compression_type::LZ4; + default: return compression_type::NONE; + } + }(); + return {comp_type, 1, static_cast(p.uncompressed_page_size), static_cast(p.uncompressed_page_size)}; @@ -1029,54 +1031,13 @@ struct decomp_sum { __device__ decompression_info operator()(decompression_info const& a, decompression_info const& b) const { - return {a.codec, + return {a.type, a.num_pages + b.num_pages, cuda::std::max(a.max_page_decompressed_size, b.max_page_decompressed_size), a.total_decompressed_size + b.total_decompressed_size}; } }; -/** - * @brief Functor which returns total scratch space required based on computed decompression_info - * data. - * - */ -struct get_decomp_scratch { - size_t operator()(decompression_info const& di) const - { - switch (di.codec) { - case UNCOMPRESSED: - case GZIP: return 0; - - case BROTLI: return cudf::io::detail::get_gpu_debrotli_scratch_size(di.num_pages); - - case SNAPPY: - if (cudf::io::nvcomp_integration::is_stable_enabled()) { - return nvcomp::batched_decompress_temp_size(nvcomp::compression_type::SNAPPY, - di.num_pages, - di.max_page_decompressed_size, - di.total_decompressed_size); - } else { - return 0; - } - break; - - case ZSTD: - return nvcomp::batched_decompress_temp_size(nvcomp::compression_type::ZSTD, - di.num_pages, - di.max_page_decompressed_size, - di.total_decompressed_size); - case LZ4_RAW: - return nvcomp::batched_decompress_temp_size(nvcomp::compression_type::LZ4, - di.num_pages, - di.max_page_decompressed_size, - di.total_decompressed_size); - - default: CUDF_FAIL("Invalid compression codec for parquet decompression"); - } - } -}; - /** * @brief Add the cost of decompression codec scratch space to the per-page cumulative * size information. @@ -1103,14 +1064,14 @@ void include_decompression_scratch_size(device_span chunk thrust::equal_to{}, decomp_sum{}); - // retrieve to host so we can call nvcomp to get compression scratch sizes + // retrieve to host so we can get compression scratch sizes auto h_decomp_info = cudf::detail::make_host_vector_sync(decomp_info, stream); auto temp_cost = cudf::detail::make_host_vector(pages.size(), stream); thrust::transform(thrust::host, h_decomp_info.begin(), h_decomp_info.end(), temp_cost.begin(), - get_decomp_scratch{}); + cudf::io::detail::get_decompression_scratch_size); // add to the cumulative_page_info data rmm::device_uvector d_temp_cost = cudf::detail::make_device_uvector_async( @@ -1295,15 +1256,15 @@ void reader::impl::setup_next_subpass(read_mode mode) auto const num_columns = _input_columns.size(); - // if the user has passed a very small value (under the hardcoded minimum_subpass_expected_size), - // respect it. + // if the user has passed a very small value (under the hardcoded + // minimum_subpass_expected_size), respect it. auto const min_subpass_size = std::min(_input_pass_read_limit, minimum_subpass_expected_size); // what do we do if the base memory size (the compressed data) itself is approaching or larger // than the overall read limit? we are still going to be decompressing in subpasses, but we have - // to assume some reasonable minimum size needed to safely decompress a single subpass. so always - // reserve at least that much space. this can result in using up to 2x the specified user limit - // but should only ever happen with unrealistically low numbers. + // to assume some reasonable minimum size needed to safely decompress a single subpass. so + // always reserve at least that much space. this can result in using up to 2x the specified user + // limit but should only ever happen with unrealistically low numbers. size_t const remaining_read_limit = _input_pass_read_limit == 0 ? 0 : pass.base_mem_size + min_subpass_size >= _input_pass_read_limit @@ -1372,8 +1333,8 @@ void reader::impl::setup_next_subpass(read_mode mode) if (subpass.single_subpass) { subpass.pages = pass.pages; } - // copy the appropriate subset of pages from each column and store the mapping back to the source - // (pass) pages + // copy the appropriate subset of pages from each column and store the mapping back to the + // source (pass) pages else { subpass.page_buf = cudf::detail::hostdevice_vector(total_pages, total_pages, _stream); subpass.page_src_index = rmm::device_uvector(total_pages, _stream); @@ -1571,7 +1532,8 @@ void reader::impl::compute_input_passes() return; } - // generate passes. make sure to account for the case where a single row group doesn't fit within + // generate passes. make sure to account for the case where a single row group doesn't fit + // within // std::size_t const comp_read_limit = _input_pass_read_limit > 0 @@ -1594,8 +1556,8 @@ void reader::impl::compute_input_passes() auto const [compressed_rg_size, _ /*compressed + uncompressed*/] = get_row_group_size(row_group); - // We must use the effective size of the first row group we are reading to accurately calculate - // the first non-zero input_pass_start_row_count. + // We must use the effective size of the first row group we are reading to accurately + // calculate the first non-zero input_pass_start_row_count. auto const row_group_rows = (skip_rows) ? rgi.start_row + row_group.num_rows - skip_rows : row_group.num_rows; @@ -1605,8 +1567,8 @@ void reader::impl::compute_input_passes() // can we add this row group if (cur_pass_byte_size + compressed_rg_size >= comp_read_limit) { // A single row group (the current one) is larger than the read limit: - // We always need to include at least one row group, so end the pass at the end of the current - // row group + // We always need to include at least one row group, so end the pass at the end of the + // current row group if (cur_rg_start == cur_rg_index) { _file_itm_data.input_pass_row_group_offsets.push_back(cur_rg_index + 1); _file_itm_data.input_pass_start_row_count.push_back(cur_row_count + row_group_rows);