diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp index bc4262b7451d53..18642ab1218b4d 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp +++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp @@ -17,6 +17,8 @@ #include "transactional_hive_reader.h" +#include + #include "runtime/runtime_state.h" #include "transactional_hive_common.h" #include "vec/data_types/data_type_factory.hpp" @@ -108,15 +110,38 @@ Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range, int64_t num_delete_files = 0; std::filesystem::path file_path(data_file_path); + //See https://github.com/apache/hive/commit/ffee30e6267e85f00a22767262192abb9681cfb7#diff-5fe26c36b4e029dcd344fc5d484e7347R165 + // bucket_xxx_attemptId => bucket_xxx + // bucket_xxx => bucket_xxx + auto remove_bucket_attemptId = [](const std::string& str) { + re2::RE2 pattern("^bucket_\\d+_\\d+$"); + + if (re2::RE2::FullMatch(str, pattern)) { + size_t pos = str.rfind('_'); + if (pos != std::string::npos) { + return str.substr(0, pos); + } + } + return str; + }; + SCOPED_TIMER(_transactional_orc_profile.delete_files_read_time); for (auto& delete_delta : range.table_format_params.transactional_hive_params.delete_deltas) { const std::string file_name = file_path.filename().string(); - auto iter = std::find(delete_delta.file_names.begin(), delete_delta.file_names.end(), - file_name); - if (iter == delete_delta.file_names.end()) { + + //need opt. + std::vector delete_delta_file_names; + for (const auto& x : delete_delta.file_names) { + delete_delta_file_names.emplace_back(remove_bucket_attemptId(x)); + } + auto iter = std::find(delete_delta_file_names.begin(), delete_delta_file_names.end(), + remove_bucket_attemptId(file_name)); + if (iter == delete_delta_file_names.end()) { continue; } - auto delete_file = fmt::format("{}/{}", delete_delta.directory_location, file_name); + auto delete_file = + fmt::format("{}/{}", delete_delta.directory_location, + delete_delta.file_names[iter - delete_delta_file_names.begin()]); TFileRangeDesc delete_range; // must use __set() method to make sure __isset is true diff --git a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql index 814df4cdc5ff90..da6400bdff0220 100755 --- a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql +++ b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql @@ -41,3 +41,55 @@ insert into orc_full_acid_par PARTITION(part_col=20230102) values (6, 'F'); update orc_full_acid_par set value = 'BB' where id = 2; + + + + +create table orc_to_acid_tb (id INT, value STRING) +PARTITIONED BY (part_col INT) +CLUSTERED BY (id) INTO 3 BUCKETS +STORED AS ORC; +INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=101) VALUES (1, 'A'), (3, 'C'); +INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=102) VALUES (2, 'B'); +ALTER TABLE orc_to_acid_tb SET TBLPROPERTIES ('transactional'='true'); + + +create table orc_to_acid_compacted_tb (id INT, value STRING) +PARTITIONED BY (part_col INT) +CLUSTERED BY (id) INTO 3 BUCKETS +STORED AS ORC; +INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=101) VALUES (1, 'A'), (3, 'C'); +INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (2, 'B'); +ALTER TABLE orc_to_acid_compacted_tb SET TBLPROPERTIES ('transactional'='true'); +ALTER TABLE orc_to_acid_compacted_tb COMPACT 'major'; +INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (4, 'D'); +update orc_to_acid_compacted_tb set value = "CC" where id = 3; +update orc_to_acid_compacted_tb set value = "BB" where id = 2; + + +create table orc_acid_minor (id INT, value STRING) +CLUSTERED BY (id) INTO 3 BUCKETS +STORED AS ORC +TBLPROPERTIES ('transactional' = 'true'); +insert into orc_acid_minor values (1, 'A'); +insert into orc_acid_minor values (2, 'B'); +insert into orc_acid_minor values (3, 'C'); +update orc_acid_minor set value = "BB" where id = 2; +ALTER TABLE orc_acid_minor COMPACT 'minor'; +insert into orc_acid_minor values (4, 'D'); +update orc_acid_minor set value = "DD" where id = 4; +DELETE FROM orc_acid_minor WHERE id = 3; + + +create table orc_acid_major (id INT, value STRING) +CLUSTERED BY (id) INTO 3 BUCKETS +STORED AS ORC +TBLPROPERTIES ('transactional' = 'true'); +insert into orc_acid_major values (1, 'A'); +insert into orc_acid_major values (2, 'B'); +insert into orc_acid_major values (3, 'C'); +update orc_acid_major set value = "BB" where id = 2; +ALTER TABLE orc_acid_major COMPACT 'minor'; +insert into orc_acid_major values (4, 'D'); +update orc_acid_major set value = "DD" where id = 4; +DELETE FROM orc_acid_major WHERE id = 3; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java new file mode 100644 index 00000000000000..915a1a410d1cc5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java @@ -0,0 +1,453 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.backup.Status; +import org.apache.doris.common.util.LocationPath; +import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; +import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue; +import org.apache.doris.fs.FileSystem; +import org.apache.doris.fs.remote.RemoteFile; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class AcidUtil { + private static final Logger LOG = LogManager.getLogger(AcidUtil.class); + + public static final String VALID_TXNS_KEY = "hive.txn.valid.txns"; + public static final String VALID_WRITEIDS_KEY = "hive.txn.valid.writeids"; + + private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = "bucket_"; + private static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length"; + + // An `_orc_acid_version` file is written to each base/delta/delete_delta dir written by a full acid write + // or compaction. This is the primary mechanism for versioning acid data. + // Each individual ORC file written stores the current version as a user property in ORC footer. All data files + // produced by Acid write should have this (starting with Hive 3.0), including those written by compactor.This is + // more for sanity checking in case someone moved the files around or something like that. + // In hive, methods for getting/reading the version from files were moved to test which is the only place they are + // used (after HIVE-23506), in order to keep devs out of temptation, since they access the FileSystem which + // is expensive. + // After `HIVE-23825: Create a flag to turn off _orc_acid_version file creation`, introduce variables to + // control whether to generate `_orc_acid_version` file. So don't need check this file exist. + private static final String HIVE_ORC_ACID_VERSION_FILE = "_orc_acid_version"; + + @Getter + @ToString + private static class ParsedBase { + private final long writeId; + private final long visibilityId; + + public ParsedBase(long writeId, long visibilityId) { + this.writeId = writeId; + this.visibilityId = visibilityId; + } + } + + private static ParsedBase parseBase(String name) { + //format1 : base_writeId + //format2 : base_writeId_visibilityId detail: https://issues.apache.org/jira/browse/HIVE-20823 + name = name.substring("base_".length()); + int index = name.indexOf("_v"); + if (index == -1) { + return new ParsedBase(Long.parseLong(name), 0); + } + return new ParsedBase( + Long.parseLong(name.substring(0, index)), + Long.parseLong(name.substring(index + 2))); + } + + @Getter + @ToString + @EqualsAndHashCode + private static class ParsedDelta implements Comparable { + private final long min; + private final long max; + private final String path; + private final int statementId; + private final boolean deleteDelta; + private final long visibilityId; + + public ParsedDelta(long min, long max, @NonNull String path, int statementId, + boolean deleteDelta, long visibilityId) { + this.min = min; + this.max = max; + this.path = path; + this.statementId = statementId; + this.deleteDelta = deleteDelta; + this.visibilityId = visibilityId; + } + + /* + * Smaller minWID orders first; + * If minWID is the same, larger maxWID orders first; + * Otherwise, sort by stmtID; files w/o stmtID orders first. + * + * Compactions (Major/Minor) merge deltas/bases but delete of old files + * happens in a different process; thus it's possible to have bases/deltas with + * overlapping writeId boundaries. The sort order helps figure out the "best" set of files + * to use to get data. + * This sorts "wider" delta before "narrower" i.e. delta_5_20 sorts before delta_5_10 (and delta_11_20) + */ + @Override + public int compareTo(ParsedDelta other) { + return min != other.min ? Long.compare(min, other.min) : + other.max != max ? Long.compare(other.max, max) : + statementId != other.statementId + ? Integer.compare(statementId, other.statementId) : + path.compareTo(other.path); + } + } + + + private static boolean isValidMetaDataFile(FileSystem fileSystem, String baseDir) + throws IOException { + String fileLocation = baseDir + "_metadata_acid"; + Status status = fileSystem.exists(fileLocation); + if (status != Status.OK) { + return false; + } + //In order to save the cost of reading the file content, we only check whether the file exists. + // File Contents: {"thisFileVersion":"0","dataFormat":"compacted"} + // + // Map metadata; + // try (var in = read(fileLocation)) { + // metadata = new ObjectMapper().readValue(in, new TypeReference<>() {}); + // } + // catch (IOException e) { + // throw new IOException(String.format("Failed to read %s: %s", fileLocation, e.getMessage()), e); + // } + // + // String version = metadata.get("thisFileVersion"); + // if (!"0".equals(version)) { + // throw new IOException("Unexpected ACID metadata version: " + version); + // } + // + // String format = metadata.get("dataFormat"); + // if (!"compacted".equals(format)) { + // throw new IOException("Unexpected value for ACID dataFormat: " + format); + // } + return true; + } + + private static boolean isValidBase(FileSystem fileSystem, String baseDir, + ParsedBase base, ValidWriteIdList writeIdList) throws IOException { + if (base.writeId == Long.MIN_VALUE) { + //Ref: https://issues.apache.org/jira/browse/HIVE-13369 + //such base is created by 1st compaction in case of non-acid to acid table conversion.(you + //will get dir: `base_-9223372036854775808`) + //By definition there are no open txns with id < 1. + //After this: https://issues.apache.org/jira/browse/HIVE-18192, txns(global transaction ID) => writeId. + return true; + } + + // hive 4 : just check "_v" suffix, before hive 4 : check `_metadata_acid` file in baseDir. + if ((base.visibilityId > 0) || isValidMetaDataFile(fileSystem, baseDir)) { + return writeIdList.isValidBase(base.writeId); + } + + // if here, it's a result of IOW + return writeIdList.isWriteIdValid(base.writeId); + } + + private static ParsedDelta parseDelta(String fileName, String deltaPrefix, String path) { + // format1: delta_min_max_statementId_visibilityId, delete_delta_min_max_statementId_visibilityId + // _visibilityId maybe not exists. + // detail: https://issues.apache.org/jira/browse/HIVE-20823 + // format2: delta_min_max_visibilityId, delete_delta_min_visibilityId + // when minor compaction runs, we collapse per statement delta files inside a single + // transaction so we no longer need a statementId in the file name + + // String fileName = fileName.substring(name.lastIndexOf('/') + 1); + // checkArgument(fileName.startsWith(deltaPrefix), "File does not start with '%s': %s", deltaPrefix, path); + + long visibilityId = 0; + int visibilityIdx = fileName.indexOf("_v"); + if (visibilityIdx != -1) { + visibilityId = Long.parseLong(fileName.substring(visibilityIdx + 2)); + fileName = fileName.substring(0, visibilityIdx); + } + + boolean deleteDelta = deltaPrefix.equals("delete_delta_"); + + String rest = fileName.substring(deltaPrefix.length()); + int split = rest.indexOf('_'); + int split2 = rest.indexOf('_', split + 1); + long min = Long.parseLong(rest.substring(0, split)); + + if (split2 == -1) { + long max = Long.parseLong(rest.substring(split + 1)); + return new ParsedDelta(min, max, path, -1, deleteDelta, visibilityId); + } + + long max = Long.parseLong(rest.substring(split + 1, split2)); + int statementId = Integer.parseInt(rest.substring(split2 + 1)); + return new ParsedDelta(min, max, path, statementId, deleteDelta, visibilityId); + } + + public interface FileFilter { + public boolean accept(String fileName); + } + + public static final class FullAcidFileFilter implements FileFilter { + @Override + public boolean accept(String fileName) { + return fileName.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX) + && !fileName.endsWith(DELTA_SIDE_FILE_SUFFIX); + } + } + + public static final class InsertOnlyFileFilter implements FileFilter { + @Override + public boolean accept(String fileName) { + return true; + } + } + + //Since the hive3 library cannot read the hive4 transaction table normally, and there are many problems + // when using the Hive 4 library directly, this method is implemented. + //Ref: hive/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java#getAcidState + public static FileCacheValue getAcidState(FileSystem fileSystem, HivePartition partition, + Map txnValidIds, Map catalogProps, boolean isFullAcid) throws Exception { + + // Ref: https://issues.apache.org/jira/browse/HIVE-18192 + // Readers should use the combination of ValidTxnList and ValidWriteIdList(Table) for snapshot isolation. + // ValidReadTxnList implements ValidTxnList + // ValidReaderWriteIdList implements ValidWriteIdList + ValidTxnList validTxnList = null; + if (txnValidIds.containsKey(VALID_TXNS_KEY)) { + validTxnList = new ValidReadTxnList(); + validTxnList.readFromString( + txnValidIds.get(VALID_TXNS_KEY) + ); + } else { + throw new RuntimeException("Miss ValidTxnList"); + } + + ValidWriteIdList validWriteIdList = null; + if (txnValidIds.containsKey(VALID_WRITEIDS_KEY)) { + validWriteIdList = new ValidReaderWriteIdList(); + validWriteIdList.readFromString( + txnValidIds.get(VALID_WRITEIDS_KEY) + ); + } else { + throw new RuntimeException("Miss ValidWriteIdList"); + } + + String partitionPath = partition.getPath(); + //hdfs://xxxxx/user/hive/warehouse/username/data_id=200103 + + List lsPartitionPath = new ArrayList<>(); + Status status = fileSystem.globList(partitionPath + "/*", lsPartitionPath); + // List all files and folders, without recursion. + // FileStatus[] lsPartitionPath = null; + // fileSystem.listFileStatuses(partitionPath,lsPartitionPath); + if (status != Status.OK) { + throw new IOException(status.toString()); + } + + String oldestBase = null; + long oldestBaseWriteId = Long.MAX_VALUE; + String bestBasePath = null; + long bestBaseWriteId = 0; + boolean haveOriginalFiles = false; + List workingDeltas = new ArrayList<>(); + + for (RemoteFile remotePath : lsPartitionPath) { + if (remotePath.isDirectory()) { + String dirName = remotePath.getName(); //dirName: base_xxx,delta_xxx,... + String dirPath = partitionPath + "/" + dirName; + + if (dirName.startsWith("base_")) { + ParsedBase base = parseBase(dirName); + if (!validTxnList.isTxnValid(base.visibilityId)) { + //checks visibilityTxnId to see if it is committed in current snapshot. + continue; + } + + long writeId = base.writeId; + if (oldestBaseWriteId > writeId) { + oldestBase = dirPath; + oldestBaseWriteId = writeId; + } + + if (((bestBasePath == null) || (bestBaseWriteId < writeId)) + && isValidBase(fileSystem, dirPath, base, validWriteIdList)) { + //IOW will generator a base_N/ directory: https://issues.apache.org/jira/browse/HIVE-14988 + //So maybe need consider: https://issues.apache.org/jira/browse/HIVE-25777 + + bestBasePath = dirPath; + bestBaseWriteId = writeId; + } + } else if (dirName.startsWith("delta_") || dirName.startsWith("delete_delta_")) { + String deltaPrefix = dirName.startsWith("delta_") ? "delta_" : "delete_delta_"; + ParsedDelta delta = parseDelta(dirName, deltaPrefix, dirPath); + + if (!validTxnList.isTxnValid(delta.visibilityId)) { + continue; + } + + // No need check (validWriteIdList.isWriteIdRangeAborted(min,max) != RangeResponse.ALL) + // It is a subset of (validWriteIdList.isWriteIdRangeValid(min, max) != RangeResponse.NONE) + if (validWriteIdList.isWriteIdRangeValid(delta.min, delta.max) != RangeResponse.NONE) { + workingDeltas.add(delta); + } + } else { + //Sometimes hive will generate temporary directories(`.hive-staging_hive_xxx` ), + // which do not need to be read. + LOG.warn("Read Hive Acid Table ignore the contents of this folder:" + dirName); + } + } else { + haveOriginalFiles = true; + } + } + + if (bestBasePath == null && haveOriginalFiles) { + // ALTER TABLE nonAcidTbl SET TBLPROPERTIES ('transactional'='true'); + throw new UnsupportedOperationException("For no acid table convert to acid, please COMPACT 'major'."); + } + + if ((oldestBase != null) && (bestBasePath == null)) { + /* + * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given + * {@link writeIdList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus + * cannot have any data for an open txn. We could check {@link deltas} has files to cover + * [1,n] w/o gaps but this would almost never happen... + * + * We only throw for base_x produced by Compactor since that base erases all history and + * cannot be used for a client that has a snapshot in which something inside this base is + * open. (Nor can we ignore this base of course) But base_x which is a result of IOW, + * contains all history so we treat it just like delta wrt visibility. Imagine, IOW which + * aborts. It creates a base_x, which can and should just be ignored.*/ + long[] exceptions = validWriteIdList.getInvalidWriteIds(); + String minOpenWriteId = ((exceptions != null) + && (exceptions.length > 0)) ? String.valueOf(exceptions[0]) : "x"; + throw new IOException( + String.format("Not enough history available for ({},{}). Oldest available base: {}", + validWriteIdList.getHighWatermark(), minOpenWriteId, oldestBase)); + } + + workingDeltas.sort(null); + + List deltas = new ArrayList<>(); + long current = bestBaseWriteId; + int lastStatementId = -1; + ParsedDelta prev = null; + // find need read delta/delete_delta file. + for (ParsedDelta next : workingDeltas) { + if (next.max > current) { + if (validWriteIdList.isWriteIdRangeValid(current + 1, next.max) != RangeResponse.NONE) { + deltas.add(next); + current = next.max; + lastStatementId = next.statementId; + prev = next; + } + } else if ((next.max == current) && (lastStatementId >= 0)) { + //make sure to get all deltas within a single transaction; multi-statement txn + //generate multiple delta files with the same txnId range + //of course, if maxWriteId has already been minor compacted, + // all per statement deltas are obsolete + + deltas.add(next); + prev = next; + } else if ((prev != null) + && (next.max == prev.max) + && (next.min == prev.min) + && (next.statementId == prev.statementId)) { + // The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta, except + // the path. This may happen when we have split update and we have two types of delta + // directories- 'delta_x_y' and 'delete_delta_x_y' for the SAME txn range. + + // Also note that any delete_deltas in between a given delta_x_y range would be made + // obsolete. For example, a delta_30_50 would make delete_delta_40_40 obsolete. + // This is valid because minor compaction always compacts the normal deltas and the delete + // deltas for the same range. That is, if we had 3 directories, delta_30_30, + // delete_delta_40_40 and delta_50_50, then running minor compaction would produce + // delta_30_50 and delete_delta_30_50. + deltas.add(next); + prev = next; + } + } + + FileCacheValue fileCacheValue = new FileCacheValue(); + List deleteDeltas = new ArrayList<>(); + + FileFilter fileFilter = isFullAcid ? new FullAcidFileFilter() : new InsertOnlyFileFilter(); + + // delta directories + for (ParsedDelta delta : deltas) { + String location = delta.getPath(); + + List remoteFiles = new ArrayList<>(); + status = fileSystem.listFiles(location, false, remoteFiles); + if (status.ok()) { + if (delta.isDeleteDelta()) { + List deleteDeltaFileNames = remoteFiles.stream() + .map(RemoteFile::getName).filter(fileFilter::accept) + .collect(Collectors.toList()); + deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames)); + continue; + } + remoteFiles.stream().filter(f -> fileFilter.accept(f.getName())).forEach(file -> { + LocationPath path = new LocationPath(file.getPath().toString(), catalogProps); + fileCacheValue.addFile(file, path); + }); + } else { + throw new RuntimeException(status.getErrMsg()); + } + } + + // base + if (bestBasePath != null) { + List remoteFiles = new ArrayList<>(); + status = fileSystem.listFiles(bestBasePath, false, remoteFiles); + if (status.ok()) { + remoteFiles.stream().filter(f -> fileFilter.accept(f.getName())) + .forEach(file -> { + LocationPath path = new LocationPath(file.getPath().toString(), catalogProps); + fileCacheValue.addFile(file, path); + }); + } else { + throw new RuntimeException(status.getErrMsg()); + } + } + + if (isFullAcid) { + fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas)); + } else if (!deleteDeltas.isEmpty()) { + throw new RuntimeException("No Hive Full Acid Table have delete_delta_* Dir."); + } + return fileCacheValue; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java index a5e0eefb3483aa..fcaefecf41b156 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java @@ -23,7 +23,6 @@ import org.apache.doris.datasource.TableMetadata; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; @@ -82,7 +81,7 @@ NotificationEventResponse getNextNotification(long lastEventId, void commitTxn(long txnId); - ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId); + Map getValidWriteIds(String fullTableName, long currentTransactionId); void acquireSharedLock(String queryId, long txnId, String user, TableName tblName, List partitionNames, long timeoutMs); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 71c7308b079866..b554f508103992 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -30,6 +30,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; @@ -359,19 +360,24 @@ public Map getNameToPartitionItems() { } public boolean isHiveTransactionalTable() { - return dlaType == DLAType.HIVE && AcidUtils.isTransactionalTable(remoteTable) - && isSupportedTransactionalFileFormat(); + return dlaType == DLAType.HIVE && AcidUtils.isTransactionalTable(remoteTable); } - private boolean isSupportedTransactionalFileFormat() { + private boolean isSupportedFullAcidTransactionalFileFormat() { // Sometimes we meet "transactional" = "true" but format is parquet, which is not supported. // So we need to check the input format for transactional table. String inputFormatName = remoteTable.getSd().getInputFormat(); return inputFormatName != null && SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS.contains(inputFormatName); } - public boolean isFullAcidTable() { - return dlaType == DLAType.HIVE && AcidUtils.isFullAcidTable(remoteTable); + public boolean isFullAcidTable() throws UserException { + if (dlaType == DLAType.HIVE && AcidUtils.isFullAcidTable(remoteTable)) { + if (!isSupportedFullAcidTransactionalFileFormat()) { + throw new UserException("This table is full Acid Table, but no Orc Format."); + } + return true; + } + return false; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index ea42dfa2f52a01..258c936e44821b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -32,12 +32,12 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.common.util.CacheBulkLoader; import org.apache.doris.common.util.LocationPath; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.ExternalMetaCacheMgr; -import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.fs.FileSystemCache; import org.apache.doris.fs.remote.RemoteFile; @@ -55,7 +55,6 @@ import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.Iterables; @@ -70,21 +69,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.utils.FileUtils; -import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.net.URI; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -104,10 +98,6 @@ public class HiveMetaStoreCache { private static final Logger LOG = LogManager.getLogger(HiveMetaStoreCache.class); public static final String HIVE_DEFAULT_PARTITION = "__HIVE_DEFAULT_PARTITION__"; - // After hive 3, transactional table's will have file '_orc_acid_version' with value >= '2'. - public static final String HIVE_ORC_ACID_VERSION_FILE = "_orc_acid_version"; - - private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = "bucket_"; private final HMSExternalCatalog catalog; private JobConf jobConf; @@ -742,121 +732,32 @@ public LoadingCache getPartitionCache() { return partitionCache; } - public List getFilesByTransaction(List partitions, ValidWriteIdList validWriteIds, - boolean isFullAcid, boolean skipCheckingAcidVersionFile, long tableId, String bindBrokerName) { + public List getFilesByTransaction(List partitions, Map txnValidIds, + boolean isFullAcid, String bindBrokerName) { List fileCacheValues = Lists.newArrayList(); - String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME); try { - for (HivePartition partition : partitions) { - FileCacheValue fileCacheValue = new FileCacheValue(); - AcidUtils.Directory directory; - if (!Strings.isNullOrEmpty(remoteUser)) { - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser); - directory = ugi.doAs((PrivilegedExceptionAction) () -> AcidUtils.getAcidState( - new Path(partition.getPath()), jobConf, validWriteIds, false, true)); - } else { - directory = AcidUtils.getAcidState(new Path(partition.getPath()), jobConf, validWriteIds, false, - true); - } - if (directory == null) { - return Collections.emptyList(); - } - if (!directory.getOriginalFiles().isEmpty()) { - throw new Exception("Original non-ACID files in transactional tables are not supported"); - } - - if (isFullAcid) { - int acidVersion = 2; - /** - * From Hive version >= 3.0, delta/base files will always have file '_orc_acid_version' - * with value >= '2'. - */ - Path baseOrDeltaPath = directory.getBaseDirectory() != null ? directory.getBaseDirectory() : - !directory.getCurrentDirectories().isEmpty() ? directory.getCurrentDirectories().get(0) - .getPath() : null; - if (baseOrDeltaPath == null) { - return Collections.emptyList(); - } - if (!skipCheckingAcidVersionFile) { - String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString(); - RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey( - LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(), - bindBrokerName), - catalog.getCatalogProperty().getProperties(), - bindBrokerName, jobConf)); - Status status = fs.exists(acidVersionPath); - if (status != Status.OK) { - if (status.getErrCode() == ErrCode.NOT_FOUND) { - acidVersion = 0; - } else { - throw new Exception(String.format("Failed to check remote path {} exists.", - acidVersionPath)); - } - } - if (acidVersion == 0 && !directory.getCurrentDirectories().isEmpty()) { - throw new Exception( - "Hive 2.x versioned full-acid tables need to run major compaction."); - } - } - } - - // delta directories - List deleteDeltas = new ArrayList<>(); - for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) { - String location = delta.getPath().toString(); - RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey( - LocationPath.getFSIdentity(location, bindBrokerName), - catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); - List remoteFiles = new ArrayList<>(); - Status status = fs.listFiles(location, false, remoteFiles); - if (status.ok()) { - if (delta.isDeleteDelta()) { - List deleteDeltaFileNames = remoteFiles.stream().map(f -> f.getName()).filter( - name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) - .collect(Collectors.toList()); - deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames)); - continue; - } - remoteFiles.stream().filter( - f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> { - LocationPath path = new LocationPath(file.getPath().toString(), - catalog.getProperties()); - fileCacheValue.addFile(file, path); - }); - } else { - throw new RuntimeException(status.getErrMsg()); - } - } + if (partitions.isEmpty()) { + return fileCacheValues; + } - // base - if (directory.getBaseDirectory() != null) { - String location = directory.getBaseDirectory().toString(); - RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey( - LocationPath.getFSIdentity(location, bindBrokerName), - catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); - List remoteFiles = new ArrayList<>(); - Status status = fs.listFiles(location, false, remoteFiles); - if (status.ok()) { - remoteFiles.stream().filter( - f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) - .forEach(file -> { - LocationPath path = new LocationPath(file.getPath().toString(), - catalog.getProperties()); - fileCacheValue.addFile(file, path); - }); - } else { - throw new RuntimeException(status.getErrMsg()); - } - } - fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas)); - fileCacheValues.add(fileCacheValue); + for (HivePartition partition : partitions) { + //Get filesystem multiple times, Reason: https://github.com/apache/doris/pull/23409. + RemoteFileSystem fileSystem = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( + new FileSystemCache.FileSystemCacheKey( + LocationPath.getFSIdentity(partition.getPath(), bindBrokerName), + catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); + + AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(jobConf); + HadoopAuthenticator hadoopAuthenticator = + HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig); + + fileCacheValues.add( + hadoopAuthenticator.doAs(() -> AcidUtil.getAcidState( + fileSystem, partition, txnValidIds, catalog.getProperties(), isFullAcid)) + ); } } catch (Exception e) { - throw new CacheException("failed to get input splits for write ids %s in catalog %s", e, - validWriteIds.toString(), catalog.getName()); + throw new CacheException("Failed to get input splits %s", e, txnValidIds.toString()); } return fileCacheValues; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index a660cb148ac069..b44ffe00390536 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -46,6 +46,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -179,6 +180,25 @@ public boolean createTable(CreateTableStmt stmt) throws UserException { props.put("owner", ConnectContext.get().getUserIdentity().getUser()); } } + + if (props.containsKey("transactional") && props.get("transactional").equalsIgnoreCase("true")) { + throw new UserException("Not support create hive transactional table."); + /* + CREATE TABLE trans6( + `col1` int, + `col2` int + ) ENGINE=hive + PROPERTIES ( + 'file_format'='orc', + 'compression'='zlib', + 'bucketing_version'='2', + 'transactional'='true', + 'transactional_properties'='default' + ); + In hive, this table only can insert not update(not report error,but not actually updated). + */ + } + String fileFormat = props.getOrDefault(FILE_FORMAT_KEY, Config.hive_default_file_format); Map ddlProps = new HashMap<>(); for (Map.Entry entry : props.entrySet()) { @@ -273,6 +293,10 @@ public void dropTable(DropTableStmt stmt) throws DdlException { ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tblName, dbName); } } + if (AcidUtils.isTransactionalTable(client.getTable(dbName, tblName))) { + throw new DdlException("Not support drop hive transactional table."); + } + try { client.dropTable(dbName, tblName); db.setUnInitialized(true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java index a88b1136369e97..7d62af29611447 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java @@ -21,9 +21,9 @@ import org.apache.doris.common.UserException; import com.google.common.collect.Lists; -import org.apache.hadoop.hive.common.ValidWriteIdList; import java.util.List; +import java.util.Map; /** * HiveTransaction is used to save info of a hive transaction. @@ -40,7 +40,7 @@ public class HiveTransaction { private long txnId; private List partitionNames = Lists.newArrayList(); - ValidWriteIdList validWriteIdList = null; + Map txnValidIds = null; public HiveTransaction(String queryId, String user, HMSExternalTable hiveTable, boolean isFullAcid) { this.queryId = queryId; @@ -61,14 +61,14 @@ public boolean isFullAcid() { return isFullAcid; } - public ValidWriteIdList getValidWriteIds(HMSCachedClient client) { - if (validWriteIdList == null) { + public Map getValidWriteIds(HMSCachedClient client) { + if (txnValidIds == null) { TableName tableName = new TableName(hiveTable.getCatalog().getName(), hiveTable.getDbName(), hiveTable.getName()); client.acquireSharedLock(queryId, txnId, user, tableName, partitionNames, 5000); - validWriteIdList = client.getValidWriteIds(tableName.getDb() + "." + tableName.getTbl(), txnId); + txnValidIds = client.getValidWriteIds(tableName.getDb() + "." + tableName.getTbl(), txnId); } - return validWriteIdList; + return txnValidIds; } public void begin() throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java index 0cdb3e469c2951..672afa96208d65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java @@ -32,7 +32,6 @@ import com.google.common.collect.ImmutableList.Builder; import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.NotImplementedException; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; @@ -523,7 +522,7 @@ public void commitTxn(long txnId) { } @Override - public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) { + public Map getValidWriteIds(String fullTableName, long currentTransactionId) { throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java index 56b69dc71e2b03..9137c977f179c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java @@ -31,6 +31,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; @@ -564,7 +565,8 @@ public void acquireSharedLock(String queryId, long txnId, String user, TableName } @Override - public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) { + public Map getValidWriteIds(String fullTableName, long currentTransactionId) { + Map conf = new HashMap<>(); try (ThriftHMSClient client = getClient()) { try { return ugiDoAs(() -> { @@ -581,7 +583,10 @@ public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTrans ValidTxnWriteIdList validTxnWriteIdList = TxnUtils.createValidTxnWriteIdList(currentTransactionId, tableValidWriteIdsList); ValidWriteIdList writeIdList = validTxnWriteIdList.getTableValidWriteIdList(fullTableName); - return writeIdList; + + conf.put(AcidUtil.VALID_TXNS_KEY, validTransactions.writeToString()); + conf.put(AcidUtil.VALID_WRITEIDS_KEY, writeIdList.writeToString()); + return conf; }); } catch (Exception e) { client.setThrowable(e); @@ -591,7 +596,14 @@ public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTrans // Ignore this exception when the version of hive is not compatible with these apis. // Currently, the workaround is using a max watermark. LOG.warn("failed to get valid write ids for {}, transaction {}", fullTableName, currentTransactionId, e); - return new ValidReaderWriteIdList(fullTableName, new long[0], new BitSet(), Long.MAX_VALUE); + + ValidTxnList validTransactions = new ValidReadTxnList( + new long[0], new BitSet(), Long.MAX_VALUE, Long.MAX_VALUE); + ValidWriteIdList writeIdList = new ValidReaderWriteIdList( + fullTableName, new long[0], new BitSet(), Long.MAX_VALUE); + conf.put(AcidUtil.VALID_TXNS_KEY, validTransactions.writeToString()); + conf.put(AcidUtil.VALID_WRITEIDS_KEY, writeIdList.writeToString()); + return conf; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 3a8ab722fb68bf..dbbf2154251f14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -58,7 +58,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.Setter; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.logging.log4j.LogManager; @@ -267,7 +266,14 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List allFiles, String bindBrokerName, int numBackends) throws IOException, UserException { List fileCaches; if (hiveTransaction != null) { - fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName); + try { + fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName); + } catch (Exception e) { + // Release shared load (getValidWriteIds acquire Lock). + // If no exception is throw, the lock will be released when `finalizeQuery()`. + Env.getCurrentHiveTransactionMgr().deregister(hiveTransaction.getQueryId()); + throw e; + } } else { boolean withCache = Config.max_external_file_cache_num > 0; fileCaches = cache.getFilesByPartitions(partitions, withCache, partitions.size() > 1, bindBrokerName); @@ -367,10 +373,10 @@ private List getFileSplitByTransaction(HiveMetaStoreCache cache, } hiveTransaction.addPartition(partition.getPartitionName(hmsTable.getPartitionColumns())); } - ValidWriteIdList validWriteIds = hiveTransaction.getValidWriteIds( + Map txnValidIds = hiveTransaction.getValidWriteIds( ((HMSExternalCatalog) hmsTable.getCatalog()).getClient()); - return cache.getFilesByTransaction(partitions, validWriteIds, - hiveTransaction.isFullAcid(), skipCheckingAcidVersionFile, hmsTable.getId(), bindBrokerName); + + return cache.getFilesByTransaction(partitions, txnValidIds, hiveTransaction.isFullAcid(), bindBrokerName); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 96d5d56a7e10ff..1293e0d0adbec6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; import org.apache.doris.common.profile.ProfileManager.ProfileType; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.datasource.hive.HMSExternalTable; @@ -312,6 +313,10 @@ private ExecutorFactory selectInsertExecutorFactory( } else if (physicalSink instanceof PhysicalHiveTableSink) { boolean emptyInsert = childIsEmptyRelation(physicalSink); HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf; + if (hiveExternalTable.isHiveTransactionalTable()) { + throw new UserException("Not supported insert into hive transactional table."); + } + return ExecutorFactory.from( planner, dataSink, diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java index 12e66398210b22..5d4474d77ec21e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java @@ -28,7 +28,6 @@ import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; import com.google.common.collect.ImmutableList; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; @@ -202,7 +201,7 @@ public void commitTxn(long txnId) { } @Override - public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) { + public Map getValidWriteIds(String fullTableName, long currentTransactionId) { return null; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java new file mode 100644 index 00000000000000..a54084e9b45b29 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java @@ -0,0 +1,595 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.common.info.SimpleTableInfo; +import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue; +import org.apache.doris.fs.LocalDfsFileSystem; + +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + + +public class HiveAcidTest { + + @Test + public void testOriginalDeltas() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/000000_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/000001_1"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/000002_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/random"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/_done"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/subdir/000000_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_025_025"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_029_029"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_025_030"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_050_100"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_101_101"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + HivePartition partition = new HivePartition(new SimpleTableInfo("", "tbl"), + false, "", "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), new HashMap<>()); + try { + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); + } catch (UnsupportedOperationException e) { + Assert.assertTrue(e.getMessage().contains("For no acid table convert to acid, please COMPACT 'major'.")); + } + } + + + @Test + public void testObsoleteOriginals() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_10/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_5/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/000000_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/000001_1"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:150:" + Long.MAX_VALUE + ":").writeToString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + new HashMap<>()); + try { + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); + } catch (UnsupportedOperationException e) { + Assert.assertTrue(e.getMessage().contains("For no acid table convert to acid, please COMPACT 'major'.")); + } + } + + + @Test + public void testOverlapingDelta() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0000063_63/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_000062_62/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_00061_61/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0060_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_052_55/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_50/bucket_0"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + new HashMap<>()); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/base_50/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_00061_61/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_000062_62/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_0000063_63/bucket_0" + ); + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + } + + + @Test + public void testOverlapingDelta2() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0000063_63_0/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_000062_62_0/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_000062_62_3/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_00061_61_0/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0060_60_1/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0060_60_4/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0060_60_7/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_052_55/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_058_58/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_50/bucket_0"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + new HashMap<>()); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); + + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/base_50/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_00061_61_0/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_000062_62_0/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_000062_62_3/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_0000063_63_0/bucket_0" + + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + } + + + @Test + public void deltasWithOpenTxnInRead() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0"); + + Map txnValidIds = new HashMap<>(); + + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:4:4").writeToString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + new HashMap<>()); + + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); + + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0" + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + + } + + + @Test + public void deltasWithOpenTxnInRead2() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_4_4_1/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_4_4_3/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_101_101_1/bucket_0"); + + Map txnValidIds = new HashMap<>(); + + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:4:4").writeToString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + new HashMap<>()); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); + + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0" + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + } + + @Test + public void testBaseWithDeleteDeltas() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_5/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_10/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_49/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_025_025/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_029_029/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_029_029/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_025_030/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_025_030/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_050_105/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_050_105/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_110_110/bucket_0"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + // Map tableProps = new HashMap<>(); + // tableProps.put(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, + // AcidUtils.AcidOperationalProperties.getDefault().toString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + new HashMap<>()); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); + + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/base_49/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_050_105/bucket_0" + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + + List resultDelta = Arrays.asList( + "file://" + tempPath.toAbsolutePath() + "/delete_delta_050_105/bucket_0" + ); + + List deltaFiles = new ArrayList<>(); + fileCacheValue.getAcidInfo().getDeleteDeltas().forEach( + deltaInfo -> { + String loc = deltaInfo.getDirectoryLocation(); + deltaInfo.getFileNames().forEach( + fileName -> deltaFiles.add(loc + "/" + fileName) + ); + } + ); + + Assert.assertTrue(resultDelta.containsAll(deltaFiles) && deltaFiles.containsAll(resultDelta)); + } + + + @Test + public void testOverlapingDeltaAndDeleteDelta() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0000063_63/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_000062_62/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_00061_61/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_00064_64/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_40_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0060_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_052_55/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_052_55/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_50/bucket_0"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + + Map tableProps = new HashMap<>(); + // tableProps.put(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, + // AcidUtils.AcidOperationalProperties.getDefault().toString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + tableProps); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); + + + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/base_50/bucket_0", + + "file:" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_00061_61/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_000062_62/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_0000063_63/bucket_0" + + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + + List resultDelta = Arrays.asList( + + "file://" + tempPath.toAbsolutePath() + "/delete_delta_40_60/bucket_0", + "file://" + tempPath.toAbsolutePath() + "/delete_delta_00064_64/bucket_0" + ); + + List deltaFiles = new ArrayList<>(); + fileCacheValue.getAcidInfo().getDeleteDeltas().forEach( + deltaInfo -> { + String loc = deltaInfo.getDirectoryLocation(); + deltaInfo.getFileNames().forEach( + fileName -> deltaFiles.add(loc + "/" + fileName) + ); + } + ); + + Assert.assertTrue(resultDelta.containsAll(deltaFiles) && deltaFiles.containsAll(resultDelta)); + } + + @Test + public void testMinorCompactedDeltaMakesInBetweenDelteDeltaObsolete() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_50_50/bucket_0"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + Map tableProps = new HashMap<>(); + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + tableProps); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0" + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + } + + @Test + public void deleteDeltasWithOpenTxnInRead() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_2_5/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_3_3/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_4_4_1/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_4_4_3/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_101_101_1/bucket_0"); + + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:4:4").writeToString()); + + + Map tableProps = new HashMap<>(); + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + tableProps); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); + + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0" + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + List resultDelta = Arrays.asList( + "file://" + tempPath.toAbsolutePath() + "/delete_delta_2_5/bucket_0" + ); + + List deltaFiles = new ArrayList<>(); + fileCacheValue.getAcidInfo().getDeleteDeltas().forEach( + deltaInfo -> { + String loc = deltaInfo.getDirectoryLocation(); + deltaInfo.getFileNames().forEach( + fileName -> deltaFiles.add(loc + "/" + fileName) + ); + } + ); + + Assert.assertTrue(resultDelta.containsAll(deltaFiles) && deltaFiles.containsAll(resultDelta)); + } + + @Test + public void testBaseDeltas() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_5/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_10/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_49/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_025_025/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_029_029/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_025_030/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_050_105/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_90_120/bucket_0"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + Map tableProps = new HashMap<>(); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + tableProps); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/base_49/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_050_105/bucket_0" + ); + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + } +} diff --git a/regression-test/data/external_table_p0/hive/test_transactional_hive.out b/regression-test/data/external_table_p0/hive/test_transactional_hive.out index 2ad0446ccb7e25..060fa8c048e5a0 100644 --- a/regression-test/data/external_table_p0/hive/test_transactional_hive.out +++ b/regression-test/data/external_table_p0/hive/test_transactional_hive.out @@ -76,3 +76,49 @@ F -- !q05 -- 0 + +-- !2 -- +1 A 101 +2 BB 102 +3 CC 101 +4 D 102 + +-- !3 -- +1 A 101 +3 CC 101 + +-- !4 -- +2 BB 102 +4 D 102 + +-- !5 -- +1 A 101 +2 BB 102 + +-- !6 -- +4 D 102 + +-- !7 -- +1 A +2 BB +4 DD + +-- !10 -- +1 A +2 BB + +-- !11 -- +4 DD + +-- !12 -- +1 A +2 BB +4 DD + +-- !15 -- +1 A +2 BB + +-- !16 -- +4 DD + diff --git a/regression-test/data/external_table_p2/hive/test_hive_translation_insert_only.out b/regression-test/data/external_table_p2/hive/test_hive_translation_insert_only.out new file mode 100644 index 00000000000000..e4bdb3fe32d44b --- /dev/null +++ b/regression-test/data/external_table_p2/hive/test_hive_translation_insert_only.out @@ -0,0 +1,21 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !1 -- +1 A +2 B +3 C +4 D + +-- !2 -- +1 A +2 B +3 C +4 D +5 E + +-- !3 -- +1 A +2 B +3 C +4 D +5 E + diff --git a/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy b/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy index dbe20395ec95ec..4f7008ec1726fa 100644 --- a/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy +++ b/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy @@ -52,6 +52,70 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock select count(*) from orc_full_acid_par_empty; """ } + + def test_acid = { + + sql """set enable_fallback_to_original_planner=false;""" + try { + sql """ select * from orc_to_acid_tb """ + }catch( Exception e) { + assertTrue(e.getMessage().contains("For no acid table convert to acid, please COMPACT")); + } + + qt_2 """ select * from orc_to_acid_compacted_tb order by id """ + qt_3 """ select * from orc_to_acid_compacted_tb where part_col=101 order by id """ + qt_4 """ select * from orc_to_acid_compacted_tb where part_col=102 order by id """ + qt_5 """ select * from orc_to_acid_compacted_tb where id < 3 order by id """ + qt_6 """ select * from orc_to_acid_compacted_tb where id > 3 order by id """ + + + qt_7 """ select * from orc_acid_minor order by id """ + qt_10 """ select * from orc_acid_minor where id < 3 order by id """ + qt_11 """ select * from orc_acid_minor where id > 3 order by id """ + + + qt_12 """ select * from orc_acid_major order by id """ + qt_15 """ select * from orc_acid_major where id < 3 order by id """ + qt_16 """ select * from orc_acid_major where id > 3 order by id """ + } + + def test_acid_write = { + sql """set enable_fallback_to_original_planner=false;""" + + + + try { + sql """ + CREATE TABLE acid_tb ( + `col1` BOOLEAN COMMENT 'col1', + `col2` INT COMMENT 'col2' + ) ENGINE=hive + PROPERTIES ( + 'file_format'='orc', + 'compression'='zlib', + 'bucketing_version'='2', + 'transactional'='true', + 'transactional_properties'='default' + ); + """ + }catch( Exception e) { + assertTrue(e.getMessage().contains("Not support create hive transactional table.")); + } + try { + sql """ insert into orc_acid_major(id,value) values(1,"a1"); """ + }catch (Exception e) { + assertTrue(e.getMessage().contains("Not supported insert into hive transactional table.")); + } + + try { + sql """ drop table orc_acid_major; """ + } catch (Exception e) { + assertTrue(e.getMessage().contains("Not support drop hive transactional table.")); + + } + } + + String enabled = context.config.otherConfigs.get("enableHiveTest") if (enabled == null || !enabled.equalsIgnoreCase("true")) { logger.info("diable Hive test.") @@ -60,6 +124,7 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock for (String hivePrefix : ["hive3"]) { try { + String hdfs_port = context.config.otherConfigs.get(hivePrefix + "HdfsPort") String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") String catalog_name = "test_transactional_${hivePrefix}" String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") @@ -68,6 +133,7 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock sql """create catalog if not exists ${catalog_name} properties ( "type"="hms", 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}' + ,'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}' );""" sql """use `${catalog_name}`.`default`""" @@ -79,6 +145,9 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock q01() q01_par() + test_acid() + test_acid_write() + sql """drop catalog if exists ${catalog_name}""" } finally { } diff --git a/regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy b/regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy new file mode 100644 index 00000000000000..62a9727ed86e06 --- /dev/null +++ b/regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_translation_insert_only", "p2,external,hive,external_remote,external_remote_hive") { + + String enabled = context.config.otherConfigs.get("enableExternalHudiTest") + //hudi hive use same catalog in p2. + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable test") + } + + String props = context.config.otherConfigs.get("hudiEmrCatalog") + String hms_catalog_name = "test_hive_translation_insert_only" + + sql """drop catalog if exists ${hms_catalog_name};""" + sql """ + CREATE CATALOG IF NOT EXISTS ${hms_catalog_name} + PROPERTIES ( + ${props} + ,'hive.version' = '3.1.3' + ); + """ + + logger.info("catalog " + hms_catalog_name + " created") + sql """switch ${hms_catalog_name};""" + logger.info("switched to catalog " + hms_catalog_name) + sql """ use regression;""" + + qt_1 """ select * from text_insert_only order by id """ + qt_2 """ select * from parquet_insert_only_major order by id """ + qt_3 """ select * from orc_insert_only_minor order by id """ + + sql """drop catalog ${hms_catalog_name};""" +} + + +/* +SET hive.support.concurrency=true; +SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + + +create table text_insert_only (id INT, value STRING) +ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' +TBLPROPERTIES ('transactional' = 'true', +'transactional_properties'='insert_only'); +insert into text_insert_only values (1, 'A'); +insert into text_insert_only values (2, 'B'); +insert into text_insert_only values (3, 'C'); +Load data xxx (4,D) + + +create table parquet_insert_only_major (id INT, value STRING) +CLUSTERED BY (id) INTO 3 BUCKETS +STORED AS parquet +TBLPROPERTIES ('transactional' = 'true', +'transactional_properties'='insert_only'); +insert into parquet_insert_only_major values (1, 'A'); +insert into parquet_insert_only_major values (2, 'B'); +insert into parquet_insert_only_major values (3, 'C'); +ALTER TABLE parquet_insert_only_major COMPACT 'major'; +insert into parquet_insert_only_major values (4, 'D'); +insert into parquet_insert_only_major values (5, 'E'); + + +create table orc_insert_only_minor (id INT, value STRING) +CLUSTERED BY (id) INTO 3 BUCKETS +stored as orc +TBLPROPERTIES ('transactional' = 'true', +'transactional_properties'='insert_only'); +insert into orc_insert_only_minor values (1, 'A'); +insert into orc_insert_only_minor values (2, 'B'); +insert into orc_insert_only_minor values (3, 'C'); +ALTER TABLE orc_insert_only_minor COMPACT 'minor'; +insert into orc_insert_only_minor values (4, 'D'); +insert into orc_insert_only_minor values (5, 'E'); + +*/