From 2a523cff718c6ca78c86c99106cae2fc31cdbe7e Mon Sep 17 00:00:00 2001 From: daidai Date: Mon, 16 Dec 2024 15:22:37 +0800 Subject: [PATCH] add ut && regression test --- .../table/transactional_hive_reader.cpp | 2 +- .../create_preinstalled_scripts/run25.hql | 52 ++ .../doris/datasource/hive/AcidUtil.java | 13 +- .../datasource/hive/HiveMetaStoreCache.java | 11 +- .../datasource/hive/HiveMetadataOps.java | 2 +- .../insert/InsertIntoTableCommand.java | 2 +- .../doris/datasource/hive/HiveAcidTest.java | 595 ++++++++++++++++++ .../hive/test_transactional_hive.out | 46 ++ .../hive/test_transactional_hive.groovy | 67 ++ 9 files changed, 775 insertions(+), 15 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp index 96ef015993ac5cd..c230df8ee75ff1e 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp +++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp @@ -137,7 +137,7 @@ Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range, } auto iter = std::find(delete_delta_file_names.begin(), delete_delta_file_names.end(), remove_bucket_attemptId(file_name)); - if (iter == delete_delta.file_names.end()) { + if (iter == delete_delta_file_names.end()) { continue; } auto delete_file = fmt::format("{}/{}", delete_delta.directory_location, diff --git a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql index 814df4cdc5ff90b..da6400bdff02208 100755 --- a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql +++ b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql @@ -41,3 +41,55 @@ insert into orc_full_acid_par PARTITION(part_col=20230102) values (6, 'F'); update orc_full_acid_par set value = 'BB' where id = 2; + + + + +create table orc_to_acid_tb (id INT, value STRING) +PARTITIONED BY (part_col INT) +CLUSTERED BY (id) INTO 3 BUCKETS +STORED AS ORC; +INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=101) VALUES (1, 'A'), (3, 'C'); +INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=102) VALUES (2, 'B'); +ALTER TABLE orc_to_acid_tb SET TBLPROPERTIES ('transactional'='true'); + + +create table orc_to_acid_compacted_tb (id INT, value STRING) +PARTITIONED BY (part_col INT) +CLUSTERED BY (id) INTO 3 BUCKETS +STORED AS ORC; +INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=101) VALUES (1, 'A'), (3, 'C'); +INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (2, 'B'); +ALTER TABLE orc_to_acid_compacted_tb SET TBLPROPERTIES ('transactional'='true'); +ALTER TABLE orc_to_acid_compacted_tb COMPACT 'major'; +INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (4, 'D'); +update orc_to_acid_compacted_tb set value = "CC" where id = 3; +update orc_to_acid_compacted_tb set value = "BB" where id = 2; + + +create table orc_acid_minor (id INT, value STRING) +CLUSTERED BY (id) INTO 3 BUCKETS +STORED AS ORC +TBLPROPERTIES ('transactional' = 'true'); +insert into orc_acid_minor values (1, 'A'); +insert into orc_acid_minor values (2, 'B'); +insert into orc_acid_minor values (3, 'C'); +update orc_acid_minor set value = "BB" where id = 2; +ALTER TABLE orc_acid_minor COMPACT 'minor'; +insert into orc_acid_minor values (4, 'D'); +update orc_acid_minor set value = "DD" where id = 4; +DELETE FROM orc_acid_minor WHERE id = 3; + + +create table orc_acid_major (id INT, value STRING) +CLUSTERED BY (id) INTO 3 BUCKETS +STORED AS ORC +TBLPROPERTIES ('transactional' = 'true'); +insert into orc_acid_major values (1, 'A'); +insert into orc_acid_major values (2, 'B'); +insert into orc_acid_major values (3, 'C'); +update orc_acid_major set value = "BB" where id = 2; +ALTER TABLE orc_acid_major COMPACT 'minor'; +insert into orc_acid_major values (4, 'D'); +update orc_acid_major set value = "DD" where id = 4; +DELETE FROM orc_acid_major WHERE id = 3; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java index 92493a14039940d..3e6509a7903b106 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java @@ -21,8 +21,8 @@ import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue; +import org.apache.doris.fs.FileSystem; import org.apache.doris.fs.remote.RemoteFile; -import org.apache.doris.fs.remote.RemoteFileSystem; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -64,7 +64,6 @@ public class AcidUtil { @Getter @ToString - @EqualsAndHashCode private static class ParsedBase { private final long writeId; private final long visibilityId; @@ -131,7 +130,7 @@ public int compareTo(ParsedDelta other) { } - private static boolean isValidMetaDataFile(RemoteFileSystem fileSystem, String baseDir) + private static boolean isValidMetaDataFile(FileSystem fileSystem, String baseDir) throws IOException { String fileLocation = baseDir + "_metadata_acid"; Status status = fileSystem.exists(fileLocation); @@ -161,7 +160,7 @@ private static boolean isValidMetaDataFile(RemoteFileSystem fileSystem, String b return true; } - private static boolean isValidBase(RemoteFileSystem remoteFileSystem, String baseDir, + private static boolean isValidBase(FileSystem fileSystem, String baseDir, ParsedBase base, ValidWriteIdList writeIdList) throws IOException { if (base.writeId == Long.MIN_VALUE) { //Ref: https://issues.apache.org/jira/browse/HIVE-13369 @@ -173,7 +172,7 @@ private static boolean isValidBase(RemoteFileSystem remoteFileSystem, String bas } // hive 4 : just check "_v" suffix, before hive 4 : check `_metadata_acid` file in baseDir. - if ((base.visibilityId > 0) || isValidMetaDataFile(remoteFileSystem, baseDir)) { + if ((base.visibilityId > 0) || isValidMetaDataFile(fileSystem, baseDir)) { return writeIdList.isValidBase(base.writeId); } @@ -208,7 +207,7 @@ private static ParsedDelta parseDelta(String fileName, String deltaPrefix, Strin if (split2 == -1) { long max = Long.parseLong(rest.substring(split + 1)); - return new ParsedDelta(min, max, fileName, -1, deleteDelta, visibilityId); + return new ParsedDelta(min, max, path, -1, deleteDelta, visibilityId); } long max = Long.parseLong(rest.substring(split + 1, split2)); @@ -219,7 +218,7 @@ private static ParsedDelta parseDelta(String fileName, String deltaPrefix, Strin //Since the hive3 library cannot read the hive4 transaction table normally, and there are many problems // when using the Hive 4 library directly, this method is implemented. //Ref: hive/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java#getAcidState - public static FileCacheValue getAcidState(RemoteFileSystem fileSystem, HivePartition partition, + public static FileCacheValue getAcidState(FileSystem fileSystem, HivePartition partition, Map txnValidIds, Map catalogProps) throws Exception { // Ref: https://issues.apache.org/jira/browse/HIVE-18192 diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 455f0b5cf78706c..b6cc42b1199a841 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -867,12 +867,13 @@ public List getFilesByTransaction(List partitions return fileCacheValues; } - RemoteFileSystem fileSystem = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey( - LocationPath.getFSIdentity(partitions.get(0).getPath(), bindBrokerName), - catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); - for (HivePartition partition : partitions) { + //Get filesystem multiple times, reason: https://github.com/apache/doris/pull/23409. + RemoteFileSystem fileSystem = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( + new FileSystemCache.FileSystemCacheKey( + LocationPath.getFSIdentity(partition.getPath(), bindBrokerName), + catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); + if (!Strings.isNullOrEmpty(remoteUser)) { UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser); fileCacheValues.add( diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index 4a11cf9b01fce90..b44ffe00390536f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -181,7 +181,7 @@ public boolean createTable(CreateTableStmt stmt) throws UserException { } } - if (props.containsKey("transactional") && props.get("transactional").equals("true")) { + if (props.containsKey("transactional") && props.get("transactional").equalsIgnoreCase("true")) { throw new UserException("Not support create hive transactional table."); /* CREATE TABLE trans6( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 5810a07bb197c19..dac2c63745a9352 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -271,7 +271,7 @@ private ExecutorFactory selectInsertExecutorFactory( boolean emptyInsert = childIsEmptyRelation(physicalSink); HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf; if (hiveExternalTable.isHiveTransactionalTable()) { - throw new UserException("Not supported insert into hive table"); + throw new UserException("Not supported insert into hive transactional table."); } return ExecutorFactory.from( diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java new file mode 100644 index 000000000000000..fc4ca299166f025 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java @@ -0,0 +1,595 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.common.info.SimpleTableInfo; +import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue; +import org.apache.doris.fs.LocalDfsFileSystem; + +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + + +public class HiveAcidTest { + + @Test + public void testOriginalDeltas() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/000000_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/000001_1"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/000002_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/random"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/_done"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/subdir/000000_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_025_025"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_029_029"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_025_030"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_050_100"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_101_101"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + HivePartition partition = new HivePartition(new SimpleTableInfo("", "tbl"), + false, "", "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), new HashMap<>()); + try { + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + } catch (UnsupportedOperationException e) { + Assert.assertTrue(e.getMessage().contains("For no acid table convert to acid, please COMPACT 'major'.")); + } + } + + + @Test + public void testObsoleteOriginals() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_10/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_5/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/000000_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/000001_1"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:150:" + Long.MAX_VALUE + ":").writeToString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + new HashMap<>()); + try { + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + } catch (UnsupportedOperationException e) { + Assert.assertTrue(e.getMessage().contains("For no acid table convert to acid, please COMPACT 'major'.")); + } + } + + + @Test + public void testOverlapingDelta() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0000063_63/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_000062_62/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_00061_61/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0060_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_052_55/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_50/bucket_0"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + new HashMap<>()); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/base_50/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_00061_61/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_000062_62/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_0000063_63/bucket_0" + ); + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + } + + + @Test + public void testOverlapingDelta2() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0000063_63_0/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_000062_62_0/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_000062_62_3/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_00061_61_0/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0060_60_1/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0060_60_4/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0060_60_7/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_052_55/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_058_58/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_50/bucket_0"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + new HashMap<>()); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/base_50/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_00061_61_0/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_000062_62_0/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_000062_62_3/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_0000063_63_0/bucket_0" + + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + } + + + @Test + public void deltasWithOpenTxnInRead() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0"); + + Map txnValidIds = new HashMap<>(); + + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:4:4").writeToString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + new HashMap<>()); + + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0" + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + + } + + + @Test + public void deltasWithOpenTxnInRead2() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_4_4_1/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_4_4_3/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_101_101_1/bucket_0"); + + Map txnValidIds = new HashMap<>(); + + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:4:4").writeToString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + new HashMap<>()); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0" + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + } + + @Test + public void testBaseWithDeleteDeltas() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_5/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_10/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_49/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_025_025/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_029_029/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_029_029/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_025_030/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_025_030/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_050_105/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_050_105/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_110_110/bucket_0"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + // Map tableProps = new HashMap<>(); + // tableProps.put(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, + // AcidUtils.AcidOperationalProperties.getDefault().toString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + new HashMap<>()); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/base_49/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_050_105/bucket_0" + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + + List resultDelta = Arrays.asList( + "file://" + tempPath.toAbsolutePath() + "/delete_delta_050_105/bucket_0" + ); + + List deltaFiles = new ArrayList<>(); + fileCacheValue.getAcidInfo().getDeleteDeltas().forEach( + deltaInfo -> { + String loc = deltaInfo.getDirectoryLocation(); + deltaInfo.getFileNames().forEach( + fileName -> deltaFiles.add(loc + "/" + fileName) + ); + } + ); + + Assert.assertTrue(resultDelta.containsAll(deltaFiles) && deltaFiles.containsAll(resultDelta)); + } + + + @Test + public void testOverlapingDeltaAndDeleteDelta() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0000063_63/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_000062_62/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_00061_61/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_00064_64/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_40_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_0060_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_052_55/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_052_55/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_50/bucket_0"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + + Map tableProps = new HashMap<>(); + // tableProps.put(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, + // AcidUtils.AcidOperationalProperties.getDefault().toString()); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + tableProps); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + + + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/base_50/bucket_0", + + "file:" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_00061_61/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_000062_62/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_0000063_63/bucket_0" + + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + + List resultDelta = Arrays.asList( + + "file://" + tempPath.toAbsolutePath() + "/delete_delta_40_60/bucket_0", + "file://" + tempPath.toAbsolutePath() + "/delete_delta_00064_64/bucket_0" + ); + + List deltaFiles = new ArrayList<>(); + fileCacheValue.getAcidInfo().getDeleteDeltas().forEach( + deltaInfo -> { + String loc = deltaInfo.getDirectoryLocation(); + deltaInfo.getFileNames().forEach( + fileName -> deltaFiles.add(loc + "/" + fileName) + ); + } + ); + + Assert.assertTrue(resultDelta.containsAll(deltaFiles) && deltaFiles.containsAll(resultDelta)); + } + + @Test + public void testMinorCompactedDeltaMakesInBetweenDelteDeltaObsolete() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_50_50/bucket_0"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + Map tableProps = new HashMap<>(); + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + tableProps); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0" + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + } + + @Test + public void deleteDeltasWithOpenTxnInRead() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_2_5/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delete_delta_3_3/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_4_4_1/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_4_4_3/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_101_101_1/bucket_0"); + + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:4:4").writeToString()); + + + Map tableProps = new HashMap<>(); + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + tableProps); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0" + ); + + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + List resultDelta = Arrays.asList( + "file://" + tempPath.toAbsolutePath() + "/delete_delta_2_5/bucket_0" + ); + + List deltaFiles = new ArrayList<>(); + fileCacheValue.getAcidInfo().getDeleteDeltas().forEach( + deltaInfo -> { + String loc = deltaInfo.getDirectoryLocation(); + deltaInfo.getFileNames().forEach( + fileName -> deltaFiles.add(loc + "/" + fileName) + ); + } + ); + + Assert.assertTrue(resultDelta.containsAll(deltaFiles) && deltaFiles.containsAll(resultDelta)); + } + + @Test + public void testBaseDeltas() throws Exception { + LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem(); + Path tempPath = Files.createTempDirectory("tbl"); + + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_5/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_10/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/base_49/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_025_025/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_029_029/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_025_030/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_050_105/bucket_0"); + localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + "/delta_90_120/bucket_0"); + + Map txnValidIds = new HashMap<>(); + txnValidIds.put( + AcidUtil.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + txnValidIds.put( + AcidUtil.VALID_WRITEIDS_KEY, + new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":").writeToString()); + + Map tableProps = new HashMap<>(); + + HivePartition partition = new HivePartition( + new SimpleTableInfo("", "tbl"), + false, + "", + "file://" + tempPath.toAbsolutePath() + "", + new ArrayList<>(), + tableProps); + + FileCacheValue fileCacheValue = + AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, new HashMap<>()); + + List readFile = + fileCacheValue.getFiles().stream().map(x -> x.path.toString()).collect(Collectors.toList()); + + + + List resultReadFile = Arrays.asList( + "file:" + tempPath.toAbsolutePath() + "/base_49/bucket_0", + "file:" + tempPath.toAbsolutePath() + "/delta_050_105/bucket_0" + ); + Assert.assertTrue(resultReadFile.containsAll(readFile) && readFile.containsAll(resultReadFile)); + } +} diff --git a/regression-test/data/external_table_p0/hive/test_transactional_hive.out b/regression-test/data/external_table_p0/hive/test_transactional_hive.out index 2ad0446ccb7e258..060fa8c048e5a05 100644 --- a/regression-test/data/external_table_p0/hive/test_transactional_hive.out +++ b/regression-test/data/external_table_p0/hive/test_transactional_hive.out @@ -76,3 +76,49 @@ F -- !q05 -- 0 + +-- !2 -- +1 A 101 +2 BB 102 +3 CC 101 +4 D 102 + +-- !3 -- +1 A 101 +3 CC 101 + +-- !4 -- +2 BB 102 +4 D 102 + +-- !5 -- +1 A 101 +2 BB 102 + +-- !6 -- +4 D 102 + +-- !7 -- +1 A +2 BB +4 DD + +-- !10 -- +1 A +2 BB + +-- !11 -- +4 DD + +-- !12 -- +1 A +2 BB +4 DD + +-- !15 -- +1 A +2 BB + +-- !16 -- +4 DD + diff --git a/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy b/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy index dbe20395ec95ecb..4cde3d06dcc6239 100644 --- a/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy +++ b/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy @@ -52,6 +52,68 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock select count(*) from orc_full_acid_par_empty; """ } + + def test_acid = { + try { + sql """ select * from orc_to_acid_tb """ + }catch( Exception e) { + assertTrue(e.getMessage().contains("For no acid table convert to acid, please COMPACT")); + } + + qt_2 """ select * from orc_to_acid_compacted_tb order by id """ + qt_3 """ select * from orc_to_acid_compacted_tb where part_col=101 order by id """ + qt_4 """ select * from orc_to_acid_compacted_tb where part_col=102 order by id """ + qt_5 """ select * from orc_to_acid_compacted_tb where id < 3 order by id """ + qt_6 """ select * from orc_to_acid_compacted_tb where id > 3 order by id """ + + + qt_7 """ select * from orc_acid_minor order by id """ + qt_10 """ select * from orc_acid_minor where id < 3 order by id """ + qt_11 """ select * from orc_acid_minor where id > 3 order by id """ + + + qt_12 """ select * from orc_acid_major order by id """ + qt_15 """ select * from orc_acid_major where id < 3 order by id """ + qt_16 """ select * from orc_acid_major where id > 3 order by id """ + } + + def test_acid_write = { + sql """set enable_fallback_to_original_planner=false;""" + + + + try { + sql """ + CREATE TABLE acid_tb ( + `col1` BOOLEAN COMMENT 'col1', + `col2` INT COMMENT 'col2' + ) ENGINE=hive + PROPERTIES ( + 'file_format'='orc', + 'compression'='zlib', + 'bucketing_version'='2', + 'transactional'='true', + 'transactional_properties'='default' + ); + """ + }catch( Exception e) { + assertTrue(e.getMessage().contains("Not support create hive transactional table.")); + } + try { + sql """ insert into orc_acid_major(id,value) values(1,"a1"); """ + }catch (Exception e) { + assertTrue(e.getMessage().contains("Not supported insert into hive transactional table.")); + } + + try { + sql """ drop table orc_acid_major; """ + } catch (Exception e) { + assertTrue(e.getMessage().contains("Not support drop hive transactional table.")); + + } + } + + String enabled = context.config.otherConfigs.get("enableHiveTest") if (enabled == null || !enabled.equalsIgnoreCase("true")) { logger.info("diable Hive test.") @@ -60,6 +122,7 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock for (String hivePrefix : ["hive3"]) { try { + String hdfs_port = context.config.otherConfigs.get(hivePrefix + "HdfsPort") String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") String catalog_name = "test_transactional_${hivePrefix}" String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") @@ -68,6 +131,7 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock sql """create catalog if not exists ${catalog_name} properties ( "type"="hms", 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}' + ,'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}' );""" sql """use `${catalog_name}`.`default`""" @@ -79,6 +143,9 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock q01() q01_par() + test_acid() + test_acid_write() + sql """drop catalog if exists ${catalog_name}""" } finally { }