Skip to content

Commit

Permalink
add ut && regression test
Browse files Browse the repository at this point in the history
  • Loading branch information
hubgeter committed Dec 16, 2024
1 parent b862b14 commit 2a523cf
Show file tree
Hide file tree
Showing 9 changed files with 775 additions and 15 deletions.
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/table/transactional_hive_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range,
}
auto iter = std::find(delete_delta_file_names.begin(), delete_delta_file_names.end(),
remove_bucket_attemptId(file_name));
if (iter == delete_delta.file_names.end()) {
if (iter == delete_delta_file_names.end()) {
continue;
}
auto delete_file = fmt::format("{}/{}", delete_delta.directory_location,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,55 @@ insert into orc_full_acid_par PARTITION(part_col=20230102) values
(6, 'F');

update orc_full_acid_par set value = 'BB' where id = 2;




create table orc_to_acid_tb (id INT, value STRING)
PARTITIONED BY (part_col INT)
CLUSTERED BY (id) INTO 3 BUCKETS
STORED AS ORC;
INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=101) VALUES (1, 'A'), (3, 'C');
INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=102) VALUES (2, 'B');
ALTER TABLE orc_to_acid_tb SET TBLPROPERTIES ('transactional'='true');


create table orc_to_acid_compacted_tb (id INT, value STRING)
PARTITIONED BY (part_col INT)
CLUSTERED BY (id) INTO 3 BUCKETS
STORED AS ORC;
INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=101) VALUES (1, 'A'), (3, 'C');
INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (2, 'B');
ALTER TABLE orc_to_acid_compacted_tb SET TBLPROPERTIES ('transactional'='true');
ALTER TABLE orc_to_acid_compacted_tb COMPACT 'major';
INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (4, 'D');
update orc_to_acid_compacted_tb set value = "CC" where id = 3;
update orc_to_acid_compacted_tb set value = "BB" where id = 2;


create table orc_acid_minor (id INT, value STRING)
CLUSTERED BY (id) INTO 3 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional' = 'true');
insert into orc_acid_minor values (1, 'A');
insert into orc_acid_minor values (2, 'B');
insert into orc_acid_minor values (3, 'C');
update orc_acid_minor set value = "BB" where id = 2;
ALTER TABLE orc_acid_minor COMPACT 'minor';
insert into orc_acid_minor values (4, 'D');
update orc_acid_minor set value = "DD" where id = 4;
DELETE FROM orc_acid_minor WHERE id = 3;


create table orc_acid_major (id INT, value STRING)
CLUSTERED BY (id) INTO 3 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional' = 'true');
insert into orc_acid_major values (1, 'A');
insert into orc_acid_major values (2, 'B');
insert into orc_acid_major values (3, 'C');
update orc_acid_major set value = "BB" where id = 2;
ALTER TABLE orc_acid_major COMPACT 'minor';
insert into orc_acid_major values (4, 'D');
update orc_acid_major set value = "DD" where id = 4;
DELETE FROM orc_acid_major WHERE id = 3;
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue;
import org.apache.doris.fs.FileSystem;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;

import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down Expand Up @@ -64,7 +64,6 @@ public class AcidUtil {

@Getter
@ToString
@EqualsAndHashCode
private static class ParsedBase {
private final long writeId;
private final long visibilityId;
Expand Down Expand Up @@ -131,7 +130,7 @@ public int compareTo(ParsedDelta other) {
}


private static boolean isValidMetaDataFile(RemoteFileSystem fileSystem, String baseDir)
private static boolean isValidMetaDataFile(FileSystem fileSystem, String baseDir)
throws IOException {
String fileLocation = baseDir + "_metadata_acid";
Status status = fileSystem.exists(fileLocation);
Expand Down Expand Up @@ -161,7 +160,7 @@ private static boolean isValidMetaDataFile(RemoteFileSystem fileSystem, String b
return true;
}

private static boolean isValidBase(RemoteFileSystem remoteFileSystem, String baseDir,
private static boolean isValidBase(FileSystem fileSystem, String baseDir,
ParsedBase base, ValidWriteIdList writeIdList) throws IOException {
if (base.writeId == Long.MIN_VALUE) {
//Ref: https://issues.apache.org/jira/browse/HIVE-13369
Expand All @@ -173,7 +172,7 @@ private static boolean isValidBase(RemoteFileSystem remoteFileSystem, String bas
}

// hive 4 : just check "_v" suffix, before hive 4 : check `_metadata_acid` file in baseDir.
if ((base.visibilityId > 0) || isValidMetaDataFile(remoteFileSystem, baseDir)) {
if ((base.visibilityId > 0) || isValidMetaDataFile(fileSystem, baseDir)) {
return writeIdList.isValidBase(base.writeId);
}

Expand Down Expand Up @@ -208,7 +207,7 @@ private static ParsedDelta parseDelta(String fileName, String deltaPrefix, Strin

if (split2 == -1) {
long max = Long.parseLong(rest.substring(split + 1));
return new ParsedDelta(min, max, fileName, -1, deleteDelta, visibilityId);
return new ParsedDelta(min, max, path, -1, deleteDelta, visibilityId);
}

long max = Long.parseLong(rest.substring(split + 1, split2));
Expand All @@ -219,7 +218,7 @@ private static ParsedDelta parseDelta(String fileName, String deltaPrefix, Strin
//Since the hive3 library cannot read the hive4 transaction table normally, and there are many problems
// when using the Hive 4 library directly, this method is implemented.
//Ref: hive/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java#getAcidState
public static FileCacheValue getAcidState(RemoteFileSystem fileSystem, HivePartition partition,
public static FileCacheValue getAcidState(FileSystem fileSystem, HivePartition partition,
Map<String, String> txnValidIds, Map<String, String> catalogProps) throws Exception {

// Ref: https://issues.apache.org/jira/browse/HIVE-18192
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,12 +867,13 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
return fileCacheValues;
}

RemoteFileSystem fileSystem = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(partitions.get(0).getPath(), bindBrokerName),
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));

for (HivePartition partition : partitions) {
//Get filesystem multiple times, reason: https://github.com/apache/doris/pull/23409.
RemoteFileSystem fileSystem = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(partition.getPath(), bindBrokerName),
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));

if (!Strings.isNullOrEmpty(remoteUser)) {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser);
fileCacheValues.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public boolean createTable(CreateTableStmt stmt) throws UserException {
}
}

if (props.containsKey("transactional") && props.get("transactional").equals("true")) {
if (props.containsKey("transactional") && props.get("transactional").equalsIgnoreCase("true")) {
throw new UserException("Not support create hive transactional table.");
/*
CREATE TABLE trans6(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ private ExecutorFactory selectInsertExecutorFactory(
boolean emptyInsert = childIsEmptyRelation(physicalSink);
HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf;
if (hiveExternalTable.isHiveTransactionalTable()) {
throw new UserException("Not supported insert into hive table");
throw new UserException("Not supported insert into hive transactional table.");
}

return ExecutorFactory.from(
Expand Down
Loading

0 comments on commit 2a523cf

Please sign in to comment.