Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](parquet-reader)Implement late materialization of parquet complex types. #44098

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 53 additions & 99 deletions be/src/vec/exec/format/parquet/parquet_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,19 @@ const int32_t ParquetInt96::JULIAN_EPOCH_OFFSET_DAYS = 2440588;
const int64_t ParquetInt96::MICROS_IN_DAY = 86400000000;
const int64_t ParquetInt96::NANOS_PER_MICROSECOND = 1000;

ColumnSelectVector::ColumnSelectVector(const uint8_t* filter_map, size_t filter_map_size,
bool filter_all) {
build(filter_map, filter_map_size, filter_all);
}

void ColumnSelectVector::build(const uint8_t* filter_map, size_t filter_map_size, bool filter_all) {
Status FilterMap::init(const uint8_t* filter_map_data, size_t filter_map_size, bool filter_all) {
_filter_all = filter_all;
_filter_map = filter_map;
_filter_map_data = filter_map_data;
_filter_map_size = filter_map_size;
if (filter_all) {
_has_filter = true;
_filter_ratio = 1;
} else if (filter_map == nullptr) {
} else if (filter_map_data == nullptr) {
_has_filter = false;
_filter_ratio = 0;
} else {
size_t filter_count =
simd::count_zero_num(reinterpret_cast<const int8_t*>(filter_map), filter_map_size);
size_t filter_count = simd::count_zero_num(reinterpret_cast<const int8_t*>(filter_map_data),
filter_map_size);
if (filter_count == filter_map_size) {
_has_filter = true;
_filter_all = true;
Expand All @@ -58,109 +53,68 @@ void ColumnSelectVector::build(const uint8_t* filter_map, size_t filter_map_size
_filter_ratio = 0;
}
}
return Status::OK();
}

void ColumnSelectVector::set_run_length_null_map(const std::vector<uint16_t>& run_length_null_map,
size_t num_values, NullMap* null_map) {
_num_values = num_values;
_num_nulls = 0;
_read_index = 0;
size_t map_index = 0;
bool is_null = false;
if (_has_filter) {
// No run length null map is generated when _filter_all = true
DCHECK(!_filter_all);
_data_map.resize(num_values);
for (auto& run_length : run_length_null_map) {
if (is_null) {
_num_nulls += run_length;
for (int i = 0; i < run_length; ++i) {
_data_map[map_index++] = FILTERED_NULL;
}
} else {
for (int i = 0; i < run_length; ++i) {
_data_map[map_index++] = FILTERED_CONTENT;
}
}
is_null = !is_null;
}
size_t num_read = 0;
DCHECK_LE(_filter_map_index + num_values, _filter_map_size);
for (size_t i = 0; i < num_values; ++i) {
if (_filter_map[_filter_map_index++]) {
_data_map[i] = _data_map[i] == FILTERED_NULL ? NULL_DATA : CONTENT;
num_read++;
}
}
_num_filtered = num_values - num_read;
if (null_map != nullptr && num_read > 0) {
NullMap& map_data_column = *null_map;
auto null_map_index = map_data_column.size();
map_data_column.resize(null_map_index + num_read);
if (_num_nulls == 0) {
memset(map_data_column.data() + null_map_index, 0, num_read);
} else if (_num_nulls == num_values) {
memset(map_data_column.data() + null_map_index, 1, num_read);
} else {
for (size_t i = 0; i < num_values; ++i) {
if (_data_map[i] == CONTENT) {
map_data_column[null_map_index++] = (UInt8) false;
} else if (_data_map[i] == NULL_DATA) {
map_data_column[null_map_index++] = (UInt8) true;
}
}
}
}
} else {
_num_filtered = 0;
_run_length_null_map = &run_length_null_map;
if (null_map != nullptr) {
NullMap& map_data_column = *null_map;
auto null_map_index = map_data_column.size();
map_data_column.resize(null_map_index + num_values);

for (auto& run_length : run_length_null_map) {
if (is_null) {
memset(map_data_column.data() + null_map_index, 1, run_length);
null_map_index += run_length;
_num_nulls += run_length;
} else {
memset(map_data_column.data() + null_map_index, 0, run_length);
null_map_index += run_length;
}
is_null = !is_null;
}
} else {
for (auto& run_length : run_length_null_map) {
if (is_null) {
_num_nulls += run_length;
}
is_null = !is_null;
}
}
}
}

bool ColumnSelectVector::can_filter_all(size_t remaining_num_values) {
bool FilterMap::can_filter_all(size_t remaining_num_values, size_t filter_map_index) {
if (!_has_filter) {
return false;
}
if (_filter_all) {
// all data in normal columns can be skipped when _filter_all = true,
// so the remaining_num_values should be less than the remaining filter map size.
DCHECK_LE(remaining_num_values + _filter_map_index, _filter_map_size);
DCHECK_LE(remaining_num_values + filter_map_index, _filter_map_size);
// return true always, to make sure that the data in normal columns can be skipped.
return true;
}
if (remaining_num_values + _filter_map_index > _filter_map_size) {
if (remaining_num_values + filter_map_index > _filter_map_size) {
return false;
}
return simd::count_zero_num(reinterpret_cast<const int8_t*>(_filter_map + _filter_map_index),
remaining_num_values) == remaining_num_values;
}
return simd::count_zero_num(
reinterpret_cast<const int8_t*>(_filter_map_data + filter_map_index),
remaining_num_values) == remaining_num_values;
}

Status FilterMap::generate_nested_filter_map(const std::vector<level_t>& rep_levels,
std::vector<uint8_t>& nested_filter_map_data,
std::unique_ptr<FilterMap>* nested_filter_map,
size_t* current_row_ptr, size_t start_index) const {
if (!has_filter() || filter_all()) {
return Status::InternalError(fmt::format(
"FilterMap::generate_nested_filter_map failed: has_filter={}, filter_all={}",
has_filter(), filter_all()));
}

if (rep_levels.empty()) {
return Status::OK();
}

nested_filter_map_data.resize(rep_levels.size());

void ColumnSelectVector::skip(size_t num_values) {
_filter_map_index += num_values;
size_t current_row = current_row_ptr ? *current_row_ptr : 0;

for (size_t i = start_index; i < rep_levels.size(); i++) {
if (i != start_index && rep_levels[i] == 0) {
current_row++;
if (current_row >= _filter_map_size) {
return Status::InvalidArgument(fmt::format(
"current_row >= _filter_map_size. current_row: {}, _filter_map_size: {}",
current_row, _filter_map_size));
}
}
nested_filter_map_data[i] = _filter_map_data[current_row];
}

if (current_row_ptr) {
*current_row_ptr = current_row;
}

auto new_filter = std::make_unique<FilterMap>();
RETURN_IF_ERROR(
new_filter->init(nested_filter_map_data.data(), nested_filter_map_data.size(), false));
*nested_filter_map = std::move(new_filter);

return Status::OK();
}

ParsedVersion::ParsedVersion(std::string application, std::optional<std::string> version,
Expand Down
156 changes: 126 additions & 30 deletions be/src/vec/exec/format/parquet/parquet_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <ostream>
#include <regex>
#include <string>
#include <unordered_set>
#include <vector>

#include "vec/columns/column_nullable.h"
Expand Down Expand Up @@ -69,42 +70,148 @@ struct ParquetInt96 {
#pragma pack()
static_assert(sizeof(ParquetInt96) == 12, "The size of ParquetInt96 is not 12.");

class ColumnSelectVector {
class FilterMap {
public:
enum DataReadType : uint8_t { CONTENT = 0, NULL_DATA, FILTERED_CONTENT, FILTERED_NULL };
FilterMap() = default;
Status init(const uint8_t* filter_map_data, size_t filter_map_size, bool filter_all);

ColumnSelectVector(const uint8_t* filter_map, size_t filter_map_size, bool filter_all);
Status generate_nested_filter_map(const std::vector<level_t>& rep_levels,
std::vector<uint8_t>& nested_filter_map_data,
std::unique_ptr<FilterMap>* nested_filter_map,
size_t* current_row_ptr, size_t start_index = 0) const;

ColumnSelectVector() = default;
const uint8_t* filter_map_data() const { return _filter_map_data; }
size_t filter_map_size() const { return _filter_map_size; }
bool has_filter() const { return _has_filter; }
bool filter_all() const { return _filter_all; }
double filter_ratio() const { return _has_filter ? _filter_ratio : 0; }

void build(const uint8_t* filter_map, size_t filter_map_size, bool filter_all);
bool can_filter_all(size_t remaining_num_values, size_t filter_map_index);

const uint8_t* filter_map() { return _filter_map; }
private:
bool _has_filter = false;
bool _filter_all = false;
const uint8_t* _filter_map_data = nullptr;
size_t _filter_map_size = 0;
double _filter_ratio = 0;
};

size_t num_values() const { return _num_values; }
class ColumnSelectVector {
public:
enum DataReadType : uint8_t { CONTENT = 0, NULL_DATA, FILTERED_CONTENT, FILTERED_NULL };

size_t num_nulls() const { return _num_nulls; }
ColumnSelectVector() = default;

size_t num_filtered() const { return _num_filtered; }
Status init(const std::vector<uint16_t>& run_length_null_map, size_t num_values,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'init' exceeds recommended size/complexity thresholds [readability-function-size]

    Status init(const std::vector<uint16_t>& run_length_null_map, size_t num_values,
           ^
Additional context

be/src/vec/exec/format/parquet/parquet_common.h:104: 98 lines including whitespace and comments (threshold 80)

    Status init(const std::vector<uint16_t>& run_length_null_map, size_t num_values,
           ^

NullMap* null_map, FilterMap* filter_map, size_t filter_map_index,
const std::unordered_set<size_t>* skipped_indices = nullptr) {
_num_values = num_values;
_num_nulls = 0;
_read_index = 0;
size_t map_index = 0;
bool is_null = false;
_has_filter = filter_map->has_filter();

if (filter_map->has_filter()) {
// No run length null map is generated when _filter_all = true
DCHECK(!filter_map->filter_all());
_data_map.resize(num_values);
for (auto& run_length : run_length_null_map) {
if (is_null) {
_num_nulls += run_length;
for (int i = 0; i < run_length; ++i) {
_data_map[map_index++] = FILTERED_NULL;
}
} else {
for (int i = 0; i < run_length; ++i) {
_data_map[map_index++] = FILTERED_CONTENT;
}
}
is_null = !is_null;
}

double filter_ratio() const { return _has_filter ? _filter_ratio : 0; }
size_t num_read = 0;
size_t i = 0;
size_t valid_count = 0;

void fallback_filter() { _has_filter = false; }
while (valid_count < num_values) {
DCHECK_LT(filter_map_index + i, filter_map->filter_map_size());

bool has_filter() const { return _has_filter; }
if (skipped_indices != nullptr &&
skipped_indices->count(filter_map_index + i) > 0) {
++i;
continue;
}

bool can_filter_all(size_t remaining_num_values);
if (filter_map->filter_map_data()[filter_map_index + i]) {
_data_map[valid_count] =
_data_map[valid_count] == FILTERED_NULL ? NULL_DATA : CONTENT;
num_read++;
}
++valid_count;
++i;
}

bool filter_all() const { return _filter_all; }
_num_filtered = num_values - num_read;

void skip(size_t num_values);
if (null_map != nullptr && num_read > 0) {
NullMap& map_data_column = *null_map;
auto null_map_index = map_data_column.size();
map_data_column.resize(null_map_index + num_read);

void reset() {
if (_has_filter) {
_filter_map_index = 0;
if (_num_nulls == 0) {
memset(map_data_column.data() + null_map_index, 0, num_read);
} else if (_num_nulls == num_values) {
memset(map_data_column.data() + null_map_index, 1, num_read);
} else {
for (size_t i = 0; i < num_values; ++i) {
if (_data_map[i] == CONTENT) {
map_data_column[null_map_index++] = (UInt8) false;
} else if (_data_map[i] == NULL_DATA) {
map_data_column[null_map_index++] = (UInt8) true;
}
}
}
}
} else {
_num_filtered = 0;
_run_length_null_map = &run_length_null_map;
if (null_map != nullptr) {
NullMap& map_data_column = *null_map;
auto null_map_index = map_data_column.size();
map_data_column.resize(null_map_index + num_values);

for (auto& run_length : run_length_null_map) {
if (is_null) {
memset(map_data_column.data() + null_map_index, 1, run_length);
null_map_index += run_length;
_num_nulls += run_length;
} else {
memset(map_data_column.data() + null_map_index, 0, run_length);
null_map_index += run_length;
}
is_null = !is_null;
}
} else {
for (auto& run_length : run_length_null_map) {
if (is_null) {
_num_nulls += run_length;
}
is_null = !is_null;
}
}
}
return Status::OK();
}

size_t num_values() const { return _num_values; }

size_t num_nulls() const { return _num_nulls; }

size_t num_filtered() const { return _num_filtered; }

bool has_filter() const { return _has_filter; }

template <bool has_filter>
size_t get_next_run(DataReadType* data_read_type) {
DCHECK_EQ(_has_filter, has_filter);
Expand Down Expand Up @@ -137,22 +244,11 @@ class ColumnSelectVector {
}
}

void set_run_length_null_map(const std::vector<uint16_t>& run_length_null_map,
size_t num_values, NullMap* null_map = nullptr);

private:
std::vector<DataReadType> _data_map;
// the length of non-null values and null values are arranged in turn.
const std::vector<uint16_t>* _run_length_null_map;
bool _has_filter = false;
// only used when the whole batch is skipped
bool _filter_all = false;
const uint8_t* _filter_map = nullptr;
size_t _filter_map_size = 0;
double _filter_ratio = 0;
size_t _filter_map_index = 0;

// generated in set_run_length_null_map
bool _has_filter;
size_t _num_values;
size_t _num_nulls;
size_t _num_filtered;
Expand Down
Loading
Loading