From ae6499b0384bf51556a119e3458314536789229c Mon Sep 17 00:00:00 2001 From: Slawomir Pajak Date: Mon, 16 Sep 2024 11:47:57 +0200 Subject: [PATCH] Fix BigQuery case-insensitive-cache --- .../trino/plugin/bigquery/BigQueryClient.java | 92 +++++++++++-------- .../TestBigQueryCaseInsensitiveMapping.java | 1 - 2 files changed, 56 insertions(+), 37 deletions(-) diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java index a5a19c0169e03..8685c8e807b91 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java @@ -39,6 +39,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.UncheckedExecutionException; import io.airlift.log.Logger; import io.airlift.units.Duration; import io.trino.cache.EvictableCacheBuilder; @@ -49,7 +50,6 @@ import io.trino.spi.connector.TableNotFoundException; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -107,8 +107,8 @@ public class BigQueryClient private final ViewMaterializationCache materializationCache; private final boolean caseInsensitiveNameMatching; private final LoadingCache> remoteDatasetIdCache; - private final Cache> remoteDatasetCaseInsentiveCache; - private final Cache> remoteTableCaseInsentiveCache; + private final Cache remoteDatasetCaseInsentiveCache; + private final Cache remoteTableCaseInsentiveCache; private final Optional configProjectId; public BigQueryClient( @@ -162,23 +162,20 @@ public Optional toRemoteDataset(String projectId, String d return Optional.of(RemoteDatabaseObject.of(datasetName)); } - try { - Map datasetMap = remoteDatasetCaseInsentiveCache.get(projectId, () -> { - Map mapping = new HashMap<>(); - for (DatasetId dataset : datasetIds.get()) { - mapping.merge( - dataset.getDataset().toLowerCase(ENGLISH), - RemoteDatabaseObject.of(dataset.getDataset()), - (currentValue, collision) -> currentValue.registerCollision(collision.getOnlyRemoteName())); - } - return mapping; - }); - - return Optional.ofNullable(datasetMap.get(datasetName)); + DatasetId cacheKey = DatasetId.of(projectId, datasetName); + + Optional remoteDataSetFromCache = Optional.ofNullable(remoteDatasetCaseInsentiveCache.getIfPresent(cacheKey)); + if (remoteDataSetFromCache.isPresent()) { + return remoteDataSetFromCache; } - catch (ExecutionException e) { - return Optional.empty(); + + // Get all information from BigQuery and update cache from all fetched information + for (DatasetId datasetId : datasetIds.get()) { + DatasetId newCacheKey = datasetIdToLowerCase(datasetId); + RemoteDatabaseObject newValue = RemoteDatabaseObject.of(datasetId.getDataset()); + updateCache(remoteDatasetCaseInsentiveCache, newCacheKey, newValue); } + return Optional.ofNullable(remoteDatasetCaseInsentiveCache.getIfPresent(cacheKey)); } public Optional toRemoteTable(ConnectorSession session, String projectId, String remoteDatasetName, String tableName) @@ -202,23 +199,39 @@ private Optional toRemoteTable(String projectId, String re } TableId cacheKey = TableId.of(projectId, remoteDatasetName, tableName); - DatasetId datasetId = DatasetId.of(projectId, remoteDatasetName); + + Optional remoteTableFromCache = Optional.ofNullable(remoteTableCaseInsentiveCache.getIfPresent(cacheKey)); + if (remoteTableFromCache.isPresent()) { + return remoteTableFromCache; + } + + // Get all information from BigQuery and update cache from all fetched information + for (TableId table : tableIds.get()) { + TableId newCacheKey = tableIdToLowerCase(table); + RemoteDatabaseObject newValue = RemoteDatabaseObject.of(table.getTable()); + updateCache(remoteTableCaseInsentiveCache, newCacheKey, newValue); + } + + return Optional.ofNullable(remoteTableCaseInsentiveCache.getIfPresent(cacheKey)); + } + + private void updateCache(Cache caseInsensitiveCache, T newCacheKey, RemoteDatabaseObject newValue) + { try { - Map tableMap = remoteTableCaseInsentiveCache.get(datasetId, () -> { - Map mapping = new HashMap<>(); - for (TableId table : tableIds.get()) { - mapping.merge( - tableIdToLowerCase(table), - RemoteDatabaseObject.of(table.getTable()), - (currentValue, collision) -> currentValue.registerCollision(collision.getOnlyRemoteName())); - } - return mapping; - }); - - return Optional.ofNullable(tableMap.get(cacheKey)); + RemoteDatabaseObject currentCacheValue = caseInsensitiveCache.getIfPresent(newCacheKey); + if (currentCacheValue == null) { + caseInsensitiveCache.get(newCacheKey, () -> newValue); + } + else if (!currentCacheValue.remoteNames.contains(newValue.getOnlyRemoteName())) { + // Cache already has key, check if new value is already registered and update with collision if it's not + RemoteDatabaseObject mergedValue = currentCacheValue.registerCollision(newValue.getOnlyRemoteName()); + caseInsensitiveCache.invalidate(newCacheKey); + caseInsensitiveCache.get(newCacheKey, () -> mergedValue); + } } catch (ExecutionException e) { - return Optional.empty(); + // Loading cache value should never throw as it's only + throw new UncheckedExecutionException(e); } } @@ -230,6 +243,13 @@ private static TableId tableIdToLowerCase(TableId tableId) tableId.getTable().toLowerCase(ENGLISH)); } + private static DatasetId datasetIdToLowerCase(DatasetId tableId) + { + return DatasetId.of( + tableId.getProject(), + tableId.getDataset().toLowerCase(ENGLISH)); + } + public DatasetInfo getDataset(DatasetId datasetId) { return bigQuery.getDataset(datasetId); @@ -350,7 +370,7 @@ public void createSchema(DatasetInfo datasetInfo) { bigQuery.create(datasetInfo); remoteDatasetIdCache.invalidate(datasetInfo.getDatasetId().getProject()); - remoteDatasetCaseInsentiveCache.invalidate(datasetInfo.getDatasetId().getProject()); + remoteDatasetCaseInsentiveCache.invalidate(datasetIdToLowerCase(datasetInfo.getDatasetId())); } public void dropSchema(DatasetId datasetId, boolean cascade) @@ -362,19 +382,19 @@ public void dropSchema(DatasetId datasetId, boolean cascade) bigQuery.delete(datasetId); } remoteDatasetIdCache.invalidate(datasetId.getProject()); - remoteDatasetCaseInsentiveCache.invalidate(datasetId.getProject()); + remoteDatasetCaseInsentiveCache.invalidate(datasetIdToLowerCase(datasetId)); } public void createTable(TableInfo tableInfo) { bigQuery.create(tableInfo); - remoteTableCaseInsentiveCache.invalidate(DatasetId.of(tableInfo.getTableId().getProject(), tableInfo.getTableId().getDataset())); + remoteTableCaseInsentiveCache.invalidate(tableIdToLowerCase(tableInfo.getTableId())); } public void dropTable(TableId tableId) { bigQuery.delete(tableId); - remoteTableCaseInsentiveCache.invalidate(DatasetId.of(tableId.getProject(), tableId.getDataset())); + remoteTableCaseInsentiveCache.invalidate(tableIdToLowerCase(tableId)); } Job create(JobInfo jobInfo) diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryCaseInsensitiveMapping.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryCaseInsensitiveMapping.java index 21fd4822d2e44..16343128de0df 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryCaseInsensitiveMapping.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryCaseInsensitiveMapping.java @@ -46,7 +46,6 @@ protected QueryRunner createQueryRunner() .setConnectorProperties(ImmutableMap.builder() .put("bigquery.case-insensitive-name-matching", "true") .put("bigquery.case-insensitive-name-matching.cache-ttl", "1m") - .put("bigquery.service-cache-ttl", "0ms") // Disable service cache to focus on metadata cache .buildOrThrow()) .build(); }