Skip to content

Commit

Permalink
Resource Management of Dataset Cache Layer (#417)
Browse files Browse the repository at this point in the history
* Add task status QUEUED & add cache task and query task to pool

* Add max cache table size ratio to limit cache task

* Add query task timeout

* Add query memory limit

* Fix style check

* Add test

* Modify duckdb.max-query-timeout to duckdb.max-cache-query-timeout

* Implement cache retry in case the cache task surpasses the memory limit

* Remove query memory limit

* Modify Cache Query Task exception

* Remove unused import

* Add more information in error message
  • Loading branch information
AShiou authored Jan 4, 2024
1 parent aaf01fd commit 9d33e7b
Show file tree
Hide file tree
Showing 12 changed files with 540 additions and 50 deletions.
5 changes: 5 additions & 0 deletions accio-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@
<artifactId>javax.inject</artifactId>
</dependency>

<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@
import io.airlift.configuration.Config;
import io.airlift.units.DataSize;

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;

public class DuckDBConfig
{
private DataSize memoryLimit = DataSize.of(Runtime.getRuntime().maxMemory() / 2, DataSize.Unit.BYTE);
private String homeDirectory;
private String tempDirectory = "/tmp/duck";
private int maxConcurrentTasks = 10;
private double maxCacheTableSizeRatio = 0.5;
private long maxCacheQueryTimeout = 20;
private long cacheTaskRetryDelay = 60;

public DataSize getMemoryLimit()
{
Expand Down Expand Up @@ -55,4 +62,50 @@ public void setTempDirectory(String tempDirectory)
{
this.tempDirectory = tempDirectory;
}

public int getMaxConcurrentTasks()
{
return maxConcurrentTasks;
}

@Config("duckdb.max-concurrent-tasks")
public void setMaxConcurrentTasks(int maxConcurrentTasks)
{
this.maxConcurrentTasks = maxConcurrentTasks;
}

@Min(0)
@Max(1)
public double getMaxCacheTableSizeRatio()
{
return maxCacheTableSizeRatio;
}

@Config("duckdb.max-cache-table-size-ratio")
public void setMaxCacheTableSizeRatio(double maxCacheTableSizeRatio)
{
this.maxCacheTableSizeRatio = maxCacheTableSizeRatio;
}

public long getMaxCacheQueryTimeout()
{
return maxCacheQueryTimeout;
}

@Config("duckdb.max-cache-query-timeout")
public void setMaxCacheQueryTimeout(long maxCacheQueryTimeout)
{
this.maxCacheQueryTimeout = maxCacheQueryTimeout;
}

public long getCacheTaskRetryDelay()
{
return cacheTaskRetryDelay;
}

@Config("duckdb.cache-task-retry-delay")
public void setCacheTaskRetryDelay(long cacheTaskRetryDelay)
{
this.cacheTaskRetryDelay = cacheTaskRetryDelay;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.accio.base.client.duckdb;

import io.accio.base.AccioException;
import io.airlift.units.DataSize;

import java.util.HashMap;
import java.util.Map;

import static io.accio.base.metadata.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static java.util.Locale.ENGLISH;

public final class DuckdbUtil
{
private static final Map<String, DataSize.Unit> UNIT_MAP = new HashMap<>();

// https://github.com/duckdb/duckdb/blob/4c7cb20474baa3a8ca1d5d8ceb22beae0e4c0e4c/src/common/string_util.cpp#L178
static {
UNIT_MAP.put("BYTE", DataSize.Unit.BYTE);
UNIT_MAP.put("BYTES", DataSize.Unit.BYTE);
UNIT_MAP.put("KB", DataSize.Unit.KILOBYTE);
UNIT_MAP.put("KIB", DataSize.Unit.KILOBYTE);
UNIT_MAP.put("MB", DataSize.Unit.MEGABYTE);
UNIT_MAP.put("MIB", DataSize.Unit.MEGABYTE);
UNIT_MAP.put("GB", DataSize.Unit.GIGABYTE);
UNIT_MAP.put("GIB", DataSize.Unit.GIGABYTE);
UNIT_MAP.put("TB", DataSize.Unit.TERABYTE);
UNIT_MAP.put("TIB", DataSize.Unit.TERABYTE);
UNIT_MAP.put("PB", DataSize.Unit.PETABYTE);
UNIT_MAP.put("PIB", DataSize.Unit.PETABYTE);
}

private DuckdbUtil() {}

public static DataSize convertDuckDBUnits(String valueWithUnit)
{
try {
String valueWithUnitUpperCase = valueWithUnit.toUpperCase(ENGLISH);
for (Map.Entry<String, DataSize.Unit> entry : UNIT_MAP.entrySet()) {
String unit = entry.getKey();
if (valueWithUnitUpperCase.endsWith(unit)) {
double value = Double.parseDouble(valueWithUnitUpperCase.substring(0, valueWithUnitUpperCase.length() - unit.length()).trim());
return DataSize.of((long) value, entry.getValue());
}
}
}
catch (Exception e) {
throw new AccioException(GENERIC_INTERNAL_ERROR, String.format("Failed to parse duckdb value %s", valueWithUnit), e);
}
throw new AccioException(GENERIC_INTERNAL_ERROR, String.format("Failed to parse duckdb value %s", valueWithUnit));
}
}
124 changes: 83 additions & 41 deletions accio-cache/src/main/java/io/accio/cache/CacheManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.accio.base.ConnectorRecordIterator;
import io.accio.base.Parameter;
import io.accio.base.SessionContext;
import io.accio.base.client.duckdb.DuckDBConfig;
import io.accio.base.client.duckdb.DuckdbClient;
import io.accio.base.dto.CacheInfo;
import io.accio.base.sql.SqlConverter;
Expand All @@ -35,7 +36,6 @@
import javax.annotation.PreDestroy;
import javax.inject.Inject;

import java.sql.SQLException;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
Expand All @@ -50,11 +50,13 @@

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.accio.base.CatalogSchemaTableName.catalogSchemaTableName;
import static io.accio.base.metadata.StandardErrorCode.EXCEEDED_GLOBAL_MEMORY_LIMIT;
import static io.accio.base.metadata.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.accio.base.metadata.StandardErrorCode.GENERIC_USER_ERROR;
import static io.accio.cache.EventLogger.Level.ERROR;
import static io.accio.cache.EventLogger.Level.INFO;
import static io.accio.cache.TaskInfo.TaskStatus.DONE;
import static io.accio.cache.TaskInfo.TaskStatus.QUEUED;
import static io.accio.cache.TaskInfo.TaskStatus.RUNNING;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.concurrent.Threads.threadsNamed;
Expand All @@ -63,10 +65,10 @@
import static java.lang.System.currentTimeMillis;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;

public class CacheManager
Expand All @@ -82,11 +84,14 @@ public class CacheManager
private final ConcurrentLinkedQueue<PathInfo> tempFileLocations = new ConcurrentLinkedQueue<>();
private final CachedTableMapping cachedTableMapping;
private final ConcurrentMap<CatalogSchemaTableName, ScheduledFuture<?>> cacheScheduledFutures = new ConcurrentHashMap<>();
private final ConcurrentMap<CatalogSchemaTableName, ScheduledFuture<?>> retryScheduledFutures = new ConcurrentHashMap<>();
private final ScheduledThreadPoolExecutor refreshExecutor = new ScheduledThreadPoolExecutor(5, daemonThreadsNamed("cache-refresh-%s"));

private final ScheduledThreadPoolExecutor retryExecutor = new ScheduledThreadPoolExecutor(5, daemonThreadsNamed("cache-retry-%s"));
private final ExecutorService executorService = newCachedThreadPool(threadsNamed("cache-manager-%s"));
private final ConcurrentHashMap<CatalogSchemaTableName, Task> tasks = new ConcurrentHashMap<>();
private final EventLogger eventLogger;
private final DuckdbTaskManager duckdbTaskManager;
private final DuckDBConfig duckdbConfig;

@Inject
public CacheManager(
Expand All @@ -96,58 +101,83 @@ public CacheManager(
DuckdbClient duckdbClient,
CacheStorageConfig cacheStorageConfig,
CachedTableMapping cachedTableMapping,
EventLogger eventLogger)
EventLogger eventLogger,
DuckdbTaskManager duckdbTaskManager,
DuckDBConfig duckDBConfig)
{
this.sqlParser = new SqlParser();
this.sqlConverter = requireNonNull(sqlConverter, "sqlConverter is null");
this.cacheService = requireNonNull(cacheService, "cacheService is null");
this.extraRewriter = requireNonNull(extraRewriter, "extraRewriter is null");
this.duckdbClient = requireNonNull(duckdbClient, "duckdbClient is null");
this.duckdbTaskManager = requireNonNull(duckdbTaskManager, "duckdbTaskManager is null");
this.cacheStorageConfig = requireNonNull(cacheStorageConfig, "cacheStorageConfig is null");
this.cachedTableMapping = requireNonNull(cachedTableMapping, "cachedTableMapping is null");
this.eventLogger = requireNonNull(eventLogger, "eventLogger is null");
this.duckdbConfig = requireNonNull(duckDBConfig, "duckDBConfig is null");
refreshExecutor.setRemoveOnCancelPolicy(true);
}

private synchronized CompletableFuture<Void> refreshCache(AccioMDL mdl, CacheInfo cacheInfo)
private synchronized CompletableFuture<Void> refreshCache(AccioMDL mdl, CacheInfo cacheInfo, TaskInfo taskInfo)
{
CatalogSchemaTableName catalogSchemaTableName = catalogSchemaTableName(mdl.getCatalog(), mdl.getSchema(), cacheInfo.getName());
Optional<Task> taskOptional = Optional.ofNullable(tasks.get(catalogSchemaTableName));
if (taskOptional.isPresent() && taskOptional.get().getTaskInfo().inProgress()) {
throw new AccioException(GENERIC_USER_ERROR, format("cache is already running; catalogName: %s, schemaName: %s, tableName: %s", mdl.getCatalog(), mdl.getSchema(), cacheInfo.getName()));
}
removeCacheIfExist(catalogSchemaTableName);
return doCache(mdl, cacheInfo);
return doCache(mdl, cacheInfo, taskInfo);
}

private CompletableFuture<Void> handleCache(AccioMDL mdl, CacheInfo cacheInfo)
private CompletableFuture<Void> handleCache(AccioMDL mdl, CacheInfo cacheInfo, TaskInfo taskInfo)
{
return refreshCache(mdl, cacheInfo)
CatalogSchemaTableName catalogSchemaTableName = new CatalogSchemaTableName(mdl.getCatalog(), mdl.getSchema(), cacheInfo.getName());
String duckdbTableName = format("%s_%s", cacheInfo.getName(), randomUUID().toString().replace("-", ""));
long createTime = currentTimeMillis();
return refreshCache(mdl, cacheInfo, taskInfo)
.thenRun(() -> {
if (cacheInfo.getRefreshTime().toMillis() > 0) {
cacheScheduledFutures.put(
new CatalogSchemaTableName(mdl.getCatalog(), mdl.getSchema(), cacheInfo.getName()),
catalogSchemaTableName,
refreshExecutor.scheduleWithFixedDelay(
() -> createTask(mdl, cacheInfo).join(),
cacheInfo.getRefreshTime().toMillis(),
cacheInfo.getRefreshTime().toMillis(),
MILLISECONDS));
}
})
.exceptionally(e -> {
String errMsg = format("Failed to do cache for cacheInfo %s; caused by %s", cacheInfo.getName(), e.getMessage());
// If the cache fails because DuckDB doesn't have sufficient memory, we'll attempt to retry it later.
if (e.getCause() instanceof AccioException && EXCEEDED_GLOBAL_MEMORY_LIMIT.toErrorCode().equals(((AccioException) e.getCause()).getErrorCode())) {
retryScheduledFutures.put(
catalogSchemaTableName,
retryExecutor.schedule(
() -> createTask(mdl, cacheInfo).join(),
duckdbConfig.getCacheTaskRetryDelay(),
SECONDS));
errMsg += "; will retry after " + duckdbConfig.getCacheTaskRetryDelay() + " seconds";
}
duckdbClient.dropTableQuietly(duckdbTableName);
LOG.error(e, errMsg);
cachedTableMapping.putCachedTableMapping(catalogSchemaTableName, new CacheInfoPair(cacheInfo, Optional.empty(), Optional.of(errMsg), createTime));
return null;
});
}

public ConnectorRecordIterator query(String sql, List<Parameter> parameters)
throws SQLException
{
return DuckdbRecordIterator.of(duckdbClient, sql, parameters.stream().collect(toImmutableList()));
return duckdbTaskManager.addCacheQueryTask(() -> DuckdbRecordIterator.of(duckdbClient, sql, parameters.stream().collect(toImmutableList())));
}

private CompletableFuture<Void> doCache(AccioMDL mdl, CacheInfo cacheInfo)
private CompletableFuture<Void> doCache(AccioMDL mdl, CacheInfo cacheInfo, TaskInfo taskInfo)
{
CatalogSchemaTableName catalogSchemaTableName = new CatalogSchemaTableName(mdl.getCatalog(), mdl.getSchema(), cacheInfo.getName());
String duckdbTableName = format("%s_%s", cacheInfo.getName(), randomUUID().toString().replace("-", ""));
long createTime = currentTimeMillis();
return runAsync(() -> {
return duckdbTaskManager.addCacheTask(() -> {
duckdbTaskManager.checkCacheMemoryLimit();
taskInfo.setTaskStatus(RUNNING);
SessionContext sessionContext = SessionContext.builder()
.setCatalog(mdl.getCatalog())
.setSchema(mdl.getSchema())
Expand All @@ -161,12 +191,6 @@ private CompletableFuture<Void> doCache(AccioMDL mdl, CacheInfo cacheInfo)

createCache(mdl, cacheInfo, sessionContext, rewrittenStatement, duckdbTableName);
cachedTableMapping.putCachedTableMapping(catalogSchemaTableName, new CacheInfoPair(cacheInfo, duckdbTableName, createTime));
}).exceptionally(e -> {
duckdbClient.dropTableQuietly(duckdbTableName);
String errMsg = format("Failed to do cache for cacheInfo %s; caused by %s", cacheInfo.getName(), e.getMessage());
LOG.error(e, errMsg);
cachedTableMapping.putCachedTableMapping(catalogSchemaTableName, new CacheInfoPair(cacheInfo, Optional.empty(), Optional.of(errMsg), createTime));
return null;
});
}

Expand Down Expand Up @@ -211,6 +235,14 @@ public void removeCacheIfExist(String catalogName, String schemaName)
cacheScheduledFutures.remove(catalogSchemaTableName);
});

retryScheduledFutures.keySet().stream()
.filter(catalogSchemaTableName -> catalogSchemaTableName.getCatalogName().equals(catalogName)
&& catalogSchemaTableName.getSchemaTableName().getSchemaName().equals(schemaName))
.forEach(catalogSchemaTableName -> {
retryScheduledFutures.get(catalogSchemaTableName).cancel(true);
retryScheduledFutures.remove(catalogSchemaTableName);
});

cachedTableMapping.entrySet().stream()
.filter(entry -> entry.getKey().getCatalogName().equals(catalogName)
&& entry.getKey().getSchemaTableName().getSchemaName().equals(schemaName))
Expand All @@ -232,6 +264,11 @@ public void removeCacheIfExist(CatalogSchemaTableName catalogSchemaTableName)
cacheScheduledFutures.remove(catalogSchemaTableName);
}

if (retryScheduledFutures.containsKey(catalogSchemaTableName)) {
retryScheduledFutures.get(catalogSchemaTableName).cancel(true);
retryScheduledFutures.remove(catalogSchemaTableName);
}

Optional.ofNullable(cachedTableMapping.get(catalogSchemaTableName)).ifPresent(cacheInfoPair -> {
cacheInfoPair.getTableName().ifPresent(duckdbClient::dropTableQuietly);
cachedTableMapping.remove(catalogSchemaTableName);
Expand All @@ -248,6 +285,12 @@ public boolean cacheScheduledFutureExists(CatalogSchemaTableName catalogSchemaTa
return cacheScheduledFutures.containsKey(catalogSchemaTableName);
}

@VisibleForTesting
public boolean retryScheduledFutureExists(CatalogSchemaTableName catalogSchemaTableName)
{
return retryScheduledFutures.containsKey(catalogSchemaTableName);
}

@PreDestroy
public void stop()
{
Expand Down Expand Up @@ -298,9 +341,9 @@ public CompletableFuture<TaskInfo> createTask(AccioMDL mdl, CacheInfo cacheInfo)
{
return supplyAsync(() -> {
CatalogSchemaTableName catalogSchemaTableName = new CatalogSchemaTableName(mdl.getCatalog(), mdl.getSchema(), cacheInfo.getName());
TaskInfo taskInfo = new TaskInfo(mdl.getCatalog(), mdl.getSchema(), cacheInfo.getName(), RUNNING, Instant.now());
TaskInfo taskInfo = new TaskInfo(mdl.getCatalog(), mdl.getSchema(), cacheInfo.getName(), QUEUED, Instant.now());
// To fix flaky test, we pass value to tasks instead of a reference;
Task task = new Task(TaskInfo.copyFrom(taskInfo), handleCache(mdl, cacheInfo));
Task task = new Task(TaskInfo.copyFrom(taskInfo), mdl, cacheInfo);
tasks.put(catalogSchemaTableName, task);
return taskInfo;
});
Expand Down Expand Up @@ -355,28 +398,27 @@ private class Task
private final TaskInfo taskInfo;
private final CompletableFuture<?> completableFuture;

public Task(TaskInfo taskInfo, CompletableFuture<?> completableFuture)
public Task(TaskInfo taskInfo, AccioMDL mdl, CacheInfo cacheInfo)
{
this.taskInfo = taskInfo;
this.completableFuture =
completableFuture
.thenRun(() -> {
CacheInfoPair cacheInfoPair = cachedTableMapping.getCacheInfoPair(
taskInfo.getCatalogName(),
taskInfo.getSchemaName(), taskInfo.getTableName());
taskInfo.setCachedTable(new CachedTable(
cacheInfoPair.getCacheInfo().getName(),
cacheInfoPair.getErrorMessage(),
cacheInfoPair.getCacheInfo().getRefreshTime(),
Instant.ofEpochMilli(cacheInfoPair.getCreateTime())));
taskInfo.setTaskStatus(DONE);
if (cacheInfoPair.getErrorMessage().isPresent()) {
eventLogger.logEvent(ERROR, "CREATE_TASK", taskInfo);
}
else {
eventLogger.logEvent(INFO, "CREATE_TASK", taskInfo);
}
});
this.completableFuture = handleCache(mdl, cacheInfo, taskInfo)
.thenRun(() -> {
CacheInfoPair cacheInfoPair = cachedTableMapping.getCacheInfoPair(
taskInfo.getCatalogName(),
taskInfo.getSchemaName(), taskInfo.getTableName());
taskInfo.setCachedTable(new CachedTable(
cacheInfoPair.getCacheInfo().getName(),
cacheInfoPair.getErrorMessage(),
cacheInfoPair.getCacheInfo().getRefreshTime(),
Instant.ofEpochMilli(cacheInfoPair.getCreateTime())));
taskInfo.setTaskStatus(DONE);
if (cacheInfoPair.getErrorMessage().isPresent()) {
eventLogger.logEvent(ERROR, "CREATE_TASK", taskInfo);
}
else {
eventLogger.logEvent(INFO, "CREATE_TASK", taskInfo);
}
});
}

public TaskInfo getTaskInfo()
Expand Down
Loading

0 comments on commit 9d33e7b

Please sign in to comment.