Skip to content

Commit

Permalink
refactor code.
Browse files Browse the repository at this point in the history
  • Loading branch information
hubgeter committed Dec 3, 2024
1 parent 12bfe89 commit 2dec82f
Show file tree
Hide file tree
Showing 9 changed files with 490 additions and 244 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import org.apache.doris.datasource.TableMetadata;
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;

import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
Expand Down Expand Up @@ -83,9 +81,7 @@ NotificationEventResponse getNextNotification(long lastEventId,

void commitTxn(long txnId);

ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId);

ValidTxnList getValidTxns();
Map<String, String> getValidWriteIds(String fullTableName, long currentTransactionId);

void acquireSharedLock(String queryId, long txnId, String user, TableName tblName,
List<String> partitionNames, long timeoutMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.util.CacheBulkLoader;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.fs.FileSystemCache;
import org.apache.doris.fs.remote.RemoteFile;
Expand All @@ -54,6 +54,7 @@
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Iterables;
Expand All @@ -72,22 +73,20 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.utils.FileUtils;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -858,196 +857,36 @@ static ParsedDelta parseDelta(String fileName, String deltaPrefix, String path)
return new ParsedDelta(min, max, path, statementId, deleteDelta, visibilityId);
}

public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions, ValidWriteIdList validWriteIds,
ValidTxnList validTxnList,
boolean isFullAcid, boolean skipCheckingAcidVersionFile, long tableId, String bindBrokerName) {
public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions, Map<String, String> txnValidIds,
boolean isFullAcid, String bindBrokerName) {
List<FileCacheValue> fileCacheValues = Lists.newArrayList();
// String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME);
String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME);

try {
if (partitions.isEmpty()) {
return fileCacheValues;
}

for (HivePartition partition : partitions) {
RemoteFileSystem fsPar = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(partition.getPath(), bindBrokerName),
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));

Set<String> acidResult = new HashSet<>();
fsPar.listDirectories(partition.getPath(), acidResult);


List<String> originalFiles = new ArrayList<>();
List<ParsedDelta> workingDeltas = new ArrayList<>();
String oldestBase = null;
long oldestBaseWriteId = Long.MAX_VALUE;
String bestBasePath = null;
long bestBaseWriteId = 0;

for (String fileDirectory : acidResult) {
// checkArgument(fileDirectory.startsWith(partition.getPath()),
// "file '%s' does not start with directory '%s'",
// fileDirectory, partition.getPath());
String suffix = fileDirectory.substring(partition.getPath().length() + 1);
int slash = suffix.indexOf('/');
String name = (slash == -1) ? "" : suffix.substring(0, slash);


if (name.startsWith("base_")) {
ParsedBase base = parseBase(name);
if (!validTxnList.isTxnValid(base.visibilityId)) {
//checks visibilityTxnId to see if it is committed in current snapshot
continue;
}

long writeId = base.writeId;
if (oldestBaseWriteId > writeId) {
oldestBase = fileDirectory;
oldestBaseWriteId = writeId;
}

if (((bestBasePath == null) || (bestBaseWriteId < writeId))
&& isValidBase(base, validWriteIds)) {

bestBasePath = fileDirectory;
bestBaseWriteId = writeId;
}
} else if (name.startsWith("delta_") || name.startsWith("delete_delta_")) {
String deltaPrefix = name.startsWith("delta_") ? "delta_" : "delete_delta_";
ParsedDelta delta = parseDelta(name, deltaPrefix, fileDirectory);
if (validWriteIds.isWriteIdRangeValid(delta.min, delta.max) != RangeResponse.NONE) {
workingDeltas.add(delta);
}
} else {
originalFiles.add(fileDirectory);
}
}

if (bestBasePath == null && !originalFiles.isEmpty()) {
throw new Exception("For no acid table convert to acid, please COMPACT 'major'.");
}
originalFiles.clear();

if ((oldestBase != null) && (bestBasePath == null)) {
/*
* If here, it means there was a base_x (> 1 perhaps) but none were suitable for given
* {@link writeIdList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus
* cannot have any data for an open txn. We could check {@link deltas} has files to cover
* [1,n] w/o gaps but this would almost never happen...
*
* We only throw for base_x produced by Compactor since that base erases all history and
* cannot be used for a client that has a snapshot in which something inside this base is
* open. (Nor can we ignore this base of course) But base_x which is a result of IOW,
* contains all history so we treat it just like delta wrt visibility. Imagine, IOW which
* aborts. It creates a base_x, which can and should just be ignored.*/

long[] exceptions = validWriteIds.getInvalidWriteIds();
String minOpenWriteId = ((exceptions != null)
&& (exceptions.length > 0)) ? String.valueOf(exceptions[0]) : "x";
throw new IOException(
String.format("Not enough history available for ({},{}). Oldest available base: {}",
validWriteIds.getHighWatermark(), minOpenWriteId, oldestBase));
}

workingDeltas.sort(null);

List<ParsedDelta> deltas = new ArrayList<>();
long current = bestBaseWriteId;
int lastStatementId = -1;
ParsedDelta prev = null;
// find need read delta/delete_delta file.
for (ParsedDelta next : workingDeltas) {
if (next.max > current) {
if (validWriteIds.isWriteIdRangeValid(current + 1, next.max) != RangeResponse.NONE) {
deltas.add(next);
current = next.max;
lastStatementId = next.statementId;
prev = next;
}
} else if ((next.max == current) && (lastStatementId >= 0)) {
//make sure to get all deltas within a single transaction; multi-statement txn
//generate multiple delta files with the same txnId range
//of course, if maxWriteId has already been minor compacted,
// all per statement deltas are obsolete

deltas.add(next);
prev = next;
} else if ((prev != null)
&& (next.max == prev.max)
&& (next.min == prev.min)
&& (next.statementId == prev.statementId)) {
// The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta, except
// the path. This may happen when we have split update and we have two types of delta
// directories- 'delta_x_y' and 'delete_delta_x_y' for the SAME txn range.

// Also note that any delete_deltas in between a given delta_x_y range would be made
// obsolete. For example, a delta_30_50 would make delete_delta_40_40 obsolete.
// This is valid because minor compaction always compacts the normal deltas and the delete
// deltas for the same range. That is, if we had 3 directories, delta_30_30,
// delete_delta_40_40 and delta_50_50, then running minor compaction would produce
// delta_30_50 and delete_delta_30_50.
deltas.add(next);
prev = next;
}
}

RemoteFileSystem fileSystem = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(partitions.get(0).getPath(), bindBrokerName),
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));

FileCacheValue fileCacheValue = new FileCacheValue();

// delta directories
List<DeleteDeltaInfo> deleteDeltas = new ArrayList<>();
for (ParsedDelta delta : deltas) {
String location = delta.getPath();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(location, bindBrokerName),
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
if (delta.isDeleteDelta()) {
List<String> deleteDeltaFileNames = remoteFiles.stream().map(f -> f.getName()).filter(
name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.collect(Collectors.toList());
deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames));
continue;
}
remoteFiles.stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> {
LocationPath path = new LocationPath(file.getPath().toString(),
catalog.getProperties());
fileCacheValue.addFile(file, path);
});
} else {
throw new RuntimeException(status.getErrMsg());
}
}

// base
if (bestBasePath != null) {
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(bestBasePath, bindBrokerName),
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.listFiles(bestBasePath, false, remoteFiles);
if (status.ok()) {
remoteFiles.stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.forEach(file -> {
LocationPath path = new LocationPath(file.getPath().toString(),
catalog.getProperties());
fileCacheValue.addFile(file, path);
});
} else {
throw new RuntimeException(status.getErrMsg());
}
for (HivePartition partition : partitions) {
if (!Strings.isNullOrEmpty(remoteUser)) {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser);
fileCacheValues.add(
ugi.doAs((PrivilegedExceptionAction<FileCacheValue>) () -> AcidUtil.getAcidState(
fileSystem, partition, txnValidIds, catalog.getProperties()))
);
} else {
fileCacheValues.add(AcidUtil.getAcidState(
fileSystem, partition, txnValidIds, catalog.getProperties())
);
}
fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas));
fileCacheValues.add(fileCacheValue);
}
} catch (Exception e) {
throw new CacheException("failed to get input splits for write ids %s in catalog %s", e,
validWriteIds.toString(), catalog.getName());
throw new CacheException("Failed to get input splits %s", e, txnValidIds.toString());
}
return fileCacheValues;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
import org.apache.doris.common.UserException;

import com.google.common.collect.Lists;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;

import java.util.List;
import java.util.Map;

/**
* HiveTransaction is used to save info of a hive transaction.
Expand All @@ -41,7 +40,7 @@ public class HiveTransaction {
private long txnId;
private List<String> partitionNames = Lists.newArrayList();

ValidWriteIdList validWriteIdList = null;
Map<String, String> txnValidIds = null;

public HiveTransaction(String queryId, String user, HMSExternalTable hiveTable, boolean isFullAcid) {
this.queryId = queryId;
Expand All @@ -62,18 +61,14 @@ public boolean isFullAcid() {
return isFullAcid;
}

public ValidTxnList getValidTxns(HMSCachedClient client) {
return client.getValidTxns();
}

public ValidWriteIdList getValidWriteIds(HMSCachedClient client) {
if (validWriteIdList == null) {
public Map<String, String> getValidWriteIds(HMSCachedClient client) {
if (txnValidIds == null) {
TableName tableName = new TableName(hiveTable.getCatalog().getName(), hiveTable.getDbName(),
hiveTable.getName());
client.acquireSharedLock(queryId, txnId, user, tableName, partitionNames, 5000);
validWriteIdList = client.getValidWriteIds(tableName.getDb() + "." + tableName.getTbl(), txnId);
txnValidIds = client.getValidWriteIds(tableName.getDb() + "." + tableName.getTbl(), txnId);
}
return validWriteIdList;
return txnValidIds;
}

public void begin() throws UserException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import com.google.common.collect.ImmutableList.Builder;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
Expand Down Expand Up @@ -524,12 +522,7 @@ public void commitTxn(long txnId) {
}

@Override
public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) {
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
}

@Override
public ValidTxnList getValidTxns() {
public Map<String, String> getValidWriteIds(String fullTableName, long currentTransactionId) {
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
}

Expand Down
Loading

0 comments on commit 2dec82f

Please sign in to comment.