Skip to content

Commit

Permalink
[fix](hive)fix hive insert only translaction table.(#45753)(#46385) (#…
Browse files Browse the repository at this point in the history
…46454)

### What problem does this PR solve?
bp #45753 : fix read hive insert only  Transaction table.
bp #46385  , #45999 : fix #45753  case unstable.
  • Loading branch information
hubgeter authored Jan 7, 2025
1 parent d5bfe01 commit cda7023
Show file tree
Hide file tree
Showing 11 changed files with 371 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,56 @@ insert into orc_full_acid_par PARTITION(part_col=20230102) values
(6, 'F');

update orc_full_acid_par set value = 'BB' where id = 2;




create table orc_to_acid_tb (id INT, value STRING)
PARTITIONED BY (part_col INT)
CLUSTERED BY (id) INTO 3 BUCKETS
STORED AS ORC;
INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=101) VALUES (1, 'A'), (3, 'C');
INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=102) VALUES (2, 'B');
ALTER TABLE orc_to_acid_tb SET TBLPROPERTIES ('transactional'='true');


create table orc_to_acid_compacted_tb (id INT, value STRING)
PARTITIONED BY (part_col INT)
CLUSTERED BY (id) INTO 3 BUCKETS
STORED AS ORC;
INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=101) VALUES (1, 'A'), (3, 'C');
INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (2, 'B');
ALTER TABLE orc_to_acid_compacted_tb SET TBLPROPERTIES ('transactional'='true');
ALTER TABLE orc_to_acid_compacted_tb partition(part_col='101') COMPACT 'major' and wait;
ALTER TABLE orc_to_acid_compacted_tb partition(part_col='102') COMPACT 'major' and wait;
INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (4, 'D');
update orc_to_acid_compacted_tb set value = "CC" where id = 3;
update orc_to_acid_compacted_tb set value = "BB" where id = 2;


create table orc_acid_minor (id INT, value STRING)
CLUSTERED BY (id) INTO 3 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional' = 'true');
insert into orc_acid_minor values (1, 'A');
insert into orc_acid_minor values (2, 'B');
insert into orc_acid_minor values (3, 'C');
update orc_acid_minor set value = "BB" where id = 2;
ALTER TABLE orc_acid_minor COMPACT 'minor' and wait;
insert into orc_acid_minor values (4, 'D');
update orc_acid_minor set value = "DD" where id = 4;
DELETE FROM orc_acid_minor WHERE id = 3;


create table orc_acid_major (id INT, value STRING)
CLUSTERED BY (id) INTO 3 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional' = 'true');
insert into orc_acid_major values (1, 'A');
insert into orc_acid_major values (2, 'B');
insert into orc_acid_major values (3, 'C');
update orc_acid_major set value = "BB" where id = 2;
ALTER TABLE orc_acid_major COMPACT 'minor' and wait;
insert into orc_acid_major values (4, 'D');
update orc_acid_major set value = "DD" where id = 4;
DELETE FROM orc_acid_major WHERE id = 3;
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
Expand Down Expand Up @@ -359,19 +360,24 @@ public Map<String, PartitionItem> getNameToPartitionItems() {
}

public boolean isHiveTransactionalTable() {
return dlaType == DLAType.HIVE && AcidUtils.isTransactionalTable(remoteTable)
&& isSupportedTransactionalFileFormat();
return dlaType == DLAType.HIVE && AcidUtils.isTransactionalTable(remoteTable);
}

private boolean isSupportedTransactionalFileFormat() {
private boolean isSupportedFullAcidTransactionalFileFormat() {
// Sometimes we meet "transactional" = "true" but format is parquet, which is not supported.
// So we need to check the input format for transactional table.
String inputFormatName = remoteTable.getSd().getInputFormat();
return inputFormatName != null && SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS.contains(inputFormatName);
}

public boolean isFullAcidTable() {
return dlaType == DLAType.HIVE && AcidUtils.isFullAcidTable(remoteTable);
public boolean isFullAcidTable() throws UserException {
if (dlaType == DLAType.HIVE && AcidUtils.isFullAcidTable(remoteTable)) {
if (!isSupportedFullAcidTransactionalFileFormat()) {
throw new UserException("This table is full Acid Table, but no Orc Format.");
}
return true;
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.common.util.CacheBulkLoader;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.datasource.hive.HiveUtil.ACIDFileFilter;
import org.apache.doris.datasource.hive.HiveUtil.FullAcidFileFilter;
import org.apache.doris.datasource.hive.HiveUtil.InsertOnlyACIDFileFilter;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.fs.FileSystemCache;
import org.apache.doris.fs.remote.RemoteFile;
Expand All @@ -55,7 +59,6 @@
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Iterables;
Expand All @@ -77,12 +80,10 @@
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -107,8 +108,6 @@ public class HiveMetaStoreCache {
// After hive 3, transactional table's will have file '_orc_acid_version' with value >= '2'.
public static final String HIVE_ORC_ACID_VERSION_FILE = "_orc_acid_version";

private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = "bucket_";

private final HMSExternalCatalog catalog;
private JobConf jobConf;
private final ExecutorService refreshExecutor;
Expand Down Expand Up @@ -742,19 +741,16 @@ public LoadingCache<PartitionCacheKey, HivePartition> getPartitionCache() {
public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions, ValidWriteIdList validWriteIds,
boolean isFullAcid, boolean skipCheckingAcidVersionFile, long tableId, String bindBrokerName) {
List<FileCacheValue> fileCacheValues = Lists.newArrayList();
String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME);
try {
for (HivePartition partition : partitions) {

AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(jobConf);
HadoopAuthenticator hadoopAuthenticator =
HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig);

FileCacheValue fileCacheValue = new FileCacheValue();
AcidUtils.Directory directory;
if (!Strings.isNullOrEmpty(remoteUser)) {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser);
directory = ugi.doAs((PrivilegedExceptionAction<AcidUtils.Directory>) () -> AcidUtils.getAcidState(
new Path(partition.getPath()), jobConf, validWriteIds, false, true));
} else {
directory = AcidUtils.getAcidState(new Path(partition.getPath()), jobConf, validWriteIds, false,
true);
}
AcidUtils.Directory directory = hadoopAuthenticator.doAs(() -> AcidUtils.getAcidState(
new Path(partition.getPath()), jobConf, validWriteIds, false, true));
if (directory == null) {
return Collections.emptyList();
}
Expand All @@ -775,7 +771,8 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
return Collections.emptyList();
}
if (!skipCheckingAcidVersionFile) {
String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString();
String acidVersionPath = new Path(
baseOrDeltaPath, HIVE_ORC_ACID_VERSION_FILE).toUri().toString();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(),
Expand All @@ -798,6 +795,8 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
}
}

ACIDFileFilter fileFilter = isFullAcid ? new FullAcidFileFilter() : new InsertOnlyACIDFileFilter();

// delta directories
List<DeleteDeltaInfo> deleteDeltas = new ArrayList<>();
for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) {
Expand All @@ -810,14 +809,14 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
if (delta.isDeleteDelta()) {
List<String> deleteDeltaFileNames = remoteFiles.stream().map(f -> f.getName()).filter(
name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
List<String> deleteDeltaFileNames = remoteFiles.stream()
.map(f -> f.getName()).filter(fileFilter::accept)
.collect(Collectors.toList());
deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames));
continue;
}
remoteFiles.stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> {
remoteFiles.stream().filter(f -> fileFilter.accept(f.getName()))
.forEach(file -> {
LocationPath path = new LocationPath(file.getPath().toString(),
catalog.getProperties());
fileCacheValue.addFile(file, path);
Expand All @@ -837,8 +836,7 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
remoteFiles.stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
remoteFiles.stream().filter(f -> fileFilter.accept(f.getName()))
.forEach(file -> {
LocationPath path = new LocationPath(file.getPath().toString(),
catalog.getProperties());
Expand All @@ -848,7 +846,12 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
throw new RuntimeException(status.getErrMsg());
}
}
fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas));

if (isFullAcid) {
fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas));
} else if (!deleteDeltas.isEmpty()) {
throw new RuntimeException("No Hive Full Acid Table have delete_delta_* Dir.");
}
fileCacheValues.add(fileCacheValue);
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -179,6 +180,25 @@ public boolean createTable(CreateTableStmt stmt) throws UserException {
props.put("owner", ConnectContext.get().getUserIdentity().getUser());
}
}

if (props.containsKey("transactional") && props.get("transactional").equals("true")) {
throw new UserException("Not support create hive transactional table.");
/*
CREATE TABLE trans6(
`col1` int,
`col2` int
) ENGINE=hive
PROPERTIES (
'file_format'='orc',
'compression'='zlib',
'bucketing_version'='2',
'transactional'='true',
'transactional_properties'='default'
);
In hive, this table only can insert not update(not report error,but not actually updated).
*/
}

String fileFormat = props.getOrDefault(FILE_FORMAT_KEY, Config.hive_default_file_format);
Map<String, String> ddlProps = new HashMap<>();
for (Map.Entry<String, String> entry : props.entrySet()) {
Expand Down Expand Up @@ -273,6 +293,11 @@ public void dropTable(DropTableStmt stmt) throws DdlException {
ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tblName, dbName);
}
}

if (AcidUtils.isTransactionalTable(client.getTable(dbName, tblName))) {
throw new DdlException("Not support drop hive transactional table.");
}

try {
client.dropTable(dbName, tblName);
db.setUnInitialized(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public final class HiveUtil {
public static final Set<String> SUPPORTED_TEXT_COMPRESSIONS =
ImmutableSet.of("plain", "gzip", "zstd", "bzip2", "lz4", "snappy");

public static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = "bucket_";
public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";

private HiveUtil() {
}

Expand Down Expand Up @@ -386,4 +389,23 @@ public static StorageDescriptor makeStorageDescriptorFromHivePartition(HiveParti

return sd;
}

public interface ACIDFileFilter {
public boolean accept(String fileName);
}

public static final class FullAcidFileFilter implements ACIDFileFilter {
@Override
public boolean accept(String fileName) {
return fileName.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)
&& !fileName.endsWith(DELTA_SIDE_FILE_SUFFIX);
}
}

public static final class InsertOnlyACIDFileFilter implements ACIDFileFilter {
@Override
public boolean accept(String fileName) {
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,16 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartiti
List<Split> allFiles, String bindBrokerName, int numBackends) throws IOException, UserException {
List<FileCacheValue> fileCaches;
if (hiveTransaction != null) {
fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName);
try {
fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName);
} catch (Exception e) {
// Release shared load (getValidWriteIds acquire Lock).
// If no exception is throw, the lock will be released when `finalizeQuery()`.
// TODO: merge HMSTransaction,HiveTransaction, HiveTransactionMgr,HiveTransactionManager
// and redesign the logic of this code.
Env.getCurrentHiveTransactionMgr().deregister(hiveTransaction.getQueryId());
throw e;
}
} else {
boolean withCache = Config.max_external_file_cache_num > 0;
fileCaches = cache.getFilesByPartitions(partitions, withCache, partitions.size() > 1, bindBrokerName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ private BuildInsertExecutorResult initPlanOnce(ConnectContext ctx,
} else if (physicalSink instanceof PhysicalHiveTableSink) {
boolean emptyInsert = childIsEmptyRelation(physicalSink);
HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf;
if (hiveExternalTable.isHiveTransactionalTable()) {
throw new AnalysisException("Not supported insert into hive transactional table.");
}
insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, label, planner,
Optional.of(insertCtx.orElse((new HiveInsertCommandContext()))), emptyInsert);
// set hive query options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,49 @@ F

-- !q05 --
0

-- !2 --
1 A 101
2 BB 102
3 CC 101
4 D 102

-- !3 --
1 A 101
3 CC 101

-- !4 --
2 BB 102
4 D 102

-- !5 --
1 A 101
2 BB 102

-- !6 --
4 D 102

-- !7 --
1 A
2 BB
4 DD

-- !10 --
1 A
2 BB

-- !11 --
4 DD

-- !12 --
1 A
2 BB
4 DD

-- !15 --
1 A
2 BB

-- !16 --
4 DD

Loading

0 comments on commit cda7023

Please sign in to comment.