Skip to content

Commit

Permalink
Fix the ORC decoding bug for the timestamp data (#17570)
Browse files Browse the repository at this point in the history
This PR introduces a band-aid class `run_cache_manager` to handle an exceptional case in TIMESTAMP data type, where the DATA stream (seconds) is processed ahead of SECONDARY stream (nanoseconds) and the excess rows are lost. The fix uses `run_cache_manager` (and also `cache_helper`, which is an implementation detail) to cache the potentially missed data from the DATA stream and let them be used in the next decoding iteration, thus preventing data loss.





Closes #17155

Authors:
  - Tianyu Liu (https://github.com/kingcrimsontianyu)

Approvers:
  - Matthew Murray (https://github.com/Matt711)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #17570
  • Loading branch information
kingcrimsontianyu authored Jan 7, 2025
1 parent caf97ef commit 4e97cd4
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 4 deletions.
205 changes: 202 additions & 3 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -132,6 +132,177 @@ struct orcdec_state_s {
} vals;
};

/**
* @brief Manage caching of the first run of TIMESTAMP's DATA stream for a row group.
*
* This class is used to address a special case, where the first run of the DATA stream spans two
* adjacent row groups and its length is greater than the maximum length allowed to be consumed.
* This limit is imposed by the decoder when processing the SECONDARY stream. This class shall be
* instantiated in the shared memory, and be used to cache the DATA stream with a decoded data type
* of `int64_t`. As an optimization, the actual cache is implemented in the cache_helper class as a
* local variable and does not reside in the shared memory.
*/
class run_cache_manager {
private:
enum class status : uint8_t {
DISABLED, ///< Run cache manager is disabled. No caching will be performed. If the special case
///< happens, the run cache manager will be set to this status after the cache read
///< is completed. This status also applies when the special case does not happen.
CAN_WRITE_TO_CACHE, ///< Run cache manager is ready for write. If the special case happens, the
///< run cache manager will be set to this status.
CAN_READ_FROM_CACHE, ///< Run cache manager is ready for read. If the special case happens, the
///< run cache manager will be set to this status after the cache write is
///< completed.
};

public:
/**
* @brief Initialize the run cache manager.
*
* @param[in] s ORC decoder state.
*/
__device__ void initialize(orcdec_state_s* s)
{
_status = (s->top.data.index.run_pos[CI_DATA2] > 0 and s->chunk.type_kind == TIMESTAMP)
? status::CAN_WRITE_TO_CACHE
: status::DISABLED;
_reusable_length = 0;
_run_length = 0;
}

private:
status _status; ///< The status of the run cache manager.
uint32_t
_reusable_length; ///< The number of data to be cached and reused later. For example, if a run
///< has a length of 512 but the maximum length allowed to be consumed is
///< capped at 162, then 350 (512-162) data will be cached.
uint32_t _run_length; ///< The length of the run, 512 in the above example.
friend class cache_helper;
};

/**
* @brief Helper class to help run_cache_manager cache the first run of TIMESTAMP's DATA stream for
* a row group.
*
* The run_cache_manager is intended to be stored in the shared memory, whereas the actual cache is
* in the local storage (as an optimization). If a function is to use run_cache_manager, both the
* manager and the cache objects need to be passed. This class is introduced to simplify the
* function call, so that only a single cache_helper object needs to be passed. To that end, public
* methods originally belonging to run_cache_manager have been moved to this class.
*/
class cache_helper {
public:
/**
* @brief Constructor.
*
* @param[in] run_cache_manager_inst An instance of run_cache_manager.
*/
__device__ explicit cache_helper(run_cache_manager& run_cache_manager_inst)
: _manager(run_cache_manager_inst)
{
}

/**
* @brief Set the reusable length object.
*
* @param[in] run_length The length of the first run (spanning two adjacent row groups) of the
* DATA stream.
* @param[in] max_length The maximum length allowed to be consumed. This limit is imposed
* by the decoder when processing the SECONDARY stream.
*/
__device__ void set_reusable_length(uint32_t run_length, uint32_t max_length)
{
if (_manager._status == run_cache_manager::status::CAN_WRITE_TO_CACHE) {
_manager._run_length = run_length;
_manager._reusable_length =
(_manager._run_length > max_length) ? (_manager._run_length - max_length) : 0;
}
}

/**
* @brief Adjust the maximum length allowed to be consumed when the length of the first run is
* greater than it.
*
* @param[in] max_length The maximum length allowed to be consumed for the DATA stream.
* @return A new maximum length.
*/
[[nodiscard]] __device__ uint32_t adjust_max_length(uint32_t max_length)
{
auto new_max_length{max_length};
if (_manager._status == run_cache_manager::status::CAN_READ_FROM_CACHE) {
new_max_length -= _manager._reusable_length;
}
return new_max_length;
}

/**
* @brief Copy the excess data from the intermediate buffer for the DATA stream to the cache.
*
* @param[in] src Intermediate buffer for the DATA stream.
*/
__device__ void write_to_cache(int64_t* src)
{
if (_manager._status != run_cache_manager::status::CAN_WRITE_TO_CACHE) { return; }

auto const tid = threadIdx.x;

__syncthreads();

// All threads in the block always take a uniform code path for the following branches.
// _reusable_length ranges between [0, 512].
if (_manager._reusable_length > 0) {
auto const length_to_skip = _manager._run_length - _manager._reusable_length;
if (tid < _manager._reusable_length) {
auto const src_idx = tid + length_to_skip;
_storage = src[src_idx];
}
if (tid == 0) { _manager._status = run_cache_manager::status::CAN_READ_FROM_CACHE; }
} else {
if (tid == 0) { _manager._status = run_cache_manager::status::DISABLED; }
}

__syncthreads();
}

/**
* @brief Copy the cached data to the intermediate buffer for the DATA stream.
*
* @param[in,out] dst Intermediate buffer for the DATA stream.
* @param[in,out] rle Run length decoder state object.
*/
__device__ void read_from_cache(int64_t* dst, orc_rlev2_state_s* rle)
{
if (_manager._status != run_cache_manager::status::CAN_READ_FROM_CACHE) { return; }

auto const tid = threadIdx.x;

// First, shift the data up
auto const dst_idx = tid + _manager._reusable_length;
auto const v = (dst_idx < rle->num_vals + _manager._reusable_length) ? dst[tid] : 0;
__syncthreads();

if (dst_idx < rle->num_vals + _manager._reusable_length) { dst[dst_idx] = v; }
__syncthreads();

// Second, insert the cached data
if (tid < _manager._reusable_length) { dst[tid] = _storage; }
__syncthreads();

if (tid == 0) {
// Disable the run cache manager, since cache write-and-read happens at most once per row
// group.
_manager._status = run_cache_manager::status::DISABLED;
rle->num_vals += _manager._reusable_length;
}

__syncthreads();
}

private:
run_cache_manager& _manager; ///< An instance of run_cache_manager.
int64_t _storage; ///< Per-thread cache storage.
};

/**
* @brief Initializes byte stream, modifying length and start position to keep the read pointer
* 8-byte aligned.
Expand Down Expand Up @@ -631,6 +802,8 @@ static const __device__ __constant__ uint8_t ClosestFixedBitsMap[65] = {
* @param[in] maxvals maximum number of values to decode
* @param[in] t thread id
* @param[in] has_buffered_values If true, means there are already buffered values
* @param[in] cache_helper_inst If non-null, the run cache manager will be used to manage
* caching of the first run of the DATA stream.
*
* @return number of values decoded
*/
Expand All @@ -640,9 +813,11 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs,
T* vals,
uint32_t maxvals,
int t,
bool has_buffered_values = false)
bool has_buffered_values = false,
cache_helper* cache_helper_inst = nullptr)
{
if (t == 0) {
if (cache_helper_inst != nullptr) { maxvals = cache_helper_inst->adjust_max_length(maxvals); }
uint32_t maxpos = min(bs->len, bs->pos + (bytestream_buffer_size - 8u));
uint32_t lastpos = bs->pos;
auto numvals = 0;
Expand Down Expand Up @@ -685,6 +860,9 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs,
l += deltapos;
}
}

if (cache_helper_inst != nullptr) { cache_helper_inst->set_reusable_length(n, maxvals); }

if ((numvals != 0) and (numvals + n > maxvals)) break;
// case where there are buffered values and can't consume a whole chunk
// from decoded values, so skip adding any more to buffer, work on buffered values and then
Expand Down Expand Up @@ -866,6 +1044,17 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs,
__syncwarp();
}
__syncthreads();
// Currently run_cache_manager is only designed to fix the TIMESTAMP's DATA stream bug where the
// data type is int64_t.
if constexpr (cuda::std::is_same_v<T, int64_t>) {
if (cache_helper_inst != nullptr) {
// Run cache is read from during the 2nd iteration of the top-level while loop in
// gpuDecodeOrcColumnData().
cache_helper_inst->read_from_cache(vals, rle);
// Run cache is written to during the 1st iteration of the loop.
cache_helper_inst->write_to_cache(vals);
}
}
return rle->num_vals;
}

Expand Down Expand Up @@ -1401,6 +1590,8 @@ CUDF_KERNEL void __launch_bounds__(block_size)
// Struct doesn't have any data in itself, so skip
bool const is_valid = s->chunk.type_kind != STRUCT;
size_t const max_num_rows = s->chunk.column_num_rows;
__shared__ run_cache_manager run_cache_manager_inst;
cache_helper cache_helper_inst(run_cache_manager_inst);
if (t == 0 and is_valid) {
// If we have an index, seek to the initial run and update row positions
if (num_rowgroups > 0) {
Expand Down Expand Up @@ -1443,6 +1634,8 @@ CUDF_KERNEL void __launch_bounds__(block_size)

bytestream_init(&s->bs, s->chunk.streams[CI_DATA], s->chunk.strm_len[CI_DATA]);
bytestream_init(&s->bs2, s->chunk.streams[CI_DATA2], s->chunk.strm_len[CI_DATA2]);

run_cache_manager_inst.initialize(s);
}
__syncthreads();

Expand Down Expand Up @@ -1602,7 +1795,13 @@ CUDF_KERNEL void __launch_bounds__(block_size)
if (is_rlev1(s->chunk.encoding_kind)) {
numvals = Integer_RLEv1<int64_t>(bs, &s->u.rlev1, s->vals.i64, numvals, t);
} else {
numvals = Integer_RLEv2<int64_t>(bs, &s->u.rlev2, s->vals.i64, numvals, t);
numvals = Integer_RLEv2<int64_t>(bs,
&s->u.rlev2,
s->vals.i64,
numvals,
t,
false /**has_buffered_values */,
&cache_helper_inst);
}
if (s->chunk.type_kind == DECIMAL) {
// If we're using an index, we may have to drop values from the initial run
Expand Down
Binary file not shown.
Binary file not shown.
24 changes: 23 additions & 1 deletion python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2024, NVIDIA CORPORATION.
# Copyright (c) 2019-2025, NVIDIA CORPORATION.

import datetime
import decimal
Expand Down Expand Up @@ -1970,3 +1970,25 @@ def test_row_group_alignment(datadir):
got = cudf.read_orc(buffer)

assert_eq(expected, got)


@pytest.mark.parametrize(
"inputfile",
[
"TestOrcFile.timestamp.desynced.uncompressed.RLEv2.orc",
"TestOrcFile.timestamp.desynced.snappy.RLEv2.orc",
],
)
def test_orc_reader_desynced_timestamp(datadir, inputfile):
# Test a special case where the DATA stream (second) in a TIMESTAMP column
# is progressed faster than the SECONDARY stream (nanosecond) at the start of a row
# group. In this case, the "run cache manager" in the decoder kernel is used to
# orchestrate the dual-stream processing.
# For more information, see https://github.com/rapidsai/cudf/issues/17155.

path = datadir / inputfile

expect = pd.read_orc(path)
got = cudf.read_orc(path)

assert_frame_equal(cudf.from_pandas(expect), got)

0 comments on commit 4e97cd4

Please sign in to comment.