diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java index f64dc437c18d..6029e46f528e 100644 --- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java +++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java @@ -29,7 +29,7 @@ import io.trino.filesystem.cache.CacheKeyProvider; import io.trino.filesystem.cache.CachingHostAddressProvider; import io.trino.filesystem.cache.DefaultCacheKeyProvider; -import io.trino.filesystem.cache.NoneCachingHostAddressProvider; +import io.trino.filesystem.cache.DefaultCachingHostAddressProvider; import io.trino.filesystem.cache.TrinoFileSystemCache; import io.trino.filesystem.gcs.GcsFileSystemFactory; import io.trino.filesystem.gcs.GcsFileSystemModule; @@ -101,7 +101,7 @@ protected void setup(Binder binder) factories.addBinding("gs").to(GcsFileSystemFactory.class); } - newOptionalBinder(binder, CachingHostAddressProvider.class).setDefault().to(NoneCachingHostAddressProvider.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, CachingHostAddressProvider.class).setDefault().to(DefaultCachingHostAddressProvider.class).in(Scopes.SINGLETON); newOptionalBinder(binder, CacheKeyProvider.class).setDefault().to(DefaultCacheKeyProvider.class).in(Scopes.SINGLETON); newMapBinder(binder, FileSystemConfig.CacheType.class, TrinoFileSystemCache.class); diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CachingHostAddressProvider.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CachingHostAddressProvider.java index 18f01a2ba8ad..c35c4049999a 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CachingHostAddressProvider.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CachingHostAddressProvider.java @@ -22,5 +22,5 @@ public interface CachingHostAddressProvider /** * Returns a lists of hosts which are preferred to cache the split with the given path. */ - List getHosts(String splitPath); + List getHosts(String splitPath, List defaultAddresses); } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/ConsistentHashingHostAddressProvider.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/ConsistentHashingHostAddressProvider.java index eeb0a9723db7..9cb237fc112c 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/ConsistentHashingHostAddressProvider.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/ConsistentHashingHostAddressProvider.java @@ -61,7 +61,7 @@ public ConsistentHashingHostAddressProvider(NodeManager nodeManager, ConsistentH } @Override - public List getHosts(String splitPath) + public List getHosts(String splitPath, List defaultAddresses) { return consistentHashRing.locate(splitPath, replicationFactor) .stream() diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/NoneCachingHostAddressProvider.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/DefaultCachingHostAddressProvider.java similarity index 80% rename from lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/NoneCachingHostAddressProvider.java rename to lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/DefaultCachingHostAddressProvider.java index 349fc5b7694b..8ed93e70caea 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/NoneCachingHostAddressProvider.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/DefaultCachingHostAddressProvider.java @@ -13,17 +13,16 @@ */ package io.trino.filesystem.cache; -import com.google.common.collect.ImmutableList; import io.trino.spi.HostAddress; import java.util.List; -public class NoneCachingHostAddressProvider +public class DefaultCachingHostAddressProvider implements CachingHostAddressProvider { @Override - public List getHosts(String splitPath) + public List getHosts(String splitPath, List defaultAddresses) { - return ImmutableList.of(); + return defaultAddresses; } } diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestConsistentHashingCacheHostAddressProvider.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestConsistentHashingCacheHostAddressProvider.java index 3c8ef89d6478..041950f7959f 100644 --- a/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestConsistentHashingCacheHostAddressProvider.java +++ b/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestConsistentHashingCacheHostAddressProvider.java @@ -13,6 +13,7 @@ */ package io.trino.filesystem.cache; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import io.trino.client.NodeVersion; import io.trino.metadata.InternalNode; @@ -85,7 +86,7 @@ private static void assertFairDistribution(CachingHostAddressProvider cachingHos int n = 1000; Map counts = new HashMap<>(); for (int i = 0; i < n; i++) { - counts.merge(cachingHostAddressProvider.getHosts(String.valueOf(i)).get(0).getHostText(), 1, Math::addExact); + counts.merge(cachingHostAddressProvider.getHosts(String.valueOf(i), ImmutableList.of()).get(0).getHostText(), 1, Math::addExact); } assertThat(nodeNames.stream().map(m -> m.getHostAndPort().getHostText()).collect(Collectors.toSet())).isEqualTo(counts.keySet()); counts.values().forEach(c -> assertThat(abs(c - n / nodeNames.size()) < 0.1 * n).isTrue()); @@ -105,7 +106,7 @@ private Map> getDistribution(ConsistentHashingHostAddressPr int n = 1000; Map> distribution = new HashMap<>(); for (int i = 0; i < n; i++) { - String host = provider.getHosts(String.valueOf(i)).get(0).getHostText(); + String host = provider.getHosts(String.valueOf(i), ImmutableList.of()).get(0).getHostText(); distribution.computeIfAbsent(host, (k) -> new HashSet<>()).add(i); } return distribution; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index 9350d0b14815..9a00e3707175 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -333,7 +333,7 @@ private List splitsForFile( addFileEntry.getStats().flatMap(DeltaLakeFileStatistics::getNumRecords), addFileEntry.getModificationTime(), addFileEntry.getDeletionVector(), - cachingHostAddressProvider.getHosts(splitPath), + cachingHostAddressProvider.getHosts(splitPath, ImmutableList.of()), SplitWeight.standard(), statisticsPredicate, partitionKeys)); @@ -359,7 +359,7 @@ private List splitsForFile( Optional.empty(), addFileEntry.getModificationTime(), addFileEntry.getDeletionVector(), - cachingHostAddressProvider.getHosts(splitPath), + cachingHostAddressProvider.getHosts(splitPath, ImmutableList.of()), SplitWeight.fromProportion(clamp((double) splitSize / maxSplitSize, minimumAssignedSplitWeight, 1.0)), statisticsPredicate, partitionKeys)); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java index 231cd4bdf220..fe65a14a693b 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java @@ -25,7 +25,7 @@ import io.opentelemetry.api.trace.Tracer; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.cache.CachingHostAddressProvider; -import io.trino.filesystem.cache.NoneCachingHostAddressProvider; +import io.trino.filesystem.cache.DefaultCachingHostAddressProvider; import io.trino.filesystem.hdfs.HdfsFileSystemFactory; import io.trino.hdfs.HdfsEnvironment; import io.trino.hdfs.TrinoHdfsFileSystemStats; @@ -207,7 +207,7 @@ public void setUp() binder.bind(HdfsEnvironment.class).toInstance(HDFS_ENVIRONMENT); binder.bind(TrinoHdfsFileSystemStats.class).toInstance(HDFS_FILE_SYSTEM_STATS); binder.bind(TrinoFileSystemFactory.class).to(HdfsFileSystemFactory.class).in(Scopes.SINGLETON); - binder.bind(CachingHostAddressProvider.class).to(NoneCachingHostAddressProvider.class).in(Scopes.SINGLETON); + binder.bind(CachingHostAddressProvider.class).to(DefaultCachingHostAddressProvider.class).in(Scopes.SINGLETON); }, new AbstractModule() { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java index 48931352867b..2a718c5db619 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java @@ -20,7 +20,7 @@ import io.airlift.json.JsonCodecFactory; import io.airlift.units.DataSize; import io.trino.filesystem.Location; -import io.trino.filesystem.cache.NoneCachingHostAddressProvider; +import io.trino.filesystem.cache.DefaultCachingHostAddressProvider; import io.trino.filesystem.hdfs.HdfsFileSystemFactory; import io.trino.filesystem.memory.MemoryFileSystemFactory; import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess; @@ -241,7 +241,7 @@ public Stream getActiveFiles( deltaLakeConfig, HDFS_FILE_SYSTEM_FACTORY, deltaLakeTransactionManager, - new NoneCachingHostAddressProvider()); + new DefaultCachingHostAddressProvider()); } private AddFileEntry addFileEntryOfSize(long fileSize)