Skip to content

Commit

Permalink
[feature](hive)Support read hive4 transaction tables. (#44001)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Problem Summary:
Support read hive4 transaction tables.

1. Since there are compilation problems when directly using hive4 API to
get the file need to read, a function called
`ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java#getAcidState`
is implemented.

2. At the same time, it is forbidden to create, insert data, and delete
aicd tables.

3. fix `'transactional_properties' = 'insert_only'` table read Incorrect
data read.
 
TODO : merge HMSTransaction ,HiveTransaction class 
              merge  HiveTransactionMgr, HiveTransactionManager class

### Release note

[feature](hive)Support read hive4 transaction tables
  • Loading branch information
hubgeter authored Jan 3, 2025
1 parent 67c874c commit c398cba
Show file tree
Hide file tree
Showing 18 changed files with 1,453 additions and 150 deletions.
33 changes: 29 additions & 4 deletions be/src/vec/exec/format/table/transactional_hive_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "transactional_hive_reader.h"

#include <re2/re2.h>

#include "runtime/runtime_state.h"
#include "transactional_hive_common.h"
#include "vec/data_types/data_type_factory.hpp"
Expand Down Expand Up @@ -108,15 +110,38 @@ Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range,
int64_t num_delete_files = 0;
std::filesystem::path file_path(data_file_path);

//See https://github.com/apache/hive/commit/ffee30e6267e85f00a22767262192abb9681cfb7#diff-5fe26c36b4e029dcd344fc5d484e7347R165
// bucket_xxx_attemptId => bucket_xxx
// bucket_xxx => bucket_xxx
auto remove_bucket_attemptId = [](const std::string& str) {
re2::RE2 pattern("^bucket_\\d+_\\d+$");

if (re2::RE2::FullMatch(str, pattern)) {
size_t pos = str.rfind('_');
if (pos != std::string::npos) {
return str.substr(0, pos);
}
}
return str;
};

SCOPED_TIMER(_transactional_orc_profile.delete_files_read_time);
for (auto& delete_delta : range.table_format_params.transactional_hive_params.delete_deltas) {
const std::string file_name = file_path.filename().string();
auto iter = std::find(delete_delta.file_names.begin(), delete_delta.file_names.end(),
file_name);
if (iter == delete_delta.file_names.end()) {

//need opt.
std::vector<std::string> delete_delta_file_names;
for (const auto& x : delete_delta.file_names) {
delete_delta_file_names.emplace_back(remove_bucket_attemptId(x));
}
auto iter = std::find(delete_delta_file_names.begin(), delete_delta_file_names.end(),
remove_bucket_attemptId(file_name));
if (iter == delete_delta_file_names.end()) {
continue;
}
auto delete_file = fmt::format("{}/{}", delete_delta.directory_location, file_name);
auto delete_file =
fmt::format("{}/{}", delete_delta.directory_location,
delete_delta.file_names[iter - delete_delta_file_names.begin()]);

TFileRangeDesc delete_range;
// must use __set() method to make sure __isset is true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,55 @@ insert into orc_full_acid_par PARTITION(part_col=20230102) values
(6, 'F');

update orc_full_acid_par set value = 'BB' where id = 2;




create table orc_to_acid_tb (id INT, value STRING)
PARTITIONED BY (part_col INT)
CLUSTERED BY (id) INTO 3 BUCKETS
STORED AS ORC;
INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=101) VALUES (1, 'A'), (3, 'C');
INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=102) VALUES (2, 'B');
ALTER TABLE orc_to_acid_tb SET TBLPROPERTIES ('transactional'='true');


create table orc_to_acid_compacted_tb (id INT, value STRING)
PARTITIONED BY (part_col INT)
CLUSTERED BY (id) INTO 3 BUCKETS
STORED AS ORC;
INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=101) VALUES (1, 'A'), (3, 'C');
INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (2, 'B');
ALTER TABLE orc_to_acid_compacted_tb SET TBLPROPERTIES ('transactional'='true');
ALTER TABLE orc_to_acid_compacted_tb COMPACT 'major';
INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (4, 'D');
update orc_to_acid_compacted_tb set value = "CC" where id = 3;
update orc_to_acid_compacted_tb set value = "BB" where id = 2;


create table orc_acid_minor (id INT, value STRING)
CLUSTERED BY (id) INTO 3 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional' = 'true');
insert into orc_acid_minor values (1, 'A');
insert into orc_acid_minor values (2, 'B');
insert into orc_acid_minor values (3, 'C');
update orc_acid_minor set value = "BB" where id = 2;
ALTER TABLE orc_acid_minor COMPACT 'minor';
insert into orc_acid_minor values (4, 'D');
update orc_acid_minor set value = "DD" where id = 4;
DELETE FROM orc_acid_minor WHERE id = 3;


create table orc_acid_major (id INT, value STRING)
CLUSTERED BY (id) INTO 3 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional' = 'true');
insert into orc_acid_major values (1, 'A');
insert into orc_acid_major values (2, 'B');
insert into orc_acid_major values (3, 'C');
update orc_acid_major set value = "BB" where id = 2;
ALTER TABLE orc_acid_major COMPACT 'minor';
insert into orc_acid_major values (4, 'D');
update orc_acid_major set value = "DD" where id = 4;
DELETE FROM orc_acid_major WHERE id = 3;
Loading

0 comments on commit c398cba

Please sign in to comment.