From 5aa2e123f386dce3e049e812f92de5203e220e3d Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Tue, 13 Feb 2024 09:48:38 +0530 Subject: [PATCH 1/3] Separate coordinator and worker bindings for alluxio cache --- ...AlluxioCoordinatorNoOpFileSystemCache.java | 66 +++++++++++++++++++ .../alluxio/AlluxioFileSystemCacheModule.java | 16 ++++- .../filesystem/manager/FileSystemModule.java | 2 +- ...stDeltaLakeAlluxioCacheFileOperations.java | 3 +- 4 files changed, 83 insertions(+), 4 deletions(-) create mode 100644 lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioCoordinatorNoOpFileSystemCache.java diff --git a/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioCoordinatorNoOpFileSystemCache.java b/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioCoordinatorNoOpFileSystemCache.java new file mode 100644 index 000000000000..eb5ad992d060 --- /dev/null +++ b/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioCoordinatorNoOpFileSystemCache.java @@ -0,0 +1,66 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.client.file.cache.CacheManager; +import alluxio.conf.AlluxioProperties; +import alluxio.conf.InstancedConfiguration; +import com.google.inject.Inject; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoInput; +import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoInputStream; +import io.trino.filesystem.cache.TrinoFileSystemCache; + +import java.io.IOException; + +/** + * Used to skip caching data on coordinator while still registering alluxio metrics so + * that JMX queries for metrics can succeed on the coordinator + */ +public class AlluxioCoordinatorNoOpFileSystemCache + implements TrinoFileSystemCache +{ + @Inject + public AlluxioCoordinatorNoOpFileSystemCache() + { + try { + CacheManager cacheManager = CacheManager.Factory.create(new InstancedConfiguration(new AlluxioProperties())); + cacheManager.close(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public TrinoInput cacheInput(TrinoInputFile delegate, String key) + throws IOException + { + return delegate.newInput(); + } + + @Override + public TrinoInputStream cacheStream(TrinoInputFile delegate, String key) + throws IOException + { + return delegate.newStream(); + } + + @Override + public void expire(Location source) + throws IOException + { + } +} diff --git a/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemCacheModule.java b/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemCacheModule.java index c0e1b6a3fd99..a28749fd2727 100644 --- a/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemCacheModule.java +++ b/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemCacheModule.java @@ -32,6 +32,13 @@ public class AlluxioFileSystemCacheModule extends AbstractConfigurationAwareModule { + private final boolean isCoordinator; + + public AlluxioFileSystemCacheModule(boolean isCoordinator) + { + this.isCoordinator = isCoordinator; + } + @Override protected void setup(Binder binder) { @@ -40,8 +47,13 @@ protected void setup(Binder binder) binder.bind(AlluxioCacheStats.class).in(SINGLETON); newExporter(binder).export(AlluxioCacheStats.class).as(generator -> generator.generatedNameOf(AlluxioCacheStats.class)); - binder.bind(TrinoFileSystemCache.class).to(AlluxioFileSystemCache.class).in(SINGLETON); - newOptionalBinder(binder, CachingHostAddressProvider.class).setBinding().to(ConsistentHashingHostAddressProvider.class).in(SINGLETON); + if (isCoordinator) { + binder.bind(TrinoFileSystemCache.class).to(AlluxioCoordinatorNoOpFileSystemCache.class).in(SINGLETON); + newOptionalBinder(binder, CachingHostAddressProvider.class).setBinding().to(ConsistentHashingHostAddressProvider.class).in(SINGLETON); + } + else { + binder.bind(TrinoFileSystemCache.class).to(AlluxioFileSystemCache.class).in(SINGLETON); + } Properties metricProps = new Properties(); metricProps.put("sink.jmx.class", "alluxio.metrics.sink.JmxSink"); diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java index f545aff40466..f64dc437c18d 100644 --- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java +++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java @@ -110,7 +110,7 @@ protected void setup(Binder binder) install(conditionalModule( FileSystemConfig.class, cache -> cache.getCacheType() == FileSystemConfig.CacheType.ALLUXIO, - new AlluxioFileSystemCacheModule())); + new AlluxioFileSystemCacheModule(nodeManager.getCurrentNode().isCoordinator()))); } @Provides diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java index 766f315b177e..3973f168eb43 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java @@ -71,9 +71,10 @@ protected DistributedQueryRunner createQueryRunner() .buildOrThrow(); DistributedQueryRunner queryRunner = DeltaLakeQueryRunner.builder(session) + .setCoordinatorProperties(ImmutableMap.of("node-scheduler.include-coordinator", "false")) .setDeltaProperties(deltaLakeProperties) .setCatalogName(DELTA_CATALOG) - .setNodeCount(1) + .setNodeCount(2) .build(); queryRunner.execute("CREATE SCHEMA " + session.getSchema().orElseThrow()); From e722f8bc83847c095314eff85e65072504542b84 Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Tue, 13 Feb 2024 11:09:35 +0530 Subject: [PATCH 2/3] Add DefaultCachingHostAddressProvider Allows connector to use it's own host addresses for split scheduling when caching is not enabled --- .../java/io/trino/filesystem/manager/FileSystemModule.java | 4 ++-- .../trino/filesystem/cache/CachingHostAddressProvider.java | 2 +- .../cache/ConsistentHashingHostAddressProvider.java | 2 +- ...rovider.java => DefaultCachingHostAddressProvider.java} | 7 +++---- .../TestConsistentHashingCacheHostAddressProvider.java | 5 +++-- .../io/trino/plugin/deltalake/DeltaLakeSplitManager.java | 4 ++-- .../io/trino/plugin/deltalake/TestDeltaLakeMetadata.java | 4 ++-- .../trino/plugin/deltalake/TestDeltaLakeSplitManager.java | 4 ++-- 8 files changed, 16 insertions(+), 16 deletions(-) rename lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/{NoneCachingHostAddressProvider.java => DefaultCachingHostAddressProvider.java} (80%) diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java index f64dc437c18d..6029e46f528e 100644 --- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java +++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java @@ -29,7 +29,7 @@ import io.trino.filesystem.cache.CacheKeyProvider; import io.trino.filesystem.cache.CachingHostAddressProvider; import io.trino.filesystem.cache.DefaultCacheKeyProvider; -import io.trino.filesystem.cache.NoneCachingHostAddressProvider; +import io.trino.filesystem.cache.DefaultCachingHostAddressProvider; import io.trino.filesystem.cache.TrinoFileSystemCache; import io.trino.filesystem.gcs.GcsFileSystemFactory; import io.trino.filesystem.gcs.GcsFileSystemModule; @@ -101,7 +101,7 @@ protected void setup(Binder binder) factories.addBinding("gs").to(GcsFileSystemFactory.class); } - newOptionalBinder(binder, CachingHostAddressProvider.class).setDefault().to(NoneCachingHostAddressProvider.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, CachingHostAddressProvider.class).setDefault().to(DefaultCachingHostAddressProvider.class).in(Scopes.SINGLETON); newOptionalBinder(binder, CacheKeyProvider.class).setDefault().to(DefaultCacheKeyProvider.class).in(Scopes.SINGLETON); newMapBinder(binder, FileSystemConfig.CacheType.class, TrinoFileSystemCache.class); diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CachingHostAddressProvider.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CachingHostAddressProvider.java index 18f01a2ba8ad..c35c4049999a 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CachingHostAddressProvider.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CachingHostAddressProvider.java @@ -22,5 +22,5 @@ public interface CachingHostAddressProvider /** * Returns a lists of hosts which are preferred to cache the split with the given path. */ - List getHosts(String splitPath); + List getHosts(String splitPath, List defaultAddresses); } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/ConsistentHashingHostAddressProvider.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/ConsistentHashingHostAddressProvider.java index eeb0a9723db7..9cb237fc112c 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/ConsistentHashingHostAddressProvider.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/ConsistentHashingHostAddressProvider.java @@ -61,7 +61,7 @@ public ConsistentHashingHostAddressProvider(NodeManager nodeManager, ConsistentH } @Override - public List getHosts(String splitPath) + public List getHosts(String splitPath, List defaultAddresses) { return consistentHashRing.locate(splitPath, replicationFactor) .stream() diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/NoneCachingHostAddressProvider.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/DefaultCachingHostAddressProvider.java similarity index 80% rename from lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/NoneCachingHostAddressProvider.java rename to lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/DefaultCachingHostAddressProvider.java index 349fc5b7694b..8ed93e70caea 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/NoneCachingHostAddressProvider.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/DefaultCachingHostAddressProvider.java @@ -13,17 +13,16 @@ */ package io.trino.filesystem.cache; -import com.google.common.collect.ImmutableList; import io.trino.spi.HostAddress; import java.util.List; -public class NoneCachingHostAddressProvider +public class DefaultCachingHostAddressProvider implements CachingHostAddressProvider { @Override - public List getHosts(String splitPath) + public List getHosts(String splitPath, List defaultAddresses) { - return ImmutableList.of(); + return defaultAddresses; } } diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestConsistentHashingCacheHostAddressProvider.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestConsistentHashingCacheHostAddressProvider.java index 3c8ef89d6478..041950f7959f 100644 --- a/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestConsistentHashingCacheHostAddressProvider.java +++ b/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestConsistentHashingCacheHostAddressProvider.java @@ -13,6 +13,7 @@ */ package io.trino.filesystem.cache; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import io.trino.client.NodeVersion; import io.trino.metadata.InternalNode; @@ -85,7 +86,7 @@ private static void assertFairDistribution(CachingHostAddressProvider cachingHos int n = 1000; Map counts = new HashMap<>(); for (int i = 0; i < n; i++) { - counts.merge(cachingHostAddressProvider.getHosts(String.valueOf(i)).get(0).getHostText(), 1, Math::addExact); + counts.merge(cachingHostAddressProvider.getHosts(String.valueOf(i), ImmutableList.of()).get(0).getHostText(), 1, Math::addExact); } assertThat(nodeNames.stream().map(m -> m.getHostAndPort().getHostText()).collect(Collectors.toSet())).isEqualTo(counts.keySet()); counts.values().forEach(c -> assertThat(abs(c - n / nodeNames.size()) < 0.1 * n).isTrue()); @@ -105,7 +106,7 @@ private Map> getDistribution(ConsistentHashingHostAddressPr int n = 1000; Map> distribution = new HashMap<>(); for (int i = 0; i < n; i++) { - String host = provider.getHosts(String.valueOf(i)).get(0).getHostText(); + String host = provider.getHosts(String.valueOf(i), ImmutableList.of()).get(0).getHostText(); distribution.computeIfAbsent(host, (k) -> new HashSet<>()).add(i); } return distribution; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index 9350d0b14815..9a00e3707175 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -333,7 +333,7 @@ private List splitsForFile( addFileEntry.getStats().flatMap(DeltaLakeFileStatistics::getNumRecords), addFileEntry.getModificationTime(), addFileEntry.getDeletionVector(), - cachingHostAddressProvider.getHosts(splitPath), + cachingHostAddressProvider.getHosts(splitPath, ImmutableList.of()), SplitWeight.standard(), statisticsPredicate, partitionKeys)); @@ -359,7 +359,7 @@ private List splitsForFile( Optional.empty(), addFileEntry.getModificationTime(), addFileEntry.getDeletionVector(), - cachingHostAddressProvider.getHosts(splitPath), + cachingHostAddressProvider.getHosts(splitPath, ImmutableList.of()), SplitWeight.fromProportion(clamp((double) splitSize / maxSplitSize, minimumAssignedSplitWeight, 1.0)), statisticsPredicate, partitionKeys)); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java index 231cd4bdf220..fe65a14a693b 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java @@ -25,7 +25,7 @@ import io.opentelemetry.api.trace.Tracer; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.cache.CachingHostAddressProvider; -import io.trino.filesystem.cache.NoneCachingHostAddressProvider; +import io.trino.filesystem.cache.DefaultCachingHostAddressProvider; import io.trino.filesystem.hdfs.HdfsFileSystemFactory; import io.trino.hdfs.HdfsEnvironment; import io.trino.hdfs.TrinoHdfsFileSystemStats; @@ -207,7 +207,7 @@ public void setUp() binder.bind(HdfsEnvironment.class).toInstance(HDFS_ENVIRONMENT); binder.bind(TrinoHdfsFileSystemStats.class).toInstance(HDFS_FILE_SYSTEM_STATS); binder.bind(TrinoFileSystemFactory.class).to(HdfsFileSystemFactory.class).in(Scopes.SINGLETON); - binder.bind(CachingHostAddressProvider.class).to(NoneCachingHostAddressProvider.class).in(Scopes.SINGLETON); + binder.bind(CachingHostAddressProvider.class).to(DefaultCachingHostAddressProvider.class).in(Scopes.SINGLETON); }, new AbstractModule() { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java index 48931352867b..2a718c5db619 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java @@ -20,7 +20,7 @@ import io.airlift.json.JsonCodecFactory; import io.airlift.units.DataSize; import io.trino.filesystem.Location; -import io.trino.filesystem.cache.NoneCachingHostAddressProvider; +import io.trino.filesystem.cache.DefaultCachingHostAddressProvider; import io.trino.filesystem.hdfs.HdfsFileSystemFactory; import io.trino.filesystem.memory.MemoryFileSystemFactory; import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess; @@ -241,7 +241,7 @@ public Stream getActiveFiles( deltaLakeConfig, HDFS_FILE_SYSTEM_FACTORY, deltaLakeTransactionManager, - new NoneCachingHostAddressProvider()); + new DefaultCachingHostAddressProvider()); } private AddFileEntry addFileEntryOfSize(long fileSize) From 9c476f79c936d98c54dbf14e19083f4312b814fa Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Mon, 12 Feb 2024 12:22:34 +0530 Subject: [PATCH 3/3] Support embedded alluxio cache in hive --- .../main/sphinx/connector/filesystem-cache.md | 1 + .../trino/plugin/hive/HiveSplitManager.java | 9 +- .../io/trino/plugin/hive/HiveSplitSource.java | 8 +- .../hive/TestBackgroundHiveSplitLoader.java | 2 + .../TestHiveAlluxioCacheFileOperations.java | 194 ++++++++++++++++++ .../plugin/hive/TestHiveSplitSource.java | 7 + .../io/trino/tests/product/TestGroups.java | 2 +- .../environment/EnvMultinodeHiveCaching.java | 53 +---- .../product/launcher/suite/suites/Suite5.java | 4 +- .../multinode-cached/hive-worker.properties | 8 - .../hive.properties} | 4 +- .../TestDeltaLakeAlluxioCaching.java | 4 +- .../product/hive/TestHiveAlluxioCaching.java | 91 ++++++++ .../tests/product/hive/TestHiveCaching.java | 121 ----------- .../product/hive/util/CachingTestUtils.java | 84 -------- .../util => utils}/CachingTestUtils.java | 2 +- 16 files changed, 325 insertions(+), 269 deletions(-) create mode 100644 plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveAlluxioCacheFileOperations.java delete mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-cached/hive-worker.properties rename testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/{multinode-cached/hive-coordinator.properties => multinode-hive-cached/hive.properties} (71%) create mode 100644 testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveAlluxioCaching.java delete mode 100644 testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveCaching.java delete mode 100644 testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/util/CachingTestUtils.java rename testing/trino-product-tests/src/main/java/io/trino/tests/product/{deltalake/util => utils}/CachingTestUtils.java (97%) diff --git a/docs/src/main/sphinx/connector/filesystem-cache.md b/docs/src/main/sphinx/connector/filesystem-cache.md index 97a9a2896f4f..8a8397036aba 100644 --- a/docs/src/main/sphinx/connector/filesystem-cache.md +++ b/docs/src/main/sphinx/connector/filesystem-cache.md @@ -12,6 +12,7 @@ source [Alluxio](https://github.com/Alluxio/alluxio) libraries with catalogs using the following connectors: * [](/connector/delta-lake) +* [](/connector/hive) (fs-cache-distributed)= ## Distributed caching diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java index 630caa38534b..51fd1466aa65 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java @@ -24,6 +24,7 @@ import io.airlift.stats.CounterStat; import io.airlift.units.DataSize; import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.cache.CachingHostAddressProvider; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore; @@ -114,6 +115,7 @@ public class HiveSplitManager private final boolean recursiveDfsWalkerEnabled; private final CounterStat highMemorySplitSourceCounter; private final TypeManager typeManager; + private final CachingHostAddressProvider cachingHostAddressProvider; private final int maxPartitionsPerScan; @Inject @@ -124,7 +126,8 @@ public HiveSplitManager( TrinoFileSystemFactory fileSystemFactory, ExecutorService executorService, VersionEmbedder versionEmbedder, - TypeManager typeManager) + TypeManager typeManager, + CachingHostAddressProvider cachingHostAddressProvider) { this( transactionManager, @@ -141,6 +144,7 @@ public HiveSplitManager( hiveConfig.getMaxSplitsPerSecond(), hiveConfig.getRecursiveDirWalkerEnabled(), typeManager, + cachingHostAddressProvider, hiveConfig.getMaxPartitionsPerScan()); } @@ -159,6 +163,7 @@ public HiveSplitManager( @Nullable Integer maxSplitsPerSecond, boolean recursiveDfsWalkerEnabled, TypeManager typeManager, + CachingHostAddressProvider cachingHostAddressProvider, int maxPartitionsPerScan) { this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); @@ -176,6 +181,7 @@ public HiveSplitManager( this.maxSplitsPerSecond = firstNonNull(maxSplitsPerSecond, Integer.MAX_VALUE); this.recursiveDfsWalkerEnabled = recursiveDfsWalkerEnabled; this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.cachingHostAddressProvider = requireNonNull(cachingHostAddressProvider, "cachingHostAddressProvider is null"); this.maxPartitionsPerScan = maxPartitionsPerScan; } @@ -275,6 +281,7 @@ public ConnectorSplitSource getSplits( hiveSplitLoader, executor, highMemorySplitSourceCounter, + cachingHostAddressProvider, hiveTable.isRecordScannedFiles()); hiveSplitLoader.start(splitSource); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java index 2da1750229f5..94a66bf09101 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java @@ -19,6 +19,7 @@ import io.airlift.log.Logger; import io.airlift.stats.CounterStat; import io.airlift.units.DataSize; +import io.trino.filesystem.cache.CachingHostAddressProvider; import io.trino.plugin.hive.InternalHiveSplit.InternalHiveBlock; import io.trino.plugin.hive.util.AsyncQueue; import io.trino.plugin.hive.util.AsyncQueue.BorrowResult; @@ -84,6 +85,7 @@ class HiveSplitSource private final CounterStat highMemorySplitSourceCounter; private final AtomicBoolean loggedHighMemoryWarning = new AtomicBoolean(); private final HiveSplitWeightProvider splitWeightProvider; + private final CachingHostAddressProvider cachingHostAddressProvider; private final boolean recordScannedFiles; private final ImmutableList.Builder scannedFilePaths = ImmutableList.builder(); @@ -98,6 +100,7 @@ private HiveSplitSource( HiveSplitLoader splitLoader, AtomicReference stateReference, CounterStat highMemorySplitSourceCounter, + CachingHostAddressProvider cachingHostAddressProvider, boolean recordScannedFiles) { requireNonNull(session, "session is null"); @@ -114,6 +117,7 @@ private HiveSplitSource( this.maxInitialSplitSize = getMaxInitialSplitSize(session); this.remainingInitialSplits = new AtomicInteger(maxInitialSplits); this.splitWeightProvider = isSizeBasedSplitWeightsEnabled(session) ? new SizeBasedSplitWeightProvider(getMinimumAssignedSplitWeight(session), maxSplitSize) : HiveSplitWeightProvider.uniformStandardWeightProvider(); + this.cachingHostAddressProvider = requireNonNull(cachingHostAddressProvider, "cachingHostAddressProvider is null"); this.recordScannedFiles = recordScannedFiles; } @@ -128,6 +132,7 @@ public static HiveSplitSource allAtOnce( HiveSplitLoader splitLoader, Executor executor, CounterStat highMemorySplitSourceCounter, + CachingHostAddressProvider cachingHostAddressProvider, boolean recordScannedFiles) { AtomicReference stateReference = new AtomicReference<>(State.initial()); @@ -168,6 +173,7 @@ public boolean isFinished() splitLoader, stateReference, highMemorySplitSourceCounter, + cachingHostAddressProvider, recordScannedFiles); } @@ -305,7 +311,7 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) { internalSplit.getFileModifiedTime(), internalSplit.getSchema(), internalSplit.getPartitionKeys(), - block.getAddresses(), + cachingHostAddressProvider.getHosts(internalSplit.getPath(), block.getAddresses()), internalSplit.getReadBucketNumber(), internalSplit.getTableBucketNumber(), internalSplit.isForceLocalScheduling(), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java index 9a48ffb3e10e..666479f35d1b 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java @@ -29,6 +29,7 @@ import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.TrinoOutputFile; +import io.trino.filesystem.cache.DefaultCachingHostAddressProvider; import io.trino.filesystem.memory.MemoryFileSystemFactory; import io.trino.plugin.hive.HiveColumnHandle.ColumnType; import io.trino.plugin.hive.fs.CachingDirectoryLister; @@ -1230,6 +1231,7 @@ private HiveSplitSource hiveSplitSource(HiveSplitLoader hiveSplitLoader) hiveSplitLoader, executor, new CounterStat(), + new DefaultCachingHostAddressProvider(), false); } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveAlluxioCacheFileOperations.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveAlluxioCacheFileOperations.java new file mode 100644 index 000000000000..c6dd7231c8d1 --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveAlluxioCacheFileOperations.java @@ -0,0 +1,194 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultiset; +import com.google.common.collect.Multiset; +import com.google.common.io.Closer; +import io.opentelemetry.api.common.Attributes; +import io.trino.filesystem.manager.FileSystemConfig; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import org.intellij.lang.annotations.Language; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_LOCATION; +import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_POSITION; +import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_SIZE; +import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_POSITION; +import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_SIZE; +import static io.trino.plugin.hive.HiveQueryRunner.HIVE_CATALOG; +import static io.trino.testing.MultisetAssertions.assertMultisetsEqual; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toCollection; + +@Execution(ExecutionMode.SAME_THREAD) +public class TestHiveAlluxioCacheFileOperations + extends AbstractTestQueryFramework +{ + private final Closer closer = Closer.create(); + + @Override + protected DistributedQueryRunner createQueryRunner() + throws Exception + { + Path cacheDirectory = Files.createTempDirectory("cache"); + closer.register(() -> deleteRecursively(cacheDirectory, ALLOW_INSECURE)); + Path metastoreDirectory = Files.createTempDirectory(HIVE_CATALOG); + closer.register(() -> deleteRecursively(metastoreDirectory, ALLOW_INSECURE)); + + Map hiveProperties = ImmutableMap.builder() + .put("fs.cache", FileSystemConfig.CacheType.ALLUXIO.name()) + .put("fs.cache.directories", cacheDirectory.toAbsolutePath().toString()) + .put("fs.cache.max-sizes", "100MB") + .put("hive.metastore", "file") + .put("hive.metastore.catalog.dir", metastoreDirectory.toUri().toString()) + .buildOrThrow(); + + return HiveQueryRunner.builder() + .setCoordinatorProperties(ImmutableMap.of("node-scheduler.include-coordinator", "false")) + .setHiveProperties(hiveProperties) + .setNodeCount(2) + .build(); + } + + @AfterAll + public void destroy() + throws Exception + { + closer.close(); + } + + @Test + public void testCacheFileOperations() + { + assertUpdate("DROP TABLE IF EXISTS test_cache_file_operations"); + assertUpdate("CREATE TABLE test_cache_file_operations(data varchar, key varchar) WITH (partitioned_by=ARRAY['key'], format='parquet')"); + assertUpdate("INSERT INTO test_cache_file_operations VALUES ('1-abc', 'p1')", 1); + assertUpdate("INSERT INTO test_cache_file_operations VALUES ('2-xyz', 'p2')", 1); + assertFileSystemAccesses( + "SELECT * FROM test_cache_file_operations", + ImmutableMultiset.builder() + .add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 279)) + .add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 279)) + .add(new CacheOperation("Alluxio.readExternal", "key=p1/", 0, 279)) + .add(new CacheOperation("Alluxio.readExternal", "key=p2/", 0, 279)) + .add(new CacheOperation("Alluxio.writeCache", "key=p1/", 0, 279)) + .add(new CacheOperation("Alluxio.writeCache", "key=p2/", 0, 279)) + .build()); + assertFileSystemAccesses( + "SELECT * FROM test_cache_file_operations", + ImmutableMultiset.builder() + .add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 279)) + .add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 279)) + .build()); + assertUpdate("INSERT INTO test_cache_file_operations VALUES ('3-xyz', 'p3')", 1); + assertUpdate("INSERT INTO test_cache_file_operations VALUES ('4-xyz', 'p4')", 1); + assertUpdate("INSERT INTO test_cache_file_operations VALUES ('5-xyz', 'p5')", 1); + assertFileSystemAccesses( + "SELECT * FROM test_cache_file_operations", + ImmutableMultiset.builder() + .add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 279)) + .add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 279)) + .add(new CacheOperation("Alluxio.readCached", "key=p3/", 0, 279)) + .add(new CacheOperation("Alluxio.readCached", "key=p4/", 0, 279)) + .add(new CacheOperation("Alluxio.readCached", "key=p5/", 0, 279)) + .add(new CacheOperation("Alluxio.readExternal", "key=p3/", 0, 279)) + .add(new CacheOperation("Alluxio.readExternal", "key=p4/", 0, 279)) + .add(new CacheOperation("Alluxio.readExternal", "key=p5/", 0, 279)) + .add(new CacheOperation("Alluxio.writeCache", "key=p3/", 0, 279)) + .add(new CacheOperation("Alluxio.writeCache", "key=p4/", 0, 279)) + .add(new CacheOperation("Alluxio.writeCache", "key=p5/", 0, 279)) + .build()); + assertFileSystemAccesses( + "SELECT * FROM test_cache_file_operations", + ImmutableMultiset.builder() + .add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 279)) + .add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 279)) + .add(new CacheOperation("Alluxio.readCached", "key=p3/", 0, 279)) + .add(new CacheOperation("Alluxio.readCached", "key=p4/", 0, 279)) + .add(new CacheOperation("Alluxio.readCached", "key=p5/", 0, 279)) + .build()); + } + + private void assertFileSystemAccesses(@Language("SQL") String query, Multiset expectedCacheAccesses) + { + DistributedQueryRunner queryRunner = getDistributedQueryRunner(); + queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query); + assertMultisetsEqual(getCacheOperations(), expectedCacheAccesses); + } + + private Multiset getCacheOperations() + { + return getQueryRunner().getSpans().stream() + .filter(span -> span.getName().startsWith("Alluxio.")) + .filter(span -> !isTrinoSchemaOrPermissions(requireNonNull(span.getAttributes().get(CACHE_FILE_LOCATION)))) + .map(span -> CacheOperation.create(span.getName(), span.getAttributes())) + .collect(toCollection(HashMultiset::create)); + } + + private static final Pattern DATA_FILE_PATTERN = Pattern.compile(".*?/(?key=[^/]*/)?(?\\d{8}_\\d{6}_\\d{5}_\\w{5})_(?[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); + + private record CacheOperation(String operationName, String fileId, long position, long length) + { + public static CacheOperation create(String operationName, Attributes attributes) + { + String path = requireNonNull(attributes.get(CACHE_FILE_LOCATION)); + String fileName = path.replaceFirst(".*/", ""); + + long position = switch (operationName) { + case "Alluxio.readCached" -> requireNonNull(attributes.get(CACHE_FILE_READ_POSITION)); + case "Alluxio.readExternal" -> requireNonNull(attributes.get(CACHE_FILE_READ_POSITION)); + case "Alluxio.writeCache" -> requireNonNull(attributes.get(CACHE_FILE_WRITE_POSITION)); + default -> throw new IllegalArgumentException("Unexpected operation name: " + operationName); + }; + + long length = switch (operationName) { + case "Alluxio.readCached" -> requireNonNull(attributes.get(CACHE_FILE_READ_SIZE)); + case "Alluxio.readExternal" -> requireNonNull(attributes.get(CACHE_FILE_READ_SIZE)); + case "Alluxio.writeCache" -> requireNonNull(attributes.get(CACHE_FILE_WRITE_SIZE)); + default -> throw new IllegalArgumentException("Unexpected operation name: " + operationName); + }; + + if (!path.contains("/.trino")) { + Matcher matcher = DATA_FILE_PATTERN.matcher(path); + if (matcher.matches()) { + return new CacheOperation(operationName, matcher.group("partition"), position, length); + } + } + else { + return new CacheOperation(operationName, fileName, position, length); + } + throw new IllegalArgumentException("File not recognized: " + path); + } + } + + private static boolean isTrinoSchemaOrPermissions(String path) + { + return path.endsWith(".trinoSchema") || path.contains(".trinoPermissions"); + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java index acd4791ee713..89a60c4cc08b 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java @@ -18,6 +18,7 @@ import com.google.common.util.concurrent.SettableFuture; import io.airlift.stats.CounterStat; import io.airlift.units.DataSize; +import io.trino.filesystem.cache.DefaultCachingHostAddressProvider; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.connector.ConnectorSplitSource; import org.junit.jupiter.api.Test; @@ -57,6 +58,7 @@ public void testOutstandingSplitCount() new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), new CounterStat(), + new DefaultCachingHostAddressProvider(), false); // add 10 splits @@ -92,6 +94,7 @@ public void testDynamicPartitionPruning() new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), new CounterStat(), + new DefaultCachingHostAddressProvider(), false); // add two splits, one of the splits is dynamically pruned @@ -119,6 +122,7 @@ public void testEvenlySizedSplitRemainder() new TestingHiveSplitLoader(), Executors.newSingleThreadExecutor(), new CounterStat(), + new DefaultCachingHostAddressProvider(), false); // One byte larger than the initial split max size @@ -147,6 +151,7 @@ public void testFail() new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), new CounterStat(), + new DefaultCachingHostAddressProvider(), false); // add some splits @@ -198,6 +203,7 @@ public void testReaderWaitsForSplits() new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), new CounterStat(), + new DefaultCachingHostAddressProvider(), false); SettableFuture splits = SettableFuture.create(); @@ -253,6 +259,7 @@ public void testOutstandingSplitSize() new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), new CounterStat(), + new DefaultCachingHostAddressProvider(), false); int testSplitSizeInBytes = new TestSplit(0).getEstimatedSizeInBytes(); diff --git a/testing/trino-product-tests-groups/src/main/java/io/trino/tests/product/TestGroups.java b/testing/trino-product-tests-groups/src/main/java/io/trino/tests/product/TestGroups.java index dfdbc6bd95a9..a7f5e7e0f2df 100644 --- a/testing/trino-product-tests-groups/src/main/java/io/trino/tests/product/TestGroups.java +++ b/testing/trino-product-tests-groups/src/main/java/io/trino/tests/product/TestGroups.java @@ -54,7 +54,7 @@ public final class TestGroups public static final String HIVE_COMPRESSION = "hive_compression"; public static final String HIVE_TRANSACTIONAL = "hive_transactional"; public static final String HIVE_VIEW_COMPATIBILITY = "hive_view_compatibility"; - public static final String HIVE_CACHING = "hive_caching"; + public static final String HIVE_ALLUXIO_CACHING = "hive_alluxio_caching"; public static final String HIVE_ICEBERG_REDIRECTIONS = "hive_iceberg_redirections"; public static final String HIVE_HUDI_REDIRECTIONS = "hive_hudi_redirections"; public static final String HIVE_KERBEROS = "hive_kerberos"; diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeHiveCaching.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeHiveCaching.java index a1dfdd133cd9..bee1facad5a1 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeHiveCaching.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeHiveCaching.java @@ -17,25 +17,14 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import io.trino.tests.product.launcher.docker.DockerFiles; -import io.trino.tests.product.launcher.env.Debug; import io.trino.tests.product.launcher.env.Environment; -import io.trino.tests.product.launcher.env.EnvironmentConfig; import io.trino.tests.product.launcher.env.EnvironmentProvider; -import io.trino.tests.product.launcher.env.ServerPackage; import io.trino.tests.product.launcher.env.common.Hadoop; -import io.trino.tests.product.launcher.env.common.Standard; +import io.trino.tests.product.launcher.env.common.StandardMultinode; import io.trino.tests.product.launcher.env.common.TestsEnvironment; -import io.trino.tests.product.launcher.env.jdk.JdkProvider; -import java.io.File; - -import static io.trino.tests.product.launcher.env.EnvironmentContainers.COORDINATOR; -import static io.trino.tests.product.launcher.env.EnvironmentContainers.worker; import static io.trino.tests.product.launcher.env.common.Hadoop.CONTAINER_TRINO_HIVE_PROPERTIES; -import static io.trino.tests.product.launcher.env.common.Standard.CONTAINER_TRINO_CONFIG_PROPERTIES; import static io.trino.tests.product.launcher.env.common.Standard.CONTAINER_TRINO_ETC; -import static io.trino.tests.product.launcher.env.common.Standard.CONTAINER_TRINO_JVM_CONFIG; -import static io.trino.tests.product.launcher.env.common.Standard.createTrinoContainer; import static java.util.Objects.requireNonNull; import static org.testcontainers.utility.MountableFile.forHostPath; @@ -48,52 +37,22 @@ public final class EnvMultinodeHiveCaching private final DockerFiles dockerFiles; private final DockerFiles.ResourceProvider configDir; - private final String imagesVersion; - private final JdkProvider jdkProvider; - private final File serverPackage; - private final boolean debug; - @Inject public EnvMultinodeHiveCaching( DockerFiles dockerFiles, - Standard standard, - Hadoop hadoop, - EnvironmentConfig environmentConfig, - @ServerPackage File serverPackage, - JdkProvider jdkProvider, - @Debug boolean debug) + StandardMultinode standardMultinode, + Hadoop hadoop) { - super(ImmutableList.of(standard, hadoop)); + super(ImmutableList.of(standardMultinode, hadoop)); this.dockerFiles = requireNonNull(dockerFiles, "dockerFiles is null"); this.configDir = dockerFiles.getDockerFilesHostDirectory("conf/environment"); - this.imagesVersion = environmentConfig.getImagesVersion(); - this.jdkProvider = requireNonNull(jdkProvider, "jdkProvider is null"); - this.serverPackage = requireNonNull(serverPackage, "serverPackage is null"); - this.debug = debug; } @Override public void extendEnvironment(Environment.Builder builder) { - builder.configureContainer(COORDINATOR, container -> container - .withCopyFileToContainer(forHostPath(configDir.getPath("multinode/multinode-master-jvm.config")), CONTAINER_TRINO_JVM_CONFIG) - .withCopyFileToContainer(forHostPath(dockerFiles.getDockerFilesHostPath("common/standard-multinode/multinode-master-config.properties")), CONTAINER_TRINO_CONFIG_PROPERTIES) - .withTmpFs(ImmutableMap.of("/tmp/cache", "rw"))); + builder.configureContainers(container -> container.withTmpFs(ImmutableMap.of("/tmp/cache", "rw"))); builder.addConnector("hive", forHostPath(dockerFiles.getDockerFilesHostPath("common/hadoop/hive.properties")), CONTAINER_TRINO_HIVE_NON_CACHED_PROPERTIES); - builder.addConnector("hive", forHostPath(configDir.getPath("multinode-cached/hive-coordinator.properties")), CONTAINER_TRINO_HIVE_PROPERTIES); - - createTrinoWorker(builder, 0); - createTrinoWorker(builder, 1); - } - - @SuppressWarnings("resource") - private void createTrinoWorker(Environment.Builder builder, int workerNumber) - { - builder.addContainer(createTrinoContainer(dockerFiles, serverPackage, jdkProvider, debug, "ghcr.io/trinodb/testing/centos7-oj17:" + imagesVersion, worker(workerNumber)) - .withCopyFileToContainer(forHostPath(configDir.getPath("multinode/multinode-worker-jvm.config")), CONTAINER_TRINO_JVM_CONFIG) - .withCopyFileToContainer(forHostPath(dockerFiles.getDockerFilesHostPath("common/standard-multinode/multinode-worker-config.properties")), CONTAINER_TRINO_CONFIG_PROPERTIES) - .withCopyFileToContainer(forHostPath(dockerFiles.getDockerFilesHostPath("common/hadoop/hive.properties")), CONTAINER_TRINO_HIVE_NON_CACHED_PROPERTIES) - .withCopyFileToContainer(forHostPath(configDir.getPath("multinode-cached/hive-worker.properties")), CONTAINER_TRINO_HIVE_PROPERTIES) - .withTmpFs(ImmutableMap.of("/tmp/cache", "rw"))); + builder.addConnector("hive", forHostPath(configDir.getPath("multinode-hive-cached/hive.properties")), CONTAINER_TRINO_HIVE_PROPERTIES); } } diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite5.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite5.java index 79f5365cfbe8..8bcbd2daa236 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite5.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite5.java @@ -27,7 +27,7 @@ import static io.trino.tests.product.TestGroups.AUTHORIZATION; import static io.trino.tests.product.TestGroups.CONFIGURED_FEATURES; import static io.trino.tests.product.TestGroups.HDFS_IMPERSONATION; -import static io.trino.tests.product.TestGroups.HIVE_CACHING; +import static io.trino.tests.product.TestGroups.HIVE_ALLUXIO_CACHING; import static io.trino.tests.product.TestGroups.HIVE_KERBEROS; import static io.trino.tests.product.TestGroups.ICEBERG; import static io.trino.tests.product.TestGroups.STORAGE_FORMATS; @@ -50,7 +50,7 @@ public List getTestRuns(EnvironmentConfig config) .withGroups(CONFIGURED_FEATURES, STORAGE_FORMATS, HDFS_IMPERSONATION, AUTHORIZATION) .build(), testOnEnvironment(EnvMultinodeHiveCaching.class) - .withGroups(CONFIGURED_FEATURES, HIVE_CACHING, STORAGE_FORMATS) + .withGroups(CONFIGURED_FEATURES, HIVE_ALLUXIO_CACHING) .withExcludedGroups(ICEBERG) .build()); } diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-cached/hive-worker.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-cached/hive-worker.properties deleted file mode 100644 index 4bee1ef360a7..000000000000 --- a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-cached/hive-worker.properties +++ /dev/null @@ -1,8 +0,0 @@ -connector.name=hive -hive.config.resources=/docker/presto-product-tests/conf/presto/etc/hive-default-fs-site.xml -hive.metastore.uri=thrift://hadoop-master:9083 -hive.allow-drop-table=true -hive.cache.enabled=true -hive.cache.location=/tmp/cache -hive.parquet.time-zone=UTC -hive.rcfile.time-zone=UTC diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-cached/hive-coordinator.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-hive-cached/hive.properties similarity index 71% rename from testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-cached/hive-coordinator.properties rename to testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-hive-cached/hive.properties index 8a5d028c9e34..df9d231296ee 100644 --- a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-cached/hive-coordinator.properties +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-hive-cached/hive.properties @@ -2,6 +2,8 @@ connector.name=hive hive.config.resources=/docker/presto-product-tests/conf/presto/etc/hive-default-fs-site.xml hive.metastore.uri=thrift://hadoop-master:9083 hive.allow-drop-table=true -hive.cache.enabled=true +fs.cache=ALLUXIO +fs.cache.directories=/tmp/cache/hive +fs.cache.max-disk-usage-percentages=90 hive.parquet.time-zone=UTC hive.rcfile.time-zone=UTC diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlluxioCaching.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlluxioCaching.java index a098fe8c334d..56dda5bfb502 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlluxioCaching.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlluxioCaching.java @@ -15,14 +15,14 @@ import io.airlift.units.Duration; import io.trino.tempto.ProductTest; -import io.trino.tests.product.deltalake.util.CachingTestUtils.CacheStats; +import io.trino.tests.product.utils.CachingTestUtils.CacheStats; import org.testng.annotations.Test; import static io.airlift.testing.Assertions.assertGreaterThan; import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; import static io.trino.tests.product.TestGroups.DELTA_LAKE_ALLUXIO_CACHING; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; -import static io.trino.tests.product.deltalake.util.CachingTestUtils.getCacheStats; +import static io.trino.tests.product.utils.CachingTestUtils.getCacheStats; import static io.trino.tests.product.utils.QueryAssertions.assertEventually; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.util.concurrent.TimeUnit.SECONDS; diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveAlluxioCaching.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveAlluxioCaching.java new file mode 100644 index 000000000000..51f31fe0cb03 --- /dev/null +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveAlluxioCaching.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.hive; + +import io.airlift.units.Duration; +import io.trino.tempto.ProductTest; +import io.trino.tests.product.utils.CachingTestUtils.CacheStats; +import org.testng.annotations.Test; + +import static io.airlift.testing.Assertions.assertGreaterThan; +import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; +import static io.trino.tests.product.TestGroups.HIVE_ALLUXIO_CACHING; +import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.utils.CachingTestUtils.getCacheStats; +import static io.trino.tests.product.utils.QueryAssertions.assertEventually; +import static io.trino.tests.product.utils.QueryExecutors.onTrino; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; + +public class TestHiveAlluxioCaching + extends ProductTest +{ + @Test(groups = {HIVE_ALLUXIO_CACHING, PROFILE_SPECIFIC_TESTS}) + public void testReadFromCache() + { + testReadFromTable("table1"); + testReadFromTable("table2"); + } + + private void testReadFromTable(String tableNameSuffix) + { + String cachedTableName = "hive.default.test_cache_read" + tableNameSuffix; + String nonCachedTableName = "hivenoncached.default.test_cache_read" + tableNameSuffix; + + createTestTable(nonCachedTableName); + + CacheStats beforeCacheStats = getCacheStats("hive"); + long tableSize = (Long) onTrino().executeQuery("SELECT SUM(size) as size FROM (SELECT \"$path\", \"$file_size\" AS size FROM " + nonCachedTableName + " GROUP BY 1, 2)").getOnlyValue(); + + assertThat(onTrino().executeQuery("SELECT * FROM " + cachedTableName)) + .hasRowsCount(150000); + + assertEventually( + new Duration(20, SECONDS), + () -> { + // first query via caching catalog should fetch remote data + CacheStats afterQueryCacheStats = getCacheStats("hive"); + assertGreaterThanOrEqual(afterQueryCacheStats.cacheSpaceUsed(), beforeCacheStats.cacheSpaceUsed() + tableSize); + assertGreaterThan(afterQueryCacheStats.externalReads(), beforeCacheStats.externalReads()); + assertGreaterThanOrEqual(afterQueryCacheStats.cacheReads(), beforeCacheStats.cacheReads()); + }); + + assertEventually( + new Duration(10, SECONDS), + () -> { + CacheStats beforeQueryCacheStats = getCacheStats("hive"); + + assertThat(onTrino().executeQuery("SELECT * FROM " + cachedTableName)).hasAnyRows(); + + // query via caching catalog should read exclusively from cache + CacheStats afterQueryCacheStats = getCacheStats("hive"); + assertGreaterThan(afterQueryCacheStats.cacheReads(), beforeQueryCacheStats.cacheReads()); + assertEquals(afterQueryCacheStats.externalReads(), beforeQueryCacheStats.externalReads()); + assertEquals(afterQueryCacheStats.cacheSpaceUsed(), beforeQueryCacheStats.cacheSpaceUsed()); + }); + + onTrino().executeQuery("DROP TABLE " + nonCachedTableName); + } + + /** + * Creates a table which should contain multiple small ORC files + */ + private void createTestTable(String tableName) + { + onTrino().executeQuery("DROP TABLE IF EXISTS " + tableName); + onTrino().executeQuery("SET SESSION hive.target_max_file_size = '4MB'"); + onTrino().executeQuery("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.sf1.customer"); + } +} diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveCaching.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveCaching.java deleted file mode 100644 index 5f473544e059..000000000000 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveCaching.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.tests.product.hive; - -import io.airlift.units.Duration; -import io.trino.tempto.ProductTest; -import io.trino.tempto.assertions.QueryAssert.Row; -import io.trino.tests.product.hive.util.CachingTestUtils.CacheStats; -import org.testng.annotations.Test; - -import java.util.Collections; -import java.util.Random; - -import static io.airlift.testing.Assertions.assertGreaterThan; -import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; -import static io.trino.tempto.assertions.QueryAssert.Row.row; -import static io.trino.tests.product.TestGroups.HIVE_CACHING; -import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; -import static io.trino.tests.product.hive.util.CachingTestUtils.getCacheStats; -import static io.trino.tests.product.utils.QueryAssertions.assertEventually; -import static io.trino.tests.product.utils.QueryExecutors.onTrino; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.assertj.core.api.Assertions.assertThat; -import static org.testng.Assert.assertEquals; - -public class TestHiveCaching - extends ProductTest -{ - private static final int NUMBER_OF_FILES = 5; - - @Test(groups = {HIVE_CACHING, PROFILE_SPECIFIC_TESTS}) - public void testReadFromCache() - { - testReadFromTable("table1"); - testReadFromTable("table2"); - } - - private void testReadFromTable(String tableNameSuffix) - { - String cachedTableName = "hive.default.test_cache_read" + tableNameSuffix; - String nonCachedTableName = "hivenoncached.default.test_cache_read" + tableNameSuffix; - - Row[] tableData = createTestTable(nonCachedTableName); - - CacheStats beforeCacheStats = getCacheStats(); - long initialRemoteReads = beforeCacheStats.getRemoteReads(); - long initialCachedReads = beforeCacheStats.getCachedReads(); - long initialNonLocalReads = beforeCacheStats.getNonLocalReads(); - long initialAsyncDownloadedMb = beforeCacheStats.getAsyncDownloadedMb(); - - assertThat(onTrino().executeQuery("SELECT * FROM " + cachedTableName)) - .containsExactlyInOrder(tableData); - - assertEventually( - new Duration(20, SECONDS), - () -> { - // first query via caching catalog should fetch remote data - CacheStats afterQueryCacheStats = getCacheStats(); - assertGreaterThanOrEqual(afterQueryCacheStats.getAsyncDownloadedMb(), initialAsyncDownloadedMb + 5); - assertGreaterThan(afterQueryCacheStats.getRemoteReads(), initialRemoteReads); - assertEquals(afterQueryCacheStats.getCachedReads(), initialCachedReads); - assertEquals(afterQueryCacheStats.getNonLocalReads(), initialNonLocalReads); - }); - - assertEventually( - new Duration(10, SECONDS), - () -> { - CacheStats beforeQueryCacheStats = getCacheStats(); - long beforeQueryCachedReads = beforeQueryCacheStats.getCachedReads(); - long beforeQueryRemoteReads = beforeQueryCacheStats.getRemoteReads(); - long beforeQueryNonLocalReads = beforeQueryCacheStats.getNonLocalReads(); - - assertThat(onTrino().executeQuery("SELECT * FROM " + cachedTableName)) - .containsExactlyInOrder(tableData); - - // query via caching catalog should read exclusively from cache - CacheStats afterQueryCacheStats = getCacheStats(); - assertGreaterThan(afterQueryCacheStats.getCachedReads(), beforeQueryCachedReads); - assertEquals(afterQueryCacheStats.getRemoteReads(), beforeQueryRemoteReads); - // all reads should be local as Trino would schedule splits on nodes with cached data - assertEquals(afterQueryCacheStats.getNonLocalReads(), beforeQueryNonLocalReads); - }); - - onTrino().executeQuery("DROP TABLE " + nonCachedTableName); - } - - /** - * Creates table with 5 text files that are larger than 1MB - */ - private Row[] createTestTable(String tableName) - { - StringBuilder randomDataBuilder = new StringBuilder(); - Random random = new Random(); - for (int i = 0; i < 500_000; ++i) { - randomDataBuilder.append(random.nextInt(10)); - } - String randomData = randomDataBuilder.toString(); - - onTrino().executeQuery("DROP TABLE IF EXISTS " + tableName); - onTrino().executeQuery("CREATE TABLE " + tableName + " (col varchar) WITH (format='TEXTFILE')"); - - for (int i = 0; i < NUMBER_OF_FILES; ++i) { - // use `format` to overcome SQL query length limit - onTrino().executeQuery("INSERT INTO " + tableName + " SELECT format('%1$s%1$s%1$s%1$s%1$s', '" + randomData + "')"); - } - - Row row = row(randomData.repeat(5)); - return Collections.nCopies(NUMBER_OF_FILES, row).toArray(new Row[0]); - } -} diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/util/CachingTestUtils.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/util/CachingTestUtils.java deleted file mode 100644 index 3fa52e870534..000000000000 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/util/CachingTestUtils.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.tests.product.hive.util; - -import io.trino.tempto.query.QueryResult; - -import static com.google.common.collect.Iterables.getOnlyElement; -import static io.trino.tests.product.utils.QueryExecutors.onTrino; - -public final class CachingTestUtils -{ - private CachingTestUtils() {} - - public static CacheStats getCacheStats() - { - QueryResult queryResult = onTrino().executeQuery("SELECT " + - " sum(Cached_rrc_requests) as cachedreads, " + - " sum(Remote_rrc_requests + Direct_rrc_requests) as remotereads, " + - " sum(Nonlocal_rrc_requests) as nonlocalreads " + - "FROM jmx.current.\"rubix:catalog=hive,type=detailed,name=stats\";"); - - long cachedReads = (Long) getOnlyElement(queryResult.rows()) - .get(queryResult.tryFindColumnIndex("cachedreads").get() - 1); - - long remoteReads = (Long) getOnlyElement(queryResult.rows()) - .get(queryResult.tryFindColumnIndex("remotereads").get() - 1); - - long nonLocalReads = (Long) getOnlyElement(queryResult.rows()) - .get(queryResult.tryFindColumnIndex("nonlocalreads").get() - 1); - - long asyncDownloadedMb = (Long) getOnlyElement(onTrino().executeQuery("SELECT sum(Count) FROM " + - "jmx.current.\"metrics:name=rubix.bookkeeper.count.async_downloaded_mb\"").rows()) - .get(0); - - return new CacheStats(cachedReads, remoteReads, nonLocalReads, asyncDownloadedMb); - } - - public static class CacheStats - { - private final long cachedReads; - private final long remoteReads; - private final long nonLocalReads; - private final long asyncDownloadedMb; - - public CacheStats(long cachedReads, long remoteReads, long nonLocalReads, long asyncDownloadedMb) - { - this.cachedReads = cachedReads; - this.remoteReads = remoteReads; - this.nonLocalReads = nonLocalReads; - this.asyncDownloadedMb = asyncDownloadedMb; - } - - public long getCachedReads() - { - return cachedReads; - } - - public long getRemoteReads() - { - return remoteReads; - } - - public long getNonLocalReads() - { - return nonLocalReads; - } - - public long getAsyncDownloadedMb() - { - return asyncDownloadedMb; - } - } -} diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/CachingTestUtils.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/utils/CachingTestUtils.java similarity index 97% rename from testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/CachingTestUtils.java rename to testing/trino-product-tests/src/main/java/io/trino/tests/product/utils/CachingTestUtils.java index 39e73ec24e83..a1ce1d3a8c95 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/CachingTestUtils.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/utils/CachingTestUtils.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.tests.product.deltalake.util; +package io.trino.tests.product.utils; import io.trino.tempto.query.QueryResult;