Skip to content

Commit

Permalink
Fix BigQuery case-insensitive-cache
Browse files Browse the repository at this point in the history
  • Loading branch information
pajaks committed Sep 18, 2024
1 parent 14c5169 commit cacc1ed
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.cache.EvictableCacheBuilder;
Expand All @@ -49,7 +50,6 @@
import io.trino.spi.connector.TableNotFoundException;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -107,8 +107,8 @@ public class BigQueryClient
private final ViewMaterializationCache materializationCache;
private final boolean caseInsensitiveNameMatching;
private final LoadingCache<String, List<DatasetId>> remoteDatasetIdCache;
private final Cache<String, Map<String, RemoteDatabaseObject>> remoteDatasetCaseInsentiveCache;
private final Cache<DatasetId, Map<TableId, RemoteDatabaseObject>> remoteTableCaseInsentiveCache;
private final Cache<String, RemoteDatabaseObject> remoteDatasetCaseInsentiveCache;
private final Cache<TableId, RemoteDatabaseObject> remoteTableCaseInsentiveCache;
private final Optional<String> configProjectId;

public BigQueryClient(
Expand Down Expand Up @@ -162,23 +162,18 @@ public Optional<RemoteDatabaseObject> toRemoteDataset(String projectId, String d
return Optional.of(RemoteDatabaseObject.of(datasetName));
}

try {
Map<String, RemoteDatabaseObject> datasetMap = remoteDatasetCaseInsentiveCache.get(projectId, () -> {
Map<String, RemoteDatabaseObject> mapping = new HashMap<>();
for (DatasetId dataset : datasetIds.get()) {
mapping.merge(
dataset.getDataset().toLowerCase(ENGLISH),
RemoteDatabaseObject.of(dataset.getDataset()),
(currentValue, collision) -> currentValue.registerCollision(collision.getOnlyRemoteName()));
}
return mapping;
});

return Optional.ofNullable(datasetMap.get(datasetName));
Optional<RemoteDatabaseObject> remoteDataSetFromCache = Optional.ofNullable(remoteDatasetCaseInsentiveCache.getIfPresent(datasetName));
if (remoteDataSetFromCache.isPresent()) {
return remoteDataSetFromCache;
}
catch (ExecutionException e) {
return Optional.empty();

// Get all information from BigQuery and update cache from all fetched information
for (DatasetId dataset : datasetIds.get()) {
String newCacheKey = dataset.getDataset().toLowerCase(ENGLISH);
RemoteDatabaseObject newValue = RemoteDatabaseObject.of(dataset.getDataset());
updateCache(remoteDatasetCaseInsentiveCache, newCacheKey, newValue);
}
return Optional.ofNullable(remoteDatasetCaseInsentiveCache.getIfPresent(datasetName));
}

public Optional<RemoteDatabaseObject> toRemoteTable(ConnectorSession session, String projectId, String remoteDatasetName, String tableName)
Expand All @@ -202,23 +197,39 @@ private Optional<RemoteDatabaseObject> toRemoteTable(String projectId, String re
}

TableId cacheKey = TableId.of(projectId, remoteDatasetName, tableName);
DatasetId datasetId = DatasetId.of(projectId, remoteDatasetName);

Optional<RemoteDatabaseObject> remoteTableFromCache = Optional.ofNullable(remoteTableCaseInsentiveCache.getIfPresent(cacheKey));
if (remoteTableFromCache.isPresent()) {
return remoteTableFromCache;
}

// Get all information from BigQuery and update cache from all fetched information
for (TableId table : tableIds.get()) {
TableId newCacheKey = tableIdToLowerCase(table);
RemoteDatabaseObject newValue = RemoteDatabaseObject.of(table.getTable());
updateCache(remoteTableCaseInsentiveCache, newCacheKey, newValue);
}

return Optional.ofNullable(remoteTableCaseInsentiveCache.getIfPresent(cacheKey));
}

private <T> void updateCache(Cache<T, RemoteDatabaseObject> caseInsensitiveCache, T newCacheKey, RemoteDatabaseObject newValue)
{
try {
Map<TableId, RemoteDatabaseObject> tableMap = remoteTableCaseInsentiveCache.get(datasetId, () -> {
Map<TableId, RemoteDatabaseObject> mapping = new HashMap<>();
for (TableId table : tableIds.get()) {
mapping.merge(
tableIdToLowerCase(table),
RemoteDatabaseObject.of(table.getTable()),
(currentValue, collision) -> currentValue.registerCollision(collision.getOnlyRemoteName()));
}
return mapping;
});

return Optional.ofNullable(tableMap.get(cacheKey));
RemoteDatabaseObject currentCacheValue = caseInsensitiveCache.getIfPresent(newCacheKey);
if (currentCacheValue == null) {
caseInsensitiveCache.get(newCacheKey, () -> newValue);
}
else if (!currentCacheValue.remoteNames.contains(newValue.getOnlyRemoteName())) {
// Cache already has key, check if new value is already registered and update with collision if it's not
RemoteDatabaseObject mergedValue = currentCacheValue.registerCollision(newValue.getOnlyRemoteName());
caseInsensitiveCache.invalidate(newCacheKey);
caseInsensitiveCache.get(newCacheKey, () -> mergedValue);
}
}
catch (ExecutionException e) {
return Optional.empty();
// Ignore errors while building cache
log.warn(e, "Error while loading value for key:" + newCacheKey);
}
}

Expand Down Expand Up @@ -350,7 +361,7 @@ public void createSchema(DatasetInfo datasetInfo)
{
bigQuery.create(datasetInfo);
remoteDatasetIdCache.invalidate(datasetInfo.getDatasetId().getProject());
remoteDatasetCaseInsentiveCache.invalidate(datasetInfo.getDatasetId().getProject());
remoteDatasetCaseInsentiveCache.invalidate(datasetInfo.getDatasetId().getDataset().toLowerCase(ENGLISH));
}

public void dropSchema(DatasetId datasetId, boolean cascade)
Expand All @@ -362,19 +373,19 @@ public void dropSchema(DatasetId datasetId, boolean cascade)
bigQuery.delete(datasetId);
}
remoteDatasetIdCache.invalidate(datasetId.getProject());
remoteDatasetCaseInsentiveCache.invalidate(datasetId.getProject());
remoteDatasetCaseInsentiveCache.invalidate(datasetId.getDataset().toLowerCase(ENGLISH));
}

public void createTable(TableInfo tableInfo)
{
bigQuery.create(tableInfo);
remoteTableCaseInsentiveCache.invalidate(DatasetId.of(tableInfo.getTableId().getProject(), tableInfo.getTableId().getDataset()));
remoteTableCaseInsentiveCache.invalidate(tableIdToLowerCase(tableInfo.getTableId()));
}

public void dropTable(TableId tableId)
{
bigQuery.delete(tableId);
remoteTableCaseInsentiveCache.invalidate(DatasetId.of(tableId.getProject(), tableId.getDataset()));
remoteTableCaseInsentiveCache.invalidate(tableIdToLowerCase(tableId));
}

Job create(JobInfo jobInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ protected QueryRunner createQueryRunner()
.setConnectorProperties(ImmutableMap.<String, String>builder()
.put("bigquery.case-insensitive-name-matching", "true")
.put("bigquery.case-insensitive-name-matching.cache-ttl", "1m")
.put("bigquery.service-cache-ttl", "0ms") // Disable service cache to focus on metadata cache
.buildOrThrow())
.build();
}
Expand Down

0 comments on commit cacc1ed

Please sign in to comment.