Skip to content

Commit

Permalink
opt source code and add tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
kaka11chen committed Nov 21, 2024
1 parent 079a064 commit 210df93
Show file tree
Hide file tree
Showing 13 changed files with 2,404 additions and 27 deletions.
13 changes: 8 additions & 5 deletions be/src/vec/exec/format/parquet/parquet_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ bool FilterMap::can_filter_all(size_t remaining_num_values, size_t filter_map_in
Status FilterMap::generate_nested_filter_map(const std::vector<level_t>& rep_levels,
std::vector<uint8_t>& nested_filter_map_data,
std::unique_ptr<FilterMap>* nested_filter_map,
size_t* current_row_ptr, bool is_cross_page,
size_t start_index) const {
size_t* current_row_ptr, size_t start_index) const {
if (!has_filter() || filter_all()) {
*nested_filter_map = std::make_unique<FilterMap>();
return Status::OK();
return Status::InternalError(fmt::format(
"FilterMap::generate_nested_filter_map failed: has_filter={}, filter_all={}",
has_filter(), filter_all()));
}

if (rep_levels.empty()) {
Expand All @@ -91,10 +91,13 @@ Status FilterMap::generate_nested_filter_map(const std::vector<level_t>& rep_lev

nested_filter_map_data.resize(rep_levels.size());

fprintf(stderr, "nested_filter_map_data.size() in generate_nested_filter_map: %ld\n",
nested_filter_map_data.size());

size_t current_row = current_row_ptr ? *current_row_ptr : 0;

for (size_t i = start_index; i < rep_levels.size(); i++) {
if (!is_cross_page && i > start_index && rep_levels[i] == 0) {
if (i != start_index && rep_levels[i] == 0) {
current_row++;
if (current_row >= _filter_map_size) {
return Status::InvalidArgument(
Expand Down
4 changes: 1 addition & 3 deletions be/src/vec/exec/format/parquet/parquet_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ class FilterMap {
Status generate_nested_filter_map(const std::vector<level_t>& rep_levels,
std::vector<uint8_t>& nested_filter_map_data,
std::unique_ptr<FilterMap>* nested_filter_map,
size_t* current_row_ptr, // 当前处理到哪一行
bool is_cross_page, // 是否是跨页的情况
size_t start_index = 0) const; // rep_levels的起始处理位置
size_t* current_row_ptr, size_t start_index = 0) const;

const uint8_t* filter_map_data() const { return _filter_map_data; }
size_t filter_map_size() const { return _filter_map_size; }
Expand Down
41 changes: 25 additions & 16 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,6 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
size_t* read_rows, bool* eof, bool is_dict_filter,
bool align_rows) {
std::unique_ptr<FilterMap> nested_filter_map;
std::unique_ptr<std::vector<uint8_t>> nested_filter_map_data;

size_t current_row;

FilterMap* current_filter_map = &filter_map;
size_t origin_size = 0;
Expand All @@ -337,17 +334,22 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
} else {
_rep_levels.resize(0);
_def_levels.resize(0);
if (_nested_filter_map_data) {
_nested_filter_map_data->resize(0);
}
}
size_t parsed_rows = 0;
size_t remaining_values = _chunk_reader->remaining_num_values();
bool has_rep_level = _chunk_reader->max_rep_level() > 0;
bool has_def_level = _chunk_reader->max_def_level() > 0;

// Handle repetition levels (indicates nesting structure)
if (has_rep_level) {
LevelDecoder& rep_decoder = _chunk_reader->rep_level_decoder();
// Read repetition levels until batch is full or no more values
while (parsed_rows <= batch_size && remaining_values > 0) {
level_t rep_level = rep_decoder.get_next();
if (rep_level == 0) {
if (rep_level == 0) { // rep_level 0 indicates start of new row
if (parsed_rows == batch_size) {
rep_decoder.rewind_one();
break;
Expand All @@ -358,13 +360,15 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
remaining_values--;
}

if (filter_map.has_filter()) {
nested_filter_map_data = std::make_unique<std::vector<uint8_t>>();
nested_filter_map_data->resize(_rep_levels.size());
current_row = _orig_filter_map_index;
// Generate nested filter map
if (filter_map.has_filter() && (!filter_map.filter_all())) {
if (_nested_filter_map_data == nullptr) {
_nested_filter_map_data.reset(new std::vector<uint8_t>());
}
RETURN_IF_ERROR(filter_map.generate_nested_filter_map(
_rep_levels, *nested_filter_map_data, &nested_filter_map, &current_row, false,
0));
_rep_levels, *_nested_filter_map_data, &nested_filter_map,
&_orig_filter_map_index, origin_size));
// Update current_filter_map to nested_filter_map
current_filter_map = nested_filter_map.get();
}
} else if (!align_rows) {
Expand All @@ -374,15 +378,16 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
_rep_levels.resize(parsed_rows, 0);
}

// Process definition levels (indicates null values)
size_t parsed_values = _chunk_reader->remaining_num_values() - remaining_values;

_def_levels.resize(origin_size + parsed_values);
if (has_def_level) {
_chunk_reader->def_level_decoder().get_levels(&_def_levels[origin_size], parsed_values);
} else {
std::fill(_def_levels.begin() + origin_size, _def_levels.end(), 0);
}

// Handle nullable columns
MutableColumnPtr data_column;
std::vector<uint16_t> null_map;
NullMap* map_data_column = nullptr;
Expand All @@ -399,6 +404,7 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
data_column = doris_column->assume_mutable();
}

// Process definition levels to build null map
size_t has_read = origin_size;
size_t ancestor_nulls = 0;
size_t null_size = 0;
Expand Down Expand Up @@ -445,7 +451,9 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType

size_t num_values = parsed_values - ancestor_nulls;

// Handle filtered values
if (current_filter_map->filter_all()) {
// Skip all values if everything is filtered
if (null_size > 0) {
RETURN_IF_ERROR(_chunk_reader->skip_values(null_size, false));
}
Expand All @@ -470,11 +478,10 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_nulls, false));
}
}

if (!align_rows) {
*read_rows = parsed_rows;
}
*read_rows += parsed_rows;
_filter_map_index += parsed_values;

// Handle cross-page reading
if (_chunk_reader->remaining_num_values() == 0) {
if (_chunk_reader->has_next_page()) {
RETURN_IF_ERROR(_chunk_reader->next_page());
Expand All @@ -486,6 +493,7 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
}
}

// Apply filtering to repetition and definition levels
if (current_filter_map->has_filter()) {
if (current_filter_map->filter_all()) {
_rep_levels.resize(0);
Expand All @@ -510,7 +518,8 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
}
}

_orig_filter_map_index = current_row + 1;
// Prepare for next row
++_orig_filter_map_index;

if (_rep_levels.size() > 0) {
// make sure the rows of complex type are aligned correctly,
Expand Down
Loading

0 comments on commit 210df93

Please sign in to comment.