Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](hive)Support read hive4 transaction tables. #44001

Merged
merged 10 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions be/src/vec/exec/format/table/transactional_hive_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "transactional_hive_reader.h"

#include <re2/re2.h>

#include "runtime/runtime_state.h"
#include "transactional_hive_common.h"
#include "vec/data_types/data_type_factory.hpp"
Expand Down Expand Up @@ -108,15 +110,38 @@ Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range,
int64_t num_delete_files = 0;
std::filesystem::path file_path(data_file_path);

//See https://github.com/apache/hive/commit/ffee30e6267e85f00a22767262192abb9681cfb7#diff-5fe26c36b4e029dcd344fc5d484e7347R165
// bucket_xxx_attemptId => bucket_xxx
// bucket_xxx => bucket_xxx
auto remove_bucket_attemptId = [](const std::string& str) {
re2::RE2 pattern("^bucket_\\d+_\\d+$");

if (re2::RE2::FullMatch(str, pattern)) {
size_t pos = str.rfind('_');
if (pos != std::string::npos) {
return str.substr(0, pos);
}
}
return str;
};

SCOPED_TIMER(_transactional_orc_profile.delete_files_read_time);
for (auto& delete_delta : range.table_format_params.transactional_hive_params.delete_deltas) {
const std::string file_name = file_path.filename().string();
auto iter = std::find(delete_delta.file_names.begin(), delete_delta.file_names.end(),
file_name);
if (iter == delete_delta.file_names.end()) {

//need opt.
std::vector<std::string> delete_delta_file_names;
for (const auto& x : delete_delta.file_names) {
delete_delta_file_names.emplace_back(remove_bucket_attemptId(x));
}
auto iter = std::find(delete_delta_file_names.begin(), delete_delta_file_names.end(),
remove_bucket_attemptId(file_name));
if (iter == delete_delta_file_names.end()) {
continue;
}
auto delete_file = fmt::format("{}/{}", delete_delta.directory_location, file_name);
auto delete_file =
fmt::format("{}/{}", delete_delta.directory_location,
delete_delta.file_names[iter - delete_delta_file_names.begin()]);

TFileRangeDesc delete_range;
// must use __set() method to make sure __isset is true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,55 @@ insert into orc_full_acid_par PARTITION(part_col=20230102) values
(6, 'F');

update orc_full_acid_par set value = 'BB' where id = 2;




create table orc_to_acid_tb (id INT, value STRING)
PARTITIONED BY (part_col INT)
CLUSTERED BY (id) INTO 3 BUCKETS
STORED AS ORC;
INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=101) VALUES (1, 'A'), (3, 'C');
INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=102) VALUES (2, 'B');
ALTER TABLE orc_to_acid_tb SET TBLPROPERTIES ('transactional'='true');


create table orc_to_acid_compacted_tb (id INT, value STRING)
PARTITIONED BY (part_col INT)
CLUSTERED BY (id) INTO 3 BUCKETS
STORED AS ORC;
INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=101) VALUES (1, 'A'), (3, 'C');
INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (2, 'B');
ALTER TABLE orc_to_acid_compacted_tb SET TBLPROPERTIES ('transactional'='true');
ALTER TABLE orc_to_acid_compacted_tb COMPACT 'major';
INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (4, 'D');
update orc_to_acid_compacted_tb set value = "CC" where id = 3;
update orc_to_acid_compacted_tb set value = "BB" where id = 2;


create table orc_acid_minor (id INT, value STRING)
CLUSTERED BY (id) INTO 3 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional' = 'true');
insert into orc_acid_minor values (1, 'A');
insert into orc_acid_minor values (2, 'B');
insert into orc_acid_minor values (3, 'C');
update orc_acid_minor set value = "BB" where id = 2;
ALTER TABLE orc_acid_minor COMPACT 'minor';
insert into orc_acid_minor values (4, 'D');
update orc_acid_minor set value = "DD" where id = 4;
DELETE FROM orc_acid_minor WHERE id = 3;


create table orc_acid_major (id INT, value STRING)
CLUSTERED BY (id) INTO 3 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional' = 'true');
insert into orc_acid_major values (1, 'A');
insert into orc_acid_major values (2, 'B');
insert into orc_acid_major values (3, 'C');
update orc_acid_major set value = "BB" where id = 2;
ALTER TABLE orc_acid_major COMPACT 'minor';
insert into orc_acid_major values (4, 'D');
update orc_acid_major set value = "DD" where id = 4;
DELETE FROM orc_acid_major WHERE id = 3;
Loading
Loading