Skip to content

Commit

Permalink
Support embedded alluxio cache in hive
Browse files Browse the repository at this point in the history
  • Loading branch information
raunaqmorarka committed Feb 13, 2024
1 parent e722f8b commit 9c476f7
Show file tree
Hide file tree
Showing 16 changed files with 325 additions and 269 deletions.
1 change: 1 addition & 0 deletions docs/src/main/sphinx/connector/filesystem-cache.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ source [Alluxio](https://github.com/Alluxio/alluxio) libraries with catalogs
using the following connectors:

* [](/connector/delta-lake)
* [](/connector/hive)

(fs-cache-distributed)=
## Distributed caching
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.airlift.stats.CounterStat;
import io.airlift.units.DataSize;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.cache.CachingHostAddressProvider;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore;
Expand Down Expand Up @@ -114,6 +115,7 @@ public class HiveSplitManager
private final boolean recursiveDfsWalkerEnabled;
private final CounterStat highMemorySplitSourceCounter;
private final TypeManager typeManager;
private final CachingHostAddressProvider cachingHostAddressProvider;
private final int maxPartitionsPerScan;

@Inject
Expand All @@ -124,7 +126,8 @@ public HiveSplitManager(
TrinoFileSystemFactory fileSystemFactory,
ExecutorService executorService,
VersionEmbedder versionEmbedder,
TypeManager typeManager)
TypeManager typeManager,
CachingHostAddressProvider cachingHostAddressProvider)
{
this(
transactionManager,
Expand All @@ -141,6 +144,7 @@ public HiveSplitManager(
hiveConfig.getMaxSplitsPerSecond(),
hiveConfig.getRecursiveDirWalkerEnabled(),
typeManager,
cachingHostAddressProvider,
hiveConfig.getMaxPartitionsPerScan());
}

Expand All @@ -159,6 +163,7 @@ public HiveSplitManager(
@Nullable Integer maxSplitsPerSecond,
boolean recursiveDfsWalkerEnabled,
TypeManager typeManager,
CachingHostAddressProvider cachingHostAddressProvider,
int maxPartitionsPerScan)
{
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
Expand All @@ -176,6 +181,7 @@ public HiveSplitManager(
this.maxSplitsPerSecond = firstNonNull(maxSplitsPerSecond, Integer.MAX_VALUE);
this.recursiveDfsWalkerEnabled = recursiveDfsWalkerEnabled;
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.cachingHostAddressProvider = requireNonNull(cachingHostAddressProvider, "cachingHostAddressProvider is null");
this.maxPartitionsPerScan = maxPartitionsPerScan;
}

Expand Down Expand Up @@ -275,6 +281,7 @@ public ConnectorSplitSource getSplits(
hiveSplitLoader,
executor,
highMemorySplitSourceCounter,
cachingHostAddressProvider,
hiveTable.isRecordScannedFiles());
hiveSplitLoader.start(splitSource);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airlift.log.Logger;
import io.airlift.stats.CounterStat;
import io.airlift.units.DataSize;
import io.trino.filesystem.cache.CachingHostAddressProvider;
import io.trino.plugin.hive.InternalHiveSplit.InternalHiveBlock;
import io.trino.plugin.hive.util.AsyncQueue;
import io.trino.plugin.hive.util.AsyncQueue.BorrowResult;
Expand Down Expand Up @@ -84,6 +85,7 @@ class HiveSplitSource
private final CounterStat highMemorySplitSourceCounter;
private final AtomicBoolean loggedHighMemoryWarning = new AtomicBoolean();
private final HiveSplitWeightProvider splitWeightProvider;
private final CachingHostAddressProvider cachingHostAddressProvider;

private final boolean recordScannedFiles;
private final ImmutableList.Builder<Object> scannedFilePaths = ImmutableList.builder();
Expand All @@ -98,6 +100,7 @@ private HiveSplitSource(
HiveSplitLoader splitLoader,
AtomicReference<State> stateReference,
CounterStat highMemorySplitSourceCounter,
CachingHostAddressProvider cachingHostAddressProvider,
boolean recordScannedFiles)
{
requireNonNull(session, "session is null");
Expand All @@ -114,6 +117,7 @@ private HiveSplitSource(
this.maxInitialSplitSize = getMaxInitialSplitSize(session);
this.remainingInitialSplits = new AtomicInteger(maxInitialSplits);
this.splitWeightProvider = isSizeBasedSplitWeightsEnabled(session) ? new SizeBasedSplitWeightProvider(getMinimumAssignedSplitWeight(session), maxSplitSize) : HiveSplitWeightProvider.uniformStandardWeightProvider();
this.cachingHostAddressProvider = requireNonNull(cachingHostAddressProvider, "cachingHostAddressProvider is null");
this.recordScannedFiles = recordScannedFiles;
}

Expand All @@ -128,6 +132,7 @@ public static HiveSplitSource allAtOnce(
HiveSplitLoader splitLoader,
Executor executor,
CounterStat highMemorySplitSourceCounter,
CachingHostAddressProvider cachingHostAddressProvider,
boolean recordScannedFiles)
{
AtomicReference<State> stateReference = new AtomicReference<>(State.initial());
Expand Down Expand Up @@ -168,6 +173,7 @@ public boolean isFinished()
splitLoader,
stateReference,
highMemorySplitSourceCounter,
cachingHostAddressProvider,
recordScannedFiles);
}

Expand Down Expand Up @@ -305,7 +311,7 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) {
internalSplit.getFileModifiedTime(),
internalSplit.getSchema(),
internalSplit.getPartitionKeys(),
block.getAddresses(),
cachingHostAddressProvider.getHosts(internalSplit.getPath(), block.getAddresses()),
internalSplit.getReadBucketNumber(),
internalSplit.getTableBucketNumber(),
internalSplit.isForceLocalScheduling(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoOutputFile;
import io.trino.filesystem.cache.DefaultCachingHostAddressProvider;
import io.trino.filesystem.memory.MemoryFileSystemFactory;
import io.trino.plugin.hive.HiveColumnHandle.ColumnType;
import io.trino.plugin.hive.fs.CachingDirectoryLister;
Expand Down Expand Up @@ -1230,6 +1231,7 @@ private HiveSplitSource hiveSplitSource(HiveSplitLoader hiveSplitLoader)
hiveSplitLoader,
executor,
new CounterStat(),
new DefaultCachingHostAddressProvider(),
false);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive;

import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultiset;
import com.google.common.collect.Multiset;
import com.google.common.io.Closer;
import io.opentelemetry.api.common.Attributes;
import io.trino.filesystem.manager.FileSystemConfig;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_LOCATION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_POSITION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_SIZE;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_POSITION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_SIZE;
import static io.trino.plugin.hive.HiveQueryRunner.HIVE_CATALOG;
import static io.trino.testing.MultisetAssertions.assertMultisetsEqual;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toCollection;

@Execution(ExecutionMode.SAME_THREAD)
public class TestHiveAlluxioCacheFileOperations
extends AbstractTestQueryFramework
{
private final Closer closer = Closer.create();

@Override
protected DistributedQueryRunner createQueryRunner()
throws Exception
{
Path cacheDirectory = Files.createTempDirectory("cache");
closer.register(() -> deleteRecursively(cacheDirectory, ALLOW_INSECURE));
Path metastoreDirectory = Files.createTempDirectory(HIVE_CATALOG);
closer.register(() -> deleteRecursively(metastoreDirectory, ALLOW_INSECURE));

Map<String, String> hiveProperties = ImmutableMap.<String, String>builder()
.put("fs.cache", FileSystemConfig.CacheType.ALLUXIO.name())
.put("fs.cache.directories", cacheDirectory.toAbsolutePath().toString())
.put("fs.cache.max-sizes", "100MB")
.put("hive.metastore", "file")
.put("hive.metastore.catalog.dir", metastoreDirectory.toUri().toString())
.buildOrThrow();

return HiveQueryRunner.builder()
.setCoordinatorProperties(ImmutableMap.of("node-scheduler.include-coordinator", "false"))
.setHiveProperties(hiveProperties)
.setNodeCount(2)
.build();
}

@AfterAll
public void destroy()
throws Exception
{
closer.close();
}

@Test
public void testCacheFileOperations()
{
assertUpdate("DROP TABLE IF EXISTS test_cache_file_operations");
assertUpdate("CREATE TABLE test_cache_file_operations(data varchar, key varchar) WITH (partitioned_by=ARRAY['key'], format='parquet')");
assertUpdate("INSERT INTO test_cache_file_operations VALUES ('1-abc', 'p1')", 1);
assertUpdate("INSERT INTO test_cache_file_operations VALUES ('2-xyz', 'p2')", 1);
assertFileSystemAccesses(
"SELECT * FROM test_cache_file_operations",
ImmutableMultiset.<CacheOperation>builder()
.add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 279))
.add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 279))
.add(new CacheOperation("Alluxio.readExternal", "key=p1/", 0, 279))
.add(new CacheOperation("Alluxio.readExternal", "key=p2/", 0, 279))
.add(new CacheOperation("Alluxio.writeCache", "key=p1/", 0, 279))
.add(new CacheOperation("Alluxio.writeCache", "key=p2/", 0, 279))
.build());
assertFileSystemAccesses(
"SELECT * FROM test_cache_file_operations",
ImmutableMultiset.<CacheOperation>builder()
.add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 279))
.add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 279))
.build());
assertUpdate("INSERT INTO test_cache_file_operations VALUES ('3-xyz', 'p3')", 1);
assertUpdate("INSERT INTO test_cache_file_operations VALUES ('4-xyz', 'p4')", 1);
assertUpdate("INSERT INTO test_cache_file_operations VALUES ('5-xyz', 'p5')", 1);
assertFileSystemAccesses(
"SELECT * FROM test_cache_file_operations",
ImmutableMultiset.<CacheOperation>builder()
.add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 279))
.add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 279))
.add(new CacheOperation("Alluxio.readCached", "key=p3/", 0, 279))
.add(new CacheOperation("Alluxio.readCached", "key=p4/", 0, 279))
.add(new CacheOperation("Alluxio.readCached", "key=p5/", 0, 279))
.add(new CacheOperation("Alluxio.readExternal", "key=p3/", 0, 279))
.add(new CacheOperation("Alluxio.readExternal", "key=p4/", 0, 279))
.add(new CacheOperation("Alluxio.readExternal", "key=p5/", 0, 279))
.add(new CacheOperation("Alluxio.writeCache", "key=p3/", 0, 279))
.add(new CacheOperation("Alluxio.writeCache", "key=p4/", 0, 279))
.add(new CacheOperation("Alluxio.writeCache", "key=p5/", 0, 279))
.build());
assertFileSystemAccesses(
"SELECT * FROM test_cache_file_operations",
ImmutableMultiset.<CacheOperation>builder()
.add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 279))
.add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 279))
.add(new CacheOperation("Alluxio.readCached", "key=p3/", 0, 279))
.add(new CacheOperation("Alluxio.readCached", "key=p4/", 0, 279))
.add(new CacheOperation("Alluxio.readCached", "key=p5/", 0, 279))
.build());
}

private void assertFileSystemAccesses(@Language("SQL") String query, Multiset<CacheOperation> expectedCacheAccesses)
{
DistributedQueryRunner queryRunner = getDistributedQueryRunner();
queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
assertMultisetsEqual(getCacheOperations(), expectedCacheAccesses);
}

private Multiset<CacheOperation> getCacheOperations()
{
return getQueryRunner().getSpans().stream()
.filter(span -> span.getName().startsWith("Alluxio."))
.filter(span -> !isTrinoSchemaOrPermissions(requireNonNull(span.getAttributes().get(CACHE_FILE_LOCATION))))
.map(span -> CacheOperation.create(span.getName(), span.getAttributes()))
.collect(toCollection(HashMultiset::create));
}

private static final Pattern DATA_FILE_PATTERN = Pattern.compile(".*?/(?<partition>key=[^/]*/)?(?<queryId>\\d{8}_\\d{6}_\\d{5}_\\w{5})_(?<uuid>[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");

private record CacheOperation(String operationName, String fileId, long position, long length)
{
public static CacheOperation create(String operationName, Attributes attributes)
{
String path = requireNonNull(attributes.get(CACHE_FILE_LOCATION));
String fileName = path.replaceFirst(".*/", "");

long position = switch (operationName) {
case "Alluxio.readCached" -> requireNonNull(attributes.get(CACHE_FILE_READ_POSITION));
case "Alluxio.readExternal" -> requireNonNull(attributes.get(CACHE_FILE_READ_POSITION));
case "Alluxio.writeCache" -> requireNonNull(attributes.get(CACHE_FILE_WRITE_POSITION));
default -> throw new IllegalArgumentException("Unexpected operation name: " + operationName);
};

long length = switch (operationName) {
case "Alluxio.readCached" -> requireNonNull(attributes.get(CACHE_FILE_READ_SIZE));
case "Alluxio.readExternal" -> requireNonNull(attributes.get(CACHE_FILE_READ_SIZE));
case "Alluxio.writeCache" -> requireNonNull(attributes.get(CACHE_FILE_WRITE_SIZE));
default -> throw new IllegalArgumentException("Unexpected operation name: " + operationName);
};

if (!path.contains("/.trino")) {
Matcher matcher = DATA_FILE_PATTERN.matcher(path);
if (matcher.matches()) {
return new CacheOperation(operationName, matcher.group("partition"), position, length);
}
}
else {
return new CacheOperation(operationName, fileName, position, length);
}
throw new IllegalArgumentException("File not recognized: " + path);
}
}

private static boolean isTrinoSchemaOrPermissions(String path)
{
return path.endsWith(".trinoSchema") || path.contains(".trinoPermissions");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.stats.CounterStat;
import io.airlift.units.DataSize;
import io.trino.filesystem.cache.DefaultCachingHostAddressProvider;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorSplitSource;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -57,6 +58,7 @@ public void testOutstandingSplitCount()
new TestingHiveSplitLoader(),
Executors.newFixedThreadPool(5),
new CounterStat(),
new DefaultCachingHostAddressProvider(),
false);

// add 10 splits
Expand Down Expand Up @@ -92,6 +94,7 @@ public void testDynamicPartitionPruning()
new TestingHiveSplitLoader(),
Executors.newFixedThreadPool(5),
new CounterStat(),
new DefaultCachingHostAddressProvider(),
false);

// add two splits, one of the splits is dynamically pruned
Expand Down Expand Up @@ -119,6 +122,7 @@ public void testEvenlySizedSplitRemainder()
new TestingHiveSplitLoader(),
Executors.newSingleThreadExecutor(),
new CounterStat(),
new DefaultCachingHostAddressProvider(),
false);

// One byte larger than the initial split max size
Expand Down Expand Up @@ -147,6 +151,7 @@ public void testFail()
new TestingHiveSplitLoader(),
Executors.newFixedThreadPool(5),
new CounterStat(),
new DefaultCachingHostAddressProvider(),
false);

// add some splits
Expand Down Expand Up @@ -198,6 +203,7 @@ public void testReaderWaitsForSplits()
new TestingHiveSplitLoader(),
Executors.newFixedThreadPool(5),
new CounterStat(),
new DefaultCachingHostAddressProvider(),
false);

SettableFuture<ConnectorSplit> splits = SettableFuture.create();
Expand Down Expand Up @@ -253,6 +259,7 @@ public void testOutstandingSplitSize()
new TestingHiveSplitLoader(),
Executors.newFixedThreadPool(5),
new CounterStat(),
new DefaultCachingHostAddressProvider(),
false);
int testSplitSizeInBytes = new TestSplit(0).getEstimatedSizeInBytes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public final class TestGroups
public static final String HIVE_COMPRESSION = "hive_compression";
public static final String HIVE_TRANSACTIONAL = "hive_transactional";
public static final String HIVE_VIEW_COMPATIBILITY = "hive_view_compatibility";
public static final String HIVE_CACHING = "hive_caching";
public static final String HIVE_ALLUXIO_CACHING = "hive_alluxio_caching";
public static final String HIVE_ICEBERG_REDIRECTIONS = "hive_iceberg_redirections";
public static final String HIVE_HUDI_REDIRECTIONS = "hive_hudi_redirections";
public static final String HIVE_KERBEROS = "hive_kerberos";
Expand Down
Loading

0 comments on commit 9c476f7

Please sign in to comment.