Skip to content

Commit

Permalink
Support multithreaded reading of compressed buffers in JSON reader (#…
Browse files Browse the repository at this point in the history
…17670)

Addresses #17638 

This PR introduces multithreaded host-side decompression of compressed input buffers passed to the JSON reader, and uses a stream pool to transfer the uncompressed buffers to device.

Authors:
  - Shruti Shivakumar (https://github.com/shrshi)

Approvers:
  - Paul Mattione (https://github.com/pmattione-nvidia)
  - Nghia Truong (https://github.com/ttnghia)

URL: #17670
  • Loading branch information
shrshi authored Jan 9, 2025
1 parent f13d8fc commit 2310159
Showing 1 changed file with 61 additions and 10 deletions.
71 changes: 61 additions & 10 deletions cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,19 +30,33 @@
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_pool.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/distance.h>
#include <thrust/iterator/constant_iterator.h>
#include <thrust/scatter.h>

#include <BS_thread_pool.hpp>
#include <BS_thread_pool_utils.hpp>

#include <numeric>

namespace cudf::io::json::detail {

namespace {

namespace pools {

BS::thread_pool& tpool()
{
static BS::thread_pool _tpool(std::thread::hardware_concurrency());
return _tpool;
}

} // namespace pools

class compressed_host_buffer_source final : public datasource {
public:
explicit compressed_host_buffer_source(std::unique_ptr<datasource> const& src,
Expand All @@ -51,8 +65,8 @@ class compressed_host_buffer_source final : public datasource {
{
auto ch_buffer = host_span<uint8_t const>(reinterpret_cast<uint8_t const*>(_dbuf_ptr->data()),
_dbuf_ptr->size());
if (comptype == compression_type::GZIP || comptype == compression_type::ZIP ||
comptype == compression_type::SNAPPY) {
if (_comptype == compression_type::GZIP || _comptype == compression_type::ZIP ||
_comptype == compression_type::SNAPPY) {
_decompressed_ch_buffer_size = cudf::io::detail::get_uncompressed_size(_comptype, ch_buffer);
} else {
_decompressed_buffer = cudf::io::detail::decompress(_comptype, ch_buffer);
Expand Down Expand Up @@ -96,7 +110,22 @@ class compressed_host_buffer_source final : public datasource {
return std::make_unique<non_owning_buffer>(_decompressed_buffer.data() + offset, count);
}

[[nodiscard]] bool supports_device_read() const override { return false; }
std::future<size_t> device_read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
auto& thread_pool = pools::tpool();
return thread_pool.submit_task([this, offset, size, dst, stream] {
auto hbuf = host_read(offset, size);
CUDF_CUDA_TRY(
cudaMemcpyAsync(dst, hbuf->data(), hbuf->size(), cudaMemcpyHostToDevice, stream.value()));
stream.synchronize();
return hbuf->size();
});
}

[[nodiscard]] bool supports_device_read() const override { return true; }

[[nodiscard]] size_t size() const override { return _decompressed_ch_buffer_size; }

Expand Down Expand Up @@ -431,6 +460,8 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
// line of file i+1 don't end up on the same JSON line, if file i does not already end with a line
// delimiter.
auto constexpr num_delimiter_chars = 1;
std::vector<std::future<size_t>> thread_tasks;
auto stream_pool = cudf::detail::fork_streams(stream, pools::tpool().get_thread_count());

auto delimiter_map = cudf::detail::make_empty_host_vector<std::size_t>(sources.size(), stream);
std::vector<std::size_t> prefsum_source_sizes(sources.size());
Expand All @@ -447,13 +478,17 @@ device_span<char> ingest_raw_input(device_span<char> buffer,

auto const total_bytes_to_read = std::min(range_size, prefsum_source_sizes.back() - range_offset);
range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0;
for (std::size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; i++) {
for (std::size_t i = start_source, cur_stream = 0;
i < sources.size() && bytes_read < total_bytes_to_read;
i++) {
if (sources[i]->is_empty()) continue;
auto data_size = std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read);
auto destination = reinterpret_cast<uint8_t*>(buffer.data()) + bytes_read +
(num_delimiter_chars * delimiter_map.size());
if (sources[i]->is_device_read_preferred(data_size)) {
bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream);
if (sources[i]->supports_device_read()) {
thread_tasks.emplace_back(sources[i]->device_read_async(
range_offset, data_size, destination, stream_pool[cur_stream++ % stream_pool.size()]));
bytes_read += data_size;
} else {
h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size));
auto const& h_buffer = h_buffers.back();
Expand Down Expand Up @@ -481,6 +516,15 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
buffer.data());
}
stream.synchronize();

if (thread_tasks.size()) {
auto const bytes_read = std::accumulate(
thread_tasks.begin(), thread_tasks.end(), std::size_t{0}, [](std::size_t sum, auto& task) {
return sum + task.get();
});
CUDF_EXPECTS(bytes_read == total_bytes_to_read, "something's fishy");
}

return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars));
}

Expand All @@ -505,10 +549,17 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
return read_json_impl(sources, reader_opts, stream, mr);

std::vector<std::unique_ptr<datasource>> compressed_sources;
for (size_t i = 0; i < sources.size(); i++) {
compressed_sources.emplace_back(
std::make_unique<compressed_host_buffer_source>(sources[i], reader_opts.get_compression()));
std::vector<std::future<std::unique_ptr<compressed_host_buffer_source>>> thread_tasks;
auto& thread_pool = pools::tpool();
for (auto& src : sources) {
thread_tasks.emplace_back(thread_pool.submit_task([&reader_opts, &src] {
return std::make_unique<compressed_host_buffer_source>(src, reader_opts.get_compression());
}));
}
std::transform(thread_tasks.begin(),
thread_tasks.end(),
std::back_inserter(compressed_sources),
[](auto& task) { return task.get(); });
// in read_json_impl, we need the compressed source size to actually be the
// uncompressed source size for correct batching
return read_json_impl(compressed_sources, reader_opts, stream, mr);
Expand Down

0 comments on commit 2310159

Please sign in to comment.