From 2dec82f2f9f9a4095b21c0f43083d18bd84e72cc Mon Sep 17 00:00:00 2001 From: daidai Date: Tue, 3 Dec 2024 18:45:27 +0800 Subject: [PATCH] refactor code. --- .../doris/datasource/hive/AcidUtil.java | 429 ++++++++++++++++++ .../datasource/hive/HMSCachedClient.java | 6 +- .../datasource/hive/HiveMetaStoreCache.java | 213 ++------- .../datasource/hive/HiveTransaction.java | 17 +- .../hive/PostgreSQLJdbcHMSCachedClient.java | 9 +- .../hive/ThriftHMSCachedClient.java | 31 +- .../datasource/hive/source/HiveScanNode.java | 18 +- .../doris/datasource/TestHMSCachedClient.java | 9 +- .../doris/datasource/hive/HiveAcidTest.java | 2 + 9 files changed, 490 insertions(+), 244 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java new file mode 100644 index 000000000000000..92493a14039940d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java @@ -0,0 +1,429 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.backup.Status; +import org.apache.doris.common.util.LocationPath; +import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; +import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue; +import org.apache.doris.fs.remote.RemoteFile; +import org.apache.doris.fs.remote.RemoteFileSystem; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class AcidUtil { + private static final Logger LOG = LogManager.getLogger(AcidUtil.class); + + public static final String VALID_TXNS_KEY = "hive.txn.valid.txns"; + public static final String VALID_WRITEIDS_KEY = "hive.txn.valid.writeids"; + + private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = "bucket_"; + + // An `_orc_acid_version` file is written to each base/delta/delete_delta dir written by a full acid write + // or compaction. This is the primary mechanism for versioning acid data. + // Each individual ORC file written stores the current version as a user property in ORC footer. All data files + // produced by Acid write should have this (starting with Hive 3.0), including those written by compactor.This is + // more for sanity checking in case someone moved the files around or something like that. + // In hive, methods for getting/reading the version from files were moved to test which is the only place they are + // used (after HIVE-23506), in order to keep devs out of temptation, since they access the FileSystem which + // is expensive. + // After `HIVE-23825: Create a flag to turn off _orc_acid_version file creation`, introduce variables to + // control whether to generate `_orc_acid_version` file. So don't need check this file exist. + private static final String HIVE_ORC_ACID_VERSION_FILE = "_orc_acid_version"; + + @Getter + @ToString + @EqualsAndHashCode + private static class ParsedBase { + private final long writeId; + private final long visibilityId; + + public ParsedBase(long writeId, long visibilityId) { + this.writeId = writeId; + this.visibilityId = visibilityId; + } + } + + private static ParsedBase parseBase(String name) { + //format1 : base_writeId + //format2 : base_writeId_visibilityId detail: https://issues.apache.org/jira/browse/HIVE-20823 + name = name.substring("base_".length()); + int index = name.indexOf("_v"); + if (index == -1) { + return new ParsedBase(Long.parseLong(name), 0); + } + return new ParsedBase( + Long.parseLong(name.substring(0, index)), + Long.parseLong(name.substring(index + 2))); + } + + @Getter + @ToString + @EqualsAndHashCode + private static class ParsedDelta implements Comparable { + private final long min; + private final long max; + private final String path; + private final int statementId; + private final boolean deleteDelta; + private final long visibilityId; + + public ParsedDelta(long min, long max, @NonNull String path, int statementId, + boolean deleteDelta, long visibilityId) { + this.min = min; + this.max = max; + this.path = path; + this.statementId = statementId; + this.deleteDelta = deleteDelta; + this.visibilityId = visibilityId; + } + + /* + * Smaller minWID orders first; + * If minWID is the same, larger maxWID orders first; + * Otherwise, sort by stmtID; files w/o stmtID orders first. + * + * Compactions (Major/Minor) merge deltas/bases but delete of old files + * happens in a different process; thus it's possible to have bases/deltas with + * overlapping writeId boundaries. The sort order helps figure out the "best" set of files + * to use to get data. + * This sorts "wider" delta before "narrower" i.e. delta_5_20 sorts before delta_5_10 (and delta_11_20) + */ + @Override + public int compareTo(ParsedDelta other) { + return min != other.min ? Long.compare(min, other.min) : + other.max != max ? Long.compare(other.max, max) : + statementId != other.statementId + ? Integer.compare(statementId, other.statementId) : + path.compareTo(other.path); + } + } + + + private static boolean isValidMetaDataFile(RemoteFileSystem fileSystem, String baseDir) + throws IOException { + String fileLocation = baseDir + "_metadata_acid"; + Status status = fileSystem.exists(fileLocation); + if (status != Status.OK) { + return false; + } + //In order to save the cost of reading the file content, we only check whether the file exists. + // File Contents: {"thisFileVersion":"0","dataFormat":"compacted"} + // + // Map metadata; + // try (var in = read(fileLocation)) { + // metadata = new ObjectMapper().readValue(in, new TypeReference<>() {}); + // } + // catch (IOException e) { + // throw new IOException(String.format("Failed to read %s: %s", fileLocation, e.getMessage()), e); + // } + // + // String version = metadata.get("thisFileVersion"); + // if (!"0".equals(version)) { + // throw new IOException("Unexpected ACID metadata version: " + version); + // } + // + // String format = metadata.get("dataFormat"); + // if (!"compacted".equals(format)) { + // throw new IOException("Unexpected value for ACID dataFormat: " + format); + // } + return true; + } + + private static boolean isValidBase(RemoteFileSystem remoteFileSystem, String baseDir, + ParsedBase base, ValidWriteIdList writeIdList) throws IOException { + if (base.writeId == Long.MIN_VALUE) { + //Ref: https://issues.apache.org/jira/browse/HIVE-13369 + //such base is created by 1st compaction in case of non-acid to acid table conversion.(you + //will get dir: `base_-9223372036854775808`) + //By definition there are no open txns with id < 1. + //After this: https://issues.apache.org/jira/browse/HIVE-18192, txns(global transaction ID) => writeId. + return true; + } + + // hive 4 : just check "_v" suffix, before hive 4 : check `_metadata_acid` file in baseDir. + if ((base.visibilityId > 0) || isValidMetaDataFile(remoteFileSystem, baseDir)) { + return writeIdList.isValidBase(base.writeId); + } + + // if here, it's a result of IOW + return writeIdList.isWriteIdValid(base.writeId); + } + + private static ParsedDelta parseDelta(String fileName, String deltaPrefix, String path) { + // format1: delta_min_max_statementId_visibilityId, delete_delta_min_max_statementId_visibilityId + // _visibilityId maybe not exists. + // detail: https://issues.apache.org/jira/browse/HIVE-20823 + // format2: delta_min_max_visibilityId, delete_delta_min_visibilityId + // when minor compaction runs, we collapse per statement delta files inside a single + // transaction so we no longer need a statementId in the file name + + // String fileName = fileName.substring(name.lastIndexOf('/') + 1); + // checkArgument(fileName.startsWith(deltaPrefix), "File does not start with '%s': %s", deltaPrefix, path); + + long visibilityId = 0; + int visibilityIdx = fileName.indexOf("_v"); + if (visibilityIdx != -1) { + visibilityId = Long.parseLong(fileName.substring(visibilityIdx + 2)); + fileName = fileName.substring(0, visibilityIdx); + } + + boolean deleteDelta = deltaPrefix.equals("delete_delta_"); + + String rest = fileName.substring(deltaPrefix.length()); + int split = rest.indexOf('_'); + int split2 = rest.indexOf('_', split + 1); + long min = Long.parseLong(rest.substring(0, split)); + + if (split2 == -1) { + long max = Long.parseLong(rest.substring(split + 1)); + return new ParsedDelta(min, max, fileName, -1, deleteDelta, visibilityId); + } + + long max = Long.parseLong(rest.substring(split + 1, split2)); + int statementId = Integer.parseInt(rest.substring(split2 + 1)); + return new ParsedDelta(min, max, path, statementId, deleteDelta, visibilityId); + } + + //Since the hive3 library cannot read the hive4 transaction table normally, and there are many problems + // when using the Hive 4 library directly, this method is implemented. + //Ref: hive/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java#getAcidState + public static FileCacheValue getAcidState(RemoteFileSystem fileSystem, HivePartition partition, + Map txnValidIds, Map catalogProps) throws Exception { + + // Ref: https://issues.apache.org/jira/browse/HIVE-18192 + // Readers should use the combination of ValidTxnList and ValidWriteIdList(Table) for snapshot isolation. + // ValidReadTxnList implements ValidTxnList + // ValidReaderWriteIdList implements ValidWriteIdList + ValidTxnList validTxnList = null; + if (txnValidIds.containsKey(VALID_TXNS_KEY)) { + validTxnList = new ValidReadTxnList(); + validTxnList.readFromString( + txnValidIds.get(VALID_TXNS_KEY) + ); + } else { + throw new RuntimeException("Miss ValidTxnList"); + } + + ValidWriteIdList validWriteIdList = null; + if (txnValidIds.containsKey(VALID_WRITEIDS_KEY)) { + validWriteIdList = new ValidReaderWriteIdList(); + validWriteIdList.readFromString( + txnValidIds.get(VALID_WRITEIDS_KEY) + ); + } else { + throw new RuntimeException("Miss ValidWriteIdList"); + } + + String partitionPath = partition.getPath(); + //hdfs://xxxxx/user/hive/warehouse/username/data_id=200103 + + List lsPartitionPath = new ArrayList<>(); + Status status = fileSystem.globList(partitionPath + "/*", lsPartitionPath); + // List all files and folders, without recursion. + // FileStatus[] lsPartitionPath = null; + // fileSystem.listFileStatuses(partitionPath,lsPartitionPath); + if (status != Status.OK) { + throw new IOException(status.toString()); + } + + String oldestBase = null; + long oldestBaseWriteId = Long.MAX_VALUE; + String bestBasePath = null; + long bestBaseWriteId = 0; + boolean haveOriginalFiles = false; + List workingDeltas = new ArrayList<>(); + + for (RemoteFile remotePath : lsPartitionPath) { + if (remotePath.isDirectory()) { + String dirName = remotePath.getName(); //dirName: base_xxx,delta_xxx,... + String dirPath = partitionPath + "/" + dirName; + + if (dirName.startsWith("base_")) { + ParsedBase base = parseBase(dirName); + if (!validTxnList.isTxnValid(base.visibilityId)) { + //checks visibilityTxnId to see if it is committed in current snapshot. + continue; + } + + long writeId = base.writeId; + if (oldestBaseWriteId > writeId) { + oldestBase = dirPath; + oldestBaseWriteId = writeId; + } + + if (((bestBasePath == null) || (bestBaseWriteId < writeId)) + && isValidBase(fileSystem, dirPath, base, validWriteIdList)) { + //IOW will generator a base_N/ directory: https://issues.apache.org/jira/browse/HIVE-14988 + //So maybe need consider: https://issues.apache.org/jira/browse/HIVE-25777 + + bestBasePath = dirPath; + bestBaseWriteId = writeId; + } + } else if (dirName.startsWith("delta_") || dirName.startsWith("delete_delta_")) { + String deltaPrefix = dirName.startsWith("delta_") ? "delta_" : "delete_delta_"; + ParsedDelta delta = parseDelta(dirName, deltaPrefix, dirPath); + + if (!validTxnList.isTxnValid(delta.visibilityId)) { + continue; + } + + // No need check (validWriteIdList.isWriteIdRangeAborted(min,max) != RangeResponse.ALL) + // It is a subset of (validWriteIdList.isWriteIdRangeValid(min, max) != RangeResponse.NONE) + if (validWriteIdList.isWriteIdRangeValid(delta.min, delta.max) != RangeResponse.NONE) { + workingDeltas.add(delta); + } + } else { + //Sometimes hive will generate temporary directories(`.hive-staging_hive_xxx` ), + // which do not need to be read. + LOG.warn("Read Hive Acid Table ignore the contents of this folder:" + dirName); + } + } else { + haveOriginalFiles = true; + } + } + + if (bestBasePath == null && haveOriginalFiles) { + // ALTER TABLE nonAcidTbl SET TBLPROPERTIES ('transactional'='true'); + throw new UnsupportedOperationException("For no acid table convert to acid, please COMPACT 'major'."); + } + + if ((oldestBase != null) && (bestBasePath == null)) { + /* + * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given + * {@link writeIdList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus + * cannot have any data for an open txn. We could check {@link deltas} has files to cover + * [1,n] w/o gaps but this would almost never happen... + * + * We only throw for base_x produced by Compactor since that base erases all history and + * cannot be used for a client that has a snapshot in which something inside this base is + * open. (Nor can we ignore this base of course) But base_x which is a result of IOW, + * contains all history so we treat it just like delta wrt visibility. Imagine, IOW which + * aborts. It creates a base_x, which can and should just be ignored.*/ + long[] exceptions = validWriteIdList.getInvalidWriteIds(); + String minOpenWriteId = ((exceptions != null) + && (exceptions.length > 0)) ? String.valueOf(exceptions[0]) : "x"; + throw new IOException( + String.format("Not enough history available for ({},{}). Oldest available base: {}", + validWriteIdList.getHighWatermark(), minOpenWriteId, oldestBase)); + } + + workingDeltas.sort(null); + + List deltas = new ArrayList<>(); + long current = bestBaseWriteId; + int lastStatementId = -1; + ParsedDelta prev = null; + // find need read delta/delete_delta file. + for (ParsedDelta next : workingDeltas) { + if (next.max > current) { + if (validWriteIdList.isWriteIdRangeValid(current + 1, next.max) != RangeResponse.NONE) { + deltas.add(next); + current = next.max; + lastStatementId = next.statementId; + prev = next; + } + } else if ((next.max == current) && (lastStatementId >= 0)) { + //make sure to get all deltas within a single transaction; multi-statement txn + //generate multiple delta files with the same txnId range + //of course, if maxWriteId has already been minor compacted, + // all per statement deltas are obsolete + + deltas.add(next); + prev = next; + } else if ((prev != null) + && (next.max == prev.max) + && (next.min == prev.min) + && (next.statementId == prev.statementId)) { + // The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta, except + // the path. This may happen when we have split update and we have two types of delta + // directories- 'delta_x_y' and 'delete_delta_x_y' for the SAME txn range. + + // Also note that any delete_deltas in between a given delta_x_y range would be made + // obsolete. For example, a delta_30_50 would make delete_delta_40_40 obsolete. + // This is valid because minor compaction always compacts the normal deltas and the delete + // deltas for the same range. That is, if we had 3 directories, delta_30_30, + // delete_delta_40_40 and delta_50_50, then running minor compaction would produce + // delta_30_50 and delete_delta_30_50. + deltas.add(next); + prev = next; + } + } + + FileCacheValue fileCacheValue = new FileCacheValue(); + List deleteDeltas = new ArrayList<>(); + + // delta directories + for (ParsedDelta delta : deltas) { + String location = delta.getPath(); + + List remoteFiles = new ArrayList<>(); + status = fileSystem.listFiles(location, false, remoteFiles); + if (status.ok()) { + if (delta.isDeleteDelta()) { + List deleteDeltaFileNames = remoteFiles.stream().map(f -> f.getName()).filter( + name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) + .collect(Collectors.toList()); + deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames)); + continue; + } + remoteFiles.stream().filter( + f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> { + LocationPath path = new LocationPath(file.getPath().toString(), catalogProps); + fileCacheValue.addFile(file, path); + }); + } else { + throw new RuntimeException(status.getErrMsg()); + } + } + + // base + if (bestBasePath != null) { + List remoteFiles = new ArrayList<>(); + status = fileSystem.listFiles(bestBasePath, false, remoteFiles); + if (status.ok()) { + remoteFiles.stream().filter( + f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) + .forEach(file -> { + LocationPath path = new LocationPath(file.getPath().toString(), catalogProps); + fileCacheValue.addFile(file, path); + }); + } else { + throw new RuntimeException(status.getErrMsg()); + } + } + fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas)); + return fileCacheValue; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java index b104fbc2cdb4384..fcaefecf41b1567 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java @@ -23,8 +23,6 @@ import org.apache.doris.datasource.TableMetadata; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; -import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; @@ -83,9 +81,7 @@ NotificationEventResponse getNextNotification(long lastEventId, void commitTxn(long txnId); - ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId); - - ValidTxnList getValidTxns(); + Map getValidWriteIds(String fullTableName, long currentTransactionId); void acquireSharedLock(String queryId, long txnId, String user, TableName tblName, List partitionNames, long timeoutMs); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 709ead42452d7e9..455f0b5cf78706c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -31,12 +31,12 @@ import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; +import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.util.CacheBulkLoader; import org.apache.doris.common.util.LocationPath; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.ExternalMetaCacheMgr; -import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.fs.FileSystemCache; import org.apache.doris.fs.remote.RemoteFile; @@ -54,6 +54,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.Iterables; @@ -72,22 +73,20 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; -import org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.utils.FileUtils; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.IOException; import java.net.URI; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -858,196 +857,36 @@ static ParsedDelta parseDelta(String fileName, String deltaPrefix, String path) return new ParsedDelta(min, max, path, statementId, deleteDelta, visibilityId); } - public List getFilesByTransaction(List partitions, ValidWriteIdList validWriteIds, - ValidTxnList validTxnList, - boolean isFullAcid, boolean skipCheckingAcidVersionFile, long tableId, String bindBrokerName) { + public List getFilesByTransaction(List partitions, Map txnValidIds, + boolean isFullAcid, String bindBrokerName) { List fileCacheValues = Lists.newArrayList(); - // String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME); + String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME); try { + if (partitions.isEmpty()) { + return fileCacheValues; + } - for (HivePartition partition : partitions) { - RemoteFileSystem fsPar = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey( - LocationPath.getFSIdentity(partition.getPath(), bindBrokerName), - catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); - - Set acidResult = new HashSet<>(); - fsPar.listDirectories(partition.getPath(), acidResult); - - - List originalFiles = new ArrayList<>(); - List workingDeltas = new ArrayList<>(); - String oldestBase = null; - long oldestBaseWriteId = Long.MAX_VALUE; - String bestBasePath = null; - long bestBaseWriteId = 0; - - for (String fileDirectory : acidResult) { - // checkArgument(fileDirectory.startsWith(partition.getPath()), - // "file '%s' does not start with directory '%s'", - // fileDirectory, partition.getPath()); - String suffix = fileDirectory.substring(partition.getPath().length() + 1); - int slash = suffix.indexOf('/'); - String name = (slash == -1) ? "" : suffix.substring(0, slash); - - - if (name.startsWith("base_")) { - ParsedBase base = parseBase(name); - if (!validTxnList.isTxnValid(base.visibilityId)) { - //checks visibilityTxnId to see if it is committed in current snapshot - continue; - } - - long writeId = base.writeId; - if (oldestBaseWriteId > writeId) { - oldestBase = fileDirectory; - oldestBaseWriteId = writeId; - } - - if (((bestBasePath == null) || (bestBaseWriteId < writeId)) - && isValidBase(base, validWriteIds)) { - - bestBasePath = fileDirectory; - bestBaseWriteId = writeId; - } - } else if (name.startsWith("delta_") || name.startsWith("delete_delta_")) { - String deltaPrefix = name.startsWith("delta_") ? "delta_" : "delete_delta_"; - ParsedDelta delta = parseDelta(name, deltaPrefix, fileDirectory); - if (validWriteIds.isWriteIdRangeValid(delta.min, delta.max) != RangeResponse.NONE) { - workingDeltas.add(delta); - } - } else { - originalFiles.add(fileDirectory); - } - } - - if (bestBasePath == null && !originalFiles.isEmpty()) { - throw new Exception("For no acid table convert to acid, please COMPACT 'major'."); - } - originalFiles.clear(); - - if ((oldestBase != null) && (bestBasePath == null)) { - /* - * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given - * {@link writeIdList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus - * cannot have any data for an open txn. We could check {@link deltas} has files to cover - * [1,n] w/o gaps but this would almost never happen... - * - * We only throw for base_x produced by Compactor since that base erases all history and - * cannot be used for a client that has a snapshot in which something inside this base is - * open. (Nor can we ignore this base of course) But base_x which is a result of IOW, - * contains all history so we treat it just like delta wrt visibility. Imagine, IOW which - * aborts. It creates a base_x, which can and should just be ignored.*/ - - long[] exceptions = validWriteIds.getInvalidWriteIds(); - String minOpenWriteId = ((exceptions != null) - && (exceptions.length > 0)) ? String.valueOf(exceptions[0]) : "x"; - throw new IOException( - String.format("Not enough history available for ({},{}). Oldest available base: {}", - validWriteIds.getHighWatermark(), minOpenWriteId, oldestBase)); - } - - workingDeltas.sort(null); - - List deltas = new ArrayList<>(); - long current = bestBaseWriteId; - int lastStatementId = -1; - ParsedDelta prev = null; - // find need read delta/delete_delta file. - for (ParsedDelta next : workingDeltas) { - if (next.max > current) { - if (validWriteIds.isWriteIdRangeValid(current + 1, next.max) != RangeResponse.NONE) { - deltas.add(next); - current = next.max; - lastStatementId = next.statementId; - prev = next; - } - } else if ((next.max == current) && (lastStatementId >= 0)) { - //make sure to get all deltas within a single transaction; multi-statement txn - //generate multiple delta files with the same txnId range - //of course, if maxWriteId has already been minor compacted, - // all per statement deltas are obsolete - - deltas.add(next); - prev = next; - } else if ((prev != null) - && (next.max == prev.max) - && (next.min == prev.min) - && (next.statementId == prev.statementId)) { - // The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta, except - // the path. This may happen when we have split update and we have two types of delta - // directories- 'delta_x_y' and 'delete_delta_x_y' for the SAME txn range. - - // Also note that any delete_deltas in between a given delta_x_y range would be made - // obsolete. For example, a delta_30_50 would make delete_delta_40_40 obsolete. - // This is valid because minor compaction always compacts the normal deltas and the delete - // deltas for the same range. That is, if we had 3 directories, delta_30_30, - // delete_delta_40_40 and delta_50_50, then running minor compaction would produce - // delta_30_50 and delete_delta_30_50. - deltas.add(next); - prev = next; - } - } - + RemoteFileSystem fileSystem = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( + new FileSystemCache.FileSystemCacheKey( + LocationPath.getFSIdentity(partitions.get(0).getPath(), bindBrokerName), + catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); - FileCacheValue fileCacheValue = new FileCacheValue(); - - // delta directories - List deleteDeltas = new ArrayList<>(); - for (ParsedDelta delta : deltas) { - String location = delta.getPath(); - RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey( - LocationPath.getFSIdentity(location, bindBrokerName), - catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); - List remoteFiles = new ArrayList<>(); - Status status = fs.listFiles(location, false, remoteFiles); - if (status.ok()) { - if (delta.isDeleteDelta()) { - List deleteDeltaFileNames = remoteFiles.stream().map(f -> f.getName()).filter( - name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) - .collect(Collectors.toList()); - deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames)); - continue; - } - remoteFiles.stream().filter( - f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> { - LocationPath path = new LocationPath(file.getPath().toString(), - catalog.getProperties()); - fileCacheValue.addFile(file, path); - }); - } else { - throw new RuntimeException(status.getErrMsg()); - } - } - - // base - if (bestBasePath != null) { - RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey( - LocationPath.getFSIdentity(bestBasePath, bindBrokerName), - catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); - List remoteFiles = new ArrayList<>(); - Status status = fs.listFiles(bestBasePath, false, remoteFiles); - if (status.ok()) { - remoteFiles.stream().filter( - f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) - .forEach(file -> { - LocationPath path = new LocationPath(file.getPath().toString(), - catalog.getProperties()); - fileCacheValue.addFile(file, path); - }); - } else { - throw new RuntimeException(status.getErrMsg()); - } + for (HivePartition partition : partitions) { + if (!Strings.isNullOrEmpty(remoteUser)) { + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser); + fileCacheValues.add( + ugi.doAs((PrivilegedExceptionAction) () -> AcidUtil.getAcidState( + fileSystem, partition, txnValidIds, catalog.getProperties())) + ); + } else { + fileCacheValues.add(AcidUtil.getAcidState( + fileSystem, partition, txnValidIds, catalog.getProperties()) + ); } - fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas)); - fileCacheValues.add(fileCacheValue); } } catch (Exception e) { - throw new CacheException("failed to get input splits for write ids %s in catalog %s", e, - validWriteIds.toString(), catalog.getName()); + throw new CacheException("Failed to get input splits %s", e, txnValidIds.toString()); } return fileCacheValues; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java index 8214ea13a92351c..7d62af296114473 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java @@ -21,10 +21,9 @@ import org.apache.doris.common.UserException; import com.google.common.collect.Lists; -import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidWriteIdList; import java.util.List; +import java.util.Map; /** * HiveTransaction is used to save info of a hive transaction. @@ -41,7 +40,7 @@ public class HiveTransaction { private long txnId; private List partitionNames = Lists.newArrayList(); - ValidWriteIdList validWriteIdList = null; + Map txnValidIds = null; public HiveTransaction(String queryId, String user, HMSExternalTable hiveTable, boolean isFullAcid) { this.queryId = queryId; @@ -62,18 +61,14 @@ public boolean isFullAcid() { return isFullAcid; } - public ValidTxnList getValidTxns(HMSCachedClient client) { - return client.getValidTxns(); - } - - public ValidWriteIdList getValidWriteIds(HMSCachedClient client) { - if (validWriteIdList == null) { + public Map getValidWriteIds(HMSCachedClient client) { + if (txnValidIds == null) { TableName tableName = new TableName(hiveTable.getCatalog().getName(), hiveTable.getDbName(), hiveTable.getName()); client.acquireSharedLock(queryId, txnId, user, tableName, partitionNames, 5000); - validWriteIdList = client.getValidWriteIds(tableName.getDb() + "." + tableName.getTbl(), txnId); + txnValidIds = client.getValidWriteIds(tableName.getDb() + "." + tableName.getTbl(), txnId); } - return validWriteIdList; + return txnValidIds; } public void begin() throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java index d9848a6e87d465d..672afa96208d652 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java @@ -32,8 +32,6 @@ import com.google.common.collect.ImmutableList.Builder; import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.NotImplementedException; -import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; @@ -524,12 +522,7 @@ public void commitTxn(long txnId) { } @Override - public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) { - throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient."); - } - - @Override - public ValidTxnList getValidTxns() { + public Map getValidWriteIds(String fullTableName, long currentTransactionId) { throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java index 97ed2bc628cdb34..9137c977f179c9b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java @@ -31,6 +31,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; @@ -564,7 +565,8 @@ public void acquireSharedLock(String queryId, long txnId, String user, TableName } @Override - public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) { + public Map getValidWriteIds(String fullTableName, long currentTransactionId) { + Map conf = new HashMap<>(); try (ThriftHMSClient client = getClient()) { try { return ugiDoAs(() -> { @@ -581,7 +583,10 @@ public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTrans ValidTxnWriteIdList validTxnWriteIdList = TxnUtils.createValidTxnWriteIdList(currentTransactionId, tableValidWriteIdsList); ValidWriteIdList writeIdList = validTxnWriteIdList.getTableValidWriteIdList(fullTableName); - return writeIdList; + + conf.put(AcidUtil.VALID_TXNS_KEY, validTransactions.writeToString()); + conf.put(AcidUtil.VALID_WRITEIDS_KEY, writeIdList.writeToString()); + return conf; }); } catch (Exception e) { client.setThrowable(e); @@ -591,22 +596,14 @@ public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTrans // Ignore this exception when the version of hive is not compatible with these apis. // Currently, the workaround is using a max watermark. LOG.warn("failed to get valid write ids for {}, transaction {}", fullTableName, currentTransactionId, e); - return new ValidReaderWriteIdList(fullTableName, new long[0], new BitSet(), Long.MAX_VALUE); - } - } - @Override - public ValidTxnList getValidTxns() { - try (ThriftHMSClient client = getClient()) { - try { - return ugiDoAs(client.client::getValidTxns); - } catch (Exception e) { - client.setThrowable(e); - throw e; - } - } catch (Exception e) { - throw new HMSClientException("Catalog Get the transactions that " - + "are currently valid fail. Exception = {}", e); + ValidTxnList validTransactions = new ValidReadTxnList( + new long[0], new BitSet(), Long.MAX_VALUE, Long.MAX_VALUE); + ValidWriteIdList writeIdList = new ValidReaderWriteIdList( + fullTableName, new long[0], new BitSet(), Long.MAX_VALUE); + conf.put(AcidUtil.VALID_TXNS_KEY, validTransactions.writeToString()); + conf.put(AcidUtil.VALID_WRITEIDS_KEY, writeIdList.writeToString()); + return conf; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 60cf1861b56e483..9c26d8aa221deab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -55,8 +55,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.Setter; -import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.logging.log4j.LogManager; @@ -267,7 +265,14 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List allFiles, String bindBrokerName) throws IOException { List fileCaches; if (hiveTransaction != null) { - fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName); + try { + fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName); + } catch (Exception e) { + // Release shared load (getValidWriteIds acquire Lock). + // If no exception is throw, the lock will be released when `finalizeQuery()`. + Env.getCurrentHiveTransactionMgr().deregister(hiveTransaction.getQueryId()); + throw e; + } } else { boolean withCache = Config.max_external_file_cache_num > 0; fileCaches = cache.getFilesByPartitions(partitions, withCache, partitions.size() > 1, bindBrokerName); @@ -344,13 +349,10 @@ private List getFileSplitByTransaction(HiveMetaStoreCache cache, } hiveTransaction.addPartition(partition.getPartitionName(hmsTable.getPartitionColumns())); } - ValidWriteIdList validWriteIds = hiveTransaction.getValidWriteIds( - ((HMSExternalCatalog) hmsTable.getCatalog()).getClient()); - ValidTxnList validTxnList = hiveTransaction.getValidTxns( + Map txnValidIds = hiveTransaction.getValidWriteIds( ((HMSExternalCatalog) hmsTable.getCatalog()).getClient()); - return cache.getFilesByTransaction(partitions, validWriteIds, validTxnList, - hiveTransaction.isFullAcid(), skipCheckingAcidVersionFile, hmsTable.getId(), bindBrokerName); + return cache.getFilesByTransaction(partitions, txnValidIds, hiveTransaction.isFullAcid(), bindBrokerName); } @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java index ea7c293f04b75d5..5d4474d77ec21ef 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java @@ -28,8 +28,6 @@ import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; import com.google.common.collect.ImmutableList; -import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; @@ -203,12 +201,7 @@ public void commitTxn(long txnId) { } @Override - public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) { - return null; - } - - @Override - public ValidTxnList getValidTxns() { + public Map getValidWriteIds(String fullTableName, long currentTransactionId) { return null; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java new file mode 100644 index 000000000000000..14605861bce680f --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java @@ -0,0 +1,2 @@ +package org.apache.doris.datasource.hive;public class HiveAcidTest { +}