From ef3d5672cc7ffc74304ae0232bf65acb6395f31f Mon Sep 17 00:00:00 2001 From: cgivre Date: Wed, 24 Jul 2024 14:40:07 -0400 Subject: [PATCH] Initial Experiments --- contrib/storage-splunk/pom.xml | 6 +++++ .../drill/exec/store/splunk/SplunkSchema.java | 25 +++++++++++++++++-- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/contrib/storage-splunk/pom.xml b/contrib/storage-splunk/pom.xml index 9a68a6ffa0f..5e9b8f1de20 100644 --- a/contrib/storage-splunk/pom.xml +++ b/contrib/storage-splunk/pom.xml @@ -58,6 +58,12 @@ + + com.github.ben-manes.caffeine + caffeine + 3.1.8 + + org.apache.drill.exec diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java index c8cecab5d02..6afd6cd436b 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java @@ -18,6 +18,9 @@ package org.apache.drill.exec.store.splunk; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.collect.Sets; import com.splunk.IndexCollection; import org.apache.calcite.schema.Table; import org.apache.drill.common.exceptions.UserException; @@ -28,7 +31,6 @@ import org.apache.drill.exec.planner.logical.ModifyTableEntry; import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.StorageStrategy; -import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; public class SplunkSchema extends AbstractSchema { private final static Logger logger = LoggerFactory.getLogger(SplunkSchema.class); @@ -45,12 +48,18 @@ public class SplunkSchema extends AbstractSchema { private final Map activeTables = new HashMap<>(); private final SplunkStoragePlugin plugin; private final String queryUserName; + private final Cache> cache; public SplunkSchema(SplunkStoragePlugin plugin, String queryUserName) { super(Collections.emptyList(), plugin.getName()); this.plugin = plugin; this.queryUserName = queryUserName; + // TODO Add Configuration Parameters for the schema cache + this.cache = Caffeine.newBuilder() + .expireAfterAccess(90, TimeUnit.MINUTES) + .maximumSize(100) + .build(); registerIndexes(); } @@ -148,8 +157,20 @@ private void registerIndexes() { registerTable(SPL_TABLE_NAME, new DynamicDrillTable(plugin, plugin.getName(), new SplunkScanSpec(plugin.getName(), SPL_TABLE_NAME, plugin.getConfig(), queryUserName))); + Set indexList; // Retrieve and add all other Splunk indexes - for (String indexName : connection.getIndexes().keySet()) { + // First check the cache to see if we have a list of indexes. + String nameKey = queryUserName + "-" + plugin.getName(); + indexList = cache.getIfPresent(nameKey); + + // If the index list is not in the cache, query Splunk, retrieve the index list and add it to the cache. + if (indexList == null) { + logger.debug("Index list not in Splunk schema cache. Retrieving from Splunk."); + indexList = connection.getIndexes().keySet(); + cache.put(nameKey, indexList); + } + + for (String indexName : indexList) { logger.debug("Registering {}", indexName); registerTable(indexName, new DynamicDrillTable(plugin, plugin.getName(), new SplunkScanSpec(plugin.getName(), indexName, plugin.getConfig(), queryUserName)));