Skip to content

Commit

Permalink
Update vendored DuckDB sources to 18da189
Browse files Browse the repository at this point in the history
  • Loading branch information
duckdblabs-bot committed Feb 8, 2025
1 parent 18da189 commit 8cff889
Show file tree
Hide file tree
Showing 42 changed files with 1,087 additions and 469 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt

Large diffs are not rendered by default.

265 changes: 90 additions & 175 deletions src/duckdb/extension/parquet/column_reader.cpp

Large diffs are not rendered by default.

54 changes: 54 additions & 0 deletions src/duckdb/extension/parquet/decoder/byte_stream_split_decoder.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#include "decoder/byte_stream_split_decoder.hpp"
#include "column_reader.hpp"
#include "parquet_reader.hpp"

namespace duckdb {

ByteStreamSplitDecoder::ByteStreamSplitDecoder(ColumnReader &reader)
: reader(reader), decoded_data_buffer(reader.encoding_buffers[0]) {
}

void ByteStreamSplitDecoder::InitializePage() {
auto &block = reader.block;
// Subtract 1 from length as the block is allocated with 1 extra byte,
// but the byte stream split encoder needs to know the correct data size.
bss_decoder = make_uniq<BssDecoder>(block->ptr, block->len - 1);
block->inc(block->len);
}

void ByteStreamSplitDecoder::Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t result_offset) {
idx_t valid_count = reader.GetValidCount(defines, read_count, result_offset);

auto &allocator = reader.reader.allocator;
decoded_data_buffer.reset();
switch (reader.schema.type) {
case duckdb_parquet::Type::FLOAT:
decoded_data_buffer.resize(allocator, sizeof(float) * valid_count);
bss_decoder->GetBatch<float>(decoded_data_buffer.ptr, valid_count);
break;
case duckdb_parquet::Type::DOUBLE:
decoded_data_buffer.resize(allocator, sizeof(double) * valid_count);
bss_decoder->GetBatch<double>(decoded_data_buffer.ptr, valid_count);
break;
default:
throw std::runtime_error("BYTE_STREAM_SPLIT encoding is only supported for FLOAT or DOUBLE data");
}

reader.Plain(decoded_data_buffer, defines, read_count, result_offset, result);
}

void ByteStreamSplitDecoder::Skip(uint8_t *defines, idx_t skip_count) {
idx_t valid_count = reader.GetValidCount(defines, skip_count);
switch (reader.schema.type) {
case duckdb_parquet::Type::FLOAT:
bss_decoder->Skip<float>(valid_count);
break;
case duckdb_parquet::Type::DOUBLE:
bss_decoder->Skip<double>(valid_count);
break;
default:
throw std::runtime_error("BYTE_STREAM_SPLIT encoding is only supported for FLOAT or DOUBLE data");
}
}

} // namespace duckdb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#include "decoder/delta_binary_packed_decoder.hpp"
#include "column_reader.hpp"
#include "parquet_reader.hpp"

namespace duckdb {

DeltaBinaryPackedDecoder::DeltaBinaryPackedDecoder(ColumnReader &reader)
: reader(reader), decoded_data_buffer(reader.encoding_buffers[0]) {
}

void DeltaBinaryPackedDecoder::InitializePage() {
auto &block = reader.block;
dbp_decoder = make_uniq<DbpDecoder>(block->ptr, block->len);
block->inc(block->len);
}

void DeltaBinaryPackedDecoder::Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t result_offset) {
idx_t valid_count = reader.GetValidCount(defines, read_count, result_offset);

auto &allocator = reader.reader.allocator;
decoded_data_buffer.reset();
switch (reader.schema.type) {
case duckdb_parquet::Type::INT32:
decoded_data_buffer.resize(allocator, sizeof(int32_t) * (valid_count));
dbp_decoder->GetBatch<int32_t>(decoded_data_buffer.ptr, valid_count);

break;
case duckdb_parquet::Type::INT64:
decoded_data_buffer.resize(allocator, sizeof(int64_t) * (valid_count));
dbp_decoder->GetBatch<int64_t>(decoded_data_buffer.ptr, valid_count);
break;

default:
throw std::runtime_error("DELTA_BINARY_PACKED should only be INT32 or INT64");
}
// Plain() will put NULLs in the right place
reader.Plain(decoded_data_buffer, defines, read_count, result_offset, result);
}

void DeltaBinaryPackedDecoder::Skip(uint8_t *defines, idx_t skip_count) {
idx_t valid_count = reader.GetValidCount(defines, skip_count);
switch (reader.schema.type) {
case duckdb_parquet::Type::INT32:
dbp_decoder->Skip<int32_t>(valid_count);

break;
case duckdb_parquet::Type::INT64:
dbp_decoder->Skip<int64_t>(valid_count);
break;

default:
throw std::runtime_error("DELTA_BINARY_PACKED should only be INT32 or INT64");
}
}

} // namespace duckdb
103 changes: 103 additions & 0 deletions src/duckdb/extension/parquet/decoder/delta_byte_array_decoder.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#include "decoder/delta_byte_array_decoder.hpp"
#include "column_reader.hpp"
#include "parquet_reader.hpp"
#include "reader/templated_column_reader.hpp"

namespace duckdb {

DeltaByteArrayDecoder::DeltaByteArrayDecoder(ColumnReader &reader) : reader(reader) {
}

void DeltaByteArrayDecoder::ReadDbpData(Allocator &allocator, ResizeableBuffer &buffer, ResizeableBuffer &result_buffer,
idx_t &value_count) {
auto decoder = make_uniq<DbpDecoder>(buffer.ptr, buffer.len);
value_count = decoder->TotalValues();
result_buffer.reset();
result_buffer.resize(allocator, sizeof(uint32_t) * value_count);
decoder->GetBatch<uint32_t>(result_buffer.ptr, value_count);
decoder->Finalize();
buffer.inc(buffer.len - decoder->BufferPtr().len);
}

void DeltaByteArrayDecoder::InitializePage() {
if (reader.type.InternalType() != PhysicalType::VARCHAR) {
throw std::runtime_error("Delta Byte Array encoding is only supported for string/blob data");
}
auto &block = *reader.block;
auto &allocator = reader.reader.allocator;
idx_t prefix_count, suffix_count;
auto &prefix_buffer = reader.encoding_buffers[0];
auto &suffix_buffer = reader.encoding_buffers[1];
ReadDbpData(allocator, block, prefix_buffer, prefix_count);
ReadDbpData(allocator, block, suffix_buffer, suffix_count);
if (prefix_count != suffix_count) {
throw std::runtime_error("DELTA_BYTE_ARRAY - prefix and suffix counts are different - corrupt file?");
}
if (prefix_count == 0) {
// no values
byte_array_data = make_uniq<Vector>(LogicalType::VARCHAR, nullptr);
return;
}
auto prefix_data = reinterpret_cast<uint32_t *>(prefix_buffer.ptr);
auto suffix_data = reinterpret_cast<uint32_t *>(suffix_buffer.ptr);
byte_array_data = make_uniq<Vector>(LogicalType::VARCHAR, prefix_count);
byte_array_count = prefix_count;
delta_offset = 0;
auto string_data = FlatVector::GetData<string_t>(*byte_array_data);
for (idx_t i = 0; i < prefix_count; i++) {
auto str_len = prefix_data[i] + suffix_data[i];
block.available(suffix_data[i]);
string_data[i] = StringVector::EmptyString(*byte_array_data, str_len);
auto result_data = string_data[i].GetDataWriteable();
if (prefix_data[i] > 0) {
if (i == 0 || prefix_data[i] > string_data[i - 1].GetSize()) {
throw std::runtime_error("DELTA_BYTE_ARRAY - prefix is out of range - corrupt file?");
}
memcpy(result_data, string_data[i - 1].GetData(), prefix_data[i]);
}
memcpy(result_data + prefix_data[i], block.ptr, suffix_data[i]);
block.inc(suffix_data[i]);
string_data[i].Finalize();
}
}

void DeltaByteArrayDecoder::Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t result_offset) {
if (!byte_array_data) {
throw std::runtime_error("Internal error - DeltaByteArray called but there was no byte_array_data set");
}
auto result_ptr = FlatVector::GetData<string_t>(result);
auto &result_mask = FlatVector::Validity(result);
auto string_data = FlatVector::GetData<string_t>(*byte_array_data);
for (idx_t row_idx = 0; row_idx < read_count; row_idx++) {
if (defines && defines[row_idx + result_offset] != reader.max_define) {
result_mask.SetInvalid(row_idx + result_offset);
continue;
}
if (delta_offset >= byte_array_count) {
throw IOException("DELTA_BYTE_ARRAY - length mismatch between values and byte array lengths (attempted "
"read of %d from %d entries) - corrupt file?",
delta_offset + 1, byte_array_count);
}
result_ptr[row_idx + result_offset] = string_data[delta_offset++];
}
StringVector::AddHeapReference(result, *byte_array_data);
}

void DeltaByteArrayDecoder::Skip(uint8_t *defines, idx_t skip_count) {
if (!byte_array_data) {
throw std::runtime_error("Internal error - DeltaByteArray called but there was no byte_array_data set");
}
for (idx_t row_idx = 0; row_idx < skip_count; row_idx++) {
if (defines && defines[row_idx] != reader.max_define) {
continue;
}
if (delta_offset >= byte_array_count) {
throw IOException("DELTA_BYTE_ARRAY - length mismatch between values and byte array lengths (attempted "
"read of %d from %d entries) - corrupt file?",
delta_offset + 1, byte_array_count);
}
delta_offset++;
}
}

} // namespace duckdb
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#include "decoder/delta_length_byte_array_decoder.hpp"
#include "decoder/delta_byte_array_decoder.hpp"
#include "column_reader.hpp"
#include "parquet_reader.hpp"
#include "reader/templated_column_reader.hpp"

namespace duckdb {

DeltaLengthByteArrayDecoder::DeltaLengthByteArrayDecoder(ColumnReader &reader)
: reader(reader), length_buffer(reader.encoding_buffers[0]) {
}

void DeltaLengthByteArrayDecoder::InitializePage() {
if (reader.type.InternalType() != PhysicalType::VARCHAR) {
throw std::runtime_error("Delta Length Byte Array encoding is only supported for string/blob data");
}
// read the binary packed lengths
auto &block = *reader.block;
auto &allocator = reader.reader.allocator;
DeltaByteArrayDecoder::ReadDbpData(allocator, block, length_buffer, byte_array_count);
length_idx = 0;
}

void DeltaLengthByteArrayDecoder::Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t result_offset) {
auto &block = *reader.block;
auto length_data = reinterpret_cast<uint32_t *>(length_buffer.ptr);
auto result_data = FlatVector::GetData<string_t>(result);
auto &result_mask = FlatVector::Validity(result);
for (idx_t row_idx = 0; row_idx < read_count; row_idx++) {
auto result_idx = result_offset + row_idx;
if (defines && defines[result_idx] != reader.max_define) {
result_mask.SetInvalid(result_idx);
continue;
}
if (length_idx >= byte_array_count) {
throw IOException(
"DELTA_LENGTH_BYTE_ARRAY - length mismatch between values and byte array lengths (attempted "
"read of %d from %d entries) - corrupt file?",
length_idx, byte_array_count);
}
auto str_len = length_data[length_idx++];
block.available(str_len);
result_data[result_idx] = StringVector::EmptyString(result, str_len);
auto str_data = result_data[result_idx].GetDataWriteable();
memcpy(str_data, block.ptr, str_len);
block.inc(str_len);
result_data[result_idx].Finalize();
}
}

void DeltaLengthByteArrayDecoder::Skip(uint8_t *defines, idx_t skip_count) {
auto &block = *reader.block;
auto length_data = reinterpret_cast<uint32_t *>(length_buffer.ptr);
for (idx_t row_idx = 0; row_idx < skip_count; row_idx++) {
if (defines && defines[row_idx] != reader.max_define) {
continue;
}
if (length_idx >= byte_array_count) {
throw IOException(
"DELTA_LENGTH_BYTE_ARRAY - length mismatch between values and byte array lengths (attempted "
"read of %d from %d entries) - corrupt file?",
length_idx, byte_array_count);
}
auto str_len = length_data[length_idx++];
block.inc(str_len);
}
}

} // namespace duckdb
100 changes: 100 additions & 0 deletions src/duckdb/extension/parquet/decoder/dictionary_decoder.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#include "decoder/dictionary_decoder.hpp"
#include "column_reader.hpp"
#include "parquet_reader.hpp"

namespace duckdb {

DictionaryDecoder::DictionaryDecoder(ColumnReader &reader)
: reader(reader), offset_buffer(reader.encoding_buffers[0]), valid_sel(STANDARD_VECTOR_SIZE),
dictionary_selection_vector(STANDARD_VECTOR_SIZE), dictionary_size(0) {
}

void DictionaryDecoder::InitializeDictionary(idx_t new_dictionary_size) {
auto old_dict_size = dictionary_size;
dictionary_size = new_dictionary_size;
// we use the first value in the dictionary to keep a NULL
if (!dictionary) {
dictionary = make_uniq<Vector>(reader.type, dictionary_size + 1);
} else if (dictionary_size > old_dict_size) {
dictionary->Resize(old_dict_size, dictionary_size + 1);
}
dictionary_id = reader.reader.file_name + "_" + reader.schema.name + "_" + std::to_string(reader.chunk_read_offset);
// we use the last entry as a NULL, dictionary vectors don't have a separate validity mask
auto &dict_validity = FlatVector::Validity(*dictionary);
dict_validity.Reset(dictionary_size + 1);
dict_validity.SetInvalid(dictionary_size);
reader.Plain(reader.block, nullptr, dictionary_size, 0, *dictionary);
}

void DictionaryDecoder::InitializePage() {
// where is it otherwise??
auto &block = reader.block;
auto dict_width = block->read<uint8_t>();
dict_decoder = make_uniq<RleBpDecoder>(block->ptr, block->len, dict_width);
block->inc(block->len);
}

void DictionaryDecoder::ConvertDictToSelVec(uint32_t *offsets, const SelectionVector &rows, idx_t count,
idx_t result_offset) {
D_ASSERT(count <= STANDARD_VECTOR_SIZE);
for (idx_t idx = 0; idx < count; idx++) {
auto row_idx = rows.get_index(idx);
auto offset = offsets[idx];
if (offset >= dictionary_size) {
throw std::runtime_error("Parquet file is likely corrupted, dictionary offset out of range");
}
dictionary_selection_vector.set_index(row_idx, offset);
}
}

void DictionaryDecoder::Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t result_offset) {
if (!dictionary || dictionary_size < 0) {
throw std::runtime_error("Parquet file is likely corrupted, missing dictionary");
}
idx_t valid_count = read_count;
if (defines) {
valid_count = 0;
for (idx_t i = 0; i < read_count; i++) {
valid_sel.set_index(valid_count, i);
dictionary_selection_vector.set_index(i, dictionary_size);
valid_count += defines[result_offset + i] == reader.max_define;
}
}
if (valid_count == read_count) {
// all values are valid - we can directly decompress the offsets into the selection vector
dict_decoder->GetBatch<uint32_t>(data_ptr_cast(dictionary_selection_vector.data()), valid_count);
// we do still need to verify the offsets though
for (idx_t idx = 0; idx < valid_count; idx++) {
if (dictionary_selection_vector.get_index(idx) >= dictionary_size) {
throw std::runtime_error("Parquet file is likely corrupted, dictionary offset out of range");
}
}
} else if (valid_count > 0) {
// for the valid entries - decode the offsets
offset_buffer.resize(reader.reader.allocator, sizeof(uint32_t) * valid_count);
dict_decoder->GetBatch<uint32_t>(offset_buffer.ptr, valid_count);
ConvertDictToSelVec(reinterpret_cast<uint32_t *>(offset_buffer.ptr), valid_sel, valid_count, result_offset);
}
#ifdef DEBUG
dictionary_selection_vector.Verify(read_count, dictionary_size + 1);
#endif
if (result_offset == 0) {
result.Dictionary(*dictionary, dictionary_size + 1, dictionary_selection_vector, read_count);
DictionaryVector::SetDictionaryId(result, dictionary_id);
D_ASSERT(result.GetVectorType() == VectorType::DICTIONARY_VECTOR);
} else {
D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR);
VectorOperations::Copy(*dictionary, result, dictionary_selection_vector, read_count, 0, result_offset);
}
}

void DictionaryDecoder::Skip(uint8_t *defines, idx_t skip_count) {
if (!dictionary || dictionary_size < 0) {
throw std::runtime_error("Parquet file is likely corrupted, missing dictionary");
}
idx_t valid_count = reader.GetValidCount(defines, skip_count);
// skip past the valid offsets
dict_decoder->Skip(valid_count);
}

} // namespace duckdb
Loading

0 comments on commit 8cff889

Please sign in to comment.