Skip to content

Commit

Permalink
Add DefaultCachingHostAddressProvider
Browse files Browse the repository at this point in the history
Allows connector to use it's own host addresses for split
scheduling when caching is not enabled
  • Loading branch information
raunaqmorarka committed Feb 13, 2024
1 parent 5aa2e12 commit e722f8b
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.trino.filesystem.cache.CacheKeyProvider;
import io.trino.filesystem.cache.CachingHostAddressProvider;
import io.trino.filesystem.cache.DefaultCacheKeyProvider;
import io.trino.filesystem.cache.NoneCachingHostAddressProvider;
import io.trino.filesystem.cache.DefaultCachingHostAddressProvider;
import io.trino.filesystem.cache.TrinoFileSystemCache;
import io.trino.filesystem.gcs.GcsFileSystemFactory;
import io.trino.filesystem.gcs.GcsFileSystemModule;
Expand Down Expand Up @@ -101,7 +101,7 @@ protected void setup(Binder binder)
factories.addBinding("gs").to(GcsFileSystemFactory.class);
}

newOptionalBinder(binder, CachingHostAddressProvider.class).setDefault().to(NoneCachingHostAddressProvider.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, CachingHostAddressProvider.class).setDefault().to(DefaultCachingHostAddressProvider.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, CacheKeyProvider.class).setDefault().to(DefaultCacheKeyProvider.class).in(Scopes.SINGLETON);
newMapBinder(binder, FileSystemConfig.CacheType.class, TrinoFileSystemCache.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ public interface CachingHostAddressProvider
/**
* Returns a lists of hosts which are preferred to cache the split with the given path.
*/
List<HostAddress> getHosts(String splitPath);
List<HostAddress> getHosts(String splitPath, List<HostAddress> defaultAddresses);
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public ConsistentHashingHostAddressProvider(NodeManager nodeManager, ConsistentH
}

@Override
public List<HostAddress> getHosts(String splitPath)
public List<HostAddress> getHosts(String splitPath, List<HostAddress> defaultAddresses)
{
return consistentHashRing.locate(splitPath, replicationFactor)
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@
*/
package io.trino.filesystem.cache;

import com.google.common.collect.ImmutableList;
import io.trino.spi.HostAddress;

import java.util.List;

public class NoneCachingHostAddressProvider
public class DefaultCachingHostAddressProvider
implements CachingHostAddressProvider
{
@Override
public List<HostAddress> getHosts(String splitPath)
public List<HostAddress> getHosts(String splitPath, List<HostAddress> defaultAddresses)
{
return ImmutableList.of();
return defaultAddresses;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.filesystem.cache;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import io.trino.client.NodeVersion;
import io.trino.metadata.InternalNode;
Expand Down Expand Up @@ -85,7 +86,7 @@ private static void assertFairDistribution(CachingHostAddressProvider cachingHos
int n = 1000;
Map<String, Integer> counts = new HashMap<>();
for (int i = 0; i < n; i++) {
counts.merge(cachingHostAddressProvider.getHosts(String.valueOf(i)).get(0).getHostText(), 1, Math::addExact);
counts.merge(cachingHostAddressProvider.getHosts(String.valueOf(i), ImmutableList.of()).get(0).getHostText(), 1, Math::addExact);
}
assertThat(nodeNames.stream().map(m -> m.getHostAndPort().getHostText()).collect(Collectors.toSet())).isEqualTo(counts.keySet());
counts.values().forEach(c -> assertThat(abs(c - n / nodeNames.size()) < 0.1 * n).isTrue());
Expand All @@ -105,7 +106,7 @@ private Map<String, Set<Integer>> getDistribution(ConsistentHashingHostAddressPr
int n = 1000;
Map<String, Set<Integer>> distribution = new HashMap<>();
for (int i = 0; i < n; i++) {
String host = provider.getHosts(String.valueOf(i)).get(0).getHostText();
String host = provider.getHosts(String.valueOf(i), ImmutableList.of()).get(0).getHostText();
distribution.computeIfAbsent(host, (k) -> new HashSet<>()).add(i);
}
return distribution;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ private List<DeltaLakeSplit> splitsForFile(
addFileEntry.getStats().flatMap(DeltaLakeFileStatistics::getNumRecords),
addFileEntry.getModificationTime(),
addFileEntry.getDeletionVector(),
cachingHostAddressProvider.getHosts(splitPath),
cachingHostAddressProvider.getHosts(splitPath, ImmutableList.of()),
SplitWeight.standard(),
statisticsPredicate,
partitionKeys));
Expand All @@ -359,7 +359,7 @@ private List<DeltaLakeSplit> splitsForFile(
Optional.empty(),
addFileEntry.getModificationTime(),
addFileEntry.getDeletionVector(),
cachingHostAddressProvider.getHosts(splitPath),
cachingHostAddressProvider.getHosts(splitPath, ImmutableList.of()),
SplitWeight.fromProportion(clamp((double) splitSize / maxSplitSize, minimumAssignedSplitWeight, 1.0)),
statisticsPredicate,
partitionKeys));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.opentelemetry.api.trace.Tracer;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.cache.CachingHostAddressProvider;
import io.trino.filesystem.cache.NoneCachingHostAddressProvider;
import io.trino.filesystem.cache.DefaultCachingHostAddressProvider;
import io.trino.filesystem.hdfs.HdfsFileSystemFactory;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.hdfs.TrinoHdfsFileSystemStats;
Expand Down Expand Up @@ -207,7 +207,7 @@ public void setUp()
binder.bind(HdfsEnvironment.class).toInstance(HDFS_ENVIRONMENT);
binder.bind(TrinoHdfsFileSystemStats.class).toInstance(HDFS_FILE_SYSTEM_STATS);
binder.bind(TrinoFileSystemFactory.class).to(HdfsFileSystemFactory.class).in(Scopes.SINGLETON);
binder.bind(CachingHostAddressProvider.class).to(NoneCachingHostAddressProvider.class).in(Scopes.SINGLETON);
binder.bind(CachingHostAddressProvider.class).to(DefaultCachingHostAddressProvider.class).in(Scopes.SINGLETON);
},
new AbstractModule()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.airlift.json.JsonCodecFactory;
import io.airlift.units.DataSize;
import io.trino.filesystem.Location;
import io.trino.filesystem.cache.NoneCachingHostAddressProvider;
import io.trino.filesystem.cache.DefaultCachingHostAddressProvider;
import io.trino.filesystem.hdfs.HdfsFileSystemFactory;
import io.trino.filesystem.memory.MemoryFileSystemFactory;
import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess;
Expand Down Expand Up @@ -241,7 +241,7 @@ public Stream<AddFileEntry> getActiveFiles(
deltaLakeConfig,
HDFS_FILE_SYSTEM_FACTORY,
deltaLakeTransactionManager,
new NoneCachingHostAddressProvider());
new DefaultCachingHostAddressProvider());
}

private AddFileEntry addFileEntryOfSize(long fileSize)
Expand Down

0 comments on commit e722f8b

Please sign in to comment.