From 56883650b7783674d4919a785dc1e022274ec2d7 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Thu, 6 Feb 2025 21:45:23 +0000 Subject: [PATCH 01/14] bug fix --- cpp/src/io/json/read_json.cu | 96 +++++++++++++++++++++++++++--------- 1 file changed, 72 insertions(+), 24 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 4b0af7d6e81..93bb43f5f05 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -220,10 +220,11 @@ size_type find_first_delimiter(device_span d_data, * @param stream CUDA stream used for device memory operations and kernel launches * @returns Data source owning buffer enclosing the bytes read */ -datasource::owning_buffer get_record_range_raw_input( - host_span> sources, - json_reader_options const& reader_opts, - rmm::cuda_stream_view stream) +std::pair, + std::optional>> +get_record_range_raw_input(host_span> sources, + json_reader_options const& reader_opts, + rmm::cuda_stream_view stream) { CUDF_FUNC_RANGE(); @@ -260,7 +261,8 @@ datasource::owning_buffer get_record_range_raw_input( if (first_delim_pos == -1) { // return empty owning datasource buffer auto empty_buf = rmm::device_buffer(0, stream); - return datasource::owning_buffer(std::move(empty_buf)); + return std::make_pair(datasource::owning_buffer(std::move(empty_buf)), + std::nullopt); } else if (!should_load_till_last_source) { // Find next delimiter std::int64_t next_delim_pos = -1; @@ -302,10 +304,12 @@ datasource::owning_buffer get_record_range_raw_input( CUDF_EXPECTS(static_cast(next_delim_pos - first_delim_pos - shift_for_nonzero_offset) < batch_limit, "The size of the JSON buffer returned by every batch cannot exceed INT_MAX bytes"); - return datasource::owning_buffer( - std::move(buffer), - reinterpret_cast(buffer.data()) + first_delim_pos + shift_for_nonzero_offset, - next_delim_pos - first_delim_pos - shift_for_nonzero_offset); + return std::make_pair( + datasource::owning_buffer( + std::move(buffer), + reinterpret_cast(buffer.data()) + first_delim_pos + shift_for_nonzero_offset, + next_delim_pos - first_delim_pos - shift_for_nonzero_offset), + std::nullopt); } // Add delimiter to end of buffer - possibly adding an empty line to the input buffer - iff we are @@ -323,34 +327,56 @@ datasource::owning_buffer get_record_range_raw_input( num_chars++; } - return datasource::owning_buffer( - std::move(buffer), - reinterpret_cast(buffer.data()) + first_delim_pos + shift_for_nonzero_offset, - num_chars); + return std::make_pair( + datasource::owning_buffer( + std::move(buffer), + reinterpret_cast(buffer.data()) + first_delim_pos + shift_for_nonzero_offset, + num_chars), + std::nullopt); } // Helper function to read the current batch using byte range offsets and size // passed -table_with_metadata read_batch(host_span> sources, - json_reader_options const& reader_opts, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) +std::pair> read_batch( + host_span> sources, + json_reader_options const& reader_opts, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); + auto bufviews = get_record_range_raw_input(sources, reader_opts, stream); + /* datasource::owning_buffer bufview = get_record_range_raw_input(sources, reader_opts, stream); + */ // If input JSON buffer has single quotes and option to normalize single quotes is enabled, // invoke pre-processing FST if (reader_opts.is_enabled_normalize_single_quotes()) { normalize_single_quotes( - bufview, reader_opts.get_delimiter(), stream, cudf::get_current_device_resource_ref()); + bufviews.first, reader_opts.get_delimiter(), stream, cudf::get_current_device_resource_ref()); } - auto buffer = - cudf::device_span(reinterpret_cast(bufview.data()), bufview.size()); + auto buffer = cudf::device_span(reinterpret_cast(bufviews.first.data()), + bufviews.first.size()); stream.synchronize(); - return device_parse_nested_json(buffer, reader_opts, stream, mr); + auto first_partial_table = device_parse_nested_json(buffer, reader_opts, stream, mr); + + if (!bufviews.second.has_value()) + return std::make_pair(std::move(first_partial_table), std::nullopt); + + if (reader_opts.is_enabled_normalize_single_quotes()) { + normalize_single_quotes(bufviews.second.value(), + reader_opts.get_delimiter(), + stream, + cudf::get_current_device_resource_ref()); + } + buffer = cudf::device_span( + reinterpret_cast(bufviews.second.value().data()), bufviews.second.value().size()); + stream.synchronize(); + auto second_partial_table = device_parse_nested_json(buffer, reader_opts, stream, mr); + + return std::make_pair(std::move(first_partial_table), std::move(second_partial_table)); } table_with_metadata read_json_impl(host_span> sources, @@ -421,11 +447,17 @@ table_with_metadata read_json_impl(host_span> source * unnecessary concatenate. The size of batch_offsets is 1 if all sources are empty, * or if end_bytes_size is larger than total_source_size. */ - if (batch_offsets.size() <= 2) return read_batch(sources, reader_opts, stream, mr); + // if (batch_offsets.size() <= 2) return read_batch(sources, reader_opts, stream, mr); std::vector partial_tables; json_reader_options batched_reader_opts{reader_opts}; + auto partial_tables_ = + read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref()); + partial_tables.emplace_back(std::move(partial_tables_.first)); + if (partial_tables_.second.has_value()) + partial_tables.emplace_back(std::move(partial_tables_.second.value())); + // recursive lambda to construct schema_element. Here, we assume that the table from the // first batch contains all the columns in the concatenated table, and that the partial tables // from all following batches contain the same set of columns @@ -476,8 +508,15 @@ table_with_metadata read_json_impl(host_span> source }; batched_reader_opts.set_byte_range_offset(batch_offsets[0]); batched_reader_opts.set_byte_range_size(batch_offsets[1] - batch_offsets[0]); + partial_tables_ = + read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref()); + partial_tables.emplace_back(std::move(partial_tables_.first)); + if (partial_tables_.second.has_value()) + partial_tables.emplace_back(std::move(partial_tables_.second.value())); + /* partial_tables.emplace_back( read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); + */ auto& tbl = partial_tables.back().tbl; std::vector children; @@ -496,14 +535,23 @@ table_with_metadata read_json_impl(host_span> source batched_reader_opts.set_byte_range_offset(batch_offsets[batch_offset_pos]); batched_reader_opts.set_byte_range_size(batch_offsets[batch_offset_pos + 1] - batch_offsets[batch_offset_pos]); + /* auto partial_table = read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref()); - if (partial_table.tbl->num_columns() == 0 && partial_table.tbl->num_rows() == 0) { + */ + auto partial_tables_ = + read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref()); + + if (partial_tables_.first.tbl->num_columns() == 0 && + partial_tables_.first.tbl->num_rows() == 0) { CUDF_EXPECTS(batch_offset_pos == batch_offsets.size() - 2, "Only the partial table generated by the last batch can be empty"); break; } - partial_tables.emplace_back(std::move(partial_table)); + // partial_tables.emplace_back(std::move(partial_table)); + partial_tables.emplace_back(std::move(partial_tables_.first)); + if (partial_tables_.second.has_value()) + partial_tables.emplace_back(std::move(partial_tables_.second.value())); } auto expects_schema_equality = From e040d5bac43ed1724f1a4c02513d45a440bb499c Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Sat, 8 Feb 2025 23:34:09 +0000 Subject: [PATCH 02/14] pre formatting --- cpp/src/io/json/read_json.cu | 179 +++++++++++++++----------- cpp/tests/io/json/json_test.cpp | 3 +- cpp/tests/large_strings/json_tests.cu | 69 ++++++++++ 3 files changed, 178 insertions(+), 73 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 93bb43f5f05..8427e22173e 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -233,13 +233,8 @@ get_record_range_raw_input(host_span> sources, auto const delimiter = reader_opts.get_delimiter(); auto const num_extra_delimiters = num_delimiter_chars * sources.size(); std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); - std::size_t chunk_size = reader_opts.get_byte_range_size(); - - CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset, - "Invalid offsetting", - std::invalid_argument); - auto should_load_till_last_source = !chunk_size || chunk_size >= total_source_size - chunk_offset; - chunk_size = should_load_till_last_source ? total_source_size - chunk_offset : chunk_size; + std::size_t const chunk_size = reader_opts.get_byte_range_size(); + auto const should_load_till_last_source = chunk_offset + chunk_size == total_source_size; int num_subchunks_prealloced = should_load_till_last_source ? 0 : max_subchunks_prealloced; std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); @@ -258,12 +253,21 @@ get_record_range_raw_input(host_span> sources, auto const shift_for_nonzero_offset = std::min(chunk_offset, 1); auto const first_delim_pos = chunk_offset == 0 ? 0 : find_first_delimiter(readbufspan, delimiter, stream); + auto is_last_char_delimiter = [delimiter, readbufspan, stream]() { + char last_char; + cudf::detail::cuda_memcpy( + host_span(&last_char, 1, false), + readbufspan.subspan(readbufspan.size() - 1, 1), + stream); + return last_char == delimiter; + }(); + if (first_delim_pos == -1) { // return empty owning datasource buffer auto empty_buf = rmm::device_buffer(0, stream); return std::make_pair(datasource::owning_buffer(std::move(empty_buf)), std::nullopt); - } else if (!should_load_till_last_source) { + } else if (!should_load_till_last_source && !is_last_char_delimiter) { // Find next delimiter std::int64_t next_delim_pos = -1; std::size_t next_subchunk_start = chunk_offset + chunk_size; @@ -300,16 +304,36 @@ get_record_range_raw_input(host_span> sources, } } - auto const batch_limit = static_cast(std::numeric_limits::max()); - CUDF_EXPECTS(static_cast(next_delim_pos - first_delim_pos - shift_for_nonzero_offset) < - batch_limit, - "The size of the JSON buffer returned by every batch cannot exceed INT_MAX bytes"); + auto const batch_limit = getenv_or("LIBCUDF_JSON_BATCH_SIZE", static_cast(std::numeric_limits::max())); + if(static_cast(next_delim_pos - first_delim_pos - shift_for_nonzero_offset) < batch_limit) { + return std::make_pair( + datasource::owning_buffer( + std::move(buffer), + reinterpret_cast(buffer.data()) + first_delim_pos + shift_for_nonzero_offset, + next_delim_pos - first_delim_pos - shift_for_nonzero_offset + 1), + std::nullopt); + } + device_span bufsubspan = bufspan.subspan(first_delim_pos + shift_for_nonzero_offset, next_delim_pos - first_delim_pos - shift_for_nonzero_offset); + auto rev_it_begin = thrust::make_reverse_iterator(bufsubspan.end()); + auto rev_it_end = thrust::make_reverse_iterator(bufsubspan.begin()); + auto const second_last_delimiter_it = + thrust::find(rmm::exec_policy(stream), rev_it_begin, rev_it_end, delimiter); + CUDF_EXPECTS(second_last_delimiter_it != rev_it_end, "A single JSON line cannot be larger than 2GB"); + auto const last_line_size = static_cast(thrust::distance(rev_it_begin, second_last_delimiter_it)); + CUDF_EXPECTS(last_line_size < batch_limit, "A single JSON line cannot be larger than 2GB"); + + rmm::device_buffer second_buffer(bufsubspan.data() + static_cast(thrust::distance(second_last_delimiter_it, rev_it_end)), last_line_size + 1, stream); + return std::make_pair( datasource::owning_buffer( std::move(buffer), reinterpret_cast(buffer.data()) + first_delim_pos + shift_for_nonzero_offset, - next_delim_pos - first_delim_pos - shift_for_nonzero_offset), - std::nullopt); + next_delim_pos - first_delim_pos - shift_for_nonzero_offset - last_line_size), + datasource::owning_buffer( + std::move(second_buffer), + reinterpret_cast(second_buffer.data()), + second_buffer.size()) + ); } // Add delimiter to end of buffer - possibly adding an empty line to the input buffer - iff we are @@ -317,7 +341,7 @@ get_record_range_raw_input(host_span> sources, // table generated from the JSONL input remains unchanged since empty lines are ignored by the // parser. size_t num_chars = readbufspan.size() - first_delim_pos - shift_for_nonzero_offset; - if (num_chars) { + if (num_chars && !is_last_char_delimiter) { auto last_char = delimiter; cudf::detail::cuda_memcpy_async( device_span(reinterpret_cast(buffer.data()), buffer.size()) @@ -345,10 +369,6 @@ std::pair> read_batch( { CUDF_FUNC_RANGE(); auto bufviews = get_record_range_raw_input(sources, reader_opts, stream); - /* - datasource::owning_buffer bufview = - get_record_range_raw_input(sources, reader_opts, stream); - */ // If input JSON buffer has single quotes and option to normalize single quotes is enabled, // invoke pre-processing FST @@ -360,7 +380,14 @@ std::pair> read_batch( auto buffer = cudf::device_span(reinterpret_cast(bufviews.first.data()), bufviews.first.size()); stream.synchronize(); + std::printf("first hbuf = "); + auto hbuf = cudf::detail::make_std_vector_sync(buffer, stream); + for(auto c : hbuf) + std::printf("%c", c); + std::printf("==========================\n"); + auto first_partial_table = device_parse_nested_json(buffer, reader_opts, stream, mr); + stream.synchronize(); if (!bufviews.second.has_value()) return std::make_pair(std::move(first_partial_table), std::nullopt); @@ -374,6 +401,12 @@ std::pair> read_batch( buffer = cudf::device_span( reinterpret_cast(bufviews.second.value().data()), bufviews.second.value().size()); stream.synchronize(); + std::printf("second hbuf = "); + hbuf = cudf::detail::make_std_vector_sync(buffer, stream); + for(auto c : hbuf) + std::printf("%c", c); + std::printf("==========================\n"); + auto second_partial_table = device_parse_nested_json(buffer, reader_opts, stream, mr); return std::make_pair(std::move(first_partial_table), std::move(second_partial_table)); @@ -398,7 +431,11 @@ table_with_metadata read_json_impl(host_span> source reader_opts.is_enabled_lines() || total_source_size < std::numeric_limits::max(), "Parsing Regular JSON inputs of size greater than INT_MAX bytes is not supported"); - std::size_t chunk_offset = reader_opts.get_byte_range_offset(); + // TODO: move the offset and size logic from get_record_range_raw_input here + std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); + CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset, + "Invalid offsetting", + std::invalid_argument); std::size_t chunk_size = reader_opts.get_byte_range_size(); chunk_size = !chunk_size ? total_source_size - chunk_offset : std::min(chunk_size, total_source_size - chunk_offset); @@ -442,21 +479,26 @@ table_with_metadata read_json_impl(host_span> source } i++; } + /* * If there is a single batch, then we can directly return the table without the * unnecessary concatenate. The size of batch_offsets is 1 if all sources are empty, * or if end_bytes_size is larger than total_source_size. */ - // if (batch_offsets.size() <= 2) return read_batch(sources, reader_opts, stream, mr); - std::vector partial_tables; json_reader_options batched_reader_opts{reader_opts}; - - auto partial_tables_ = - read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref()); - partial_tables.emplace_back(std::move(partial_tables_.first)); - if (partial_tables_.second.has_value()) - partial_tables.emplace_back(std::move(partial_tables_.second.value())); + batched_reader_opts.set_byte_range_offset(chunk_offset); + batched_reader_opts.set_byte_range_size(chunk_size); + + auto insert_partial_tables = [&partial_tables](std::pair> &&partial_table_pair) { + if (partial_table_pair.first.tbl->num_columns() == 0 && partial_table_pair.first.tbl->num_rows() == 0) return false; + partial_tables.emplace_back(std::move(partial_table_pair.first)); + if (partial_table_pair.second.has_value()) { + if (partial_table_pair.second.value().tbl->num_columns() == 0 && partial_table_pair.second.value().tbl->num_rows() == 0) return false; + partial_tables.emplace_back(std::move(partial_table_pair.second.value())); + } + return true; + }; // recursive lambda to construct schema_element. Here, we assume that the table from the // first batch contains all the columns in the concatenated table, and that the partial tables @@ -506,54 +548,47 @@ table_with_metadata read_json_impl(host_span> source return schema; }; - batched_reader_opts.set_byte_range_offset(batch_offsets[0]); - batched_reader_opts.set_byte_range_size(batch_offsets[1] - batch_offsets[0]); - partial_tables_ = - read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref()); - partial_tables.emplace_back(std::move(partial_tables_.first)); - if (partial_tables_.second.has_value()) - partial_tables.emplace_back(std::move(partial_tables_.second.value())); - /* - partial_tables.emplace_back( - read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); - */ - - auto& tbl = partial_tables.back().tbl; - std::vector children; - for (size_type j = 0; j < tbl->num_columns(); j++) { - children.emplace_back(tbl->get_column(j)); + + //std::printf("batch_offsets.size() = %lu\n", batch_offsets.size()); + if (batch_offsets.size() <= 2) { + auto has_inserted = insert_partial_tables(read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); + if(!has_inserted) { + return table_with_metadata{std::make_unique(std::vector>{}), {std::vector{}}}; + } } - batched_reader_opts.set_dtypes( - construct_schema(children, partial_tables.back().metadata.schema_info, schema)); - batched_reader_opts.enable_prune_columns(true); - - // Dispatch individual batches to read_batch and push the resulting table into - // partial_tables array. Note that the reader options need to be updated for each - // batch to adjust byte range offset and byte range size. - for (std::size_t batch_offset_pos = 1; batch_offset_pos < batch_offsets.size() - 1; - batch_offset_pos++) { - batched_reader_opts.set_byte_range_offset(batch_offsets[batch_offset_pos]); - batched_reader_opts.set_byte_range_size(batch_offsets[batch_offset_pos + 1] - - batch_offsets[batch_offset_pos]); - /* - auto partial_table = - read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref()); - */ - auto partial_tables_ = - read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref()); - - if (partial_tables_.first.tbl->num_columns() == 0 && - partial_tables_.first.tbl->num_rows() == 0) { - CUDF_EXPECTS(batch_offset_pos == batch_offsets.size() - 2, - "Only the partial table generated by the last batch can be empty"); - break; + else { + batched_reader_opts.set_byte_range_offset(batch_offsets[0]); + batched_reader_opts.set_byte_range_size(batch_offsets[1] - batch_offsets[0]); + insert_partial_tables(read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); + + auto& tbl = partial_tables.back().tbl; + std::vector children; + for (size_type j = 0; j < tbl->num_columns(); j++) { + children.emplace_back(tbl->get_column(j)); + } + batched_reader_opts.set_dtypes( + construct_schema(children, partial_tables.back().metadata.schema_info, schema)); + batched_reader_opts.enable_prune_columns(true); + + // Dispatch individual batches to read_batch and push the resulting table into + // partial_tables array. Note that the reader options need to be updated for each + // batch to adjust byte range offset and byte range size. + for (std::size_t batch_offset_pos = 1; batch_offset_pos < batch_offsets.size() - 1; + batch_offset_pos++) { + batched_reader_opts.set_byte_range_offset(batch_offsets[batch_offset_pos]); + batched_reader_opts.set_byte_range_size(batch_offsets[batch_offset_pos + 1] - + batch_offsets[batch_offset_pos]); + auto has_inserted = insert_partial_tables(read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); + + if(!has_inserted) { + CUDF_EXPECTS(batch_offset_pos == batch_offsets.size() - 2, + "Only the partial table generated by the last batch can be empty"); + break; + } } - // partial_tables.emplace_back(std::move(partial_table)); - partial_tables.emplace_back(std::move(partial_tables_.first)); - if (partial_tables_.second.has_value()) - partial_tables.emplace_back(std::move(partial_tables_.second.value())); } + //std::printf("partial_tables.size() = %lu\n", partial_tables.size()); auto expects_schema_equality = std::all_of(partial_tables.begin() + 1, partial_tables.end(), diff --git a/cpp/tests/io/json/json_test.cpp b/cpp/tests/io/json/json_test.cpp index 00f46975fdc..a717fb8cd76 100644 --- a/cpp/tests/io/json/json_test.cpp +++ b/cpp/tests/io/json/json_test.cpp @@ -667,11 +667,12 @@ TEST_F(JsonReaderTest, JsonLinesByteRange) outfile << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]\n"; outfile.close(); + // Reading 0]\n[3000]\n[4000]\n[5000]\n cudf::io::json_reader_options in_options = cudf::io::json_reader_options::builder(cudf::io::source_info{fname}) .lines(true) .byte_range_offset(11) - .byte_range_size(20); + .byte_range_size(24); cudf::io::table_with_metadata result = cudf::io::read_json(in_options); diff --git a/cpp/tests/large_strings/json_tests.cu b/cpp/tests/large_strings/json_tests.cu index 205fb12c4dd..adaebeb75cb 100644 --- a/cpp/tests/large_strings/json_tests.cu +++ b/cpp/tests/large_strings/json_tests.cu @@ -16,6 +16,7 @@ #include "../io/json/json_utils.cuh" #include "io/comp/comp.hpp" +#include "io/comp/io_uncomp.hpp" #include "large_strings_fixture.hpp" #include @@ -195,3 +196,71 @@ TEST_P(JsonLargeReaderTest, MultiBatchWithNulls) // Read full test data via existing, nested JSON lines reader CUDF_EXPECT_NO_THROW(cudf::io::read_json(cjson_lines_options)); } + +TEST_P(JsonLargeReaderTest, MultiBatchFailing) +{ + cudf::io::compression_type const comptype = GetParam(); + + std::size_t batch_size_upper_bound = std::numeric_limits::max() / 16; + // set smaller batch_size to reduce file size and execution time + this->set_batch_size(batch_size_upper_bound); + + std::string json_string = R"( + { "a": { "y" : 6}, "b" : [1, 2, 3], "c": "11" } + { "a": { "y" : 6}, "b" : [4, 5 ], "c": "12" } + { "a": { "y" : 6}, "b" : [6 ], "c": "13" } + { "a": { "y" : 6}, "b" : [7 ], "c": "14" } + )"; + /* + constexpr std::size_t expected_file_size = 1.0 * static_cast(batch_size_upper_bound); + std::size_t log_repetitions = + static_cast(std::floor(std::log2(expected_file_size / json_string.size()))); + json_string.reserve(json_string.size() * (1UL << log_repetitions)); + for (std::size_t i = 0; i < log_repetitions; i++) { + json_string += json_string; + } + */ + batch_size_upper_bound = json_string.size(); + this->set_batch_size(batch_size_upper_bound); + std::string really_long_string = R"(haha)"; + std::size_t log_repetitions = + static_cast(std::floor(std::log2(static_cast(json_string.size()) / really_long_string.size()))); + really_long_string.reserve(really_long_string.size() * (1UL << log_repetitions)); + for (std::size_t i = 0; i < log_repetitions; i++) { + really_long_string += really_long_string; + } + std::string last_line = R"({ "a": { "y" : 6}, "b" : [1, 2, 3], "c": ")"; + last_line += really_long_string + "\" }\n"; + json_string += last_line; + + std::vector cdata; + if (comptype != cudf::io::compression_type::NONE) { + cdata = cudf::io::detail::compress( + comptype, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size()), + cudf::get_default_stream()); + } else { + cdata = std::vector( + reinterpret_cast(json_string.data()), + reinterpret_cast(json_string.data()) + json_string.size()); + } + + constexpr int num_sources = 1; + std::vector> chostbufs( + num_sources, + cudf::host_span(reinterpret_cast(cdata.data()), + cdata.size())); + + // Initialize parsing options (reading json lines) + cudf::io::json_reader_options cjson_lines_options = + cudf::io::json_reader_options::builder( + cudf::io::source_info{ + cudf::host_span>(chostbufs.data(), chostbufs.size())}) + .lines(true) + .compression(comptype) + .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); + + // Read full test data via existing, nested JSON lines reader + CUDF_EXPECT_NO_THROW(cudf::io::read_json(cjson_lines_options)); +} From 8f8b9e7fe34070874d19e26428f39ff6cc2a2e51 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Mon, 10 Feb 2025 22:15:43 +0000 Subject: [PATCH 03/14] small bug fix --- cpp/src/io/json/read_json.cu | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 8427e22173e..f4a33fc69c8 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -380,12 +380,6 @@ std::pair> read_batch( auto buffer = cudf::device_span(reinterpret_cast(bufviews.first.data()), bufviews.first.size()); stream.synchronize(); - std::printf("first hbuf = "); - auto hbuf = cudf::detail::make_std_vector_sync(buffer, stream); - for(auto c : hbuf) - std::printf("%c", c); - std::printf("==========================\n"); - auto first_partial_table = device_parse_nested_json(buffer, reader_opts, stream, mr); stream.synchronize(); @@ -401,12 +395,6 @@ std::pair> read_batch( buffer = cudf::device_span( reinterpret_cast(bufviews.second.value().data()), bufviews.second.value().size()); stream.synchronize(); - std::printf("second hbuf = "); - hbuf = cudf::detail::make_std_vector_sync(buffer, stream); - for(auto c : hbuf) - std::printf("%c", c); - std::printf("==========================\n"); - auto second_partial_table = device_parse_nested_json(buffer, reader_opts, stream, mr); return std::make_pair(std::move(first_partial_table), std::move(second_partial_table)); @@ -549,7 +537,6 @@ table_with_metadata read_json_impl(host_span> source return schema; }; - //std::printf("batch_offsets.size() = %lu\n", batch_offsets.size()); if (batch_offsets.size() <= 2) { auto has_inserted = insert_partial_tables(read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); if(!has_inserted) { @@ -588,7 +575,7 @@ table_with_metadata read_json_impl(host_span> source } } - //std::printf("partial_tables.size() = %lu\n", partial_tables.size()); + if(partial_tables.size() == 1) return std::move(partial_tables[0]); auto expects_schema_equality = std::all_of(partial_tables.begin() + 1, partial_tables.end(), From e031b823828f8b54db83bbe63d8d45b37a02cba8 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Mon, 10 Feb 2025 22:16:59 +0000 Subject: [PATCH 04/14] formatting --- cpp/src/io/json/read_json.cu | 96 ++++++++++++++++----------- cpp/tests/large_strings/json_tests.cu | 7 +- 2 files changed, 59 insertions(+), 44 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index f4a33fc69c8..0000052d531 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -228,12 +228,12 @@ get_record_range_raw_input(host_span> sources, { CUDF_FUNC_RANGE(); - std::size_t const total_source_size = sources_size(sources, 0, 0); - auto constexpr num_delimiter_chars = 1; - auto const delimiter = reader_opts.get_delimiter(); - auto const num_extra_delimiters = num_delimiter_chars * sources.size(); - std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); - std::size_t const chunk_size = reader_opts.get_byte_range_size(); + std::size_t const total_source_size = sources_size(sources, 0, 0); + auto constexpr num_delimiter_chars = 1; + auto const delimiter = reader_opts.get_delimiter(); + auto const num_extra_delimiters = num_delimiter_chars * sources.size(); + std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); + std::size_t const chunk_size = reader_opts.get_byte_range_size(); auto const should_load_till_last_source = chunk_offset + chunk_size == total_source_size; int num_subchunks_prealloced = should_load_till_last_source ? 0 : max_subchunks_prealloced; @@ -255,10 +255,9 @@ get_record_range_raw_input(host_span> sources, chunk_offset == 0 ? 0 : find_first_delimiter(readbufspan, delimiter, stream); auto is_last_char_delimiter = [delimiter, readbufspan, stream]() { char last_char; - cudf::detail::cuda_memcpy( - host_span(&last_char, 1, false), - readbufspan.subspan(readbufspan.size() - 1, 1), - stream); + cudf::detail::cuda_memcpy(host_span(&last_char, 1, false), + readbufspan.subspan(readbufspan.size() - 1, 1), + stream); return last_char == delimiter; }(); @@ -304,8 +303,10 @@ get_record_range_raw_input(host_span> sources, } } - auto const batch_limit = getenv_or("LIBCUDF_JSON_BATCH_SIZE", static_cast(std::numeric_limits::max())); - if(static_cast(next_delim_pos - first_delim_pos - shift_for_nonzero_offset) < batch_limit) { + auto const batch_limit = getenv_or( + "LIBCUDF_JSON_BATCH_SIZE", static_cast(std::numeric_limits::max())); + if (static_cast(next_delim_pos - first_delim_pos - shift_for_nonzero_offset) < + batch_limit) { return std::make_pair( datasource::owning_buffer( std::move(buffer), @@ -313,17 +314,24 @@ get_record_range_raw_input(host_span> sources, next_delim_pos - first_delim_pos - shift_for_nonzero_offset + 1), std::nullopt); } - device_span bufsubspan = bufspan.subspan(first_delim_pos + shift_for_nonzero_offset, next_delim_pos - first_delim_pos - shift_for_nonzero_offset); + device_span bufsubspan = + bufspan.subspan(first_delim_pos + shift_for_nonzero_offset, + next_delim_pos - first_delim_pos - shift_for_nonzero_offset); auto rev_it_begin = thrust::make_reverse_iterator(bufsubspan.end()); - auto rev_it_end = thrust::make_reverse_iterator(bufsubspan.begin()); + auto rev_it_end = thrust::make_reverse_iterator(bufsubspan.begin()); auto const second_last_delimiter_it = thrust::find(rmm::exec_policy(stream), rev_it_begin, rev_it_end, delimiter); - CUDF_EXPECTS(second_last_delimiter_it != rev_it_end, "A single JSON line cannot be larger than 2GB"); - auto const last_line_size = static_cast(thrust::distance(rev_it_begin, second_last_delimiter_it)); - CUDF_EXPECTS(last_line_size < batch_limit, "A single JSON line cannot be larger than 2GB"); + CUDF_EXPECTS(second_last_delimiter_it != rev_it_end, + "A single JSON line cannot be larger than 2GB"); + auto const last_line_size = + static_cast(thrust::distance(rev_it_begin, second_last_delimiter_it)); + CUDF_EXPECTS(last_line_size < batch_limit, "A single JSON line cannot be larger than 2GB"); + + rmm::device_buffer second_buffer(bufsubspan.data() + static_cast(thrust::distance( + second_last_delimiter_it, rev_it_end)), + last_line_size + 1, + stream); - rmm::device_buffer second_buffer(bufsubspan.data() + static_cast(thrust::distance(second_last_delimiter_it, rev_it_end)), last_line_size + 1, stream); - return std::make_pair( datasource::owning_buffer( std::move(buffer), @@ -332,8 +340,7 @@ get_record_range_raw_input(host_span> sources, datasource::owning_buffer( std::move(second_buffer), reinterpret_cast(second_buffer.data()), - second_buffer.size()) - ); + second_buffer.size())); } // Add delimiter to end of buffer - possibly adding an empty line to the input buffer - iff we are @@ -420,7 +427,7 @@ table_with_metadata read_json_impl(host_span> source "Parsing Regular JSON inputs of size greater than INT_MAX bytes is not supported"); // TODO: move the offset and size logic from get_record_range_raw_input here - std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); + std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset, "Invalid offsetting", std::invalid_argument); @@ -478,15 +485,21 @@ table_with_metadata read_json_impl(host_span> source batched_reader_opts.set_byte_range_offset(chunk_offset); batched_reader_opts.set_byte_range_size(chunk_size); - auto insert_partial_tables = [&partial_tables](std::pair> &&partial_table_pair) { - if (partial_table_pair.first.tbl->num_columns() == 0 && partial_table_pair.first.tbl->num_rows() == 0) return false; - partial_tables.emplace_back(std::move(partial_table_pair.first)); - if (partial_table_pair.second.has_value()) { - if (partial_table_pair.second.value().tbl->num_columns() == 0 && partial_table_pair.second.value().tbl->num_rows() == 0) return false; - partial_tables.emplace_back(std::move(partial_table_pair.second.value())); - } - return true; - }; + auto insert_partial_tables = + [&partial_tables]( + std::pair>&& partial_table_pair) { + if (partial_table_pair.first.tbl->num_columns() == 0 && + partial_table_pair.first.tbl->num_rows() == 0) + return false; + partial_tables.emplace_back(std::move(partial_table_pair.first)); + if (partial_table_pair.second.has_value()) { + if (partial_table_pair.second.value().tbl->num_columns() == 0 && + partial_table_pair.second.value().tbl->num_rows() == 0) + return false; + partial_tables.emplace_back(std::move(partial_table_pair.second.value())); + } + return true; + }; // recursive lambda to construct schema_element. Here, we assume that the table from the // first batch contains all the columns in the concatenated table, and that the partial tables @@ -538,15 +551,17 @@ table_with_metadata read_json_impl(host_span> source }; if (batch_offsets.size() <= 2) { - auto has_inserted = insert_partial_tables(read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); - if(!has_inserted) { - return table_with_metadata{std::make_unique
(std::vector>{}), {std::vector{}}}; + auto has_inserted = insert_partial_tables( + read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); + if (!has_inserted) { + return table_with_metadata{std::make_unique
(std::vector>{}), + {std::vector{}}}; } - } - else { + } else { batched_reader_opts.set_byte_range_offset(batch_offsets[0]); batched_reader_opts.set_byte_range_size(batch_offsets[1] - batch_offsets[0]); - insert_partial_tables(read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); + insert_partial_tables( + read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); auto& tbl = partial_tables.back().tbl; std::vector children; @@ -565,9 +580,10 @@ table_with_metadata read_json_impl(host_span> source batched_reader_opts.set_byte_range_offset(batch_offsets[batch_offset_pos]); batched_reader_opts.set_byte_range_size(batch_offsets[batch_offset_pos + 1] - batch_offsets[batch_offset_pos]); - auto has_inserted = insert_partial_tables(read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); + auto has_inserted = insert_partial_tables( + read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); - if(!has_inserted) { + if (!has_inserted) { CUDF_EXPECTS(batch_offset_pos == batch_offsets.size() - 2, "Only the partial table generated by the last batch can be empty"); break; @@ -575,7 +591,7 @@ table_with_metadata read_json_impl(host_span> source } } - if(partial_tables.size() == 1) return std::move(partial_tables[0]); + if (partial_tables.size() == 1) return std::move(partial_tables[0]); auto expects_schema_equality = std::all_of(partial_tables.begin() + 1, partial_tables.end(), diff --git a/cpp/tests/large_strings/json_tests.cu b/cpp/tests/large_strings/json_tests.cu index adaebeb75cb..d0142ecc3d0 100644 --- a/cpp/tests/large_strings/json_tests.cu +++ b/cpp/tests/large_strings/json_tests.cu @@ -223,8 +223,8 @@ TEST_P(JsonLargeReaderTest, MultiBatchFailing) batch_size_upper_bound = json_string.size(); this->set_batch_size(batch_size_upper_bound); std::string really_long_string = R"(haha)"; - std::size_t log_repetitions = - static_cast(std::floor(std::log2(static_cast(json_string.size()) / really_long_string.size()))); + std::size_t log_repetitions = static_cast( + std::floor(std::log2(static_cast(json_string.size()) / really_long_string.size()))); really_long_string.reserve(really_long_string.size() * (1UL << log_repetitions)); for (std::size_t i = 0; i < log_repetitions; i++) { really_long_string += really_long_string; @@ -249,8 +249,7 @@ TEST_P(JsonLargeReaderTest, MultiBatchFailing) constexpr int num_sources = 1; std::vector> chostbufs( num_sources, - cudf::host_span(reinterpret_cast(cdata.data()), - cdata.size())); + cudf::host_span(reinterpret_cast(cdata.data()), cdata.size())); // Initialize parsing options (reading json lines) cudf::io::json_reader_options cjson_lines_options = From baf30e7793aa005a02e446346816f02fa369aa70 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Mon, 10 Feb 2025 23:57:47 +0000 Subject: [PATCH 05/14] comments for tests --- cpp/tests/large_strings/json_tests.cu | 61 +++++++++++++++++---------- 1 file changed, 38 insertions(+), 23 deletions(-) diff --git a/cpp/tests/large_strings/json_tests.cu b/cpp/tests/large_strings/json_tests.cu index d0142ecc3d0..230c545951d 100644 --- a/cpp/tests/large_strings/json_tests.cu +++ b/cpp/tests/large_strings/json_tests.cu @@ -19,6 +19,7 @@ #include "io/comp/io_uncomp.hpp" #include "large_strings_fixture.hpp" +#include #include #include @@ -197,33 +198,27 @@ TEST_P(JsonLargeReaderTest, MultiBatchWithNulls) CUDF_EXPECT_NO_THROW(cudf::io::read_json(cjson_lines_options)); } -TEST_P(JsonLargeReaderTest, MultiBatchFailing) +TEST_P(JsonLargeReaderTest, MultiBatchDoubleBufferInput) { cudf::io::compression_type const comptype = GetParam(); - std::size_t batch_size_upper_bound = std::numeric_limits::max() / 16; - // set smaller batch_size to reduce file size and execution time - this->set_batch_size(batch_size_upper_bound); - - std::string json_string = R"( + // This test constructs a JSON input of size two times the batch size but sets the batch boundary + // to be after the start of the last record in the batch i.e. the size of the last record in the + // input is approximately the same as the size of all preceding records. Since the reader now ends + // up reading twice the allowed batch size per batch, it has to split the read buffer in two, each + // of size <= the batch size. + std::string json_string = R"( { "a": { "y" : 6}, "b" : [1, 2, 3], "c": "11" } { "a": { "y" : 6}, "b" : [4, 5 ], "c": "12" } { "a": { "y" : 6}, "b" : [6 ], "c": "13" } { "a": { "y" : 6}, "b" : [7 ], "c": "14" } )"; - /* - constexpr std::size_t expected_file_size = 1.0 * static_cast(batch_size_upper_bound); - std::size_t log_repetitions = - static_cast(std::floor(std::log2(expected_file_size / json_string.size()))); - json_string.reserve(json_string.size() * (1UL << log_repetitions)); - for (std::size_t i = 0; i < log_repetitions; i++) { - json_string += json_string; - } - */ - batch_size_upper_bound = json_string.size(); - this->set_batch_size(batch_size_upper_bound); - std::string really_long_string = R"(haha)"; - std::size_t log_repetitions = static_cast( + std::size_t const batch_size = json_string.size() + 1; + // set smaller batch_size to reduce file size and execution time + this->set_batch_size(batch_size); + + std::string really_long_string = R"(libcudf)"; + std::size_t const log_repetitions = static_cast( std::floor(std::log2(static_cast(json_string.size()) / really_long_string.size()))); really_long_string.reserve(really_long_string.size() * (1UL << log_repetitions)); for (std::size_t i = 0; i < log_repetitions; i++) { @@ -246,7 +241,7 @@ TEST_P(JsonLargeReaderTest, MultiBatchFailing) reinterpret_cast(json_string.data()) + json_string.size()); } - constexpr int num_sources = 1; + constexpr int num_sources = 3; std::vector> chostbufs( num_sources, cudf::host_span(reinterpret_cast(cdata.data()), cdata.size())); @@ -257,9 +252,29 @@ TEST_P(JsonLargeReaderTest, MultiBatchFailing) cudf::io::source_info{ cudf::host_span>(chostbufs.data(), chostbufs.size())}) .lines(true) - .compression(comptype) - .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); + .compression(comptype); // Read full test data via existing, nested JSON lines reader - CUDF_EXPECT_NO_THROW(cudf::io::read_json(cjson_lines_options)); + auto result = cudf::io::read_json(cjson_lines_options); + + ASSERT_EQ(result.tbl->num_columns(), 3); + ASSERT_EQ(result.tbl->num_rows(), 15); + + ASSERT_EQ(result.metadata.schema_info.size(), 3); + EXPECT_EQ(result.metadata.schema_info[0].name, "a"); + EXPECT_EQ(result.metadata.schema_info[1].name, "b"); + EXPECT_EQ(result.metadata.schema_info[2].name, "c"); + + EXPECT_EQ(result.tbl->get_column(2).type().id(), cudf::type_id::STRING); + auto expected_c_col = std::vector{"11", "12", "13", "14", really_long_string}; + auto single_src_ccol_size = expected_c_col.size(); + expected_c_col.resize(single_src_ccol_size * num_sources); + for (int i = 1; i <= num_sources - 1; i++) + std::copy(expected_c_col.begin(), + expected_c_col.begin() + single_src_ccol_size, + expected_c_col.begin() + (i * single_src_ccol_size)); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + result.tbl->get_column(2), + cudf::test::strings_column_wrapper(expected_c_col.begin(), expected_c_col.end())); } From bbed6a344b80d30d7d4b8fc28b5bbb3191411b2f Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Tue, 11 Feb 2025 01:20:50 +0000 Subject: [PATCH 06/14] formatting --- cpp/tests/large_strings/json_tests.cu | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/tests/large_strings/json_tests.cu b/cpp/tests/large_strings/json_tests.cu index 230c545951d..dfb0426e5b5 100644 --- a/cpp/tests/large_strings/json_tests.cu +++ b/cpp/tests/large_strings/json_tests.cu @@ -203,10 +203,10 @@ TEST_P(JsonLargeReaderTest, MultiBatchDoubleBufferInput) cudf::io::compression_type const comptype = GetParam(); // This test constructs a JSON input of size two times the batch size but sets the batch boundary - // to be after the start of the last record in the batch i.e. the size of the last record in the - // input is approximately the same as the size of all preceding records. Since the reader now ends - // up reading twice the allowed batch size per batch, it has to split the read buffer in two, each - // of size <= the batch size. + // after the start of the last record in the batch i.e. the input is constructed such that the + // size of the last record is approximately the same as the size of all preceding records. Since + // the reader now ends up reading twice the allowed batch size per batch, it has to split the read + // buffer in two, each part of size <= the batch size. std::string json_string = R"( { "a": { "y" : 6}, "b" : [1, 2, 3], "c": "11" } { "a": { "y" : 6}, "b" : [4, 5 ], "c": "12" } @@ -255,7 +255,7 @@ TEST_P(JsonLargeReaderTest, MultiBatchDoubleBufferInput) .compression(comptype); // Read full test data via existing, nested JSON lines reader - auto result = cudf::io::read_json(cjson_lines_options); + auto const result = cudf::io::read_json(cjson_lines_options); ASSERT_EQ(result.tbl->num_columns(), 3); ASSERT_EQ(result.tbl->num_rows(), 15); From 973f50d47ff39ff834e6ecb99f029ecbfe3e3b7d Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Tue, 11 Feb 2025 01:23:02 +0000 Subject: [PATCH 07/14] cleaned up tests --- cpp/tests/io/json/json_test.cpp | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/cpp/tests/io/json/json_test.cpp b/cpp/tests/io/json/json_test.cpp index a717fb8cd76..86b62b059b3 100644 --- a/cpp/tests/io/json/json_test.cpp +++ b/cpp/tests/io/json/json_test.cpp @@ -660,7 +660,7 @@ TEST_P(JsonReaderParamTest, JsonLinesFileInput) CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(1), float64_wrapper{{1.1, 2.2}}); } -TEST_F(JsonReaderTest, JsonLinesByteRange) +TEST_F(JsonReaderTest, JsonLinesByteRangeCompleteRecord) { const std::string fname = temp_env->get_temp_dir() + "JsonLinesByteRangeTest.json"; std::ofstream outfile(fname, std::ofstream::out); @@ -685,6 +685,31 @@ TEST_F(JsonReaderTest, JsonLinesByteRange) CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), int64_wrapper{{3000, 4000, 5000}}); } +TEST_F(JsonReaderTest, JsonLinesByteRangeIncompleteRecord) +{ + const std::string fname = temp_env->get_temp_dir() + "JsonLinesByteRangeTest.json"; + std::ofstream outfile(fname, std::ofstream::out); + outfile << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]\n"; + outfile.close(); + + // Reading 0]\n[3000]\n[4000]\n[50 + cudf::io::json_reader_options in_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{fname}) + .lines(true) + .byte_range_offset(11) + .byte_range_size(20); + + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + + EXPECT_EQ(result.tbl->num_columns(), 1); + EXPECT_EQ(result.tbl->num_rows(), 3); + + EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT64); + EXPECT_EQ(result.metadata.schema_info[0].name, "0"); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), int64_wrapper{{3000, 4000, 5000}}); +} + TEST_F(JsonReaderTest, JsonLinesByteRangeWithRealloc) { std::string long_string = "haha"; From b8f02a8904bb28b800e6d1f619180d21c7b4fa4b Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Tue, 11 Feb 2025 02:27:54 +0000 Subject: [PATCH 08/14] adding comments --- cpp/src/io/json/read_json.cu | 50 ++++++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 0000052d531..9154adcfaf4 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -208,17 +208,27 @@ size_type find_first_delimiter(device_span d_data, } /** - * @brief Get the byte range between record starts and ends starting from the given range. + * @brief Get the byte range between record starts and ends starting from the given range. The + * actual byte range read and returned will contain complete JSONL records, and will include the + * delimiter at the end of the last record. * * if get_byte_range_offset == 0, then we can skip the first delimiter search * if get_byte_range_offset != 0, then we need to search for the first delimiter in given range. * if not found, skip this chunk, if found, then search for first delimiter in next range until we - * find a delimiter. Use this as actual range for parsing. + * find a delimiter. Use this as actual range for parsing. If the size of actual byte range to be + * parsed is greater than the integer limit (or the requested batch size), then split the ingested + * buffer in two. Note that as long as no single record in the JSONL input is of size larger than + * the requested batch size, we are guaranteed that each of the two buffers will be within the batch + * size limit - the size of the first buffer is capped at the batch limit by the batching logic + * itself, and the second buffer contains only the last record which was incomplete in the initial + * byte range requested. If the size of the actual byte range to be parsed does not exceed batch + * limits, then the second buffer is empty. * * @param sources Data sources to read from * @param reader_opts JSON reader options with range offset and range size * @param stream CUDA stream used for device memory operations and kernel launches - * @returns Data source owning buffer enclosing the bytes read + * @returns A pair of data source owning buffers together enclosing the bytes read. The second + * buffer may or may not be empty depending on the condition described above. */ std::pair, std::optional>> @@ -228,12 +238,14 @@ get_record_range_raw_input(host_span> sources, { CUDF_FUNC_RANGE(); - std::size_t const total_source_size = sources_size(sources, 0, 0); - auto constexpr num_delimiter_chars = 1; - auto const delimiter = reader_opts.get_delimiter(); - auto const num_extra_delimiters = num_delimiter_chars * sources.size(); - std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); - std::size_t const chunk_size = reader_opts.get_byte_range_size(); + std::size_t const total_source_size = sources_size(sources, 0, 0); + auto constexpr num_delimiter_chars = 1; + auto const delimiter = reader_opts.get_delimiter(); + auto const num_extra_delimiters = num_delimiter_chars * sources.size(); + std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); + std::size_t const chunk_size = reader_opts.get_byte_range_size(); + // Sanity checks for the byte range offset and size are handled by the batching logic. + // We only need to check if we are reading until the end of the last source in this function. auto const should_load_till_last_source = chunk_offset + chunk_size == total_source_size; int num_subchunks_prealloced = should_load_till_last_source ? 0 : max_subchunks_prealloced; @@ -253,6 +265,9 @@ get_record_range_raw_input(host_span> sources, auto const shift_for_nonzero_offset = std::min(chunk_offset, 1); auto const first_delim_pos = chunk_offset == 0 ? 0 : find_first_delimiter(readbufspan, delimiter, stream); + // If the requested byte range ends with a delimiter, we do not need to add an extra delimiter at + // the end of the ingested buffer, nor do we need to continue reading any more of the input since + // the last position of the byte range coincides with the end of a record. auto is_last_char_delimiter = [delimiter, readbufspan, stream]() { char last_char; cudf::detail::cuda_memcpy(host_span(&last_char, 1, false), @@ -291,6 +306,7 @@ get_record_range_raw_input(host_span> sources, // delimiter character next_delim_pos = buffer_offset + readbufspan.size(); } else { + // Reallocate-and-retry policy // Our buffer_size estimate is insufficient to read until the end of the line! We need to // allocate more memory and try again! num_subchunks_prealloced *= 2; @@ -303,10 +319,17 @@ get_record_range_raw_input(host_span> sources, } } - auto const batch_limit = getenv_or( + // If the size of the ingested buffer is less than the batch size, we can simply return the + // buffer as is, and set the optional second buffer to null. If the size of the ingested buffer + // exceed the batch size limits due to the reallocate-and-retry policy, we split the ingested + // buffer in two parts. The second part only contains the last record in the buffer, while the + // first part contains all the remaining lines. + // As long as the size of no record exceeds the batch size limit placed, we are guaranteed that + // the returned buffer(s) will be below the batch limit. + auto const batch_size = getenv_or( "LIBCUDF_JSON_BATCH_SIZE", static_cast(std::numeric_limits::max())); if (static_cast(next_delim_pos - first_delim_pos - shift_for_nonzero_offset) < - batch_limit) { + batch_size) { return std::make_pair( datasource::owning_buffer( std::move(buffer), @@ -322,10 +345,11 @@ get_record_range_raw_input(host_span> sources, auto const second_last_delimiter_it = thrust::find(rmm::exec_policy(stream), rev_it_begin, rev_it_end, delimiter); CUDF_EXPECTS(second_last_delimiter_it != rev_it_end, - "A single JSON line cannot be larger than 2GB"); + "A single JSON line cannot be larger than the integer limit"); auto const last_line_size = static_cast(thrust::distance(rev_it_begin, second_last_delimiter_it)); - CUDF_EXPECTS(last_line_size < batch_limit, "A single JSON line cannot be larger than 2GB"); + CUDF_EXPECTS(last_line_size < batch_size, + "A single JSON line cannot be larger than the integer limit"); rmm::device_buffer second_buffer(bufsubspan.data() + static_cast(thrust::distance( second_last_delimiter_it, rev_it_end)), From 9a75ddf1dedc4e8ef489f92096c0fc060b441e4d Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Tue, 11 Feb 2025 02:48:44 +0000 Subject: [PATCH 09/14] adding more comments --- cpp/src/io/json/read_json.cu | 42 ++++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 9154adcfaf4..7926624d3ce 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -374,7 +374,7 @@ get_record_range_raw_input(host_span> sources, size_t num_chars = readbufspan.size() - first_delim_pos - shift_for_nonzero_offset; if (num_chars && !is_last_char_delimiter) { auto last_char = delimiter; - cudf::detail::cuda_memcpy_async( + cudf::detail::cuda_memcpy( device_span(reinterpret_cast(buffer.data()), buffer.size()) .subspan(readbufspan.size(), 1), host_span(&last_char, 1, false), @@ -390,8 +390,8 @@ get_record_range_raw_input(host_span> sources, std::nullopt); } -// Helper function to read the current batch using byte range offsets and size -// passed +// Helper function to read the current batch using the byte range offsets and size +// passed, normalize it, and construct a partial table. std::pair> read_batch( host_span> sources, json_reader_options const& reader_opts, @@ -399,35 +399,39 @@ std::pair> read_batch( rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - auto bufviews = get_record_range_raw_input(sources, reader_opts, stream); + // The second owning buffer in the pair returned by get_record_range_raw_input may not be + // populated depending on the size of the actual byte range read. The first owning buffer will + // always be non-empty. + auto owning_buffers = get_record_range_raw_input(sources, reader_opts, stream); // If input JSON buffer has single quotes and option to normalize single quotes is enabled, // invoke pre-processing FST if (reader_opts.is_enabled_normalize_single_quotes()) { - normalize_single_quotes( - bufviews.first, reader_opts.get_delimiter(), stream, cudf::get_current_device_resource_ref()); + normalize_single_quotes(owning_buffers.first, + reader_opts.get_delimiter(), + stream, + cudf::get_current_device_resource_ref()); + stream.synchronize(); } - auto buffer = cudf::device_span(reinterpret_cast(bufviews.first.data()), - bufviews.first.size()); - stream.synchronize(); + auto buffer = cudf::device_span( + reinterpret_cast(owning_buffers.first.data()), owning_buffers.first.size()); auto first_partial_table = device_parse_nested_json(buffer, reader_opts, stream, mr); - stream.synchronize(); - - if (!bufviews.second.has_value()) + if (!owning_buffers.second.has_value()) return std::make_pair(std::move(first_partial_table), std::nullopt); + // Repeat the normalization and table construction steps for the second buffer if it exists if (reader_opts.is_enabled_normalize_single_quotes()) { - normalize_single_quotes(bufviews.second.value(), + normalize_single_quotes(owning_buffers.second.value(), reader_opts.get_delimiter(), stream, cudf::get_current_device_resource_ref()); + stream.synchronize(); } buffer = cudf::device_span( - reinterpret_cast(bufviews.second.value().data()), bufviews.second.value().size()); - stream.synchronize(); + reinterpret_cast(owning_buffers.second.value().data()), + owning_buffers.second.value().size()); auto second_partial_table = device_parse_nested_json(buffer, reader_opts, stream, mr); - return std::make_pair(std::move(first_partial_table), std::move(second_partial_table)); } @@ -440,8 +444,7 @@ table_with_metadata read_json_impl(host_span> source * The batched JSON reader enforces that the size of each batch is at most INT_MAX * bytes (~2.14GB). Batches are defined to be byte range chunks - characterized by * chunk offset and chunk size - that may span across multiple source files. - * Note that the batched reader does not work for compressed inputs or for regular - * JSON inputs. + * Note that batching sources does not work for for regular JSON inputs. */ std::size_t const total_source_size = sources_size(sources, 0, 0); @@ -715,7 +718,8 @@ device_span ingest_raw_input(device_span buffer, thread_tasks.begin(), thread_tasks.end(), std::size_t{0}, [](std::size_t sum, auto& task) { return sum + task.get(); }); - CUDF_EXPECTS(bytes_read == total_bytes_to_read, "something's fishy"); + CUDF_EXPECTS(bytes_read == total_bytes_to_read, + "Incorrect number of bytes read by multithreaded reader"); } return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars)); From fc9f6da0918fb6f933f176593c0f250ba040f9d1 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Tue, 11 Feb 2025 03:23:43 +0000 Subject: [PATCH 10/14] added more comments --- cpp/src/io/json/read_json.cu | 56 +++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 7926624d3ce..1ae09180b07 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -390,8 +390,10 @@ get_record_range_raw_input(host_span> sources, std::nullopt); } -// Helper function to read the current batch using the byte range offsets and size -// passed, normalize it, and construct a partial table. +/** + * @brief Helper function to read the current batch using the byte range offsets and size + * passed, normalize it, and construct a partial table. + */ std::pair> read_batch( host_span> sources, json_reader_options const& reader_opts, @@ -435,17 +437,21 @@ std::pair> read_batch( return std::make_pair(std::move(first_partial_table), std::move(second_partial_table)); } +/** + * @brief Helper function that implements the batching logic for the JSONL reader. + * The goal of the batched reader is to handle reading mutiple JSONL sources whose total cumulative + * size exceeds the integer limit imposed by the JSON tokenizer. The batching logic divides the + * requested input byte range spanning sources into smaller batches, each of which itself spans + * multiple sources. The batches are constructed such that the byte subrange in each batch does not + * exceed the batch size, which is either set using the environment variable + * LIBCUDF_JSON_BATCH_SIZE, or is set to a little under the integer limit. Note that batching + * sources does not work for for regular JSON inputs. + */ table_with_metadata read_json_impl(host_span> sources, json_reader_options const& reader_opts, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - /* - * The batched JSON reader enforces that the size of each batch is at most INT_MAX - * bytes (~2.14GB). Batches are defined to be byte range chunks - characterized by - * chunk offset and chunk size - that may span across multiple source files. - * Note that batching sources does not work for for regular JSON inputs. - */ std::size_t const total_source_size = sources_size(sources, 0, 0); // Batching is enabled only for JSONL inputs, not regular JSON files @@ -453,7 +459,7 @@ table_with_metadata read_json_impl(host_span> source reader_opts.is_enabled_lines() || total_source_size < std::numeric_limits::max(), "Parsing Regular JSON inputs of size greater than INT_MAX bytes is not supported"); - // TODO: move the offset and size logic from get_record_range_raw_input here + // Sanity checks of byte range offset and clamping of byte range size std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset, "Invalid offsetting", @@ -463,13 +469,10 @@ table_with_metadata read_json_impl(host_span> source : std::min(chunk_size, total_source_size - chunk_offset); std::size_t const batch_size = get_batch_size(chunk_size); - /* - * Identify the position (zero-indexed) of starting source file from which to begin - * batching based on byte range offset. If the offset is larger than the sum of all - * source sizes, then start_source is total number of source files i.e. no file is - * read - */ - + // Identify the position (zero-indexed) of starting source file from which to begin + // batching based on byte range offset. If the offset is larger than the sum of all + // source sizes, then start_source is total number of source files i.e. no file is + // read. // Prefix sum of source file sizes std::size_t pref_source_size = 0; // Starting source file from which to being batching evaluated using byte range offset @@ -480,12 +483,10 @@ table_with_metadata read_json_impl(host_span> source } return sources.size(); }(); - /* - * Construct batches of byte ranges spanning source files, with the starting position of batches - * indicated by `batch_offsets`. `pref_bytes_size` gives the bytes position from which the current - * batch begins, and `end_bytes_size` gives the terminal bytes position after which reading - * stops. - */ + // Construct batches of byte ranges spanning source files, with the starting position of batches + // indicated by `batch_offsets`. `pref_bytes_size` gives the bytes position from which the current + // batch begins, and `end_bytes_size` gives the terminal bytes position after which reading + // stops. std::size_t pref_bytes_size = chunk_offset; std::size_t end_bytes_size = chunk_offset + chunk_size; std::vector batch_offsets{pref_bytes_size}; @@ -502,16 +503,14 @@ table_with_metadata read_json_impl(host_span> source i++; } - /* - * If there is a single batch, then we can directly return the table without the - * unnecessary concatenate. The size of batch_offsets is 1 if all sources are empty, - * or if end_bytes_size is larger than total_source_size. - */ std::vector partial_tables; json_reader_options batched_reader_opts{reader_opts}; batched_reader_opts.set_byte_range_offset(chunk_offset); batched_reader_opts.set_byte_range_size(chunk_size); + // lambda to insert the partial tables into the vector. Since read_batch function returns a pair + // of partial tables where the second table is optional, we insert a table into the vector only if + // it is non-empty auto insert_partial_tables = [&partial_tables]( std::pair>&& partial_table_pair) { @@ -578,6 +577,7 @@ table_with_metadata read_json_impl(host_span> source }; if (batch_offsets.size() <= 2) { + // single batch auto has_inserted = insert_partial_tables( read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); if (!has_inserted) { @@ -585,6 +585,7 @@ table_with_metadata read_json_impl(host_span> source {std::vector{}}}; } } else { + // multiple batches batched_reader_opts.set_byte_range_offset(batch_offsets[0]); batched_reader_opts.set_byte_range_size(batch_offsets[1] - batch_offsets[0]); insert_partial_tables( @@ -618,6 +619,7 @@ table_with_metadata read_json_impl(host_span> source } } + // If there is a single partial table, then there is no need to concatenate if (partial_tables.size() == 1) return std::move(partial_tables[0]); auto expects_schema_equality = std::all_of(partial_tables.begin() + 1, From febdbe7f24c04981aba0b1bb3114040db64cd4a8 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Tue, 11 Feb 2025 03:27:00 +0000 Subject: [PATCH 11/14] formatting --- cpp/src/io/json/read_json.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 1ae09180b07..aba4b085e20 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -439,7 +439,7 @@ std::pair> read_batch( /** * @brief Helper function that implements the batching logic for the JSONL reader. - * The goal of the batched reader is to handle reading mutiple JSONL sources whose total cumulative + * The goal of the batched reader is to handle reading multiple JSONL sources whose total cumulative * size exceeds the integer limit imposed by the JSON tokenizer. The batching logic divides the * requested input byte range spanning sources into smaller batches, each of which itself spans * multiple sources. The batches are constructed such that the byte subrange in each batch does not From b6d21fe2acdee377494d5d87220828c1ef7b384d Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Tue, 11 Feb 2025 22:06:04 +0000 Subject: [PATCH 12/14] pr reviews; adding another test --- cpp/src/io/json/read_json.cu | 6 ++-- cpp/tests/large_strings/json_tests.cu | 51 +++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index aba4b085e20..87268725f1c 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -345,11 +345,11 @@ get_record_range_raw_input(host_span> sources, auto const second_last_delimiter_it = thrust::find(rmm::exec_policy(stream), rev_it_begin, rev_it_end, delimiter); CUDF_EXPECTS(second_last_delimiter_it != rev_it_end, - "A single JSON line cannot be larger than the integer limit"); + "A single JSON line cannot be larger than the batch size limit"); auto const last_line_size = static_cast(thrust::distance(rev_it_begin, second_last_delimiter_it)); CUDF_EXPECTS(last_line_size < batch_size, - "A single JSON line cannot be larger than the integer limit"); + "A single JSON line cannot be larger than the batch size limit"); rmm::device_buffer second_buffer(bufsubspan.data() + static_cast(thrust::distance( second_last_delimiter_it, rev_it_end)), @@ -462,7 +462,7 @@ table_with_metadata read_json_impl(host_span> source // Sanity checks of byte range offset and clamping of byte range size std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset, - "Invalid offsetting", + "Invalid byte range offset", std::invalid_argument); std::size_t chunk_size = reader_opts.get_byte_range_size(); chunk_size = !chunk_size ? total_source_size - chunk_offset diff --git a/cpp/tests/large_strings/json_tests.cu b/cpp/tests/large_strings/json_tests.cu index dfb0426e5b5..b3f6a99ed51 100644 --- a/cpp/tests/large_strings/json_tests.cu +++ b/cpp/tests/large_strings/json_tests.cu @@ -20,6 +20,7 @@ #include "large_strings_fixture.hpp" #include +#include #include #include @@ -278,3 +279,53 @@ TEST_P(JsonLargeReaderTest, MultiBatchDoubleBufferInput) result.tbl->get_column(2), cudf::test::strings_column_wrapper(expected_c_col.begin(), expected_c_col.end())); } + +TEST_P(JsonLargeReaderTest, OverBatchLimitLine) +{ + cudf::io::compression_type const comptype = GetParam(); + + // This test constructs a JSONL input of size three times the batch limit. The input contains a + // single JSONL which will be completely read in the first batch itself. Since we cannot divide a + // single line, we expect the test to throw + std::string json_string = R"({ "a": { "y" : 6}, "b" : [1, 2, 3], "c": ")"; + std::string really_long_string = R"(libcudf)"; + std::size_t const log_repetitions = 5; + really_long_string.reserve(really_long_string.size() * (1UL << log_repetitions)); + for (std::size_t i = 0; i < log_repetitions; i++) { + really_long_string += really_long_string; + } + json_string += really_long_string + "\" }\n"; + + std::size_t const batch_size = json_string.size() / 3; + // set smaller batch_size to reduce file size and execution time + this->set_batch_size(batch_size); + + std::vector cdata; + if (comptype != cudf::io::compression_type::NONE) { + cdata = cudf::io::detail::compress( + comptype, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size()), + cudf::get_default_stream()); + } else { + cdata = std::vector( + reinterpret_cast(json_string.data()), + reinterpret_cast(json_string.data()) + json_string.size()); + } + + constexpr int num_sources = 1; + std::vector> chostbufs( + num_sources, + cudf::host_span(reinterpret_cast(cdata.data()), cdata.size())); + + // Initialize parsing options (reading json lines) + cudf::io::json_reader_options cjson_lines_options = + cudf::io::json_reader_options::builder( + cudf::io::source_info{ + cudf::host_span>(chostbufs.data(), chostbufs.size())}) + .lines(true) + .compression(comptype); + + // Read full test data via existing, nested JSON lines reader + EXPECT_THROW(cudf::io::read_json(cjson_lines_options), cudf::logic_error); +} From b79a163de21bf1ed3a2098a101cd61d30fe5a53c Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Fri, 14 Feb 2025 01:45:33 +0000 Subject: [PATCH 13/14] corner case fix --- cpp/src/io/json/read_json.cu | 39 ++++++++++++++++----------------- cpp/tests/io/json/json_test.cpp | 7 +++--- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 57e4068e130..c72c5d827c6 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -266,23 +266,26 @@ get_record_range_raw_input(host_span> sources, auto const shift_for_nonzero_offset = std::min(chunk_offset, 1); auto const first_delim_pos = chunk_offset == 0 ? 0 : find_first_delimiter(readbufspan, delimiter, stream); - // If the requested byte range ends with a delimiter, we do not need to add an extra delimiter at - // the end of the ingested buffer, nor do we need to continue reading any more of the input since - // the last position of the byte range coincides with the end of a record. - auto is_last_char_delimiter = [delimiter, readbufspan, stream]() { - char last_char; - cudf::detail::cuda_memcpy(host_span(&last_char, 1, false), - readbufspan.subspan(readbufspan.size() - 1, 1), - stream); - return last_char == delimiter; - }(); + // If we read till the end of the last source, we cannot be sure + // if the last record read ends with a delimiter. In such cases, we add a delimiter + // nevertheless; even if the record terminates + // with a delimiter, adding a extra delimiter does not affect the table constructed since the + // parser ignores empty lines. + auto insert_delimiter = [delimiter, stream](device_span subspan) { + auto last_char = delimiter; + cudf::detail::cuda_memcpy(subspan, host_span(&last_char, 1, false), stream); + }; + + // If the requested byte range ends with a delimiter at the end of line n, we will still need to + // continue reading since the next batch begins at the start of the n+1^th record and skips the + // entire line until the first delimiter is encountered at the end of the line. if (first_delim_pos == -1) { // return empty owning datasource buffer auto empty_buf = rmm::device_buffer(0, stream); return std::make_pair(datasource::owning_buffer(std::move(empty_buf)), std::nullopt); - } else if (!should_load_till_last_source && !is_last_char_delimiter) { + } else if (!should_load_till_last_source) { // Find next delimiter std::int64_t next_delim_pos = -1; std::size_t next_subchunk_start = chunk_offset + chunk_size; @@ -306,6 +309,7 @@ get_record_range_raw_input(host_span> sources, // If we have reached the end of source list but the source does not terminate with a // delimiter character next_delim_pos = buffer_offset + readbufspan.size(); + insert_delimiter(bufspan.subspan(next_delim_pos, 1)); } else { // Reallocate-and-retry policy // Our buffer_size estimate is insufficient to read until the end of the line! We need to @@ -370,17 +374,12 @@ get_record_range_raw_input(host_span> sources, } // Add delimiter to end of buffer - possibly adding an empty line to the input buffer - iff we are - // reading till the end of the last source i.e. should_load_till_last_source is true Note that the - // table generated from the JSONL input remains unchanged since empty lines are ignored by the + // reading till the end of the last source i.e. should_load_till_last_source is true. Note that + // the table generated from the JSONL input remains unchanged since empty lines are ignored by the // parser. size_t num_chars = readbufspan.size() - first_delim_pos - shift_for_nonzero_offset; - if (num_chars && !is_last_char_delimiter) { - auto last_char = delimiter; - cudf::detail::cuda_memcpy( - device_span(reinterpret_cast(buffer.data()), buffer.size()) - .subspan(readbufspan.size(), 1), - host_span(&last_char, 1, false), - stream); + if (num_chars) { + insert_delimiter(bufspan.subspan(readbufspan.size(), 1)); num_chars++; } diff --git a/cpp/tests/io/json/json_test.cpp b/cpp/tests/io/json/json_test.cpp index 86b62b059b3..89666c073cd 100644 --- a/cpp/tests/io/json/json_test.cpp +++ b/cpp/tests/io/json/json_test.cpp @@ -667,7 +667,7 @@ TEST_F(JsonReaderTest, JsonLinesByteRangeCompleteRecord) outfile << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]\n"; outfile.close(); - // Reading 0]\n[3000]\n[4000]\n[5000]\n + // Requesting 0]\n[3000]\n[4000]\n[5000]\n but reading 0]\n[3000]\n[4000]\n[5000]\n[6000]\n cudf::io::json_reader_options in_options = cudf::io::json_reader_options::builder(cudf::io::source_info{fname}) .lines(true) @@ -677,12 +677,13 @@ TEST_F(JsonReaderTest, JsonLinesByteRangeCompleteRecord) cudf::io::table_with_metadata result = cudf::io::read_json(in_options); EXPECT_EQ(result.tbl->num_columns(), 1); - EXPECT_EQ(result.tbl->num_rows(), 3); + EXPECT_EQ(result.tbl->num_rows(), 4); EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT64); EXPECT_EQ(result.metadata.schema_info[0].name, "0"); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), int64_wrapper{{3000, 4000, 5000}}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), + int64_wrapper{{3000, 4000, 5000, 6000}}); } TEST_F(JsonReaderTest, JsonLinesByteRangeIncompleteRecord) From 597189a398c9ea0b45c3ba6c60dbb12dcbe7a360 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Fri, 14 Feb 2025 23:34:15 +0000 Subject: [PATCH 14/14] pr reviews --- cpp/src/io/json/read_json.cu | 39 ++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index c72c5d827c6..0c95c2b05e8 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -78,7 +78,7 @@ class compressed_host_buffer_source final : public datasource { } } - size_t host_read(size_t offset, size_t size, uint8_t* dst) override + std::size_t host_read(std::size_t offset, std::size_t size, uint8_t* dst) override { auto ch_buffer = host_span(reinterpret_cast(_dbuf_ptr->data()), _dbuf_ptr->size()); @@ -97,7 +97,7 @@ class compressed_host_buffer_source final : public datasource { return count; } - std::unique_ptr host_read(size_t offset, size_t size) override + std::unique_ptr host_read(std::size_t offset, std::size_t size) override { auto ch_buffer = host_span(reinterpret_cast(_dbuf_ptr->data()), _dbuf_ptr->size()); @@ -114,10 +114,10 @@ class compressed_host_buffer_source final : public datasource { return std::make_unique(_decompressed_buffer.data() + offset, count); } - std::future device_read_async(size_t offset, - size_t size, - uint8_t* dst, - rmm::cuda_stream_view stream) override + std::future device_read_async(std::size_t offset, + std::size_t size, + uint8_t* dst, + rmm::cuda_stream_view stream) override { auto& thread_pool = pools::tpool(); return thread_pool.submit_task([this, offset, size, dst, stream] { @@ -131,12 +131,12 @@ class compressed_host_buffer_source final : public datasource { [[nodiscard]] bool supports_device_read() const override { return true; } - [[nodiscard]] size_t size() const override { return _decompressed_ch_buffer_size; } + [[nodiscard]] std::size_t size() const override { return _decompressed_ch_buffer_size; } private: std::unique_ptr _dbuf_ptr; compression_type _comptype; - size_t _decompressed_ch_buffer_size; + std::size_t _decompressed_ch_buffer_size; std::vector _decompressed_buffer; }; @@ -325,15 +325,16 @@ get_record_range_raw_input(host_span> sources, } // If the size of the ingested buffer is less than the batch size, we can simply return the - // buffer as is, and set the optional second buffer to null. If the size of the ingested buffer - // exceed the batch size limits due to the reallocate-and-retry policy, we split the ingested - // buffer in two parts. The second part only contains the last record in the buffer, while the - // first part contains all the remaining lines. + // buffer as is, and set the optional second buffer to null. + // If the size of the ingested buffer exceed the batch size limits due to the + // reallocate-and-retry policy, we split the ingested buffer in two parts. The second part + // only contains the last record in the buffer, while the first part contains all the remaining + // lines. // As long as the size of no record exceeds the batch size limit placed, we are guaranteed that // the returned buffer(s) will be below the batch limit. auto const batch_size = getenv_or( - "LIBCUDF_JSON_BATCH_SIZE", static_cast(std::numeric_limits::max())); - if (static_cast(next_delim_pos - first_delim_pos - shift_for_nonzero_offset) < + "LIBCUDF_JSON_BATCH_SIZE", static_cast(std::numeric_limits::max())); + if (static_cast(next_delim_pos - first_delim_pos - shift_for_nonzero_offset) < batch_size) { return std::make_pair( datasource::owning_buffer( @@ -353,11 +354,11 @@ get_record_range_raw_input(host_span> sources, "A single JSON line cannot be larger than the batch size limit"); auto const last_line_size = next_delim_pos - requested_size + - static_cast(thrust::distance(rev_it_begin, second_last_delimiter_it)); + static_cast(thrust::distance(rev_it_begin, second_last_delimiter_it)); CUDF_EXPECTS(last_line_size < batch_size, "A single JSON line cannot be larger than the batch size limit"); - rmm::device_buffer second_buffer(bufsubspan.data() + static_cast(thrust::distance( + rmm::device_buffer second_buffer(bufsubspan.data() + static_cast(thrust::distance( second_last_delimiter_it, rev_it_end)), last_line_size + 1, stream); @@ -377,7 +378,7 @@ get_record_range_raw_input(host_span> sources, // reading till the end of the last source i.e. should_load_till_last_source is true. Note that // the table generated from the JSONL input remains unchanged since empty lines are ignored by the // parser. - size_t num_chars = readbufspan.size() - first_delim_pos - shift_for_nonzero_offset; + std::size_t num_chars = readbufspan.size() - first_delim_pos - shift_for_nonzero_offset; if (num_chars) { insert_delimiter(bufspan.subspan(readbufspan.size(), 1)); num_chars++; @@ -654,7 +655,7 @@ device_span ingest_raw_input(device_span buffer, // line of file i+1 don't end up on the same JSON line, if file i does not already end with a line // delimiter. auto constexpr num_delimiter_chars = 1; - std::vector> thread_tasks; + std::vector> thread_tasks; auto delimiter_map = cudf::detail::make_empty_host_vector(sources.size(), stream); std::vector prefsum_source_sizes(sources.size()); @@ -672,7 +673,7 @@ device_span ingest_raw_input(device_span buffer, auto const total_bytes_to_read = std::min(range_size, prefsum_source_sizes.back() - range_offset); range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0; - size_t const num_streams = + std::size_t const num_streams = std::min({sources.size() - start_source + 1, cudf::detail::global_cuda_stream_pool().get_stream_pool_size(), pools::tpool().get_thread_count()});