From f3081229379a7d92d7193a37a71bc43ad7a3d0fa Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 7 Jan 2025 10:20:27 -0600 Subject: [PATCH] Java Parquet reads via multiple host buffers (#17673) Adds a custom cuio datasource that can provide file data via multiple host memory buffers. This allows data that arrives from multiple threads in multiple buffers to be read directly rather than requiring the buffers to be concatenated into a single host memory buffer before reading. Authors: - Jason Lowe (https://github.com/jlowe) Approvers: - Alessandro Bellina (https://github.com/abellina) - Robert (Bobby) Evans (https://github.com/revans2) URL: https://github.com/rapidsai/cudf/pull/17673 --- .../ai/rapids/cudf/ParquetChunkedReader.java | 59 +++++-- java/src/main/java/ai/rapids/cudf/Table.java | 44 +++++- java/src/main/native/CMakeLists.txt | 5 +- .../include/multi_host_buffer_source.hpp | 57 +++++++ java/src/main/native/src/ChunkedReaderJni.cpp | 58 ++++--- java/src/main/native/src/TableJni.cpp | 26 +-- .../native/src/multi_host_buffer_source.cpp | 148 ++++++++++++++++++ .../test/java/ai/rapids/cudf/TableTest.java | 41 ++++- 8 files changed, 390 insertions(+), 48 deletions(-) create mode 100644 java/src/main/native/include/multi_host_buffer_source.hpp create mode 100644 java/src/main/native/src/multi_host_buffer_source.cpp diff --git a/java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java b/java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java index 53af52eff07..5e544e92a77 100644 --- a/java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java +++ b/java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -62,12 +62,13 @@ public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, File f * @param filePath Full path of the input Parquet file to read. */ public ParquetChunkedReader(long chunkSizeByteLimit, long passReadLimit, ParquetOptions opts, File filePath) { - handle = create(chunkSizeByteLimit, passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), - filePath.getAbsolutePath(), 0, 0, opts.timeUnit().typeId.getNativeId()); - + long[] handles = create(chunkSizeByteLimit, passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), + filePath.getAbsolutePath(), null, opts.timeUnit().typeId.getNativeId()); + handle = handles[0]; if (handle == 0) { throw new IllegalStateException("Cannot create native chunked Parquet reader object."); } + multiHostBufferSourceHandle = handles[1]; } /** @@ -100,12 +101,41 @@ public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, HostMe public ParquetChunkedReader(long chunkSizeByteLimit, long passReadLimit, ParquetOptions opts, HostMemoryBuffer buffer, long offset, long len) { - handle = create(chunkSizeByteLimit,passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), null, - buffer.getAddress() + offset, len, opts.timeUnit().typeId.getNativeId()); + long[] addrsSizes = new long[]{ buffer.getAddress() + offset, len }; + long[] handles = create(chunkSizeByteLimit,passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), null, + addrsSizes, opts.timeUnit().typeId.getNativeId()); + handle = handles[0]; + if (handle == 0) { + throw new IllegalStateException("Cannot create native chunked Parquet reader object."); + } + multiHostBufferSourceHandle = handles[1]; + } + /** + * Construct the reader instance from a read limit and data in host memory buffers. + * + * @param chunkSizeByteLimit Limit on total number of bytes to be returned per read, + * or 0 if there is no limit. + * @param passReadLimit Limit on the amount of memory used for reading and decompressing data or + * 0 if there is no limit + * @param opts The options for Parquet reading. + * @param buffers Array of buffers containing the file data. The buffers are logically + * concatenated to construct the file being read. + */ + public ParquetChunkedReader(long chunkSizeByteLimit, long passReadLimit, + ParquetOptions opts, HostMemoryBuffer... buffers) { + long[] addrsSizes = new long[buffers.length * 2]; + for (int i = 0; i < buffers.length; i++) { + addrsSizes[i * 2] = buffers[i].getAddress(); + addrsSizes[(i * 2) + 1] = buffers[i].getLength(); + } + long[] handles = create(chunkSizeByteLimit,passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), null, + addrsSizes, opts.timeUnit().typeId.getNativeId()); + handle = handles[0]; if (handle == 0) { throw new IllegalStateException("Cannot create native chunked Parquet reader object."); } + multiHostBufferSourceHandle = handles[1]; } /** @@ -181,6 +211,10 @@ public void close() { DataSourceHelper.destroyWrapperDataSource(dataSourceHandle); dataSourceHandle = 0; } + if (multiHostBufferSourceHandle != 0) { + destroyMultiHostBufferSource(multiHostBufferSourceHandle); + multiHostBufferSourceHandle = 0; + } } @@ -196,6 +230,8 @@ public void close() { private long dataSourceHandle = 0; + private long multiHostBufferSourceHandle = 0; + /** * Create a native chunked Parquet reader object on heap and return its memory address. * @@ -206,13 +242,12 @@ public void close() { * @param filterColumnNames Name of the columns to read, or an empty array if we want to read all. * @param binaryToString Whether to convert the corresponding column to String if it is binary. * @param filePath Full path of the file to read, or given as null if reading from a buffer. - * @param bufferAddrs The address of a buffer to read from, or 0 if we are not using that buffer. - * @param length The length of the buffer to read from. + * @param bufferAddrsSizes The address and size pairs of buffers to read from, or null if we are not using buffers. * @param timeUnit Return type of time unit for timestamps. */ - private static native long create(long chunkSizeByteLimit, long passReadLimit, - String[] filterColumnNames, boolean[] binaryToString, - String filePath, long bufferAddrs, long length, int timeUnit); + private static native long[] create(long chunkSizeByteLimit, long passReadLimit, + String[] filterColumnNames, boolean[] binaryToString, + String filePath, long[] bufferAddrsSizes, int timeUnit); private static native long createWithDataSource(long chunkedSizeByteLimit, String[] filterColumnNames, boolean[] binaryToString, int timeUnit, long dataSourceHandle); @@ -222,4 +257,6 @@ private static native long createWithDataSource(long chunkedSizeByteLimit, private static native long[] readChunk(long handle); private static native void close(long handle); + + private static native void destroyMultiHostBufferSource(long handle); } diff --git a/java/src/main/java/ai/rapids/cudf/Table.java b/java/src/main/java/ai/rapids/cudf/Table.java index b01ce31b1f3..298f2cff6f3 100644 --- a/java/src/main/java/ai/rapids/cudf/Table.java +++ b/java/src/main/java/ai/rapids/cudf/Table.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -313,12 +313,11 @@ private static native long readAndInferJSON(long address, long length, * all of them * @param binaryToString whether to convert this column to String if binary * @param filePath the path of the file to read, or null if no path should be read. - * @param address the address of the buffer to read from or 0 if we should not. - * @param length the length of the buffer to read from. + * @param addrsAndSizes the address and size pairs for every buffer or null for no buffers. * @param timeUnit return type of TimeStamp in units */ private static native long[] readParquet(String[] filterColumnNames, boolean[] binaryToString, String filePath, - long address, long length, int timeUnit) throws CudfException; + long[] addrsAndSizes, int timeUnit) throws CudfException; private static native long[] readParquetFromDataSource(String[] filterColumnNames, boolean[] binaryToString, int timeUnit, @@ -1357,7 +1356,7 @@ public static Table readParquet(File path) { */ public static Table readParquet(ParquetOptions opts, File path) { return new Table(readParquet(opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), - path.getAbsolutePath(), 0, 0, opts.timeUnit().typeId.getNativeId())); + path.getAbsolutePath(), null, opts.timeUnit().typeId.getNativeId())); } /** @@ -1402,6 +1401,14 @@ public static Table readParquet(ParquetOptions opts, byte[] buffer, long offset, } } + /** + * Read parquet formatted data. + * @param opts various parquet parsing options. + * @param buffer raw parquet formatted bytes. + * @param offset the starting offset into buffer. + * @param len the number of bytes to parse. + * @return the data parsed as a table on the GPU. + */ public static Table readParquet(ParquetOptions opts, byte[] buffer, long offset, long len) { return readParquet(opts, buffer, offset, len, DefaultHostMemoryAllocator.get()); } @@ -1422,10 +1429,35 @@ public static Table readParquet(ParquetOptions opts, HostMemoryBuffer buffer, assert len > 0; assert len <= buffer.getLength() - offset; assert offset >= 0 && offset < buffer.length; + long[] addrsSizes = new long[]{ buffer.getAddress() + offset, len }; + return new Table(readParquet(opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), + null, addrsSizes, opts.timeUnit().typeId.getNativeId())); + } + + /** + * Read parquet formatted data. + * @param opts various parquet parsing options. + * @param buffers Buffers containing the Parquet data. The buffers are logically concatenated + * in order to construct the file being read. + * @return the data parsed as a table on the GPU. + */ + public static Table readParquet(ParquetOptions opts, HostMemoryBuffer... buffers) { + assert buffers.length > 0; + long[] addrsSizes = new long[buffers.length * 2]; + for (int i = 0; i < buffers.length; i++) { + addrsSizes[i * 2] = buffers[i].getAddress(); + addrsSizes[(i * 2) + 1] = buffers[i].getLength(); + } return new Table(readParquet(opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), - null, buffer.getAddress() + offset, len, opts.timeUnit().typeId.getNativeId())); + null, addrsSizes, opts.timeUnit().typeId.getNativeId())); } + /** + * Read parquet formatted data. + * @param opts various parquet parsing options. + * @param ds custom datasource to provide the Parquet file data + * @return the data parsed as a table on the GPU. + */ public static Table readParquet(ParquetOptions opts, DataSource ds) { long dataSourceHandle = DataSourceHelper.createWrapperDataSource(ds); try { diff --git a/java/src/main/native/CMakeLists.txt b/java/src/main/native/CMakeLists.txt index 9ff43feeac6..bd1714aa476 100644 --- a/java/src/main/native/CMakeLists.txt +++ b/java/src/main/native/CMakeLists.txt @@ -1,5 +1,5 @@ # ============================================================================= -# Copyright (c) 2019-2024, NVIDIA CORPORATION. +# Copyright (c) 2019-2025, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at @@ -156,8 +156,9 @@ add_library( src/ScalarJni.cpp src/TableJni.cpp src/aggregation128_utils.cu - src/maps_column_view.cu src/check_nvcomp_output_sizes.cu + src/maps_column_view.cu + src/multi_host_buffer_source.cpp ) # Disable NVTX if necessary diff --git a/java/src/main/native/include/multi_host_buffer_source.hpp b/java/src/main/native/include/multi_host_buffer_source.hpp new file mode 100644 index 00000000000..2aedb2321e4 --- /dev/null +++ b/java/src/main/native/include/multi_host_buffer_source.hpp @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "jni_utils.hpp" + +#include + +#include + +namespace cudf { +namespace jni { + +/** + * @brief A custom datasource providing data from an array of host memory buffers. + */ +class multi_host_buffer_source : public cudf::io::datasource { + std::vector addrs_; + std::vector offsets_; + + size_t locate_offset_index(size_t offset); + + public: + explicit multi_host_buffer_source(native_jlongArray const& addrs_sizes); + std::unique_ptr host_read(size_t offset, size_t size) override; + size_t host_read(size_t offset, size_t size, uint8_t* dst) override; + bool supports_device_read() const override { return true; } + bool is_device_read_preferred(size_t size) const override { return true; } + std::unique_ptr device_read(size_t offset, + size_t size, + rmm::cuda_stream_view stream) override; + size_t device_read(size_t offset, + size_t size, + uint8_t* dst, + rmm::cuda_stream_view stream) override; + std::future device_read_async(size_t offset, + size_t size, + uint8_t* dst, + rmm::cuda_stream_view stream) override; + size_t size() const override { return offsets_.back(); } +}; + +} // namespace jni +} // namespace cudf diff --git a/java/src/main/native/src/ChunkedReaderJni.cpp b/java/src/main/native/src/ChunkedReaderJni.cpp index cf04a87262f..4967e0b2b04 100644 --- a/java/src/main/native/src/ChunkedReaderJni.cpp +++ b/java/src/main/native/src/ChunkedReaderJni.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. + * Copyright (c) 2022-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ #include "cudf_jni_apis.hpp" #include "jni_utils.hpp" +#include "multi_host_buffer_source.hpp" #include #include @@ -36,7 +37,7 @@ extern "C" { // This function should take all the parameters that `Table.readParquet` takes, // plus one more parameter `long chunkSizeByteLimit`. -JNIEXPORT jlong JNICALL +JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_create(JNIEnv* env, jclass, jlong chunk_read_limit, @@ -44,27 +45,26 @@ Java_ai_rapids_cudf_ParquetChunkedReader_create(JNIEnv* env, jobjectArray filter_col_names, jbooleanArray j_col_binary_read, jstring inp_file_path, - jlong buffer, - jlong buffer_length, + jlongArray addrs_sizes, jint unit) { - JNI_NULL_CHECK(env, j_col_binary_read, "Null col_binary_read", 0); + JNI_NULL_CHECK(env, j_col_binary_read, "Null col_binary_read", nullptr); bool read_buffer = true; - if (buffer == 0) { - JNI_NULL_CHECK(env, inp_file_path, "Input file or buffer must be supplied", 0); + if (addrs_sizes == nullptr) { + JNI_NULL_CHECK(env, inp_file_path, "Input file or buffer must be supplied", nullptr); read_buffer = false; } else if (inp_file_path != nullptr) { - JNI_THROW_NEW( - env, cudf::jni::ILLEGAL_ARG_CLASS, "Cannot pass in both a buffer and an inp_file_path", 0); - } else if (buffer_length <= 0) { - JNI_THROW_NEW(env, cudf::jni::ILLEGAL_ARG_CLASS, "An empty buffer is not supported", 0); + JNI_THROW_NEW(env, + cudf::jni::ILLEGAL_ARG_CLASS, + "Cannot pass in both buffers and an inp_file_path", + nullptr); } try { cudf::jni::auto_set_device(env); cudf::jni::native_jstring filename(env, inp_file_path); if (!read_buffer && filename.is_empty()) { - JNI_THROW_NEW(env, cudf::jni::ILLEGAL_ARG_CLASS, "inp_file_path cannot be empty", 0); + JNI_THROW_NEW(env, cudf::jni::ILLEGAL_ARG_CLASS, "inp_file_path cannot be empty", nullptr); } cudf::jni::native_jstringArray n_filter_col_names(env, filter_col_names); @@ -75,9 +75,15 @@ Java_ai_rapids_cudf_ParquetChunkedReader_create(JNIEnv* env, cudf::jni::native_jbooleanArray n_col_binary_read(env, j_col_binary_read); (void)n_col_binary_read; - auto const source = read_buffer ? cudf::io::source_info(reinterpret_cast(buffer), - static_cast(buffer_length)) - : cudf::io::source_info(filename.get()); + cudf::jni::native_jlongArray n_addrs_sizes(env, addrs_sizes); + std::unique_ptr multi_buffer_source; + cudf::io::source_info source; + if (read_buffer) { + multi_buffer_source.reset(new cudf::jni::multi_host_buffer_source(n_addrs_sizes)); + source = cudf::io::source_info(multi_buffer_source.get()); + } else { + source = cudf::io::source_info(filename.get()); + } auto opts_builder = cudf::io::parquet_reader_options::builder(source); if (n_filter_col_names.size() > 0) { @@ -86,13 +92,18 @@ Java_ai_rapids_cudf_ParquetChunkedReader_create(JNIEnv* env, auto const read_opts = opts_builder.convert_strings_to_categories(false) .timestamp_type(cudf::data_type(static_cast(unit))) .build(); - - return reinterpret_cast( + n_addrs_sizes.cancel(); + n_col_binary_read.cancel(); + auto reader_handle = reinterpret_cast( new cudf::io::chunked_parquet_reader(static_cast(chunk_read_limit), static_cast(pass_read_limit), read_opts)); + cudf::jni::native_jlongArray result(env, 2); + result[0] = reader_handle; + result[1] = cudf::jni::release_as_jlong(multi_buffer_source); + return result.get_jArray(); } - CATCH_STD(env, 0); + CATCH_STD(env, nullptr); } JNIEXPORT jlong JNICALL @@ -177,6 +188,17 @@ JNIEXPORT void JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_close(JNIEnv* en CATCH_STD(env, ); } +JNIEXPORT void JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_destroyMultiHostBufferSource( + JNIEnv* env, jclass, jlong handle) +{ + JNI_NULL_CHECK(env, handle, "handle is null", ); + + try { + delete reinterpret_cast(handle); + } + CATCH_STD(env, ); +} + // // Chunked ORC reader JNI // diff --git a/java/src/main/native/src/TableJni.cpp b/java/src/main/native/src/TableJni.cpp index ed35f35794d..a6c7ae9ba18 100644 --- a/java/src/main/native/src/TableJni.cpp +++ b/java/src/main/native/src/TableJni.cpp @@ -19,6 +19,7 @@ #include "jni_compiled_expr.hpp" #include "jni_utils.hpp" #include "jni_writer_data_sink.hpp" +#include "multi_host_buffer_source.hpp" #include #include @@ -2071,20 +2072,17 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readParquet(JNIEnv* env, jobjectArray filter_col_names, jbooleanArray j_col_binary_read, jstring inputfilepath, - jlong buffer, - jlong buffer_length, + jlongArray addrs_and_sizes, jint unit) { JNI_NULL_CHECK(env, j_col_binary_read, "null col_binary_read", 0); bool read_buffer = true; - if (buffer == 0) { + if (addrs_and_sizes == nullptr) { JNI_NULL_CHECK(env, inputfilepath, "input file or buffer must be supplied", NULL); read_buffer = false; } else if (inputfilepath != NULL) { JNI_THROW_NEW( env, cudf::jni::ILLEGAL_ARG_CLASS, "cannot pass in both a buffer and an inputfilepath", NULL); - } else if (buffer_length <= 0) { - JNI_THROW_NEW(env, cudf::jni::ILLEGAL_ARG_CLASS, "An empty buffer is not supported", NULL); } try { @@ -2096,10 +2094,15 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readParquet(JNIEnv* env, cudf::jni::native_jstringArray n_filter_col_names(env, filter_col_names); cudf::jni::native_jbooleanArray n_col_binary_read(env, j_col_binary_read); - - auto source = read_buffer ? cudf::io::source_info(reinterpret_cast(buffer), - static_cast(buffer_length)) - : cudf::io::source_info(filename.get()); + cudf::jni::native_jlongArray n_addrs_sizes(env, addrs_and_sizes); + std::unique_ptr multi_buffer_source; + cudf::io::source_info source; + if (read_buffer) { + multi_buffer_source.reset(new cudf::jni::multi_host_buffer_source(n_addrs_sizes)); + source = cudf::io::source_info(multi_buffer_source.get()); + } else { + source = cudf::io::source_info(filename.get()); + } auto builder = cudf::io::parquet_reader_options::builder(source); if (n_filter_col_names.size() > 0) { @@ -2110,7 +2113,10 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readParquet(JNIEnv* env, builder.convert_strings_to_categories(false) .timestamp_type(cudf::data_type(static_cast(unit))) .build(); - return convert_table_for_return(env, cudf::io::read_parquet(opts).tbl); + auto tbl = cudf::io::read_parquet(opts).tbl; + n_col_binary_read.cancel(); + n_addrs_sizes.cancel(); + return convert_table_for_return(env, tbl); } CATCH_STD(env, NULL); } diff --git a/java/src/main/native/src/multi_host_buffer_source.cpp b/java/src/main/native/src/multi_host_buffer_source.cpp new file mode 100644 index 00000000000..c577fc680ba --- /dev/null +++ b/java/src/main/native/src/multi_host_buffer_source.cpp @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "multi_host_buffer_source.hpp" + +#include +#include +#include +#include + +namespace cudf { +namespace jni { + +multi_host_buffer_source::multi_host_buffer_source(native_jlongArray const& addrs_sizes) +{ + if (addrs_sizes.size() % 2 != 0) { + throw std::logic_error("addrs_sizes length not a multiple of 2"); + } + auto count = addrs_sizes.size() / 2; + addrs_.reserve(count); + offsets_.reserve(count + 1); + size_t total_size = 0; + for (int i = 0; i < addrs_sizes.size(); i += 2) { + addrs_.push_back(reinterpret_cast(addrs_sizes[i])); + offsets_.push_back(total_size); + total_size += addrs_sizes[i + 1]; + } + offsets_.push_back(total_size); +} + +size_t multi_host_buffer_source::locate_offset_index(size_t offset) +{ + if (offset < 0 || offset >= offsets_.back()) { throw std::runtime_error("bad offset"); } + auto start = offsets_.begin(); + auto it = std::upper_bound(start, offsets_.end(), offset); + return (it - start) - 1; +} + +std::unique_ptr multi_host_buffer_source::host_read(size_t offset, + size_t size) +{ + if (size == 0) { return 0; } + if (offset < 0 || offset >= offsets_.back()) { throw std::runtime_error("bad offset"); } + auto const end_offset = offset + size; + if (end_offset > offsets_.back()) { throw std::runtime_error("read past end of file"); } + auto buffer_index = locate_offset_index(offset); + auto next_offset = offsets_[buffer_index + 1]; + if (end_offset <= next_offset) { + // read range hits only a single buffer, so return a zero-copy view of the data + auto src = addrs_[buffer_index] + offset - offsets_[buffer_index]; + return std::make_unique(src, size); + } + auto buf = std::vector(size); + auto bytes_read = host_read(offset, size, buf.data()); + if (bytes_read != size) { + std::stringstream ss; + ss << "Expected host read of " << size << " found " << bytes_read; + throw std::logic_error(ss.str()); + } + return std::make_unique>>(std::move(buf)); +} + +size_t multi_host_buffer_source::host_read(size_t offset, size_t size, uint8_t* dst) +{ + if (size == 0) { return 0; } + if (offset < 0 || offset >= offsets_.back()) { throw std::runtime_error("bad offset"); } + if (offset + size > offsets_.back()) { throw std::runtime_error("read past end of file"); } + auto buffer_index = locate_offset_index(offset); + auto bytes_left = size; + while (bytes_left > 0) { + auto next_offset = offsets_[buffer_index + 1]; + auto buffer_left = next_offset - offset; + auto buffer_offset = offset - offsets_[buffer_index]; + auto src = addrs_[buffer_index] + buffer_offset; + auto copy_size = std::min(buffer_left, bytes_left); + std::memcpy(dst, src, copy_size); + offset += copy_size; + dst += copy_size; + bytes_left -= copy_size; + ++buffer_index; + } + return size; +} + +std::unique_ptr multi_host_buffer_source::device_read( + size_t offset, size_t size, rmm::cuda_stream_view stream) +{ + rmm::device_buffer buf(size, stream); + auto dst = static_cast(buf.data()); + auto bytes_read = device_read(offset, size, dst, stream); + if (bytes_read != size) { + std::stringstream ss; + ss << "Expected device read of " << size << " found " << bytes_read; + throw std::logic_error(ss.str()); + } + return std::make_unique>(std::move(buf)); +} + +size_t multi_host_buffer_source::device_read(size_t offset, + size_t size, + uint8_t* dst, + rmm::cuda_stream_view stream) +{ + if (size == 0) { return 0; } + if (offset < 0 || offset >= offsets_.back()) { throw std::runtime_error("bad offset"); } + if (offset + size > offsets_.back()) { throw std::runtime_error("read past end of file"); } + auto buffer_index = locate_offset_index(offset); + auto bytes_left = size; + while (bytes_left > 0) { + auto next_offset = offsets_[buffer_index + 1]; + auto buffer_left = next_offset - offset; + auto buffer_offset = offset - offsets_[buffer_index]; + auto src = addrs_[buffer_index] + buffer_offset; + auto copy_size = std::min(buffer_left, bytes_left); + CUDF_CUDA_TRY(cudaMemcpyAsync(dst, src, copy_size, cudaMemcpyHostToDevice, stream.value())); + offset += copy_size; + dst += copy_size; + bytes_left -= copy_size; + ++buffer_index; + } + return size; +} + +std::future multi_host_buffer_source::device_read_async(size_t offset, + size_t size, + uint8_t* dst, + rmm::cuda_stream_view stream) +{ + std::promise p; + p.set_value(device_read(offset, size, dst, stream)); + return p.get_future(); +} + +} // namespace jni +} // namespace cudf diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java index c7fcb1756b6..7eb32892bad 100644 --- a/java/src/test/java/ai/rapids/cudf/TableTest.java +++ b/java/src/test/java/ai/rapids/cudf/TableTest.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,8 +47,11 @@ import java.math.BigInteger; import java.math.RoundingMode; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.SeekableByteChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.StandardOpenOption; import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; @@ -1714,6 +1717,42 @@ void testChunkedReadParquet() { } } + @Test + void testChunkedReadParquetHostBuffers() throws Exception { + long size = TEST_PARQUET_FILE_CHUNKED_READ.length(); + java.nio.file.Path path = TEST_PARQUET_FILE_CHUNKED_READ.toPath(); + try (HostMemoryBuffer buf1 = HostMemoryBuffer.allocate(size / 2); + HostMemoryBuffer buf2 = HostMemoryBuffer.allocate(size - buf1.getLength())) { + try (SeekableByteChannel channel = Files.newByteChannel(path, StandardOpenOption.READ)) { + ByteBuffer bb1 = buf1.asByteBuffer(); + while (bb1.hasRemaining()) { + if (channel.read(bb1) == -1) { + throw new EOFException("error reading first buffer"); + } + } + ByteBuffer bb2 = buf2.asByteBuffer(); + while (bb2.hasRemaining()) { + if (channel.read(bb2) == -1) { + throw new EOFException("error reading second buffer"); + } + } + } + ParquetOptions opts = ParquetOptions.DEFAULT; + try (ParquetChunkedReader reader = new ParquetChunkedReader(240000, 0, opts, buf1, buf2)) { + int numChunks = 0; + long totalRows = 0; + while(reader.hasNext()) { + ++numChunks; + try(Table chunk = reader.readChunk()) { + totalRows += chunk.getRowCount(); + } + } + assertEquals(2, numChunks); + assertEquals(40000, totalRows); + } + } + } + @Test void testChunkedReadParquetFromDataSource() throws IOException { try (MultiBufferDataSource source = sourceFrom(TEST_PARQUET_FILE_CHUNKED_READ);