From cda702319f4b47582ce38d3b906aaf1e5675d05d Mon Sep 17 00:00:00 2001 From: daidai Date: Tue, 7 Jan 2025 10:11:53 +0800 Subject: [PATCH] [fix](hive)fix hive insert only translaction table.(#45753)(#46385) (#46454) ### What problem does this PR solve? bp #45753 : fix read hive insert only Transaction table. bp #46385 , #45999 : fix #45753 case unstable. --- .../create_preinstalled_scripts/run25.hql | 53 ++++++++++++ .../datasource/hive/HMSExternalTable.java | 16 ++-- .../datasource/hive/HiveMetaStoreCache.java | 49 ++++++----- .../datasource/hive/HiveMetadataOps.java | 25 ++++++ .../doris/datasource/hive/HiveUtil.java | 22 +++++ .../datasource/hive/source/HiveScanNode.java | 11 ++- .../insert/InsertIntoTableCommand.java | 3 + .../hive/test_transactional_hive.out | 46 ++++++++++ .../test_hive_translation_insert_only.out | 20 +++++ .../hive/test_transactional_hive.groovy | 70 +++++++++++++++ .../test_hive_translation_insert_only.groovy | 85 +++++++++++++++++++ 11 files changed, 371 insertions(+), 29 deletions(-) create mode 100644 regression-test/data/external_table_p2/hive/test_hive_translation_insert_only.out create mode 100644 regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy diff --git a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql index 814df4cdc5ff90..66e73f51df8f4c 100755 --- a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql +++ b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql @@ -41,3 +41,56 @@ insert into orc_full_acid_par PARTITION(part_col=20230102) values (6, 'F'); update orc_full_acid_par set value = 'BB' where id = 2; + + + + +create table orc_to_acid_tb (id INT, value STRING) +PARTITIONED BY (part_col INT) +CLUSTERED BY (id) INTO 3 BUCKETS +STORED AS ORC; +INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=101) VALUES (1, 'A'), (3, 'C'); +INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=102) VALUES (2, 'B'); +ALTER TABLE orc_to_acid_tb SET TBLPROPERTIES ('transactional'='true'); + + +create table orc_to_acid_compacted_tb (id INT, value STRING) +PARTITIONED BY (part_col INT) +CLUSTERED BY (id) INTO 3 BUCKETS +STORED AS ORC; +INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=101) VALUES (1, 'A'), (3, 'C'); +INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (2, 'B'); +ALTER TABLE orc_to_acid_compacted_tb SET TBLPROPERTIES ('transactional'='true'); +ALTER TABLE orc_to_acid_compacted_tb partition(part_col='101') COMPACT 'major' and wait; +ALTER TABLE orc_to_acid_compacted_tb partition(part_col='102') COMPACT 'major' and wait; +INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (4, 'D'); +update orc_to_acid_compacted_tb set value = "CC" where id = 3; +update orc_to_acid_compacted_tb set value = "BB" where id = 2; + + +create table orc_acid_minor (id INT, value STRING) +CLUSTERED BY (id) INTO 3 BUCKETS +STORED AS ORC +TBLPROPERTIES ('transactional' = 'true'); +insert into orc_acid_minor values (1, 'A'); +insert into orc_acid_minor values (2, 'B'); +insert into orc_acid_minor values (3, 'C'); +update orc_acid_minor set value = "BB" where id = 2; +ALTER TABLE orc_acid_minor COMPACT 'minor' and wait; +insert into orc_acid_minor values (4, 'D'); +update orc_acid_minor set value = "DD" where id = 4; +DELETE FROM orc_acid_minor WHERE id = 3; + + +create table orc_acid_major (id INT, value STRING) +CLUSTERED BY (id) INTO 3 BUCKETS +STORED AS ORC +TBLPROPERTIES ('transactional' = 'true'); +insert into orc_acid_major values (1, 'A'); +insert into orc_acid_major values (2, 'B'); +insert into orc_acid_major values (3, 'C'); +update orc_acid_major set value = "BB" where id = 2; +ALTER TABLE orc_acid_major COMPACT 'minor' and wait; +insert into orc_acid_major values (4, 'D'); +update orc_acid_major set value = "DD" where id = 4; +DELETE FROM orc_acid_major WHERE id = 3; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 71c7308b079866..b554f508103992 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -30,6 +30,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; @@ -359,19 +360,24 @@ public Map getNameToPartitionItems() { } public boolean isHiveTransactionalTable() { - return dlaType == DLAType.HIVE && AcidUtils.isTransactionalTable(remoteTable) - && isSupportedTransactionalFileFormat(); + return dlaType == DLAType.HIVE && AcidUtils.isTransactionalTable(remoteTable); } - private boolean isSupportedTransactionalFileFormat() { + private boolean isSupportedFullAcidTransactionalFileFormat() { // Sometimes we meet "transactional" = "true" but format is parquet, which is not supported. // So we need to check the input format for transactional table. String inputFormatName = remoteTable.getSd().getInputFormat(); return inputFormatName != null && SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS.contains(inputFormatName); } - public boolean isFullAcidTable() { - return dlaType == DLAType.HIVE && AcidUtils.isFullAcidTable(remoteTable); + public boolean isFullAcidTable() throws UserException { + if (dlaType == DLAType.HIVE && AcidUtils.isFullAcidTable(remoteTable)) { + if (!isSupportedFullAcidTransactionalFileFormat()) { + throw new UserException("This table is full Acid Table, but no Orc Format."); + } + return true; + } + return false; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 90af6b3f394117..838005b47a9e3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -32,12 +32,16 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.common.util.CacheBulkLoader; import org.apache.doris.common.util.LocationPath; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.ExternalMetaCacheMgr; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; +import org.apache.doris.datasource.hive.HiveUtil.ACIDFileFilter; +import org.apache.doris.datasource.hive.HiveUtil.FullAcidFileFilter; +import org.apache.doris.datasource.hive.HiveUtil.InsertOnlyACIDFileFilter; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.fs.FileSystemCache; import org.apache.doris.fs.remote.RemoteFile; @@ -55,7 +59,6 @@ import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.Iterables; @@ -77,12 +80,10 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.net.URI; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -107,8 +108,6 @@ public class HiveMetaStoreCache { // After hive 3, transactional table's will have file '_orc_acid_version' with value >= '2'. public static final String HIVE_ORC_ACID_VERSION_FILE = "_orc_acid_version"; - private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = "bucket_"; - private final HMSExternalCatalog catalog; private JobConf jobConf; private final ExecutorService refreshExecutor; @@ -742,19 +741,16 @@ public LoadingCache getPartitionCache() { public List getFilesByTransaction(List partitions, ValidWriteIdList validWriteIds, boolean isFullAcid, boolean skipCheckingAcidVersionFile, long tableId, String bindBrokerName) { List fileCacheValues = Lists.newArrayList(); - String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME); try { for (HivePartition partition : partitions) { + + AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(jobConf); + HadoopAuthenticator hadoopAuthenticator = + HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig); + FileCacheValue fileCacheValue = new FileCacheValue(); - AcidUtils.Directory directory; - if (!Strings.isNullOrEmpty(remoteUser)) { - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser); - directory = ugi.doAs((PrivilegedExceptionAction) () -> AcidUtils.getAcidState( - new Path(partition.getPath()), jobConf, validWriteIds, false, true)); - } else { - directory = AcidUtils.getAcidState(new Path(partition.getPath()), jobConf, validWriteIds, false, - true); - } + AcidUtils.Directory directory = hadoopAuthenticator.doAs(() -> AcidUtils.getAcidState( + new Path(partition.getPath()), jobConf, validWriteIds, false, true)); if (directory == null) { return Collections.emptyList(); } @@ -775,7 +771,8 @@ public List getFilesByTransaction(List partitions return Collections.emptyList(); } if (!skipCheckingAcidVersionFile) { - String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString(); + String acidVersionPath = new Path( + baseOrDeltaPath, HIVE_ORC_ACID_VERSION_FILE).toUri().toString(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(), @@ -798,6 +795,8 @@ public List getFilesByTransaction(List partitions } } + ACIDFileFilter fileFilter = isFullAcid ? new FullAcidFileFilter() : new InsertOnlyACIDFileFilter(); + // delta directories List deleteDeltas = new ArrayList<>(); for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) { @@ -810,14 +809,14 @@ public List getFilesByTransaction(List partitions Status status = fs.listFiles(location, false, remoteFiles); if (status.ok()) { if (delta.isDeleteDelta()) { - List deleteDeltaFileNames = remoteFiles.stream().map(f -> f.getName()).filter( - name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) + List deleteDeltaFileNames = remoteFiles.stream() + .map(f -> f.getName()).filter(fileFilter::accept) .collect(Collectors.toList()); deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames)); continue; } - remoteFiles.stream().filter( - f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> { + remoteFiles.stream().filter(f -> fileFilter.accept(f.getName())) + .forEach(file -> { LocationPath path = new LocationPath(file.getPath().toString(), catalog.getProperties()); fileCacheValue.addFile(file, path); @@ -837,8 +836,7 @@ public List getFilesByTransaction(List partitions List remoteFiles = new ArrayList<>(); Status status = fs.listFiles(location, false, remoteFiles); if (status.ok()) { - remoteFiles.stream().filter( - f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) + remoteFiles.stream().filter(f -> fileFilter.accept(f.getName())) .forEach(file -> { LocationPath path = new LocationPath(file.getPath().toString(), catalog.getProperties()); @@ -848,7 +846,12 @@ public List getFilesByTransaction(List partitions throw new RuntimeException(status.getErrMsg()); } } - fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas)); + + if (isFullAcid) { + fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas)); + } else if (!deleteDeltas.isEmpty()) { + throw new RuntimeException("No Hive Full Acid Table have delete_delta_* Dir."); + } fileCacheValues.add(fileCacheValue); } } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index a660cb148ac069..c611462f10a90b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -46,6 +46,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -179,6 +180,25 @@ public boolean createTable(CreateTableStmt stmt) throws UserException { props.put("owner", ConnectContext.get().getUserIdentity().getUser()); } } + + if (props.containsKey("transactional") && props.get("transactional").equals("true")) { + throw new UserException("Not support create hive transactional table."); + /* + CREATE TABLE trans6( + `col1` int, + `col2` int + ) ENGINE=hive + PROPERTIES ( + 'file_format'='orc', + 'compression'='zlib', + 'bucketing_version'='2', + 'transactional'='true', + 'transactional_properties'='default' + ); + In hive, this table only can insert not update(not report error,but not actually updated). + */ + } + String fileFormat = props.getOrDefault(FILE_FORMAT_KEY, Config.hive_default_file_format); Map ddlProps = new HashMap<>(); for (Map.Entry entry : props.entrySet()) { @@ -273,6 +293,11 @@ public void dropTable(DropTableStmt stmt) throws DdlException { ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tblName, dbName); } } + + if (AcidUtils.isTransactionalTable(client.getTable(dbName, tblName))) { + throw new DdlException("Not support drop hive transactional table."); + } + try { client.dropTable(dbName, tblName); db.setUnInitialized(true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java index ac7dcadbc265e7..c6acb1dea8b337 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java @@ -70,6 +70,9 @@ public final class HiveUtil { public static final Set SUPPORTED_TEXT_COMPRESSIONS = ImmutableSet.of("plain", "gzip", "zstd", "bzip2", "lz4", "snappy"); + public static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = "bucket_"; + public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length"; + private HiveUtil() { } @@ -386,4 +389,23 @@ public static StorageDescriptor makeStorageDescriptorFromHivePartition(HiveParti return sd; } + + public interface ACIDFileFilter { + public boolean accept(String fileName); + } + + public static final class FullAcidFileFilter implements ACIDFileFilter { + @Override + public boolean accept(String fileName) { + return fileName.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX) + && !fileName.endsWith(DELTA_SIDE_FILE_SUFFIX); + } + } + + public static final class InsertOnlyACIDFileFilter implements ACIDFileFilter { + @Override + public boolean accept(String fileName) { + return true; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 0c53f971ea4179..1e09fa6d909309 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -267,7 +267,16 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List allFiles, String bindBrokerName, int numBackends) throws IOException, UserException { List fileCaches; if (hiveTransaction != null) { - fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName); + try { + fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName); + } catch (Exception e) { + // Release shared load (getValidWriteIds acquire Lock). + // If no exception is throw, the lock will be released when `finalizeQuery()`. + // TODO: merge HMSTransaction,HiveTransaction, HiveTransactionMgr,HiveTransactionManager + // and redesign the logic of this code. + Env.getCurrentHiveTransactionMgr().deregister(hiveTransaction.getQueryId()); + throw e; + } } else { boolean withCache = Config.max_external_file_cache_num > 0; fileCaches = cache.getFilesByPartitions(partitions, withCache, partitions.size() > 1, bindBrokerName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 023b205ac53bab..9048f6c3a0305c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -263,6 +263,9 @@ private BuildInsertExecutorResult initPlanOnce(ConnectContext ctx, } else if (physicalSink instanceof PhysicalHiveTableSink) { boolean emptyInsert = childIsEmptyRelation(physicalSink); HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf; + if (hiveExternalTable.isHiveTransactionalTable()) { + throw new AnalysisException("Not supported insert into hive transactional table."); + } insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, label, planner, Optional.of(insertCtx.orElse((new HiveInsertCommandContext()))), emptyInsert); // set hive query options diff --git a/regression-test/data/external_table_p0/hive/test_transactional_hive.out b/regression-test/data/external_table_p0/hive/test_transactional_hive.out index 2ad0446ccb7e25..060fa8c048e5a0 100644 --- a/regression-test/data/external_table_p0/hive/test_transactional_hive.out +++ b/regression-test/data/external_table_p0/hive/test_transactional_hive.out @@ -76,3 +76,49 @@ F -- !q05 -- 0 + +-- !2 -- +1 A 101 +2 BB 102 +3 CC 101 +4 D 102 + +-- !3 -- +1 A 101 +3 CC 101 + +-- !4 -- +2 BB 102 +4 D 102 + +-- !5 -- +1 A 101 +2 BB 102 + +-- !6 -- +4 D 102 + +-- !7 -- +1 A +2 BB +4 DD + +-- !10 -- +1 A +2 BB + +-- !11 -- +4 DD + +-- !12 -- +1 A +2 BB +4 DD + +-- !15 -- +1 A +2 BB + +-- !16 -- +4 DD + diff --git a/regression-test/data/external_table_p2/hive/test_hive_translation_insert_only.out b/regression-test/data/external_table_p2/hive/test_hive_translation_insert_only.out new file mode 100644 index 00000000000000..04fccc9d4c0a2d --- /dev/null +++ b/regression-test/data/external_table_p2/hive/test_hive_translation_insert_only.out @@ -0,0 +1,20 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !1 -- +1 A +2 B +3 C +4 D + +-- !2 -- +1 A +2 B +3 C +4 D +5 E + +-- !3 -- +1 A +2 B +3 C +4 D +5 E diff --git a/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy b/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy index dbe20395ec95ec..81f2358e9da8ff 100644 --- a/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy +++ b/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy @@ -52,6 +52,71 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock select count(*) from orc_full_acid_par_empty; """ } + + + def test_acid = { + try { + sql """SET enable_fallback_to_original_planner=false;""" + sql """ select * from orc_to_acid_tb """ + }catch( Exception e) { + logger.info("e.getMessage()" + e.getMessage()) + assertTrue(e.getMessage().contains("Original non-ACID files in transactional tables are not supported")); + } + + qt_2 """ select * from orc_to_acid_compacted_tb order by id """ + qt_3 """ select * from orc_to_acid_compacted_tb where part_col=101 order by id """ + qt_4 """ select * from orc_to_acid_compacted_tb where part_col=102 order by id """ + qt_5 """ select * from orc_to_acid_compacted_tb where id < 3 order by id """ + qt_6 """ select * from orc_to_acid_compacted_tb where id > 3 order by id """ + + + qt_7 """ select * from orc_acid_minor order by id """ + qt_10 """ select * from orc_acid_minor where id < 3 order by id """ + qt_11 """ select * from orc_acid_minor where id > 3 order by id """ + + + qt_12 """ select * from orc_acid_major order by id """ + qt_15 """ select * from orc_acid_major where id < 3 order by id """ + qt_16 """ select * from orc_acid_major where id > 3 order by id """ + } + + def test_acid_write = { + sql """set enable_fallback_to_original_planner=false;""" + + + + try { + sql """ + CREATE TABLE acid_tb ( + `col1` BOOLEAN COMMENT 'col1', + `col2` INT COMMENT 'col2' + ) ENGINE=hive + PROPERTIES ( + 'file_format'='orc', + 'compression'='zlib', + 'bucketing_version'='2', + 'transactional'='true', + 'transactional_properties'='default' + ); + """ + }catch( Exception e) { + assertTrue(e.getMessage().contains("Not support create hive transactional table.")); + } + try { + sql """ insert into orc_acid_major(id,value) values(1,"a1"); """ + }catch (Exception e) { + assertTrue(e.getMessage().contains("Not supported insert into hive transactional table.")); + } + + try { + sql """ drop table orc_acid_major; """ + } catch (Exception e) { + assertTrue(e.getMessage().contains("Not support drop hive transactional table.")); + + } + } + + String enabled = context.config.otherConfigs.get("enableHiveTest") if (enabled == null || !enabled.equalsIgnoreCase("true")) { logger.info("diable Hive test.") @@ -60,6 +125,7 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock for (String hivePrefix : ["hive3"]) { try { + String hdfs_port = context.config.otherConfigs.get(hivePrefix + "HdfsPort") String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") String catalog_name = "test_transactional_${hivePrefix}" String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") @@ -68,6 +134,7 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock sql """create catalog if not exists ${catalog_name} properties ( "type"="hms", 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}' + ,'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}' );""" sql """use `${catalog_name}`.`default`""" @@ -79,6 +146,9 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock q01() q01_par() + test_acid() + test_acid_write() + sql """drop catalog if exists ${catalog_name}""" } finally { } diff --git a/regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy b/regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy new file mode 100644 index 00000000000000..9b021e1dc81372 --- /dev/null +++ b/regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_translation_insert_only", "p2,external,hive,external_remote,external_remote_hive") { + + String enabled = context.config.otherConfigs.get("enableExternalHudiTest") + //hudi hive use same catalog in p2. + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable test") + return; + } + + String props = context.config.otherConfigs.get("hudiEmrCatalog") + String hms_catalog_name = "test_hive_translation_insert_only" + + sql """drop catalog if exists ${hms_catalog_name};""" + sql """ + CREATE CATALOG IF NOT EXISTS ${hms_catalog_name} + PROPERTIES ( + ${props} + ,'hive.version' = '3.1.3' + ); + """ + + logger.info("catalog " + hms_catalog_name + " created") + sql """switch ${hms_catalog_name};""" + logger.info("switched to catalog " + hms_catalog_name) + sql """ use regression;""" + + qt_1 """ select * from text_insert_only order by id """ + qt_2 """ select * from parquet_insert_only_major order by id """ + qt_3 """ select * from orc_insert_only_minor order by id """ + + sql """drop catalog ${hms_catalog_name};""" +} + + +/* +SET hive.support.concurrency=true; +SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +create table text_insert_only (id INT, value STRING) +ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' +TBLPROPERTIES ('transactional' = 'true', +'transactional_properties'='insert_only'); +insert into text_insert_only values (1, 'A'); +insert into text_insert_only values (2, 'B'); +insert into text_insert_only values (3, 'C'); +Load data xxx (4,D) +create table parquet_insert_only_major (id INT, value STRING) +CLUSTERED BY (id) INTO 3 BUCKETS +STORED AS parquet +TBLPROPERTIES ('transactional' = 'true', +'transactional_properties'='insert_only'); +insert into parquet_insert_only_major values (1, 'A'); +insert into parquet_insert_only_major values (2, 'B'); +insert into parquet_insert_only_major values (3, 'C'); +ALTER TABLE parquet_insert_only_major COMPACT 'major'; +insert into parquet_insert_only_major values (4, 'D'); +insert into parquet_insert_only_major values (5, 'E'); +create table orc_insert_only_minor (id INT, value STRING) +CLUSTERED BY (id) INTO 3 BUCKETS +stored as orc +TBLPROPERTIES ('transactional' = 'true', +'transactional_properties'='insert_only'); +insert into orc_insert_only_minor values (1, 'A'); +insert into orc_insert_only_minor values (2, 'B'); +insert into orc_insert_only_minor values (3, 'C'); +ALTER TABLE orc_insert_only_minor COMPACT 'minor'; +insert into orc_insert_only_minor values (4, 'D'); +insert into orc_insert_only_minor values (5, 'E'); +*/ \ No newline at end of file