From 869da2e5ba029e560b9881b861a39c07027e357e Mon Sep 17 00:00:00 2001 From: daidai Date: Fri, 15 Nov 2024 11:10:54 +0800 Subject: [PATCH 01/10] [feature](hive)support hive4 acid --- .../table/transactional_hive_reader.cpp | 31 +- .../datasource/hive/HMSCachedClient.java | 3 + .../datasource/hive/HiveMetaStoreCache.java | 311 ++++++++++++++---- .../datasource/hive/HiveTransaction.java | 5 + .../hive/PostgreSQLJdbcHMSCachedClient.java | 6 + .../hive/ThriftHMSCachedClient.java | 15 + .../datasource/hive/source/HiveScanNode.java | 8 +- .../doris/datasource/TestHMSCachedClient.java | 6 + 8 files changed, 320 insertions(+), 65 deletions(-) diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp index bc4262b7451d53..96ef015993ac5c 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp +++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp @@ -21,6 +21,7 @@ #include "transactional_hive_common.h" #include "vec/data_types/data_type_factory.hpp" #include "vec/exec/format/orc/vorc_reader.h" +#include namespace doris { @@ -108,15 +109,39 @@ Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range, int64_t num_delete_files = 0; std::filesystem::path file_path(data_file_path); + + //See https://github.com/apache/hive/commit/ffee30e6267e85f00a22767262192abb9681cfb7#diff-5fe26c36b4e029dcd344fc5d484e7347R165 + // bucket_xxx_attemptId => bucket_xxx + // bucket_xxx => bucket_xxx + auto remove_bucket_attemptId = [](const std::string& str) { + re2::RE2 pattern("^bucket_\\d+_\\d+$"); + + if (re2::RE2::FullMatch(str, pattern)) { + size_t pos = str.rfind('_'); + if (pos != std::string::npos) { + return str.substr(0, pos); + } + } + return str; + }; + + SCOPED_TIMER(_transactional_orc_profile.delete_files_read_time); for (auto& delete_delta : range.table_format_params.transactional_hive_params.delete_deltas) { const std::string file_name = file_path.filename().string(); - auto iter = std::find(delete_delta.file_names.begin(), delete_delta.file_names.end(), - file_name); + + //need opt. + std::vector delete_delta_file_names; + for (const auto& x : delete_delta.file_names){ + delete_delta_file_names.emplace_back(remove_bucket_attemptId(x)); + } + auto iter = std::find(delete_delta_file_names.begin(), delete_delta_file_names.end(), + remove_bucket_attemptId(file_name)); if (iter == delete_delta.file_names.end()) { continue; } - auto delete_file = fmt::format("{}/{}", delete_delta.directory_location, file_name); + auto delete_file = fmt::format("{}/{}", delete_delta.directory_location, + delete_delta.file_names[iter-delete_delta_file_names.begin()]); TFileRangeDesc delete_range; // must use __set() method to make sure __isset is true diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java index a5e0eefb3483aa..b104fbc2cdb438 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java @@ -23,6 +23,7 @@ import org.apache.doris.datasource.TableMetadata; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -84,6 +85,8 @@ NotificationEventResponse getNextNotification(long lastEventId, ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId); + ValidTxnList getValidTxns(); + void acquireSharedLock(String queryId, long txnId, String user, TableName tblName, List partitionNames, long timeoutMs); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index ea42dfa2f52a01..709ead42452d7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -31,7 +31,6 @@ import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; -import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.util.CacheBulkLoader; import org.apache.doris.common.util.LocationPath; import org.apache.doris.common.util.Util; @@ -55,7 +54,6 @@ import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.Iterables; @@ -66,26 +64,30 @@ import com.google.common.collect.Streams; import com.google.common.collect.TreeRangeMap; import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; import org.apache.commons.lang3.math.NumberUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.utils.FileUtils; -import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.net.URI; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -742,73 +744,263 @@ public LoadingCache getPartitionCache() { return partitionCache; } + + @Getter + @ToString + @EqualsAndHashCode + public static class ParsedBase { + private final long writeId; + private final long visibilityId; + + public ParsedBase(long writeId, long visibilityId) { + this.writeId = writeId; + this.visibilityId = visibilityId; + } + } + + static ParsedBase parseBase(String name) { + name = name.substring("base_".length()); + int index = name.indexOf("_v"); + if (index == -1) { + return new ParsedBase(Long.parseLong(name), 0); + } + return new ParsedBase( + Long.parseLong(name.substring(0, index)), + Long.parseLong(name.substring(index + 2))); + } + + @Getter + @ToString + @EqualsAndHashCode + public static class ParsedDelta implements Comparable { + private final long min; + private final long max; + private final String path; + private final int statementId; + private final boolean deleteDelta; + private final long visibilityId; + + public ParsedDelta(long min, long max, @NonNull String path, int statementId, + boolean deleteDelta, long visibilityId) { + this.min = min; + this.max = max; + this.path = path; + this.statementId = statementId; + this.deleteDelta = deleteDelta; + this.visibilityId = visibilityId; + } + + @Override + public int compareTo(ParsedDelta other) { + return Long.compare(min, other.min) != 0 ? Long.compare(min, other.min) : + Long.compare(other.max, max) != 0 ? Long.compare(other.max, max) : + Integer.compare(statementId, other.statementId) != 0 + ? Integer.compare(statementId, other.statementId) : + path.compareTo(other.path); + } + } + + private static boolean isValidBase(ParsedBase base, ValidWriteIdList writeIdList) { + if (base.writeId == Long.MIN_VALUE) { + return true; + } + + // hive 4 : just check "_v" suffix + // before hive 4 : check _metadata_acid in baseDir + if ((base.visibilityId > 0)) { + // || isCompacted(fileSystem, baseDir) need check + return writeIdList.isValidBase(base.writeId); + } + + return writeIdList.isWriteIdValid(base.writeId); + } + + static ParsedDelta parseDelta(String fileName, String deltaPrefix, String path) { + /* + format1: + delta_min_max_statementId_visibilityId + delete_delta_min_max_statementId_visibilityId + + _visibilityId maybe not exists. + detail: https://issues.apache.org/jira/browse/HIVE-20823 + + format2: + delta_min_max_visibilityId + delete_delta_min_visibilityId + + when minor compaction runs, we collapse per statement delta files inside a single + transaction so we no longer need a statementId in the file name + */ + // String fileName = fileName.substring(name.lastIndexOf('/') + 1); + // checkArgument(fileName.startsWith(deltaPrefix), "File does not start with '%s': %s", deltaPrefix, path); + + long visibilityId = 0; + int visibilityIdx = fileName.indexOf("_v"); + if (visibilityIdx != -1) { + visibilityId = Long.parseLong(fileName.substring(visibilityIdx + 2)); + fileName = fileName.substring(0, visibilityIdx); + } + + boolean deleteDelta = deltaPrefix.equals("delete_delta_"); + + String rest = fileName.substring(deltaPrefix.length()); + int split = rest.indexOf('_'); + int split2 = rest.indexOf('_', split + 1); + long min = Long.parseLong(rest.substring(0, split)); + + if (split2 == -1) { + long max = Long.parseLong(rest.substring(split + 1)); + return new ParsedDelta(min, max, fileName, -1, deleteDelta, visibilityId); + } + + long max = Long.parseLong(rest.substring(split + 1, split2)); + int statementId = Integer.parseInt(rest.substring(split2 + 1)); + return new ParsedDelta(min, max, path, statementId, deleteDelta, visibilityId); + } + public List getFilesByTransaction(List partitions, ValidWriteIdList validWriteIds, + ValidTxnList validTxnList, boolean isFullAcid, boolean skipCheckingAcidVersionFile, long tableId, String bindBrokerName) { List fileCacheValues = Lists.newArrayList(); - String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME); + // String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME); + try { + for (HivePartition partition : partitions) { - FileCacheValue fileCacheValue = new FileCacheValue(); - AcidUtils.Directory directory; - if (!Strings.isNullOrEmpty(remoteUser)) { - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser); - directory = ugi.doAs((PrivilegedExceptionAction) () -> AcidUtils.getAcidState( - new Path(partition.getPath()), jobConf, validWriteIds, false, true)); - } else { - directory = AcidUtils.getAcidState(new Path(partition.getPath()), jobConf, validWriteIds, false, - true); + RemoteFileSystem fsPar = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( + new FileSystemCache.FileSystemCacheKey( + LocationPath.getFSIdentity(partition.getPath(), bindBrokerName), + catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); + + Set acidResult = new HashSet<>(); + fsPar.listDirectories(partition.getPath(), acidResult); + + + List originalFiles = new ArrayList<>(); + List workingDeltas = new ArrayList<>(); + String oldestBase = null; + long oldestBaseWriteId = Long.MAX_VALUE; + String bestBasePath = null; + long bestBaseWriteId = 0; + + for (String fileDirectory : acidResult) { + // checkArgument(fileDirectory.startsWith(partition.getPath()), + // "file '%s' does not start with directory '%s'", + // fileDirectory, partition.getPath()); + String suffix = fileDirectory.substring(partition.getPath().length() + 1); + int slash = suffix.indexOf('/'); + String name = (slash == -1) ? "" : suffix.substring(0, slash); + + + if (name.startsWith("base_")) { + ParsedBase base = parseBase(name); + if (!validTxnList.isTxnValid(base.visibilityId)) { + //checks visibilityTxnId to see if it is committed in current snapshot + continue; + } + + long writeId = base.writeId; + if (oldestBaseWriteId > writeId) { + oldestBase = fileDirectory; + oldestBaseWriteId = writeId; + } + + if (((bestBasePath == null) || (bestBaseWriteId < writeId)) + && isValidBase(base, validWriteIds)) { + + bestBasePath = fileDirectory; + bestBaseWriteId = writeId; + } + } else if (name.startsWith("delta_") || name.startsWith("delete_delta_")) { + String deltaPrefix = name.startsWith("delta_") ? "delta_" : "delete_delta_"; + ParsedDelta delta = parseDelta(name, deltaPrefix, fileDirectory); + if (validWriteIds.isWriteIdRangeValid(delta.min, delta.max) != RangeResponse.NONE) { + workingDeltas.add(delta); + } + } else { + originalFiles.add(fileDirectory); + } } - if (directory == null) { - return Collections.emptyList(); + + if (bestBasePath == null && !originalFiles.isEmpty()) { + throw new Exception("For no acid table convert to acid, please COMPACT 'major'."); } - if (!directory.getOriginalFiles().isEmpty()) { - throw new Exception("Original non-ACID files in transactional tables are not supported"); + originalFiles.clear(); + + if ((oldestBase != null) && (bestBasePath == null)) { + /* + * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given + * {@link writeIdList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus + * cannot have any data for an open txn. We could check {@link deltas} has files to cover + * [1,n] w/o gaps but this would almost never happen... + * + * We only throw for base_x produced by Compactor since that base erases all history and + * cannot be used for a client that has a snapshot in which something inside this base is + * open. (Nor can we ignore this base of course) But base_x which is a result of IOW, + * contains all history so we treat it just like delta wrt visibility. Imagine, IOW which + * aborts. It creates a base_x, which can and should just be ignored.*/ + + long[] exceptions = validWriteIds.getInvalidWriteIds(); + String minOpenWriteId = ((exceptions != null) + && (exceptions.length > 0)) ? String.valueOf(exceptions[0]) : "x"; + throw new IOException( + String.format("Not enough history available for ({},{}). Oldest available base: {}", + validWriteIds.getHighWatermark(), minOpenWriteId, oldestBase)); } - if (isFullAcid) { - int acidVersion = 2; - /** - * From Hive version >= 3.0, delta/base files will always have file '_orc_acid_version' - * with value >= '2'. - */ - Path baseOrDeltaPath = directory.getBaseDirectory() != null ? directory.getBaseDirectory() : - !directory.getCurrentDirectories().isEmpty() ? directory.getCurrentDirectories().get(0) - .getPath() : null; - if (baseOrDeltaPath == null) { - return Collections.emptyList(); - } - if (!skipCheckingAcidVersionFile) { - String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString(); - RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey( - LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(), - bindBrokerName), - catalog.getCatalogProperty().getProperties(), - bindBrokerName, jobConf)); - Status status = fs.exists(acidVersionPath); - if (status != Status.OK) { - if (status.getErrCode() == ErrCode.NOT_FOUND) { - acidVersion = 0; - } else { - throw new Exception(String.format("Failed to check remote path {} exists.", - acidVersionPath)); - } - } - if (acidVersion == 0 && !directory.getCurrentDirectories().isEmpty()) { - throw new Exception( - "Hive 2.x versioned full-acid tables need to run major compaction."); + workingDeltas.sort(null); + + List deltas = new ArrayList<>(); + long current = bestBaseWriteId; + int lastStatementId = -1; + ParsedDelta prev = null; + // find need read delta/delete_delta file. + for (ParsedDelta next : workingDeltas) { + if (next.max > current) { + if (validWriteIds.isWriteIdRangeValid(current + 1, next.max) != RangeResponse.NONE) { + deltas.add(next); + current = next.max; + lastStatementId = next.statementId; + prev = next; } + } else if ((next.max == current) && (lastStatementId >= 0)) { + //make sure to get all deltas within a single transaction; multi-statement txn + //generate multiple delta files with the same txnId range + //of course, if maxWriteId has already been minor compacted, + // all per statement deltas are obsolete + + deltas.add(next); + prev = next; + } else if ((prev != null) + && (next.max == prev.max) + && (next.min == prev.min) + && (next.statementId == prev.statementId)) { + // The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta, except + // the path. This may happen when we have split update and we have two types of delta + // directories- 'delta_x_y' and 'delete_delta_x_y' for the SAME txn range. + + // Also note that any delete_deltas in between a given delta_x_y range would be made + // obsolete. For example, a delta_30_50 would make delete_delta_40_40 obsolete. + // This is valid because minor compaction always compacts the normal deltas and the delete + // deltas for the same range. That is, if we had 3 directories, delta_30_30, + // delete_delta_40_40 and delta_50_50, then running minor compaction would produce + // delta_30_50 and delete_delta_30_50. + deltas.add(next); + prev = next; } } + + FileCacheValue fileCacheValue = new FileCacheValue(); + // delta directories List deleteDeltas = new ArrayList<>(); - for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) { - String location = delta.getPath().toString(); + for (ParsedDelta delta : deltas) { + String location = delta.getPath(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( LocationPath.getFSIdentity(location, bindBrokerName), - catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); + catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); List remoteFiles = new ArrayList<>(); Status status = fs.listFiles(location, false, remoteFiles); if (status.ok()) { @@ -831,14 +1023,13 @@ public List getFilesByTransaction(List partitions } // base - if (directory.getBaseDirectory() != null) { - String location = directory.getBaseDirectory().toString(); + if (bestBasePath != null) { RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( - LocationPath.getFSIdentity(location, bindBrokerName), - catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); + LocationPath.getFSIdentity(bestBasePath, bindBrokerName), + catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); List remoteFiles = new ArrayList<>(); - Status status = fs.listFiles(location, false, remoteFiles); + Status status = fs.listFiles(bestBasePath, false, remoteFiles); if (status.ok()) { remoteFiles.stream().filter( f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java index a88b1136369e97..8214ea13a92351 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java @@ -21,6 +21,7 @@ import org.apache.doris.common.UserException; import com.google.common.collect.Lists; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import java.util.List; @@ -61,6 +62,10 @@ public boolean isFullAcid() { return isFullAcid; } + public ValidTxnList getValidTxns(HMSCachedClient client) { + return client.getValidTxns(); + } + public ValidWriteIdList getValidWriteIds(HMSCachedClient client) { if (validWriteIdList == null) { TableName tableName = new TableName(hiveTable.getCatalog().getName(), hiveTable.getDbName(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java index 0cdb3e469c2951..d9848a6e87d465 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java @@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableList.Builder; import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.NotImplementedException; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -527,6 +528,11 @@ public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTrans throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient."); } + @Override + public ValidTxnList getValidTxns() { + throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient."); + } + @Override public void acquireSharedLock(String queryId, long txnId, String user, TableName tblName, List partitionNames, long timeoutMs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java index 56b69dc71e2b03..97ed2bc628cdb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java @@ -595,6 +595,21 @@ public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTrans } } + @Override + public ValidTxnList getValidTxns() { + try (ThriftHMSClient client = getClient()) { + try { + return ugiDoAs(client.client::getValidTxns); + } catch (Exception e) { + client.setThrowable(e); + throw e; + } + } catch (Exception e) { + throw new HMSClientException("Catalog Get the transactions that " + + "are currently valid fail. Exception = {}", e); + } + } + private LockResponse checkLock(long lockId) { try (ThriftHMSClient client = getClient()) { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 3a8ab722fb68bf..70e00ba7d8c49c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -58,6 +58,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.Setter; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Table; @@ -369,8 +370,11 @@ private List getFileSplitByTransaction(HiveMetaStoreCache cache, } ValidWriteIdList validWriteIds = hiveTransaction.getValidWriteIds( ((HMSExternalCatalog) hmsTable.getCatalog()).getClient()); - return cache.getFilesByTransaction(partitions, validWriteIds, - hiveTransaction.isFullAcid(), skipCheckingAcidVersionFile, hmsTable.getId(), bindBrokerName); + ValidTxnList validTxnList = hiveTransaction.getValidTxns( + ((HMSExternalCatalog) hmsTable.getCatalog()).getClient()); + + return cache.getFilesByTransaction(partitions, validWriteIds, validTxnList, + hiveTransaction.isFullAcid(), skipCheckingAcidVersionFile, hmsTable.getId(), bindBrokerName); } @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java index 12e66398210b22..ea7c293f04b75d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java @@ -28,6 +28,7 @@ import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; import com.google.common.collect.ImmutableList; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -206,6 +207,11 @@ public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTrans return null; } + @Override + public ValidTxnList getValidTxns() { + return null; + } + @Override public void acquireSharedLock(String queryId, long txnId, String user, TableName tblName, List partitionNames, long timeoutMs) { From 4d2115feeb663ac53468c5d545a580f015dfb647 Mon Sep 17 00:00:00 2001 From: daidai Date: Tue, 26 Nov 2024 13:12:10 +0800 Subject: [PATCH 02/10] ban create insert drop translational tb. --- .../datasource/hive/HiveMetadataOps.java | 24 +++++++++++++++++++ .../insert/InsertIntoTableCommand.java | 5 ++++ 2 files changed, 29 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index a660cb148ac069..4a11cf9b01fce9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -46,6 +46,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -179,6 +180,25 @@ public boolean createTable(CreateTableStmt stmt) throws UserException { props.put("owner", ConnectContext.get().getUserIdentity().getUser()); } } + + if (props.containsKey("transactional") && props.get("transactional").equals("true")) { + throw new UserException("Not support create hive transactional table."); + /* + CREATE TABLE trans6( + `col1` int, + `col2` int + ) ENGINE=hive + PROPERTIES ( + 'file_format'='orc', + 'compression'='zlib', + 'bucketing_version'='2', + 'transactional'='true', + 'transactional_properties'='default' + ); + In hive, this table only can insert not update(not report error,but not actually updated). + */ + } + String fileFormat = props.getOrDefault(FILE_FORMAT_KEY, Config.hive_default_file_format); Map ddlProps = new HashMap<>(); for (Map.Entry entry : props.entrySet()) { @@ -273,6 +293,10 @@ public void dropTable(DropTableStmt stmt) throws DdlException { ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tblName, dbName); } } + if (AcidUtils.isTransactionalTable(client.getTable(dbName, tblName))) { + throw new DdlException("Not support drop hive transactional table."); + } + try { client.dropTable(dbName, tblName); db.setUnInitialized(true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 96d5d56a7e10ff..b2dafbac5c179c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; import org.apache.doris.common.profile.ProfileManager.ProfileType; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.datasource.hive.HMSExternalTable; @@ -312,6 +313,10 @@ private ExecutorFactory selectInsertExecutorFactory( } else if (physicalSink instanceof PhysicalHiveTableSink) { boolean emptyInsert = childIsEmptyRelation(physicalSink); HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf; + if (hiveExternalTable.isHiveTransactionalTable()) { + throw new UserException("Not supported insert into hive table"); + } + return ExecutorFactory.from( planner, dataSink, From da3c76609833b8c6477a20691c2d50abbd7b9de3 Mon Sep 17 00:00:00 2001 From: daidai Date: Tue, 3 Dec 2024 18:45:27 +0800 Subject: [PATCH 03/10] refactor code. --- .../doris/datasource/hive/AcidUtil.java | 429 ++++++++++++++++++ .../datasource/hive/HMSCachedClient.java | 6 +- .../datasource/hive/HiveMetaStoreCache.java | 213 ++------- .../datasource/hive/HiveTransaction.java | 17 +- .../hive/PostgreSQLJdbcHMSCachedClient.java | 9 +- .../hive/ThriftHMSCachedClient.java | 31 +- .../datasource/hive/source/HiveScanNode.java | 18 +- .../doris/datasource/TestHMSCachedClient.java | 9 +- .../doris/datasource/hive/HiveAcidTest.java | 2 + 9 files changed, 490 insertions(+), 244 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java new file mode 100644 index 00000000000000..92493a14039940 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java @@ -0,0 +1,429 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.backup.Status; +import org.apache.doris.common.util.LocationPath; +import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; +import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue; +import org.apache.doris.fs.remote.RemoteFile; +import org.apache.doris.fs.remote.RemoteFileSystem; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class AcidUtil { + private static final Logger LOG = LogManager.getLogger(AcidUtil.class); + + public static final String VALID_TXNS_KEY = "hive.txn.valid.txns"; + public static final String VALID_WRITEIDS_KEY = "hive.txn.valid.writeids"; + + private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = "bucket_"; + + // An `_orc_acid_version` file is written to each base/delta/delete_delta dir written by a full acid write + // or compaction. This is the primary mechanism for versioning acid data. + // Each individual ORC file written stores the current version as a user property in ORC footer. All data files + // produced by Acid write should have this (starting with Hive 3.0), including those written by compactor.This is + // more for sanity checking in case someone moved the files around or something like that. + // In hive, methods for getting/reading the version from files were moved to test which is the only place they are + // used (after HIVE-23506), in order to keep devs out of temptation, since they access the FileSystem which + // is expensive. + // After `HIVE-23825: Create a flag to turn off _orc_acid_version file creation`, introduce variables to + // control whether to generate `_orc_acid_version` file. So don't need check this file exist. + private static final String HIVE_ORC_ACID_VERSION_FILE = "_orc_acid_version"; + + @Getter + @ToString + @EqualsAndHashCode + private static class ParsedBase { + private final long writeId; + private final long visibilityId; + + public ParsedBase(long writeId, long visibilityId) { + this.writeId = writeId; + this.visibilityId = visibilityId; + } + } + + private static ParsedBase parseBase(String name) { + //format1 : base_writeId + //format2 : base_writeId_visibilityId detail: https://issues.apache.org/jira/browse/HIVE-20823 + name = name.substring("base_".length()); + int index = name.indexOf("_v"); + if (index == -1) { + return new ParsedBase(Long.parseLong(name), 0); + } + return new ParsedBase( + Long.parseLong(name.substring(0, index)), + Long.parseLong(name.substring(index + 2))); + } + + @Getter + @ToString + @EqualsAndHashCode + private static class ParsedDelta implements Comparable { + private final long min; + private final long max; + private final String path; + private final int statementId; + private final boolean deleteDelta; + private final long visibilityId; + + public ParsedDelta(long min, long max, @NonNull String path, int statementId, + boolean deleteDelta, long visibilityId) { + this.min = min; + this.max = max; + this.path = path; + this.statementId = statementId; + this.deleteDelta = deleteDelta; + this.visibilityId = visibilityId; + } + + /* + * Smaller minWID orders first; + * If minWID is the same, larger maxWID orders first; + * Otherwise, sort by stmtID; files w/o stmtID orders first. + * + * Compactions (Major/Minor) merge deltas/bases but delete of old files + * happens in a different process; thus it's possible to have bases/deltas with + * overlapping writeId boundaries. The sort order helps figure out the "best" set of files + * to use to get data. + * This sorts "wider" delta before "narrower" i.e. delta_5_20 sorts before delta_5_10 (and delta_11_20) + */ + @Override + public int compareTo(ParsedDelta other) { + return min != other.min ? Long.compare(min, other.min) : + other.max != max ? Long.compare(other.max, max) : + statementId != other.statementId + ? Integer.compare(statementId, other.statementId) : + path.compareTo(other.path); + } + } + + + private static boolean isValidMetaDataFile(RemoteFileSystem fileSystem, String baseDir) + throws IOException { + String fileLocation = baseDir + "_metadata_acid"; + Status status = fileSystem.exists(fileLocation); + if (status != Status.OK) { + return false; + } + //In order to save the cost of reading the file content, we only check whether the file exists. + // File Contents: {"thisFileVersion":"0","dataFormat":"compacted"} + // + // Map metadata; + // try (var in = read(fileLocation)) { + // metadata = new ObjectMapper().readValue(in, new TypeReference<>() {}); + // } + // catch (IOException e) { + // throw new IOException(String.format("Failed to read %s: %s", fileLocation, e.getMessage()), e); + // } + // + // String version = metadata.get("thisFileVersion"); + // if (!"0".equals(version)) { + // throw new IOException("Unexpected ACID metadata version: " + version); + // } + // + // String format = metadata.get("dataFormat"); + // if (!"compacted".equals(format)) { + // throw new IOException("Unexpected value for ACID dataFormat: " + format); + // } + return true; + } + + private static boolean isValidBase(RemoteFileSystem remoteFileSystem, String baseDir, + ParsedBase base, ValidWriteIdList writeIdList) throws IOException { + if (base.writeId == Long.MIN_VALUE) { + //Ref: https://issues.apache.org/jira/browse/HIVE-13369 + //such base is created by 1st compaction in case of non-acid to acid table conversion.(you + //will get dir: `base_-9223372036854775808`) + //By definition there are no open txns with id < 1. + //After this: https://issues.apache.org/jira/browse/HIVE-18192, txns(global transaction ID) => writeId. + return true; + } + + // hive 4 : just check "_v" suffix, before hive 4 : check `_metadata_acid` file in baseDir. + if ((base.visibilityId > 0) || isValidMetaDataFile(remoteFileSystem, baseDir)) { + return writeIdList.isValidBase(base.writeId); + } + + // if here, it's a result of IOW + return writeIdList.isWriteIdValid(base.writeId); + } + + private static ParsedDelta parseDelta(String fileName, String deltaPrefix, String path) { + // format1: delta_min_max_statementId_visibilityId, delete_delta_min_max_statementId_visibilityId + // _visibilityId maybe not exists. + // detail: https://issues.apache.org/jira/browse/HIVE-20823 + // format2: delta_min_max_visibilityId, delete_delta_min_visibilityId + // when minor compaction runs, we collapse per statement delta files inside a single + // transaction so we no longer need a statementId in the file name + + // String fileName = fileName.substring(name.lastIndexOf('/') + 1); + // checkArgument(fileName.startsWith(deltaPrefix), "File does not start with '%s': %s", deltaPrefix, path); + + long visibilityId = 0; + int visibilityIdx = fileName.indexOf("_v"); + if (visibilityIdx != -1) { + visibilityId = Long.parseLong(fileName.substring(visibilityIdx + 2)); + fileName = fileName.substring(0, visibilityIdx); + } + + boolean deleteDelta = deltaPrefix.equals("delete_delta_"); + + String rest = fileName.substring(deltaPrefix.length()); + int split = rest.indexOf('_'); + int split2 = rest.indexOf('_', split + 1); + long min = Long.parseLong(rest.substring(0, split)); + + if (split2 == -1) { + long max = Long.parseLong(rest.substring(split + 1)); + return new ParsedDelta(min, max, fileName, -1, deleteDelta, visibilityId); + } + + long max = Long.parseLong(rest.substring(split + 1, split2)); + int statementId = Integer.parseInt(rest.substring(split2 + 1)); + return new ParsedDelta(min, max, path, statementId, deleteDelta, visibilityId); + } + + //Since the hive3 library cannot read the hive4 transaction table normally, and there are many problems + // when using the Hive 4 library directly, this method is implemented. + //Ref: hive/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java#getAcidState + public static FileCacheValue getAcidState(RemoteFileSystem fileSystem, HivePartition partition, + Map txnValidIds, Map catalogProps) throws Exception { + + // Ref: https://issues.apache.org/jira/browse/HIVE-18192 + // Readers should use the combination of ValidTxnList and ValidWriteIdList(Table) for snapshot isolation. + // ValidReadTxnList implements ValidTxnList + // ValidReaderWriteIdList implements ValidWriteIdList + ValidTxnList validTxnList = null; + if (txnValidIds.containsKey(VALID_TXNS_KEY)) { + validTxnList = new ValidReadTxnList(); + validTxnList.readFromString( + txnValidIds.get(VALID_TXNS_KEY) + ); + } else { + throw new RuntimeException("Miss ValidTxnList"); + } + + ValidWriteIdList validWriteIdList = null; + if (txnValidIds.containsKey(VALID_WRITEIDS_KEY)) { + validWriteIdList = new ValidReaderWriteIdList(); + validWriteIdList.readFromString( + txnValidIds.get(VALID_WRITEIDS_KEY) + ); + } else { + throw new RuntimeException("Miss ValidWriteIdList"); + } + + String partitionPath = partition.getPath(); + //hdfs://xxxxx/user/hive/warehouse/username/data_id=200103 + + List lsPartitionPath = new ArrayList<>(); + Status status = fileSystem.globList(partitionPath + "/*", lsPartitionPath); + // List all files and folders, without recursion. + // FileStatus[] lsPartitionPath = null; + // fileSystem.listFileStatuses(partitionPath,lsPartitionPath); + if (status != Status.OK) { + throw new IOException(status.toString()); + } + + String oldestBase = null; + long oldestBaseWriteId = Long.MAX_VALUE; + String bestBasePath = null; + long bestBaseWriteId = 0; + boolean haveOriginalFiles = false; + List workingDeltas = new ArrayList<>(); + + for (RemoteFile remotePath : lsPartitionPath) { + if (remotePath.isDirectory()) { + String dirName = remotePath.getName(); //dirName: base_xxx,delta_xxx,... + String dirPath = partitionPath + "/" + dirName; + + if (dirName.startsWith("base_")) { + ParsedBase base = parseBase(dirName); + if (!validTxnList.isTxnValid(base.visibilityId)) { + //checks visibilityTxnId to see if it is committed in current snapshot. + continue; + } + + long writeId = base.writeId; + if (oldestBaseWriteId > writeId) { + oldestBase = dirPath; + oldestBaseWriteId = writeId; + } + + if (((bestBasePath == null) || (bestBaseWriteId < writeId)) + && isValidBase(fileSystem, dirPath, base, validWriteIdList)) { + //IOW will generator a base_N/ directory: https://issues.apache.org/jira/browse/HIVE-14988 + //So maybe need consider: https://issues.apache.org/jira/browse/HIVE-25777 + + bestBasePath = dirPath; + bestBaseWriteId = writeId; + } + } else if (dirName.startsWith("delta_") || dirName.startsWith("delete_delta_")) { + String deltaPrefix = dirName.startsWith("delta_") ? "delta_" : "delete_delta_"; + ParsedDelta delta = parseDelta(dirName, deltaPrefix, dirPath); + + if (!validTxnList.isTxnValid(delta.visibilityId)) { + continue; + } + + // No need check (validWriteIdList.isWriteIdRangeAborted(min,max) != RangeResponse.ALL) + // It is a subset of (validWriteIdList.isWriteIdRangeValid(min, max) != RangeResponse.NONE) + if (validWriteIdList.isWriteIdRangeValid(delta.min, delta.max) != RangeResponse.NONE) { + workingDeltas.add(delta); + } + } else { + //Sometimes hive will generate temporary directories(`.hive-staging_hive_xxx` ), + // which do not need to be read. + LOG.warn("Read Hive Acid Table ignore the contents of this folder:" + dirName); + } + } else { + haveOriginalFiles = true; + } + } + + if (bestBasePath == null && haveOriginalFiles) { + // ALTER TABLE nonAcidTbl SET TBLPROPERTIES ('transactional'='true'); + throw new UnsupportedOperationException("For no acid table convert to acid, please COMPACT 'major'."); + } + + if ((oldestBase != null) && (bestBasePath == null)) { + /* + * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given + * {@link writeIdList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus + * cannot have any data for an open txn. We could check {@link deltas} has files to cover + * [1,n] w/o gaps but this would almost never happen... + * + * We only throw for base_x produced by Compactor since that base erases all history and + * cannot be used for a client that has a snapshot in which something inside this base is + * open. (Nor can we ignore this base of course) But base_x which is a result of IOW, + * contains all history so we treat it just like delta wrt visibility. Imagine, IOW which + * aborts. It creates a base_x, which can and should just be ignored.*/ + long[] exceptions = validWriteIdList.getInvalidWriteIds(); + String minOpenWriteId = ((exceptions != null) + && (exceptions.length > 0)) ? String.valueOf(exceptions[0]) : "x"; + throw new IOException( + String.format("Not enough history available for ({},{}). Oldest available base: {}", + validWriteIdList.getHighWatermark(), minOpenWriteId, oldestBase)); + } + + workingDeltas.sort(null); + + List deltas = new ArrayList<>(); + long current = bestBaseWriteId; + int lastStatementId = -1; + ParsedDelta prev = null; + // find need read delta/delete_delta file. + for (ParsedDelta next : workingDeltas) { + if (next.max > current) { + if (validWriteIdList.isWriteIdRangeValid(current + 1, next.max) != RangeResponse.NONE) { + deltas.add(next); + current = next.max; + lastStatementId = next.statementId; + prev = next; + } + } else if ((next.max == current) && (lastStatementId >= 0)) { + //make sure to get all deltas within a single transaction; multi-statement txn + //generate multiple delta files with the same txnId range + //of course, if maxWriteId has already been minor compacted, + // all per statement deltas are obsolete + + deltas.add(next); + prev = next; + } else if ((prev != null) + && (next.max == prev.max) + && (next.min == prev.min) + && (next.statementId == prev.statementId)) { + // The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta, except + // the path. This may happen when we have split update and we have two types of delta + // directories- 'delta_x_y' and 'delete_delta_x_y' for the SAME txn range. + + // Also note that any delete_deltas in between a given delta_x_y range would be made + // obsolete. For example, a delta_30_50 would make delete_delta_40_40 obsolete. + // This is valid because minor compaction always compacts the normal deltas and the delete + // deltas for the same range. That is, if we had 3 directories, delta_30_30, + // delete_delta_40_40 and delta_50_50, then running minor compaction would produce + // delta_30_50 and delete_delta_30_50. + deltas.add(next); + prev = next; + } + } + + FileCacheValue fileCacheValue = new FileCacheValue(); + List deleteDeltas = new ArrayList<>(); + + // delta directories + for (ParsedDelta delta : deltas) { + String location = delta.getPath(); + + List remoteFiles = new ArrayList<>(); + status = fileSystem.listFiles(location, false, remoteFiles); + if (status.ok()) { + if (delta.isDeleteDelta()) { + List deleteDeltaFileNames = remoteFiles.stream().map(f -> f.getName()).filter( + name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) + .collect(Collectors.toList()); + deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames)); + continue; + } + remoteFiles.stream().filter( + f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> { + LocationPath path = new LocationPath(file.getPath().toString(), catalogProps); + fileCacheValue.addFile(file, path); + }); + } else { + throw new RuntimeException(status.getErrMsg()); + } + } + + // base + if (bestBasePath != null) { + List remoteFiles = new ArrayList<>(); + status = fileSystem.listFiles(bestBasePath, false, remoteFiles); + if (status.ok()) { + remoteFiles.stream().filter( + f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) + .forEach(file -> { + LocationPath path = new LocationPath(file.getPath().toString(), catalogProps); + fileCacheValue.addFile(file, path); + }); + } else { + throw new RuntimeException(status.getErrMsg()); + } + } + fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas)); + return fileCacheValue; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java index b104fbc2cdb438..fcaefecf41b156 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java @@ -23,8 +23,6 @@ import org.apache.doris.datasource.TableMetadata; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; -import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; @@ -83,9 +81,7 @@ NotificationEventResponse getNextNotification(long lastEventId, void commitTxn(long txnId); - ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId); - - ValidTxnList getValidTxns(); + Map getValidWriteIds(String fullTableName, long currentTransactionId); void acquireSharedLock(String queryId, long txnId, String user, TableName tblName, List partitionNames, long timeoutMs); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 709ead42452d7e..455f0b5cf78706 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -31,12 +31,12 @@ import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; +import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.util.CacheBulkLoader; import org.apache.doris.common.util.LocationPath; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.ExternalMetaCacheMgr; -import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.fs.FileSystemCache; import org.apache.doris.fs.remote.RemoteFile; @@ -54,6 +54,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.Iterables; @@ -72,22 +73,20 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; -import org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.utils.FileUtils; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.IOException; import java.net.URI; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -858,196 +857,36 @@ static ParsedDelta parseDelta(String fileName, String deltaPrefix, String path) return new ParsedDelta(min, max, path, statementId, deleteDelta, visibilityId); } - public List getFilesByTransaction(List partitions, ValidWriteIdList validWriteIds, - ValidTxnList validTxnList, - boolean isFullAcid, boolean skipCheckingAcidVersionFile, long tableId, String bindBrokerName) { + public List getFilesByTransaction(List partitions, Map txnValidIds, + boolean isFullAcid, String bindBrokerName) { List fileCacheValues = Lists.newArrayList(); - // String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME); + String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME); try { + if (partitions.isEmpty()) { + return fileCacheValues; + } - for (HivePartition partition : partitions) { - RemoteFileSystem fsPar = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey( - LocationPath.getFSIdentity(partition.getPath(), bindBrokerName), - catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); - - Set acidResult = new HashSet<>(); - fsPar.listDirectories(partition.getPath(), acidResult); - - - List originalFiles = new ArrayList<>(); - List workingDeltas = new ArrayList<>(); - String oldestBase = null; - long oldestBaseWriteId = Long.MAX_VALUE; - String bestBasePath = null; - long bestBaseWriteId = 0; - - for (String fileDirectory : acidResult) { - // checkArgument(fileDirectory.startsWith(partition.getPath()), - // "file '%s' does not start with directory '%s'", - // fileDirectory, partition.getPath()); - String suffix = fileDirectory.substring(partition.getPath().length() + 1); - int slash = suffix.indexOf('/'); - String name = (slash == -1) ? "" : suffix.substring(0, slash); - - - if (name.startsWith("base_")) { - ParsedBase base = parseBase(name); - if (!validTxnList.isTxnValid(base.visibilityId)) { - //checks visibilityTxnId to see if it is committed in current snapshot - continue; - } - - long writeId = base.writeId; - if (oldestBaseWriteId > writeId) { - oldestBase = fileDirectory; - oldestBaseWriteId = writeId; - } - - if (((bestBasePath == null) || (bestBaseWriteId < writeId)) - && isValidBase(base, validWriteIds)) { - - bestBasePath = fileDirectory; - bestBaseWriteId = writeId; - } - } else if (name.startsWith("delta_") || name.startsWith("delete_delta_")) { - String deltaPrefix = name.startsWith("delta_") ? "delta_" : "delete_delta_"; - ParsedDelta delta = parseDelta(name, deltaPrefix, fileDirectory); - if (validWriteIds.isWriteIdRangeValid(delta.min, delta.max) != RangeResponse.NONE) { - workingDeltas.add(delta); - } - } else { - originalFiles.add(fileDirectory); - } - } - - if (bestBasePath == null && !originalFiles.isEmpty()) { - throw new Exception("For no acid table convert to acid, please COMPACT 'major'."); - } - originalFiles.clear(); - - if ((oldestBase != null) && (bestBasePath == null)) { - /* - * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given - * {@link writeIdList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus - * cannot have any data for an open txn. We could check {@link deltas} has files to cover - * [1,n] w/o gaps but this would almost never happen... - * - * We only throw for base_x produced by Compactor since that base erases all history and - * cannot be used for a client that has a snapshot in which something inside this base is - * open. (Nor can we ignore this base of course) But base_x which is a result of IOW, - * contains all history so we treat it just like delta wrt visibility. Imagine, IOW which - * aborts. It creates a base_x, which can and should just be ignored.*/ - - long[] exceptions = validWriteIds.getInvalidWriteIds(); - String minOpenWriteId = ((exceptions != null) - && (exceptions.length > 0)) ? String.valueOf(exceptions[0]) : "x"; - throw new IOException( - String.format("Not enough history available for ({},{}). Oldest available base: {}", - validWriteIds.getHighWatermark(), minOpenWriteId, oldestBase)); - } - - workingDeltas.sort(null); - - List deltas = new ArrayList<>(); - long current = bestBaseWriteId; - int lastStatementId = -1; - ParsedDelta prev = null; - // find need read delta/delete_delta file. - for (ParsedDelta next : workingDeltas) { - if (next.max > current) { - if (validWriteIds.isWriteIdRangeValid(current + 1, next.max) != RangeResponse.NONE) { - deltas.add(next); - current = next.max; - lastStatementId = next.statementId; - prev = next; - } - } else if ((next.max == current) && (lastStatementId >= 0)) { - //make sure to get all deltas within a single transaction; multi-statement txn - //generate multiple delta files with the same txnId range - //of course, if maxWriteId has already been minor compacted, - // all per statement deltas are obsolete - - deltas.add(next); - prev = next; - } else if ((prev != null) - && (next.max == prev.max) - && (next.min == prev.min) - && (next.statementId == prev.statementId)) { - // The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta, except - // the path. This may happen when we have split update and we have two types of delta - // directories- 'delta_x_y' and 'delete_delta_x_y' for the SAME txn range. - - // Also note that any delete_deltas in between a given delta_x_y range would be made - // obsolete. For example, a delta_30_50 would make delete_delta_40_40 obsolete. - // This is valid because minor compaction always compacts the normal deltas and the delete - // deltas for the same range. That is, if we had 3 directories, delta_30_30, - // delete_delta_40_40 and delta_50_50, then running minor compaction would produce - // delta_30_50 and delete_delta_30_50. - deltas.add(next); - prev = next; - } - } - + RemoteFileSystem fileSystem = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( + new FileSystemCache.FileSystemCacheKey( + LocationPath.getFSIdentity(partitions.get(0).getPath(), bindBrokerName), + catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); - FileCacheValue fileCacheValue = new FileCacheValue(); - - // delta directories - List deleteDeltas = new ArrayList<>(); - for (ParsedDelta delta : deltas) { - String location = delta.getPath(); - RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey( - LocationPath.getFSIdentity(location, bindBrokerName), - catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); - List remoteFiles = new ArrayList<>(); - Status status = fs.listFiles(location, false, remoteFiles); - if (status.ok()) { - if (delta.isDeleteDelta()) { - List deleteDeltaFileNames = remoteFiles.stream().map(f -> f.getName()).filter( - name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) - .collect(Collectors.toList()); - deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames)); - continue; - } - remoteFiles.stream().filter( - f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> { - LocationPath path = new LocationPath(file.getPath().toString(), - catalog.getProperties()); - fileCacheValue.addFile(file, path); - }); - } else { - throw new RuntimeException(status.getErrMsg()); - } - } - - // base - if (bestBasePath != null) { - RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey( - LocationPath.getFSIdentity(bestBasePath, bindBrokerName), - catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); - List remoteFiles = new ArrayList<>(); - Status status = fs.listFiles(bestBasePath, false, remoteFiles); - if (status.ok()) { - remoteFiles.stream().filter( - f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) - .forEach(file -> { - LocationPath path = new LocationPath(file.getPath().toString(), - catalog.getProperties()); - fileCacheValue.addFile(file, path); - }); - } else { - throw new RuntimeException(status.getErrMsg()); - } + for (HivePartition partition : partitions) { + if (!Strings.isNullOrEmpty(remoteUser)) { + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser); + fileCacheValues.add( + ugi.doAs((PrivilegedExceptionAction) () -> AcidUtil.getAcidState( + fileSystem, partition, txnValidIds, catalog.getProperties())) + ); + } else { + fileCacheValues.add(AcidUtil.getAcidState( + fileSystem, partition, txnValidIds, catalog.getProperties()) + ); } - fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas)); - fileCacheValues.add(fileCacheValue); } } catch (Exception e) { - throw new CacheException("failed to get input splits for write ids %s in catalog %s", e, - validWriteIds.toString(), catalog.getName()); + throw new CacheException("Failed to get input splits %s", e, txnValidIds.toString()); } return fileCacheValues; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java index 8214ea13a92351..7d62af29611447 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java @@ -21,10 +21,9 @@ import org.apache.doris.common.UserException; import com.google.common.collect.Lists; -import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidWriteIdList; import java.util.List; +import java.util.Map; /** * HiveTransaction is used to save info of a hive transaction. @@ -41,7 +40,7 @@ public class HiveTransaction { private long txnId; private List partitionNames = Lists.newArrayList(); - ValidWriteIdList validWriteIdList = null; + Map txnValidIds = null; public HiveTransaction(String queryId, String user, HMSExternalTable hiveTable, boolean isFullAcid) { this.queryId = queryId; @@ -62,18 +61,14 @@ public boolean isFullAcid() { return isFullAcid; } - public ValidTxnList getValidTxns(HMSCachedClient client) { - return client.getValidTxns(); - } - - public ValidWriteIdList getValidWriteIds(HMSCachedClient client) { - if (validWriteIdList == null) { + public Map getValidWriteIds(HMSCachedClient client) { + if (txnValidIds == null) { TableName tableName = new TableName(hiveTable.getCatalog().getName(), hiveTable.getDbName(), hiveTable.getName()); client.acquireSharedLock(queryId, txnId, user, tableName, partitionNames, 5000); - validWriteIdList = client.getValidWriteIds(tableName.getDb() + "." + tableName.getTbl(), txnId); + txnValidIds = client.getValidWriteIds(tableName.getDb() + "." + tableName.getTbl(), txnId); } - return validWriteIdList; + return txnValidIds; } public void begin() throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java index d9848a6e87d465..672afa96208d65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java @@ -32,8 +32,6 @@ import com.google.common.collect.ImmutableList.Builder; import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.NotImplementedException; -import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; @@ -524,12 +522,7 @@ public void commitTxn(long txnId) { } @Override - public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) { - throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient."); - } - - @Override - public ValidTxnList getValidTxns() { + public Map getValidWriteIds(String fullTableName, long currentTransactionId) { throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java index 97ed2bc628cdb3..9137c977f179c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java @@ -31,6 +31,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; @@ -564,7 +565,8 @@ public void acquireSharedLock(String queryId, long txnId, String user, TableName } @Override - public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) { + public Map getValidWriteIds(String fullTableName, long currentTransactionId) { + Map conf = new HashMap<>(); try (ThriftHMSClient client = getClient()) { try { return ugiDoAs(() -> { @@ -581,7 +583,10 @@ public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTrans ValidTxnWriteIdList validTxnWriteIdList = TxnUtils.createValidTxnWriteIdList(currentTransactionId, tableValidWriteIdsList); ValidWriteIdList writeIdList = validTxnWriteIdList.getTableValidWriteIdList(fullTableName); - return writeIdList; + + conf.put(AcidUtil.VALID_TXNS_KEY, validTransactions.writeToString()); + conf.put(AcidUtil.VALID_WRITEIDS_KEY, writeIdList.writeToString()); + return conf; }); } catch (Exception e) { client.setThrowable(e); @@ -591,22 +596,14 @@ public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTrans // Ignore this exception when the version of hive is not compatible with these apis. // Currently, the workaround is using a max watermark. LOG.warn("failed to get valid write ids for {}, transaction {}", fullTableName, currentTransactionId, e); - return new ValidReaderWriteIdList(fullTableName, new long[0], new BitSet(), Long.MAX_VALUE); - } - } - @Override - public ValidTxnList getValidTxns() { - try (ThriftHMSClient client = getClient()) { - try { - return ugiDoAs(client.client::getValidTxns); - } catch (Exception e) { - client.setThrowable(e); - throw e; - } - } catch (Exception e) { - throw new HMSClientException("Catalog Get the transactions that " - + "are currently valid fail. Exception = {}", e); + ValidTxnList validTransactions = new ValidReadTxnList( + new long[0], new BitSet(), Long.MAX_VALUE, Long.MAX_VALUE); + ValidWriteIdList writeIdList = new ValidReaderWriteIdList( + fullTableName, new long[0], new BitSet(), Long.MAX_VALUE); + conf.put(AcidUtil.VALID_TXNS_KEY, validTransactions.writeToString()); + conf.put(AcidUtil.VALID_WRITEIDS_KEY, writeIdList.writeToString()); + return conf; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 70e00ba7d8c49c..dbbf2154251f14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -58,8 +58,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.Setter; -import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.logging.log4j.LogManager; @@ -268,7 +266,14 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List allFiles, String bindBrokerName, int numBackends) throws IOException, UserException { List fileCaches; if (hiveTransaction != null) { - fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName); + try { + fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName); + } catch (Exception e) { + // Release shared load (getValidWriteIds acquire Lock). + // If no exception is throw, the lock will be released when `finalizeQuery()`. + Env.getCurrentHiveTransactionMgr().deregister(hiveTransaction.getQueryId()); + throw e; + } } else { boolean withCache = Config.max_external_file_cache_num > 0; fileCaches = cache.getFilesByPartitions(partitions, withCache, partitions.size() > 1, bindBrokerName); @@ -368,13 +373,10 @@ private List getFileSplitByTransaction(HiveMetaStoreCache cache, } hiveTransaction.addPartition(partition.getPartitionName(hmsTable.getPartitionColumns())); } - ValidWriteIdList validWriteIds = hiveTransaction.getValidWriteIds( - ((HMSExternalCatalog) hmsTable.getCatalog()).getClient()); - ValidTxnList validTxnList = hiveTransaction.getValidTxns( + Map txnValidIds = hiveTransaction.getValidWriteIds( ((HMSExternalCatalog) hmsTable.getCatalog()).getClient()); - return cache.getFilesByTransaction(partitions, validWriteIds, validTxnList, - hiveTransaction.isFullAcid(), skipCheckingAcidVersionFile, hmsTable.getId(), bindBrokerName); + return cache.getFilesByTransaction(partitions, txnValidIds, hiveTransaction.isFullAcid(), bindBrokerName); } @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java index ea7c293f04b75d..5d4474d77ec21e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java @@ -28,8 +28,6 @@ import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; import com.google.common.collect.ImmutableList; -import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; @@ -203,12 +201,7 @@ public void commitTxn(long txnId) { } @Override - public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) { - return null; - } - - @Override - public ValidTxnList getValidTxns() { + public Map getValidWriteIds(String fullTableName, long currentTransactionId) { return null; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java new file mode 100644 index 00000000000000..14605861bce680 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java @@ -0,0 +1,2 @@ +package org.apache.doris.datasource.hive;public class HiveAcidTest { +} From 7510fee0421d1c01e3d379957983fb4ded266fe7 Mon Sep 17 00:00:00 2001 From: daidai Date: Tue, 3 Dec 2024 18:57:38 +0800 Subject: [PATCH 04/10] rm ut --- .../java/org/apache/doris/datasource/hive/HiveAcidTest.java | 2 -- 1 file changed, 2 deletions(-) delete mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java deleted file mode 100644 index 14605861bce680..00000000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java +++ /dev/null @@ -1,2 +0,0 @@ -package org.apache.doris.datasource.hive;public class HiveAcidTest { -} From 20d8a0e653ff358e6d04c242ab9805fe7a784028 Mon Sep 17 00:00:00 2001 From: daidai Date: Mon, 16 Dec 2024 15:22:37 +0800 Subject: [PATCH 05/10] add ut && regression test --- .../table/transactional_hive_reader.cpp | 2 +- .../create_preinstalled_scripts/run25.hql | 52 ++ .../doris/datasource/hive/AcidUtil.java | 13 +- .../datasource/hive/HiveMetaStoreCache.java | 11 +- .../datasource/hive/HiveMetadataOps.java | 2 +- .../insert/InsertIntoTableCommand.java | 2 +- .../doris/datasource/hive/HiveAcidTest.java | 595 ++++++++++++++++++ .../hive/test_transactional_hive.out | 46 ++ .../hive/test_transactional_hive.groovy | 67 ++ 9 files changed, 775 insertions(+), 15 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp index 96ef015993ac5c..c230df8ee75ff1 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp +++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp @@ -137,7 +137,7 @@ Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range, } auto iter = std::find(delete_delta_file_names.begin(), delete_delta_file_names.end(), remove_bucket_attemptId(file_name)); - if (iter == delete_delta.file_names.end()) { + if (iter == delete_delta_file_names.end()) { continue; } auto delete_file = fmt::format("{}/{}", delete_delta.directory_location, diff --git a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql index 814df4cdc5ff90..da6400bdff0220 100755 --- a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql +++ b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql @@ -41,3 +41,55 @@ insert into orc_full_acid_par PARTITION(part_col=20230102) values (6, 'F'); update orc_full_acid_par set value = 'BB' where id = 2; + + + + +create table orc_to_acid_tb (id INT, value STRING) +PARTITIONED BY (part_col INT) +CLUSTERED BY (id) INTO 3 BUCKETS +STORED AS ORC; +INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=101) VALUES (1, 'A'), (3, 'C'); +INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=102) VALUES (2, 'B'); +ALTER TABLE orc_to_acid_tb SET TBLPROPERTIES ('transactional'='true'); + + +create table orc_to_acid_compacted_tb (id INT, value STRING) +PARTITIONED BY (part_col INT) +CLUSTERED BY (id) INTO 3 BUCKETS +STORED AS ORC; +INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=101) VALUES (1, 'A'), (3, 'C'); +INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (2, 'B'); +ALTER TABLE orc_to_acid_compacted_tb SET TBLPROPERTIES ('transactional'='true'); +ALTER TABLE orc_to_acid_compacted_tb COMPACT 'major'; +INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (4, 'D'); +update orc_to_acid_compacted_tb set value = "CC" where id = 3; +update orc_to_acid_compacted_tb set value = "BB" where id = 2; + + +create table orc_acid_minor (id INT, value STRING) +CLUSTERED BY (id) INTO 3 BUCKETS +STORED AS ORC +TBLPROPERTIES ('transactional' = 'true'); +insert into orc_acid_minor values (1, 'A'); +insert into orc_acid_minor values (2, 'B'); +insert into orc_acid_minor values (3, 'C'); +update orc_acid_minor set value = "BB" where id = 2; +ALTER TABLE orc_acid_minor COMPACT 'minor'; +insert into orc_acid_minor values (4, 'D'); +update orc_acid_minor set value = "DD" where id = 4; +DELETE FROM orc_acid_minor WHERE id = 3; + + +create table orc_acid_major (id INT, value STRING) +CLUSTERED BY (id) INTO 3 BUCKETS +STORED AS ORC +TBLPROPERTIES ('transactional' = 'true'); +insert into orc_acid_major values (1, 'A'); +insert into orc_acid_major values (2, 'B'); +insert into orc_acid_major values (3, 'C'); +update orc_acid_major set value = "BB" where id = 2; +ALTER TABLE orc_acid_major COMPACT 'minor'; +insert into orc_acid_major values (4, 'D'); +update orc_acid_major set value = "DD" where id = 4; +DELETE FROM orc_acid_major WHERE id = 3; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java index 92493a14039940..3e6509a7903b10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java @@ -21,8 +21,8 @@ import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue; +import org.apache.doris.fs.FileSystem; import org.apache.doris.fs.remote.RemoteFile; -import org.apache.doris.fs.remote.RemoteFileSystem; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -64,7 +64,6 @@ public class AcidUtil { @Getter @ToString - @EqualsAndHashCode private static class ParsedBase { private final long writeId; private final long visibilityId; @@ -131,7 +130,7 @@ public int compareTo(ParsedDelta other) { } - private static boolean isValidMetaDataFile(RemoteFileSystem fileSystem, String baseDir) + private static boolean isValidMetaDataFile(FileSystem fileSystem, String baseDir) throws IOException { String fileLocation = baseDir + "_metadata_acid"; Status status = fileSystem.exists(fileLocation); @@ -161,7 +160,7 @@ private static boolean isValidMetaDataFile(RemoteFileSystem fileSystem, String b return true; } - private static boolean isValidBase(RemoteFileSystem remoteFileSystem, String baseDir, + private static boolean isValidBase(FileSystem fileSystem, String baseDir, ParsedBase base, ValidWriteIdList writeIdList) throws IOException { if (base.writeId == Long.MIN_VALUE) { //Ref: https://issues.apache.org/jira/browse/HIVE-13369 @@ -173,7 +172,7 @@ private static boolean isValidBase(RemoteFileSystem remoteFileSystem, String bas } // hive 4 : just check "_v" suffix, before hive 4 : check `_metadata_acid` file in baseDir. - if ((base.visibilityId > 0) || isValidMetaDataFile(remoteFileSystem, baseDir)) { + if ((base.visibilityId > 0) || isValidMetaDataFile(fileSystem, baseDir)) { return writeIdList.isValidBase(base.writeId); } @@ -208,7 +207,7 @@ private static ParsedDelta parseDelta(String fileName, String deltaPrefix, Strin if (split2 == -1) { long max = Long.parseLong(rest.substring(split + 1)); - return new ParsedDelta(min, max, fileName, -1, deleteDelta, visibilityId); + return new ParsedDelta(min, max, path, -1, deleteDelta, visibilityId); } long max = Long.parseLong(rest.substring(split + 1, split2)); @@ -219,7 +218,7 @@ private static ParsedDelta parseDelta(String fileName, String deltaPrefix, Strin //Since the hive3 library cannot read the hive4 transaction table normally, and there are many problems // when using the Hive 4 library directly, this method is implemented. //Ref: hive/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java#getAcidState - public static FileCacheValue getAcidState(RemoteFileSystem fileSystem, HivePartition partition, + public static FileCacheValue getAcidState(FileSystem fileSystem, HivePartition partition, Map txnValidIds, Map catalogProps) throws Exception { // Ref: https://issues.apache.org/jira/browse/HIVE-18192 diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 455f0b5cf78706..b6cc42b1199a84 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -867,12 +867,13 @@ public List getFilesByTransaction(List partitions return fileCacheValues; } - RemoteFileSystem fileSystem = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey( - LocationPath.getFSIdentity(partitions.get(0).getPath(), bindBrokerName), - catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); - for (HivePartition partition : partitions) { + //Get filesystem multiple times, reason: https://github.com/apache/doris/pull/23409. + RemoteFileSystem fileSystem = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( + new FileSystemCache.FileSystemCacheKey( + LocationPath.getFSIdentity(partition.getPath(), bindBrokerName), + catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); + if (!Strings.isNullOrEmpty(remoteUser)) { UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser); fileCacheValues.add( diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index 4a11cf9b01fce9..b44ffe00390536 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -181,7 +181,7 @@ public boolean createTable(CreateTableStmt stmt) throws UserException { } } - if (props.containsKey("transactional") && props.get("transactional").equals("true")) { + if (props.containsKey("transactional") && props.get("transactional").equalsIgnoreCase("true")) { throw new UserException("Not support create hive transactional table."); /* CREATE TABLE trans6( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index b2dafbac5c179c..1293e0d0adbec6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -314,7 +314,7 @@ private ExecutorFactory selectInsertExecutorFactory( boolean emptyInsert = childIsEmptyRelation(physicalSink); HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf; if (hiveExternalTable.isHiveTransactionalTable()) { - throw new UserException("Not supported insert into hive table"); + throw new UserException("Not supported insert into hive transactional table."); } return ExecutorFactory.from( diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java new file mode 100644 index 00000000000000..fc4ca299166f02 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java @@ -0,0 +1,595 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.common.info.SimpleTableInfo; +import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue; +import org.apache.doris.fs.LocalDfsFileSystem; + +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + + +public class HiveAcidTest { + + @Test + public void testOriginalDeltas() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/000000_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/000001_1"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/000002_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/random"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/_done"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/subdir/000000_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_025_025"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_029_029"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_025_030"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_050_100"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_101_101"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + HivePartition partition = new HivePartition(new SimpleTableInfo("", "tbl"), + false, "", "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), new HashMap<>()); + try { + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + } catch (UnsupportedOperationException e) { + Assert.assertTrue(e.getMessage().contains("For no acid table convert to acid, please COMPACT 'major'.")); + } + } + + + @Test + public void testObsoleteOriginals() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_10/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_5/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/000000_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/000001_1"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:150:" + Long.MAX_VALUE + ":").writeToString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + new HashMap<>()); + try { + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + } catch (UnsupportedOperationException e) { + Assert.assertTrue(e.getMessage().contains("For no acid table convert to acid, please COMPACT 'major'.")); + } + } + + + @Test + public void testOverlapingDelta() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0000063_63/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_000062_62/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_00061_61/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0060_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_052_55/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_50/bucket_0"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + new HashMap<>()); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/base_50/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_00061_61/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_000062_62/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_0000063_63/bucket_0" + ); + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + } + + + @Test + public void testOverlapingDelta2() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0000063_63_0/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_000062_62_0/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_000062_62_3/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_00061_61_0/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0060_60_1/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0060_60_4/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0060_60_7/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_052_55/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_058_58/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_50/bucket_0"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + new HashMap<>()); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/base_50/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_00061_61_0/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_000062_62_0/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_000062_62_3/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_0000063_63_0/bucket_0" + + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + } + + + @Test + public void deltasWithOpenTxnInRead() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0"); + + Map txnValidIds = new HashMap<>(); + + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:4:4").writeToString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + new HashMap<>()); + + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0" + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + + } + + + @Test + public void deltasWithOpenTxnInRead2() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_4_4_1/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_4_4_3/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_101_101_1/bucket_0"); + + Map txnValidIds = new HashMap<>(); + + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:4:4").writeToString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + new HashMap<>()); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0" + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + } + + @Test + public void testBaseWithDeleteDeltas() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_5/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_10/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_49/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_025_025/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_029_029/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_029_029/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_025_030/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_025_030/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_050_105/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_050_105/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_110_110/bucket_0"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + // Map tableProps = new HashMap<>(); + // tableProps.put(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, + // AcidUtils.AcidOperationalProperties.getDefault().toString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + new HashMap<>()); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/base_49/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_050_105/bucket_0" + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + + List resultDelta = Arrays.asList( + "file://" + tempPath.toAbsolutePath() + "/delete_delta_050_105/bucket_0" + ); + + List deltaFiles = new ArrayList<>(); + fileCacheValue.getAcidInfo().getDeleteDeltas().forEach( + deltaInfo -> { + String loc = deltaInfo.getDirectoryLocation(); + deltaInfo.getFileNames().forEach( + fileName -> deltaFiles.add(loc + "/" + fileName) + ); + } + ); + + Assert.assertTrue(resultDelta.containsAll(deltaFiles) && deltaFiles.containsAll(resultDelta)); + } + + + @Test + public void testOverlapingDeltaAndDeleteDelta() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0000063_63/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_000062_62/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_00061_61/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_00064_64/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_40_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0060_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_052_55/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_052_55/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_50/bucket_0"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + + Map tableProps = new HashMap<>(); + // tableProps.put(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, + // AcidUtils.AcidOperationalProperties.getDefault().toString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + tableProps); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + + + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/base_50/bucket_0", + + "file:" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_00061_61/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_000062_62/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_0000063_63/bucket_0" + + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + + List resultDelta = Arrays.asList( + + "file://" + tempPath.toAbsolutePath() + "/delete_delta_40_60/bucket_0", + "file://" + tempPath.toAbsolutePath() + "/delete_delta_00064_64/bucket_0" + ); + + List deltaFiles = new ArrayList<>(); + fileCacheValue.getAcidInfo().getDeleteDeltas().forEach( + deltaInfo -> { + String loc = deltaInfo.getDirectoryLocation(); + deltaInfo.getFileNames().forEach( + fileName -> deltaFiles.add(loc + "/" + fileName) + ); + } + ); + + Assert.assertTrue(resultDelta.containsAll(deltaFiles) && deltaFiles.containsAll(resultDelta)); + } + + @Test + public void testMinorCompactedDeltaMakesInBetweenDelteDeltaObsolete() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_50_50/bucket_0"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + Map tableProps = new HashMap<>(); + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + tableProps); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0" + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + } + + @Test + public void deleteDeltasWithOpenTxnInRead() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_2_5/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_3_3/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_4_4_1/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_4_4_3/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_101_101_1/bucket_0"); + + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:4:4").writeToString()); + + + Map tableProps = new HashMap<>(); + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + tableProps); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0" + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + List resultDelta = Arrays.asList( + "file://" + tempPath.toAbsolutePath() + "/delete_delta_2_5/bucket_0" + ); + + List deltaFiles = new ArrayList<>(); + fileCacheValue.getAcidInfo().getDeleteDeltas().forEach( + deltaInfo -> { + String loc = deltaInfo.getDirectoryLocation(); + deltaInfo.getFileNames().forEach( + fileName -> deltaFiles.add(loc + "/" + fileName) + ); + } + ); + + Assert.assertTrue(resultDelta.containsAll(deltaFiles) && deltaFiles.containsAll(resultDelta)); + } + + @Test + public void testBaseDeltas() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_5/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_10/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_49/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_025_025/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_029_029/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_025_030/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_050_105/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_90_120/bucket_0"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + Map tableProps = new HashMap<>(); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + tableProps); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/base_49/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_050_105/bucket_0" + ); + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + } +} diff --git a/regression-test/data/external_table_p0/hive/test_transactional_hive.out b/regression-test/data/external_table_p0/hive/test_transactional_hive.out index 2ad0446ccb7e25..060fa8c048e5a0 100644 --- a/regression-test/data/external_table_p0/hive/test_transactional_hive.out +++ b/regression-test/data/external_table_p0/hive/test_transactional_hive.out @@ -76,3 +76,49 @@ F -- !q05 -- 0 + +-- !2 -- +1 A 101 +2 BB 102 +3 CC 101 +4 D 102 + +-- !3 -- +1 A 101 +3 CC 101 + +-- !4 -- +2 BB 102 +4 D 102 + +-- !5 -- +1 A 101 +2 BB 102 + +-- !6 -- +4 D 102 + +-- !7 -- +1 A +2 BB +4 DD + +-- !10 -- +1 A +2 BB + +-- !11 -- +4 DD + +-- !12 -- +1 A +2 BB +4 DD + +-- !15 -- +1 A +2 BB + +-- !16 -- +4 DD + diff --git a/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy b/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy index dbe20395ec95ec..4cde3d06dcc623 100644 --- a/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy +++ b/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy @@ -52,6 +52,68 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock select count(*) from orc_full_acid_par_empty; """ } + + def test_acid = { + try { + sql """ select * from orc_to_acid_tb """ + }catch( Exception e) { + assertTrue(e.getMessage().contains("For no acid table convert to acid, please COMPACT")); + } + + qt_2 """ select * from orc_to_acid_compacted_tb order by id """ + qt_3 """ select * from orc_to_acid_compacted_tb where part_col=101 order by id """ + qt_4 """ select * from orc_to_acid_compacted_tb where part_col=102 order by id """ + qt_5 """ select * from orc_to_acid_compacted_tb where id < 3 order by id """ + qt_6 """ select * from orc_to_acid_compacted_tb where id > 3 order by id """ + + + qt_7 """ select * from orc_acid_minor order by id """ + qt_10 """ select * from orc_acid_minor where id < 3 order by id """ + qt_11 """ select * from orc_acid_minor where id > 3 order by id """ + + + qt_12 """ select * from orc_acid_major order by id """ + qt_15 """ select * from orc_acid_major where id < 3 order by id """ + qt_16 """ select * from orc_acid_major where id > 3 order by id """ + } + + def test_acid_write = { + sql """set enable_fallback_to_original_planner=false;""" + + + + try { + sql """ + CREATE TABLE acid_tb ( + `col1` BOOLEAN COMMENT 'col1', + `col2` INT COMMENT 'col2' + ) ENGINE=hive + PROPERTIES ( + 'file_format'='orc', + 'compression'='zlib', + 'bucketing_version'='2', + 'transactional'='true', + 'transactional_properties'='default' + ); + """ + }catch( Exception e) { + assertTrue(e.getMessage().contains("Not support create hive transactional table.")); + } + try { + sql """ insert into orc_acid_major(id,value) values(1,"a1"); """ + }catch (Exception e) { + assertTrue(e.getMessage().contains("Not supported insert into hive transactional table.")); + } + + try { + sql """ drop table orc_acid_major; """ + } catch (Exception e) { + assertTrue(e.getMessage().contains("Not support drop hive transactional table.")); + + } + } + + String enabled = context.config.otherConfigs.get("enableHiveTest") if (enabled == null || !enabled.equalsIgnoreCase("true")) { logger.info("diable Hive test.") @@ -60,6 +122,7 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock for (String hivePrefix : ["hive3"]) { try { + String hdfs_port = context.config.otherConfigs.get(hivePrefix + "HdfsPort") String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") String catalog_name = "test_transactional_${hivePrefix}" String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") @@ -68,6 +131,7 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock sql """create catalog if not exists ${catalog_name} properties ( "type"="hms", 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}' + ,'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}' );""" sql """use `${catalog_name}`.`default`""" @@ -79,6 +143,9 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock q01() q01_par() + test_acid() + test_acid_write() + sql """drop catalog if exists ${catalog_name}""" } finally { } From 201ec62080869536c8dc1dba8695121fd834cfee Mon Sep 17 00:00:00 2001 From: daidai Date: Mon, 16 Dec 2024 15:27:00 +0800 Subject: [PATCH 06/10] rm unused code. --- .../datasource/hive/HiveMetaStoreCache.java | 119 ------------------ 1 file changed, 119 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index b6cc42b1199a84..897fcb93acb513 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -65,15 +65,10 @@ import com.google.common.collect.Streams; import com.google.common.collect.TreeRangeMap; import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.NonNull; -import lombok.ToString; import org.apache.commons.lang3.math.NumberUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.utils.FileUtils; @@ -743,120 +738,6 @@ public LoadingCache getPartitionCache() { return partitionCache; } - - @Getter - @ToString - @EqualsAndHashCode - public static class ParsedBase { - private final long writeId; - private final long visibilityId; - - public ParsedBase(long writeId, long visibilityId) { - this.writeId = writeId; - this.visibilityId = visibilityId; - } - } - - static ParsedBase parseBase(String name) { - name = name.substring("base_".length()); - int index = name.indexOf("_v"); - if (index == -1) { - return new ParsedBase(Long.parseLong(name), 0); - } - return new ParsedBase( - Long.parseLong(name.substring(0, index)), - Long.parseLong(name.substring(index + 2))); - } - - @Getter - @ToString - @EqualsAndHashCode - public static class ParsedDelta implements Comparable { - private final long min; - private final long max; - private final String path; - private final int statementId; - private final boolean deleteDelta; - private final long visibilityId; - - public ParsedDelta(long min, long max, @NonNull String path, int statementId, - boolean deleteDelta, long visibilityId) { - this.min = min; - this.max = max; - this.path = path; - this.statementId = statementId; - this.deleteDelta = deleteDelta; - this.visibilityId = visibilityId; - } - - @Override - public int compareTo(ParsedDelta other) { - return Long.compare(min, other.min) != 0 ? Long.compare(min, other.min) : - Long.compare(other.max, max) != 0 ? Long.compare(other.max, max) : - Integer.compare(statementId, other.statementId) != 0 - ? Integer.compare(statementId, other.statementId) : - path.compareTo(other.path); - } - } - - private static boolean isValidBase(ParsedBase base, ValidWriteIdList writeIdList) { - if (base.writeId == Long.MIN_VALUE) { - return true; - } - - // hive 4 : just check "_v" suffix - // before hive 4 : check _metadata_acid in baseDir - if ((base.visibilityId > 0)) { - // || isCompacted(fileSystem, baseDir) need check - return writeIdList.isValidBase(base.writeId); - } - - return writeIdList.isWriteIdValid(base.writeId); - } - - static ParsedDelta parseDelta(String fileName, String deltaPrefix, String path) { - /* - format1: - delta_min_max_statementId_visibilityId - delete_delta_min_max_statementId_visibilityId - - _visibilityId maybe not exists. - detail: https://issues.apache.org/jira/browse/HIVE-20823 - - format2: - delta_min_max_visibilityId - delete_delta_min_visibilityId - - when minor compaction runs, we collapse per statement delta files inside a single - transaction so we no longer need a statementId in the file name - */ - // String fileName = fileName.substring(name.lastIndexOf('/') + 1); - // checkArgument(fileName.startsWith(deltaPrefix), "File does not start with '%s': %s", deltaPrefix, path); - - long visibilityId = 0; - int visibilityIdx = fileName.indexOf("_v"); - if (visibilityIdx != -1) { - visibilityId = Long.parseLong(fileName.substring(visibilityIdx + 2)); - fileName = fileName.substring(0, visibilityIdx); - } - - boolean deleteDelta = deltaPrefix.equals("delete_delta_"); - - String rest = fileName.substring(deltaPrefix.length()); - int split = rest.indexOf('_'); - int split2 = rest.indexOf('_', split + 1); - long min = Long.parseLong(rest.substring(0, split)); - - if (split2 == -1) { - long max = Long.parseLong(rest.substring(split + 1)); - return new ParsedDelta(min, max, fileName, -1, deleteDelta, visibilityId); - } - - long max = Long.parseLong(rest.substring(split + 1, split2)); - int statementId = Integer.parseInt(rest.substring(split2 + 1)); - return new ParsedDelta(min, max, path, statementId, deleteDelta, visibilityId); - } - public List getFilesByTransaction(List partitions, Map txnValidIds, boolean isFullAcid, String bindBrokerName) { List fileCacheValues = Lists.newArrayList(); From 65aaff2099be40d09e5f627a9daa37f7ce0bcad6 Mon Sep 17 00:00:00 2001 From: daidai Date: Mon, 16 Dec 2024 15:37:08 +0800 Subject: [PATCH 07/10] fix formatted --- .../exec/format/table/transactional_hive_reader.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp index c230df8ee75ff1..18642ab1218b4d 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp +++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp @@ -17,11 +17,12 @@ #include "transactional_hive_reader.h" +#include + #include "runtime/runtime_state.h" #include "transactional_hive_common.h" #include "vec/data_types/data_type_factory.hpp" #include "vec/exec/format/orc/vorc_reader.h" -#include namespace doris { @@ -109,7 +110,6 @@ Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range, int64_t num_delete_files = 0; std::filesystem::path file_path(data_file_path); - //See https://github.com/apache/hive/commit/ffee30e6267e85f00a22767262192abb9681cfb7#diff-5fe26c36b4e029dcd344fc5d484e7347R165 // bucket_xxx_attemptId => bucket_xxx // bucket_xxx => bucket_xxx @@ -125,14 +125,13 @@ Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range, return str; }; - SCOPED_TIMER(_transactional_orc_profile.delete_files_read_time); for (auto& delete_delta : range.table_format_params.transactional_hive_params.delete_deltas) { const std::string file_name = file_path.filename().string(); //need opt. std::vector delete_delta_file_names; - for (const auto& x : delete_delta.file_names){ + for (const auto& x : delete_delta.file_names) { delete_delta_file_names.emplace_back(remove_bucket_attemptId(x)); } auto iter = std::find(delete_delta_file_names.begin(), delete_delta_file_names.end(), @@ -140,8 +139,9 @@ Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range, if (iter == delete_delta_file_names.end()) { continue; } - auto delete_file = fmt::format("{}/{}", delete_delta.directory_location, - delete_delta.file_names[iter-delete_delta_file_names.begin()]); + auto delete_file = + fmt::format("{}/{}", delete_delta.directory_location, + delete_delta.file_names[iter - delete_delta_file_names.begin()]); TFileRangeDesc delete_range; // must use __set() method to make sure __isset is true From 2ec6b6e76c7470c570fda3ff52613bfe6f67885f Mon Sep 17 00:00:00 2001 From: daidai Date: Thu, 19 Dec 2024 20:55:32 +0800 Subject: [PATCH 08/10] fix hive acid insert only. --- .../doris/datasource/hive/AcidUtil.java | 47 +++++++--- .../datasource/hive/HMSExternalTable.java | 16 ++-- .../datasource/hive/HiveMetaStoreCache.java | 31 +++---- .../doris/datasource/hive/HiveAcidTest.java | 22 ++--- .../test_hive_translation_insert_only.out | 21 +++++ .../test_hive_translation_insert_only.groovy | 86 +++++++++++++++++++ 6 files changed, 175 insertions(+), 48 deletions(-) create mode 100644 regression-test/data/external_table_p2/hive/test_hive_translation_insert_only.out create mode 100644 regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java index 3e6509a7903b10..915a1a410d1cc5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java @@ -49,6 +49,7 @@ public class AcidUtil { public static final String VALID_WRITEIDS_KEY = "hive.txn.valid.writeids"; private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = "bucket_"; + private static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length"; // An `_orc_acid_version` file is written to each base/delta/delete_delta dir written by a full acid write // or compaction. This is the primary mechanism for versioning acid data. @@ -215,11 +216,30 @@ private static ParsedDelta parseDelta(String fileName, String deltaPrefix, Strin return new ParsedDelta(min, max, path, statementId, deleteDelta, visibilityId); } + public interface FileFilter { + public boolean accept(String fileName); + } + + public static final class FullAcidFileFilter implements FileFilter { + @Override + public boolean accept(String fileName) { + return fileName.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX) + && !fileName.endsWith(DELTA_SIDE_FILE_SUFFIX); + } + } + + public static final class InsertOnlyFileFilter implements FileFilter { + @Override + public boolean accept(String fileName) { + return true; + } + } + //Since the hive3 library cannot read the hive4 transaction table normally, and there are many problems // when using the Hive 4 library directly, this method is implemented. //Ref: hive/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java#getAcidState public static FileCacheValue getAcidState(FileSystem fileSystem, HivePartition partition, - Map txnValidIds, Map catalogProps) throws Exception { + Map txnValidIds, Map catalogProps, boolean isFullAcid) throws Exception { // Ref: https://issues.apache.org/jira/browse/HIVE-18192 // Readers should use the combination of ValidTxnList and ValidWriteIdList(Table) for snapshot isolation. @@ -383,6 +403,8 @@ && isValidBase(fileSystem, dirPath, base, validWriteIdList)) { FileCacheValue fileCacheValue = new FileCacheValue(); List deleteDeltas = new ArrayList<>(); + FileFilter fileFilter = isFullAcid ? new FullAcidFileFilter() : new InsertOnlyFileFilter(); + // delta directories for (ParsedDelta delta : deltas) { String location = delta.getPath(); @@ -391,17 +413,16 @@ && isValidBase(fileSystem, dirPath, base, validWriteIdList)) { status = fileSystem.listFiles(location, false, remoteFiles); if (status.ok()) { if (delta.isDeleteDelta()) { - List deleteDeltaFileNames = remoteFiles.stream().map(f -> f.getName()).filter( - name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) + List deleteDeltaFileNames = remoteFiles.stream() + .map(RemoteFile::getName).filter(fileFilter::accept) .collect(Collectors.toList()); deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames)); continue; } - remoteFiles.stream().filter( - f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> { - LocationPath path = new LocationPath(file.getPath().toString(), catalogProps); - fileCacheValue.addFile(file, path); - }); + remoteFiles.stream().filter(f -> fileFilter.accept(f.getName())).forEach(file -> { + LocationPath path = new LocationPath(file.getPath().toString(), catalogProps); + fileCacheValue.addFile(file, path); + }); } else { throw new RuntimeException(status.getErrMsg()); } @@ -412,8 +433,7 @@ && isValidBase(fileSystem, dirPath, base, validWriteIdList)) { List remoteFiles = new ArrayList<>(); status = fileSystem.listFiles(bestBasePath, false, remoteFiles); if (status.ok()) { - remoteFiles.stream().filter( - f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) + remoteFiles.stream().filter(f -> fileFilter.accept(f.getName())) .forEach(file -> { LocationPath path = new LocationPath(file.getPath().toString(), catalogProps); fileCacheValue.addFile(file, path); @@ -422,7 +442,12 @@ && isValidBase(fileSystem, dirPath, base, validWriteIdList)) { throw new RuntimeException(status.getErrMsg()); } } - fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas)); + + if (isFullAcid) { + fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas)); + } else if (!deleteDeltas.isEmpty()) { + throw new RuntimeException("No Hive Full Acid Table have delete_delta_* Dir."); + } return fileCacheValue; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 71c7308b079866..b554f508103992 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -30,6 +30,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; @@ -359,19 +360,24 @@ public Map getNameToPartitionItems() { } public boolean isHiveTransactionalTable() { - return dlaType == DLAType.HIVE && AcidUtils.isTransactionalTable(remoteTable) - && isSupportedTransactionalFileFormat(); + return dlaType == DLAType.HIVE && AcidUtils.isTransactionalTable(remoteTable); } - private boolean isSupportedTransactionalFileFormat() { + private boolean isSupportedFullAcidTransactionalFileFormat() { // Sometimes we meet "transactional" = "true" but format is parquet, which is not supported. // So we need to check the input format for transactional table. String inputFormatName = remoteTable.getSd().getInputFormat(); return inputFormatName != null && SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS.contains(inputFormatName); } - public boolean isFullAcidTable() { - return dlaType == DLAType.HIVE && AcidUtils.isFullAcidTable(remoteTable); + public boolean isFullAcidTable() throws UserException { + if (dlaType == DLAType.HIVE && AcidUtils.isFullAcidTable(remoteTable)) { + if (!isSupportedFullAcidTransactionalFileFormat()) { + throw new UserException("This table is full Acid Table, but no Orc Format."); + } + return true; + } + return false; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 897fcb93acb513..258c936e44821b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -32,6 +32,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.common.util.CacheBulkLoader; import org.apache.doris.common.util.LocationPath; import org.apache.doris.common.util.Util; @@ -54,7 +55,6 @@ import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.Iterables; @@ -74,12 +74,10 @@ import org.apache.hadoop.hive.metastore.utils.FileUtils; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.net.URI; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -100,10 +98,6 @@ public class HiveMetaStoreCache { private static final Logger LOG = LogManager.getLogger(HiveMetaStoreCache.class); public static final String HIVE_DEFAULT_PARTITION = "__HIVE_DEFAULT_PARTITION__"; - // After hive 3, transactional table's will have file '_orc_acid_version' with value >= '2'. - public static final String HIVE_ORC_ACID_VERSION_FILE = "_orc_acid_version"; - - private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = "bucket_"; private final HMSExternalCatalog catalog; private JobConf jobConf; @@ -741,31 +735,26 @@ public LoadingCache getPartitionCache() { public List getFilesByTransaction(List partitions, Map txnValidIds, boolean isFullAcid, String bindBrokerName) { List fileCacheValues = Lists.newArrayList(); - String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME); - try { if (partitions.isEmpty()) { return fileCacheValues; } for (HivePartition partition : partitions) { - //Get filesystem multiple times, reason: https://github.com/apache/doris/pull/23409. + //Get filesystem multiple times, Reason: https://github.com/apache/doris/pull/23409. RemoteFileSystem fileSystem = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( LocationPath.getFSIdentity(partition.getPath(), bindBrokerName), catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); - if (!Strings.isNullOrEmpty(remoteUser)) { - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser); - fileCacheValues.add( - ugi.doAs((PrivilegedExceptionAction) () -> AcidUtil.getAcidState( - fileSystem, partition, txnValidIds, catalog.getProperties())) - ); - } else { - fileCacheValues.add(AcidUtil.getAcidState( - fileSystem, partition, txnValidIds, catalog.getProperties()) - ); - } + AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(jobConf); + HadoopAuthenticator hadoopAuthenticator = + HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig); + + fileCacheValues.add( + hadoopAuthenticator.doAs(() -> AcidUtil.getAcidState( + fileSystem, partition, txnValidIds, catalog.getProperties(), isFullAcid)) + ); } } catch (Exception e) { throw new CacheException("Failed to get input splits %s", e, txnValidIds.toString()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java index fc4ca299166f02..a54084e9b45b29 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java @@ -67,7 +67,7 @@ public void testOriginalDeltas() throws Exception { false, "", "file://" + tempPath.toAbsolutePath() + "", new ArrayList<>(), new HashMap<>()); try { - AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); } catch (UnsupportedOperationException e) { Assert.assertTrue(e.getMessage().contains("For no acid table convert to acid, please COMPACT 'major'.")); } @@ -99,7 +99,7 @@ public void testObsoleteOriginals() throws Exception { new ArrayList<>(), new HashMap<>()); try { - AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); } catch (UnsupportedOperationException e) { Assert.assertTrue(e.getMessage().contains("For no acid table convert to acid, please COMPACT 'major'.")); } @@ -136,7 +136,7 @@ public void testOverlapingDelta() throws Exception { new HashMap<>()); FileCacheValue fileCacheValue = - AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); List readFile = fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); @@ -187,7 +187,7 @@ public void testOverlapingDelta2() throws Exception { new HashMap<>()); FileCacheValue fileCacheValue = - AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); List readFile = @@ -235,7 +235,7 @@ public void deltasWithOpenTxnInRead() throws Exception { FileCacheValue fileCacheValue = - AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); List readFile = @@ -281,7 +281,7 @@ public void deltasWithOpenTxnInRead2() throws Exception { new HashMap<>()); FileCacheValue fileCacheValue = - AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); List readFile = @@ -334,7 +334,7 @@ public void testBaseWithDeleteDeltas() throws Exception { new HashMap<>()); FileCacheValue fileCacheValue = - AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); List readFile = @@ -404,7 +404,7 @@ public void testOverlapingDeltaAndDeleteDelta() throws Exception { tableProps); FileCacheValue fileCacheValue = - AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); @@ -470,7 +470,7 @@ public void testMinorCompactedDeltaMakesInBetweenDelteDeltaObsolete() throws Exc tableProps); FileCacheValue fileCacheValue = - AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); List readFile = fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); @@ -516,7 +516,7 @@ public void deleteDeltasWithOpenTxnInRead() throws Exception { tableProps); FileCacheValue fileCacheValue = - AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); List readFile = @@ -579,7 +579,7 @@ public void testBaseDeltas() throws Exception { tableProps); FileCacheValue fileCacheValue = - AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>(), true); List readFile = fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); diff --git a/regression-test/data/external_table_p2/hive/test_hive_translation_insert_only.out b/regression-test/data/external_table_p2/hive/test_hive_translation_insert_only.out new file mode 100644 index 00000000000000..e4bdb3fe32d44b --- /dev/null +++ b/regression-test/data/external_table_p2/hive/test_hive_translation_insert_only.out @@ -0,0 +1,21 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !1 -- +1 A +2 B +3 C +4 D + +-- !2 -- +1 A +2 B +3 C +4 D +5 E + +-- !3 -- +1 A +2 B +3 C +4 D +5 E + diff --git a/regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy b/regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy new file mode 100644 index 00000000000000..a258d35c924e20 --- /dev/null +++ b/regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_translation_insert_only", "p2,external,hive,external_remote,external_remote_hive") { + + if (false) { + String hms_catalog_name = "test_hive_translation_insert_only" + sql """drop catalog if exists ${hms_catalog_name};""" + sql """ + CREATE CATALOG IF NOT EXISTS ${hms_catalog_name} + PROPERTIES ( + 'hive.version' = '3.1.3', + 'type'='hms' + ); + """ + + logger.info("catalog " + hms_catalog_name + " created") + sql """switch ${hms_catalog_name};""" + logger.info("switched to catalog " + hms_catalog_name) + sql """ use regression;""" + + qt_1 """ select * from text_insert_only order by id """ + qt_2 """ select * from parquet_insert_only_major order by id """ + qt_3 """ select * from orc_insert_only_minor order by id """ + + + sql """drop catalog ${hms_catalog_name};""" + } +} + + +/* +SET hive.support.concurrency=true; +SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + + +create table text_insert_only (id INT, value STRING) +ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' +TBLPROPERTIES ('transactional' = 'true', +'transactional_properties'='insert_only'); +insert into text_insert_only values (1, 'A'); +insert into text_insert_only values (2, 'B'); +insert into text_insert_only values (3, 'C'); +Load data xxx (4,D) + + +create table parquet_insert_only_major (id INT, value STRING) +CLUSTERED BY (id) INTO 3 BUCKETS +STORED AS parquet +TBLPROPERTIES ('transactional' = 'true', +'transactional_properties'='insert_only'); +insert into parquet_insert_only_major values (1, 'A'); +insert into parquet_insert_only_major values (2, 'B'); +insert into parquet_insert_only_major values (3, 'C'); +ALTER TABLE parquet_insert_only_major COMPACT 'major'; +insert into parquet_insert_only_major values (4, 'D'); +insert into parquet_insert_only_major values (5, 'E'); + + +create table orc_insert_only_minor (id INT, value STRING) +CLUSTERED BY (id) INTO 3 BUCKETS +stored as orc +TBLPROPERTIES ('transactional' = 'true', +'transactional_properties'='insert_only'); +insert into orc_insert_only_minor values (1, 'A'); +insert into orc_insert_only_minor values (2, 'B'); +insert into orc_insert_only_minor values (3, 'C'); +ALTER TABLE orc_insert_only_minor COMPACT 'minor'; +insert into orc_insert_only_minor values (4, 'D'); +insert into orc_insert_only_minor values (5, 'E'); + +*/ From 3fe927dc0b2867c7c38982e5048d77d48e6e940c Mon Sep 17 00:00:00 2001 From: daidai Date: Sat, 21 Dec 2024 03:32:43 +0800 Subject: [PATCH 09/10] fix case --- .../test_hive_translation_insert_only.groovy | 49 ++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy b/regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy index a258d35c924e20..62a9727ed86e06 100644 --- a/regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy +++ b/regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy @@ -17,29 +17,34 @@ suite("test_hive_translation_insert_only", "p2,external,hive,external_remote,external_remote_hive") { - if (false) { - String hms_catalog_name = "test_hive_translation_insert_only" - sql """drop catalog if exists ${hms_catalog_name};""" - sql """ - CREATE CATALOG IF NOT EXISTS ${hms_catalog_name} - PROPERTIES ( - 'hive.version' = '3.1.3', - 'type'='hms' - ); - """ - - logger.info("catalog " + hms_catalog_name + " created") - sql """switch ${hms_catalog_name};""" - logger.info("switched to catalog " + hms_catalog_name) - sql """ use regression;""" - - qt_1 """ select * from text_insert_only order by id """ - qt_2 """ select * from parquet_insert_only_major order by id """ - qt_3 """ select * from orc_insert_only_minor order by id """ - - - sql """drop catalog ${hms_catalog_name};""" + String enabled = context.config.otherConfigs.get("enableExternalHudiTest") + //hudi hive use same catalog in p2. + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable test") } + + String props = context.config.otherConfigs.get("hudiEmrCatalog") + String hms_catalog_name = "test_hive_translation_insert_only" + + sql """drop catalog if exists ${hms_catalog_name};""" + sql """ + CREATE CATALOG IF NOT EXISTS ${hms_catalog_name} + PROPERTIES ( + ${props} + ,'hive.version' = '3.1.3' + ); + """ + + logger.info("catalog " + hms_catalog_name + " created") + sql """switch ${hms_catalog_name};""" + logger.info("switched to catalog " + hms_catalog_name) + sql """ use regression;""" + + qt_1 """ select * from text_insert_only order by id """ + qt_2 """ select * from parquet_insert_only_major order by id """ + qt_3 """ select * from orc_insert_only_minor order by id """ + + sql """drop catalog ${hms_catalog_name};""" } From dbf176541b8ff117c7791ec0614c2cc723de53f7 Mon Sep 17 00:00:00 2001 From: daidai Date: Thu, 26 Dec 2024 15:22:24 +0800 Subject: [PATCH 10/10] fix case2. --- .../external_table_p0/hive/test_transactional_hive.groovy | 2 ++ 1 file changed, 2 insertions(+) diff --git a/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy b/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy index 4cde3d06dcc623..4f7008ec1726fa 100644 --- a/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy +++ b/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy @@ -54,6 +54,8 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock } def test_acid = { + + sql """set enable_fallback_to_original_planner=false;""" try { sql """ select * from orc_to_acid_tb """ }catch( Exception e) {