From 49b8ae1c5f16497f5eb1f77a0acbb7b92953252e Mon Sep 17 00:00:00 2001 From: cgivre Date: Wed, 24 Jul 2024 14:40:07 -0400 Subject: [PATCH 01/14] Initial Experiments --- contrib/storage-splunk/pom.xml | 6 +++++ .../drill/exec/store/splunk/SplunkSchema.java | 25 +++++++++++++++++-- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/contrib/storage-splunk/pom.xml b/contrib/storage-splunk/pom.xml index 9a68a6ffa0f..5e9b8f1de20 100644 --- a/contrib/storage-splunk/pom.xml +++ b/contrib/storage-splunk/pom.xml @@ -58,6 +58,12 @@ + + com.github.ben-manes.caffeine + caffeine + 3.1.8 + + org.apache.drill.exec diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java index c8cecab5d02..6afd6cd436b 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java @@ -18,6 +18,9 @@ package org.apache.drill.exec.store.splunk; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.collect.Sets; import com.splunk.IndexCollection; import org.apache.calcite.schema.Table; import org.apache.drill.common.exceptions.UserException; @@ -28,7 +31,6 @@ import org.apache.drill.exec.planner.logical.ModifyTableEntry; import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.StorageStrategy; -import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; public class SplunkSchema extends AbstractSchema { private final static Logger logger = LoggerFactory.getLogger(SplunkSchema.class); @@ -45,12 +48,18 @@ public class SplunkSchema extends AbstractSchema { private final Map activeTables = new HashMap<>(); private final SplunkStoragePlugin plugin; private final String queryUserName; + private final Cache> cache; public SplunkSchema(SplunkStoragePlugin plugin, String queryUserName) { super(Collections.emptyList(), plugin.getName()); this.plugin = plugin; this.queryUserName = queryUserName; + // TODO Add Configuration Parameters for the schema cache + this.cache = Caffeine.newBuilder() + .expireAfterAccess(90, TimeUnit.MINUTES) + .maximumSize(100) + .build(); registerIndexes(); } @@ -148,8 +157,20 @@ private void registerIndexes() { registerTable(SPL_TABLE_NAME, new DynamicDrillTable(plugin, plugin.getName(), new SplunkScanSpec(plugin.getName(), SPL_TABLE_NAME, plugin.getConfig(), queryUserName))); + Set indexList; // Retrieve and add all other Splunk indexes - for (String indexName : connection.getIndexes().keySet()) { + // First check the cache to see if we have a list of indexes. + String nameKey = queryUserName + "-" + plugin.getName(); + indexList = cache.getIfPresent(nameKey); + + // If the index list is not in the cache, query Splunk, retrieve the index list and add it to the cache. + if (indexList == null) { + logger.debug("Index list not in Splunk schema cache. Retrieving from Splunk."); + indexList = connection.getIndexes().keySet(); + cache.put(nameKey, indexList); + } + + for (String indexName : indexList) { logger.debug("Registering {}", indexName); registerTable(indexName, new DynamicDrillTable(plugin, plugin.getName(), new SplunkScanSpec(plugin.getName(), indexName, plugin.getConfig(), queryUserName))); From 48eaf42d9af71d9e86d73ed9035bb69eee19cf7a Mon Sep 17 00:00:00 2001 From: cgivre Date: Wed, 24 Jul 2024 17:56:46 -0400 Subject: [PATCH 02/14] Downgrade Caffeine to support Java 8 --- contrib/storage-splunk/pom.xml | 2 +- .../java/org/apache/drill/exec/store/splunk/SplunkSchema.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/contrib/storage-splunk/pom.xml b/contrib/storage-splunk/pom.xml index 5e9b8f1de20..78cc8b2ce85 100644 --- a/contrib/storage-splunk/pom.xml +++ b/contrib/storage-splunk/pom.xml @@ -61,7 +61,7 @@ com.github.ben-manes.caffeine caffeine - 3.1.8 + 2.9.3 diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java index 6afd6cd436b..c51e725542f 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java @@ -58,7 +58,7 @@ public SplunkSchema(SplunkStoragePlugin plugin, String queryUserName) { // TODO Add Configuration Parameters for the schema cache this.cache = Caffeine.newBuilder() .expireAfterAccess(90, TimeUnit.MINUTES) - .maximumSize(100) + .maximumSize(1000) .build(); registerIndexes(); From ad83b3425daee98eb589017b38678fc951513174 Mon Sep 17 00:00:00 2001 From: cgivre Date: Thu, 25 Jul 2024 09:35:33 -0400 Subject: [PATCH 03/14] Various fixes --- .../exec/store/splunk/SplunkBatchReader.java | 5 ++++- .../drill/exec/store/splunk/SplunkSchema.java | 17 ++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java index 8c3fb45bf6d..75cb2583402 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java @@ -56,6 +56,7 @@ public class SplunkBatchReader implements ManagedReader { private static final List TS_COLS = new ArrayList<>(Arrays.asList("_indextime", "_time")); private static final String EARLIEST_TIME_COLUMN = "earliestTime"; private static final String LATEST_TIME_COLUMN = "latestTime"; + private static final int MAX_COLUMNS = 1024; private final SplunkPluginConfig config; private final SplunkSubScan subScan; @@ -88,6 +89,8 @@ public SplunkBatchReader(SplunkPluginConfig config, SplunkSubScan subScan) { RowListProcessor rowProcessor = new RowListProcessor(); csvSettings.setProcessor(rowProcessor); csvSettings.setMaxCharsPerColumn(ValueVector.MAX_BUFFER_SIZE); + // Splunk can produce a lot of columns. The default maximum is 512. + csvSettings.setMaxColumns(MAX_COLUMNS); } @Override @@ -174,7 +177,7 @@ private TupleMetadata buildSchema() { } } } - logger.debug("Time to build schmea: {} milliseconds", timer.elapsed().getNano() / 100000); + logger.debug("Time to build schema: {} milliseconds", timer.elapsed().getNano() / 100000); return builder.buildSchema(); } diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java index c51e725542f..97761f88a2b 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java @@ -103,6 +103,8 @@ public CreateTableEntry createNewTable(String tableName, List partitionC .message(plugin.getName() + " is not writable.") .build(logger); } + // Clear the index cache. + cache.invalidate(getNameForCache()); return new CreateTableEntry() { @Override @@ -131,6 +133,11 @@ public void dropTable(String indexName) { // Drop the index indexes.remove(indexName); + + // Update the cache + String cacheKey = getNameForCache(); + cache.invalidate(cacheKey); + cache.put(cacheKey, indexes.keySet()); } @Override @@ -148,6 +155,14 @@ public String getTypeName() { return SplunkPluginConfig.NAME; } + /** + * Returns the name for the cache. + * @return A String containing a combination of the queryUsername and sourceType (table name) + */ + private String getNameForCache() { + return queryUserName + "-" + plugin.getName(); + } + private void registerIndexes() { // Verify that the connection is successful. If not, don't register any indexes, // and throw an exception. @@ -160,7 +175,7 @@ private void registerIndexes() { Set indexList; // Retrieve and add all other Splunk indexes // First check the cache to see if we have a list of indexes. - String nameKey = queryUserName + "-" + plugin.getName(); + String nameKey = getNameForCache(); indexList = cache.getIfPresent(nameKey); // If the index list is not in the cache, query Splunk, retrieve the index list and add it to the cache. From f7486a8575c46baa42dd7359fc85c995b50e47f9 Mon Sep 17 00:00:00 2001 From: Charles Givre Date: Fri, 26 Jul 2024 13:42:08 -0400 Subject: [PATCH 04/14] Added config options --- .../exec/store/splunk/SplunkBatchReader.java | 6 +- .../exec/store/splunk/SplunkPluginConfig.java | 72 ++++++++++++++----- .../drill/exec/store/splunk/SplunkSchema.java | 5 +- .../store/splunk/SplunkConnectionTest.java | 2 +- .../exec/store/splunk/SplunkTestSuite.java | 6 +- 5 files changed, 64 insertions(+), 27 deletions(-) diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java index 75cb2583402..d280e17e720 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java @@ -56,8 +56,6 @@ public class SplunkBatchReader implements ManagedReader { private static final List TS_COLS = new ArrayList<>(Arrays.asList("_indextime", "_time")); private static final String EARLIEST_TIME_COLUMN = "earliestTime"; private static final String LATEST_TIME_COLUMN = "latestTime"; - private static final int MAX_COLUMNS = 1024; - private final SplunkPluginConfig config; private final SplunkSubScan subScan; private final List projectedColumns; @@ -89,8 +87,8 @@ public SplunkBatchReader(SplunkPluginConfig config, SplunkSubScan subScan) { RowListProcessor rowProcessor = new RowListProcessor(); csvSettings.setProcessor(rowProcessor); csvSettings.setMaxCharsPerColumn(ValueVector.MAX_BUFFER_SIZE); - // Splunk can produce a lot of columns. The default maximum is 512. - csvSettings.setMaxColumns(MAX_COLUMNS); + // Splunk can produce a lot of columns. The SDK default maximum is 512. + csvSettings.setMaxColumns(config.getMaxColumns()); } @Override diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java index 7845abeac5e..7c5f5b29676 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java @@ -40,6 +40,9 @@ public class SplunkPluginConfig extends StoragePluginConfig { public static final String NAME = "splunk"; public static final int DISABLED_RECONNECT_RETRIES = 1; public static final int DEFAULT_WRITER_BATCH_SIZE = 1000; + public static final int DEFAULT_MAX_READER_COLUMNS = 1024; + public static final int DEFAULT_MAX_CACHE_SIZE = 10000; + public static final int DEFAULT_CACHE_EXPIRATION = 1024; private final String scheme; private final String hostname; @@ -55,6 +58,9 @@ public class SplunkPluginConfig extends StoragePluginConfig { private final Integer reconnectRetries; private final boolean writable; private final Integer writerBatchSize; + private final Integer maxColumns; + private final Integer maxCacheSize; + private final Integer cacheExpiration; @JsonCreator public SplunkPluginConfig(@JsonProperty("username") String username, @@ -74,7 +80,11 @@ public SplunkPluginConfig(@JsonProperty("username") String username, @JsonProperty("reconnectRetries") Integer reconnectRetries, @JsonProperty("authMode") String authMode, @JsonProperty("writable") boolean writable, - @JsonProperty("writableBatchSize") Integer writerBatchSize) { + @JsonProperty("writableBatchSize") Integer writerBatchSize, + @JsonProperty("maxColumns") Integer maxColumns, + @JsonProperty("maxCacheSize") Integer maxCacheSize, + @JsonProperty("cacheExpiration") Integer cacheExpiration + ) { super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider), credentialsProvider == null, AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER)); this.scheme = scheme; @@ -91,6 +101,9 @@ public SplunkPluginConfig(@JsonProperty("username") String username, this.latestTime = latestTime == null ? "now" : latestTime; this.reconnectRetries = reconnectRetries; this.writerBatchSize = writerBatchSize; + this.maxColumns = maxColumns; + this.maxCacheSize = maxCacheSize; + this.cacheExpiration = cacheExpiration; } private SplunkPluginConfig(SplunkPluginConfig that, CredentialsProvider credentialsProvider) { @@ -109,6 +122,9 @@ private SplunkPluginConfig(SplunkPluginConfig that, CredentialsProvider credenti this.latestTime = that.latestTime; this.reconnectRetries = that.reconnectRetries; this.writerBatchSize = that.writerBatchSize; + this.maxColumns = that.maxColumns; + this.maxCacheSize = that.maxCacheSize; + this.cacheExpiration = that.cacheExpiration; } /** @@ -225,6 +241,21 @@ public int getWriterBatchSize() { return writerBatchSize != null ? writerBatchSize : DEFAULT_WRITER_BATCH_SIZE; } + @JsonProperty("maxColumns") + public int getMaxColumns() { + return maxColumns != null ? maxColumns : DEFAULT_MAX_READER_COLUMNS; + } + + @JsonProperty("maxCacheSize") + public int getMaxCacheSize() { + return maxCacheSize != null ? maxCacheSize : DEFAULT_MAX_CACHE_SIZE; + } + + @JsonProperty("cacheExpiration") + public int getCacheExpiration() { + return cacheExpiration != null ? cacheExpiration : DEFAULT_CACHE_EXPIRATION; + } + private static CredentialsProvider getCredentialsProvider(CredentialsProvider credentialsProvider) { return credentialsProvider != null ? credentialsProvider : PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER; } @@ -250,26 +281,32 @@ public boolean equals(Object that) { Objects.equals(validateHostname, thatConfig.validateHostname) && Objects.equals(earliestTime, thatConfig.earliestTime) && Objects.equals(latestTime, thatConfig.latestTime) && - Objects.equals(authMode, thatConfig.authMode); + Objects.equals(authMode, thatConfig.authMode) && + Objects.equals(maxCacheSize, thatConfig.maxCacheSize) && + Objects.equals(maxColumns, thatConfig.maxColumns) && + Objects.equals(cacheExpiration, thatConfig.cacheExpiration); } @Override public int hashCode() { return Objects.hash( - credentialsProvider, - scheme, - hostname, - port, - app, - owner, - token, - cookie, - writable, - validateCertificates, - validateHostname, - earliestTime, - latestTime, - authMode + credentialsProvider, + scheme, + hostname, + port, + app, + owner, + token, + cookie, + writable, + validateCertificates, + validateHostname, + earliestTime, + latestTime, + authMode, + cacheExpiration, + maxCacheSize, + maxColumns ); } @@ -290,6 +327,9 @@ public String toString() { .field("earliestTime", earliestTime) .field("latestTime", latestTime) .field("Authentication Mode", authMode) + .field("maxColumns", maxColumns) + .field("maxCacheSize", maxCacheSize) + .field("cacheExpiration", cacheExpiration) .toString(); } diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java index 97761f88a2b..81eee1bb615 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java @@ -55,10 +55,9 @@ public SplunkSchema(SplunkStoragePlugin plugin, String queryUserName) { this.plugin = plugin; this.queryUserName = queryUserName; - // TODO Add Configuration Parameters for the schema cache this.cache = Caffeine.newBuilder() - .expireAfterAccess(90, TimeUnit.MINUTES) - .maximumSize(1000) + .expireAfterAccess(plugin.getConfig().getCacheExpiration(), TimeUnit.MINUTES) + .maximumSize(plugin.getConfig().getMaxCacheSize()) .build(); registerIndexes(); diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java index f4884ece45e..d9da2f6e3e0 100644 --- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java +++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java @@ -59,7 +59,7 @@ public void testConnectionFail() { SPLUNK_STORAGE_PLUGIN_CONFIG.getLatestTime(), null, SPLUNK_STORAGE_PLUGIN_CONFIG.getReconnectRetries(), - StoragePluginConfig.AuthMode.SHARED_USER.name(), true, null + StoragePluginConfig.AuthMode.SHARED_USER.name(), true, null, null, null, null ); SplunkConnection sc = new SplunkConnection(invalidSplunkConfig, null); sc.connect(); diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java index 5cd228d70ec..49a1baa758d 100644 --- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java +++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java @@ -70,7 +70,7 @@ public class SplunkTestSuite extends ClusterTest { private static AtomicInteger initCount = new AtomicInteger(0); @ClassRule public static GenericContainer splunk = new GenericContainer<>( - DockerImageName.parse("splunk/splunk:9.0.2") + DockerImageName.parse("splunk/splunk:9.2") ) .withExposedPorts(8089, 8089) .withEnv("SPLUNK_START_ARGS", "--accept-license") @@ -98,7 +98,7 @@ public static void initSplunk() throws Exception { "1", "now", null, 4, - StoragePluginConfig.AuthMode.SHARED_USER.name(), true, null + StoragePluginConfig.AuthMode.SHARED_USER.name(), true, null, null, null, null ); SPLUNK_STORAGE_PLUGIN_CONFIG.setEnabled(true); pluginRegistry.put(SplunkPluginConfig.NAME, SPLUNK_STORAGE_PLUGIN_CONFIG); @@ -120,7 +120,7 @@ public static void initSplunk() throws Exception { "1", "now", credentialsProvider, 4, - AuthMode.USER_TRANSLATION.name(), true, null + AuthMode.USER_TRANSLATION.name(), true, null, null, null, null ); SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION.setEnabled(true); pluginRegistry.put("ut_splunk", SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION); From 59feb620d32c00ed58b7e833b814546f5e8dd0c2 Mon Sep 17 00:00:00 2001 From: cgivre Date: Mon, 29 Jul 2024 10:41:56 -0400 Subject: [PATCH 05/14] Updated docs and unit tests --- contrib/storage-splunk/README.md | 19 ++++++--- .../drill/exec/store/splunk/SplunkSchema.java | 41 +++++++++++++------ .../store/splunk/SplunkConnectionTest.java | 7 +++- .../exec/store/splunk/SplunkIndexesTest.java | 9 ++-- .../exec/store/splunk/SplunkPluginTest.java | 9 ++-- 5 files changed, 58 insertions(+), 27 deletions(-) diff --git a/contrib/storage-splunk/README.md b/contrib/storage-splunk/README.md index 08e3d65de80..188c7247d83 100644 --- a/contrib/storage-splunk/README.md +++ b/contrib/storage-splunk/README.md @@ -4,7 +4,7 @@ This plugin enables Drill to query Splunk. ## Configuration | Option | Default | Description | Since | -|-----------------------| --------- | --------------------------------------------------------------- | ----- | +|-----------------------| --------- | --------------------------------------------------------------- |-------| | type | (none) | Set to "splunk" to use this plugin | 1.19 | | username | null | Splunk username to be used by Drill | 1.19 | | password | null | Splunk password to be used by Drill | 1.19 | @@ -13,12 +13,15 @@ This plugin enables Drill to query Splunk. | port | 8089 | TCP port over which Drill will connect to Splunk | 1.19 | | earliestTime | null | Global earliest record timestamp default | 1.19 | | latestTime | null | Global latest record timestamp default | 1.19 | -| app | null | The application context of the service[^1] | 2.0 | -| owner | null | The owner context of the service[^1] | 2.0 | -| token | null | A Splunk authentication token to use for the session[^2] | 2.0 | -| cookie | null | A valid login cookie | 2.0 | -| validateCertificates | true | Whether the Splunk client will validates the server's SSL cert | 2.0 | +| app | null | The application context of the service[^1] | 1.21 | +| owner | null | The owner context of the service[^1] | 1.21 | +| token | null | A Splunk authentication token to use for the session[^2] | 1.21 | +| cookie | null | A valid login cookie | 1.21 | +| validateCertificates | true | Whether the Splunk client will validates the server's SSL cert | 1.21 | | validateHostname | true | Whether the Splunk client will validate the server's host name | 1.22 | +| maxColumns | 1024 | The maximum number of columns Drill will accept from Splunk | 1.22 | +| maxCacheSize | 10000 | The size (in bytes) of Splunk's schema cache. | 1.22 | +| cacheExpiration | 1024 | The number of minutes for Drill to persist the schema cache. | 1.22 | [^1]: See [this Splunk documentation](https://docs.splunk.com/Documentation/Splunk/latest/Admin/Apparchitectureandobjectownership) for more information. [^2]: See [this Splunk documentation](https://docs.splunk.com/Documentation/Splunk/latest/Security/CreateAuthTokens) for more information. @@ -46,6 +49,10 @@ To bypass it by Drill please specify "reconnectRetries": 3. It allows you to ret ### User Translation The Splunk plugin supports user translation. Simply set the `authMode` parameter to `USER_TRANSLATION` and use either the plain or vault credential provider for credentials. +## Schema Caching +For every query that you send to Splunk from Drill, Drill will have to pull schema information from Splunk. If you have a lot of indexes, this process can cause slow planning time. To improve planning time, you can configure Drill to cache the index names so that it does not need to make additional calls to Splunk. + +There are two configuration parameters for the schema caching: `maxCacheSize` and `cacheExpiration`. The maxCacheSize defaults to 10k bytes and the `cacheExpiration` defaults to 1024 minutes. To disable schema caching simply set the `cacheExpiration` parameter to a value less than zero. ## Understanding Splunk's Data Model Splunk's primary use case is analyzing event logs with a timestamp. As such, data is indexed by the timestamp, with the most recent data being indexed first. By default, Splunk diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java index 81eee1bb615..9db846a5cf7 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java @@ -49,16 +49,23 @@ public class SplunkSchema extends AbstractSchema { private final SplunkStoragePlugin plugin; private final String queryUserName; private final Cache> cache; + private final boolean useCache; public SplunkSchema(SplunkStoragePlugin plugin, String queryUserName) { super(Collections.emptyList(), plugin.getName()); this.plugin = plugin; this.queryUserName = queryUserName; - - this.cache = Caffeine.newBuilder() - .expireAfterAccess(plugin.getConfig().getCacheExpiration(), TimeUnit.MINUTES) - .maximumSize(plugin.getConfig().getMaxCacheSize()) - .build(); + this.useCache = plugin.getConfig().getCacheExpiration() >= 0; + + if (useCache) { + logger.debug("Using splunk schema cache for {}", plugin.getName()); + this.cache = Caffeine.newBuilder() + .expireAfterAccess(plugin.getConfig().getCacheExpiration(), TimeUnit.MINUTES) + .maximumSize(plugin.getConfig().getMaxCacheSize()) + .build(); + } else { + this.cache = null; + } registerIndexes(); } @@ -103,7 +110,9 @@ public CreateTableEntry createNewTable(String tableName, List partitionC .build(logger); } // Clear the index cache. - cache.invalidate(getNameForCache()); + if (useCache) { + cache.invalidate(getNameForCache()); + } return new CreateTableEntry() { @Override @@ -133,10 +142,12 @@ public void dropTable(String indexName) { // Drop the index indexes.remove(indexName); - // Update the cache - String cacheKey = getNameForCache(); - cache.invalidate(cacheKey); - cache.put(cacheKey, indexes.keySet()); + if (useCache) { + // Update the cache + String cacheKey = getNameForCache(); + cache.invalidate(cacheKey); + cache.put(cacheKey, indexes.keySet()); + } } @Override @@ -171,17 +182,21 @@ private void registerIndexes() { registerTable(SPL_TABLE_NAME, new DynamicDrillTable(plugin, plugin.getName(), new SplunkScanSpec(plugin.getName(), SPL_TABLE_NAME, plugin.getConfig(), queryUserName))); - Set indexList; + Set indexList = null; // Retrieve and add all other Splunk indexes // First check the cache to see if we have a list of indexes. String nameKey = getNameForCache(); - indexList = cache.getIfPresent(nameKey); + if (useCache) { + indexList = cache.getIfPresent(nameKey); + } // If the index list is not in the cache, query Splunk, retrieve the index list and add it to the cache. if (indexList == null) { logger.debug("Index list not in Splunk schema cache. Retrieving from Splunk."); indexList = connection.getIndexes().keySet(); - cache.put(nameKey, indexList); + if (useCache) { + cache.put(nameKey, indexList); + } } for (String indexName : indexList) { diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java index d9da2f6e3e0..5636249433f 100644 --- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java +++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java @@ -73,11 +73,14 @@ public void testConnectionFail() { public void testGetIndexes() { SplunkConnection sc = new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG, null); EntityCollection indexes = sc.getIndexes(); - assertEquals(10, indexes.size()); + assertEquals(13, indexes.size()); List expectedIndexNames = new ArrayList<>(); expectedIndexNames.add("_audit"); expectedIndexNames.add("_configtracker"); + expectedIndexNames.add("_dsappevent"); + expectedIndexNames.add("_dsclient"); + expectedIndexNames.add("_dsphonehome"); expectedIndexNames.add("_internal"); expectedIndexNames.add("_introspection"); expectedIndexNames.add("_telemetry"); @@ -92,7 +95,7 @@ public void testGetIndexes() { indexNames.add(index.getName()); } - assertEquals(indexNames, expectedIndexNames); + assertEquals(expectedIndexNames, indexNames); } } diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkIndexesTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkIndexesTest.java index fcdce8d7787..470f3960cba 100644 --- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkIndexesTest.java +++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkIndexesTest.java @@ -43,14 +43,17 @@ public void testGetSplunkIndexes() throws Exception { RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) .addRow("splunk", "summary") .addRow("splunk", "splunklogger") + .addRow("splunk", "_dsclient") .addRow("splunk", "_configtracker") .addRow("splunk", "_thefishbucket") - .addRow("splunk", "_audit") - .addRow("splunk", "_internal") - .addRow("splunk", "_introspection") .addRow("splunk", "main") .addRow("splunk", "history") + .addRow("splunk", "_dsphonehome") .addRow("splunk", "spl") + .addRow("splunk", "_audit") + .addRow("splunk", "_internal") + .addRow("splunk", "_dsappevent") + .addRow("splunk", "_introspection") .addRow("splunk", "_telemetry") .build(); diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java index 2109de28de1..ebbff43b5b9 100644 --- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java +++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java @@ -77,14 +77,17 @@ public void verifyIndexes() throws Exception { RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) .addRow("splunk", "summary") .addRow("splunk", "splunklogger") + .addRow("splunk", "_dsclient") .addRow("splunk", "_configtracker") .addRow("splunk", "_thefishbucket") - .addRow("splunk", "_audit") - .addRow("splunk", "_internal") - .addRow("splunk", "_introspection") .addRow("splunk", "main") .addRow("splunk", "history") + .addRow("splunk", "_dsphonehome") .addRow("splunk", "spl") + .addRow("splunk", "_audit") + .addRow("splunk", "_internal") + .addRow("splunk", "_dsappevent") + .addRow("splunk", "_introspection") .addRow("splunk", "_telemetry") .build(); From 6f5fe3edd6de43b564a35dc8d1b168001585a920 Mon Sep 17 00:00:00 2001 From: cgivre Date: Mon, 29 Jul 2024 12:25:17 -0400 Subject: [PATCH 06/14] Added SPL to query plan --- .../exec/store/splunk/SplunkSubScan.java | 68 ++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java index 951e05bb178..5c9ac44e460 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.ImmutableSet; import org.apache.drill.common.PlanStringBuilder; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.base.AbstractBase; @@ -29,7 +30,6 @@ import org.apache.drill.exec.physical.base.PhysicalVisitor; import org.apache.drill.exec.physical.base.SubScan; import org.apache.drill.exec.store.base.filter.ExprNode; -import com.google.common.collect.ImmutableSet; import java.util.Iterator; import java.util.List; @@ -38,6 +38,8 @@ @JsonTypeName("splunk-sub-scan") public class SplunkSubScan extends AbstractBase implements SubScan { + private static final String EARLIEST_TIME_COLUMN = "earliestTime"; + private static final String LATEST_TIME_COLUMN = "latestTime"; private final SplunkPluginConfig config; private final SplunkScanSpec splunkScanSpec; @@ -115,9 +117,73 @@ public String toString() { .field("columns", columns) .field("filters", filters) .field("maxRecords", maxRecords) + .field("spl", generateQuery()) .toString(); } + /** + * Generates the query which will be sent to Splunk. This method exists for debugging purposes so + * that the actual SPL will be recorded in the query plan. + */ + private String generateQuery() { + String earliestTime = null; + String latestTime = null; + Map filters = getFilters(); + + // Splunk searches perform best when they are time bound. This allows the user to set + // default time boundaries in the config. These will be overwritten in filter pushdowns + if (filters != null && filters.containsKey(EARLIEST_TIME_COLUMN)) { + earliestTime = filters.get(EARLIEST_TIME_COLUMN).value.value.toString(); + + // Remove from map + filters.remove(EARLIEST_TIME_COLUMN); + } + + if (filters != null && filters.containsKey(LATEST_TIME_COLUMN)) { + latestTime = filters.get(LATEST_TIME_COLUMN).value.value.toString(); + + // Remove from map so they are not pushed down into the query + filters.remove(LATEST_TIME_COLUMN); + } + + if (earliestTime == null) { + earliestTime = config.getEarliestTime(); + } + + if (latestTime == null) { + latestTime = config.getLatestTime(); + } + + // Special case: If the user wishes to send arbitrary SPL to Splunk, the user can use the "SPL" + // Index and spl filter + if (splunkScanSpec.getIndexName().equalsIgnoreCase("spl")) { + return filters.get("spl").value.value.toString(); + } + + SplunkQueryBuilder builder = new SplunkQueryBuilder(splunkScanSpec.getIndexName()); + + // Set the sourcetype + if (filters != null && filters.containsKey("sourcetype")) { + String sourcetype = filters.get("sourcetype").value.value.toString(); + builder.addSourceType(sourcetype); + filters.remove("sourcetype"); + } + + // Add projected columns, skipping star and specials. + for (SchemaPath projectedColumn: columns) { + builder.addField(projectedColumn.getAsUnescapedPath()); + } + + // Apply filters + builder.addFilters(filters); + + // Apply limits + if (getMaxRecords() > 0) { + builder.addLimit(getMaxRecords()); + } + return builder.build(); + } + @Override public int hashCode() { return Objects.hash(config, splunkScanSpec, columns, filters, maxRecords); From 673990500c3ef8c6a12c030cecb2fdcaf0324bd6 Mon Sep 17 00:00:00 2001 From: cgivre Date: Mon, 29 Jul 2024 12:51:33 -0400 Subject: [PATCH 07/14] Moved SPL to group scan --- .../exec/store/splunk/SplunkBatchReader.java | 14 ++-- .../exec/store/splunk/SplunkGroupScan.java | 65 ++++++++++++++++++ .../exec/store/splunk/SplunkSubScan.java | 67 ------------------- .../drill/exec/store/splunk/SplunkUtils.java | 3 + 4 files changed, 74 insertions(+), 75 deletions(-) diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java index d280e17e720..a02f5d09271 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java @@ -54,8 +54,6 @@ public class SplunkBatchReader implements ManagedReader { private static final Logger logger = LoggerFactory.getLogger(SplunkBatchReader.class); private static final List INT_COLS = new ArrayList<>(Arrays.asList("date_hour", "date_mday", "date_minute", "date_second", "date_year", "linecount")); private static final List TS_COLS = new ArrayList<>(Arrays.asList("_indextime", "_time")); - private static final String EARLIEST_TIME_COLUMN = "earliestTime"; - private static final String LATEST_TIME_COLUMN = "latestTime"; private final SplunkPluginConfig config; private final SplunkSubScan subScan; private final List projectedColumns; @@ -242,18 +240,18 @@ private String buildQueryString () { // Splunk searches perform best when they are time bound. This allows the user to set // default time boundaries in the config. These will be overwritten in filter pushdowns - if (filters != null && filters.containsKey(EARLIEST_TIME_COLUMN)) { - earliestTime = filters.get(EARLIEST_TIME_COLUMN).value.value.toString(); + if (filters != null && filters.containsKey(SplunkUtils.EARLIEST_TIME_COLUMN)) { + earliestTime = filters.get(SplunkUtils.EARLIEST_TIME_COLUMN).value.value.toString(); // Remove from map - filters.remove(EARLIEST_TIME_COLUMN); + filters.remove(SplunkUtils.EARLIEST_TIME_COLUMN); } - if (filters != null && filters.containsKey(LATEST_TIME_COLUMN)) { - latestTime = filters.get(LATEST_TIME_COLUMN).value.value.toString(); + if (filters != null && filters.containsKey(SplunkUtils.LATEST_TIME_COLUMN)) { + latestTime = filters.get(SplunkUtils.LATEST_TIME_COLUMN).value.value.toString(); // Remove from map so they are not pushed down into the query - filters.remove(LATEST_TIME_COLUMN); + filters.remove(SplunkUtils.LATEST_TIME_COLUMN); } if (earliestTime == null) { diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java index 9ab7de6d471..cb8de9a0c98 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java @@ -309,6 +309,70 @@ public GroupScan clone(List columns) { return new SplunkGroupScan(this, columns); } + /** + * Generates the query which will be sent to Splunk. This method exists for debugging purposes so + * that the actual SPL will be recorded in the query plan. + */ + private String generateQuery() { + String earliestTime = null; + String latestTime = null; + + // Splunk searches perform best when they are time bound. This allows the user to set + // default time boundaries in the config. These will be overwritten in filter pushdowns + if (filters != null && filters.containsKey(SplunkUtils.EARLIEST_TIME_COLUMN)) { + earliestTime = filters.get(SplunkUtils.EARLIEST_TIME_COLUMN).value.value.toString(); + + // Remove from map + filters.remove(SplunkUtils.EARLIEST_TIME_COLUMN); + } + + if (filters != null && filters.containsKey(SplunkUtils.LATEST_TIME_COLUMN)) { + latestTime = filters.get(SplunkUtils.LATEST_TIME_COLUMN).value.value.toString(); + + // Remove from map so they are not pushed down into the query + filters.remove(SplunkUtils.LATEST_TIME_COLUMN); + } + + if (earliestTime == null) { + earliestTime = config.getEarliestTime(); + } + + if (latestTime == null) { + latestTime = config.getLatestTime(); + } + + // Special case: If the user wishes to send arbitrary SPL to Splunk, the user can use the "SPL" + // Index and spl filter + if (splunkScanSpec.getIndexName().equalsIgnoreCase("spl")) { + if (filters != null && filters.containsKey("spl")) { + return filters.get("spl").value.value.toString(); + } + } + + SplunkQueryBuilder builder = new SplunkQueryBuilder(splunkScanSpec.getIndexName()); + + // Set the sourcetype + if (filters != null && filters.containsKey("sourcetype")) { + String sourcetype = filters.get("sourcetype").value.value.toString(); + builder.addSourceType(sourcetype); + filters.remove("sourcetype"); + } + + // Add projected columns, skipping star and specials. + for (SchemaPath projectedColumn: columns) { + builder.addField(projectedColumn.getAsUnescapedPath()); + } + + // Apply filters + builder.addFilters(filters); + + // Apply limits + if (maxRecords > 0) { + builder.addLimit(maxRecords); + } + return builder.build(); + } + @Override public int hashCode() { @@ -344,6 +408,7 @@ public String toString() { .field("scan spec", splunkScanSpec) .field("columns", columns) .field("maxRecords", maxRecords) + .field("spl", generateQuery()) .toString(); } } diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java index 5c9ac44e460..37c9fd2d541 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java @@ -38,9 +38,6 @@ @JsonTypeName("splunk-sub-scan") public class SplunkSubScan extends AbstractBase implements SubScan { - private static final String EARLIEST_TIME_COLUMN = "earliestTime"; - private static final String LATEST_TIME_COLUMN = "latestTime"; - private final SplunkPluginConfig config; private final SplunkScanSpec splunkScanSpec; private final List columns; @@ -117,73 +114,9 @@ public String toString() { .field("columns", columns) .field("filters", filters) .field("maxRecords", maxRecords) - .field("spl", generateQuery()) .toString(); } - /** - * Generates the query which will be sent to Splunk. This method exists for debugging purposes so - * that the actual SPL will be recorded in the query plan. - */ - private String generateQuery() { - String earliestTime = null; - String latestTime = null; - Map filters = getFilters(); - - // Splunk searches perform best when they are time bound. This allows the user to set - // default time boundaries in the config. These will be overwritten in filter pushdowns - if (filters != null && filters.containsKey(EARLIEST_TIME_COLUMN)) { - earliestTime = filters.get(EARLIEST_TIME_COLUMN).value.value.toString(); - - // Remove from map - filters.remove(EARLIEST_TIME_COLUMN); - } - - if (filters != null && filters.containsKey(LATEST_TIME_COLUMN)) { - latestTime = filters.get(LATEST_TIME_COLUMN).value.value.toString(); - - // Remove from map so they are not pushed down into the query - filters.remove(LATEST_TIME_COLUMN); - } - - if (earliestTime == null) { - earliestTime = config.getEarliestTime(); - } - - if (latestTime == null) { - latestTime = config.getLatestTime(); - } - - // Special case: If the user wishes to send arbitrary SPL to Splunk, the user can use the "SPL" - // Index and spl filter - if (splunkScanSpec.getIndexName().equalsIgnoreCase("spl")) { - return filters.get("spl").value.value.toString(); - } - - SplunkQueryBuilder builder = new SplunkQueryBuilder(splunkScanSpec.getIndexName()); - - // Set the sourcetype - if (filters != null && filters.containsKey("sourcetype")) { - String sourcetype = filters.get("sourcetype").value.value.toString(); - builder.addSourceType(sourcetype); - filters.remove("sourcetype"); - } - - // Add projected columns, skipping star and specials. - for (SchemaPath projectedColumn: columns) { - builder.addField(projectedColumn.getAsUnescapedPath()); - } - - // Apply filters - builder.addFilters(filters); - - // Apply limits - if (getMaxRecords() > 0) { - builder.addLimit(getMaxRecords()); - } - return builder.build(); - } - @Override public int hashCode() { return Objects.hash(config, splunkScanSpec, columns, filters, maxRecords); diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkUtils.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkUtils.java index 4b856182f5d..e0471436fc2 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkUtils.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkUtils.java @@ -19,6 +19,9 @@ package org.apache.drill.exec.store.splunk; public class SplunkUtils { + public static final String EARLIEST_TIME_COLUMN = "earliestTime"; + public static final String LATEST_TIME_COLUMN = "latestTime"; + /** * These are special fields that alter the queries sent to Splunk. */ From e38ad9c35979896f9b867801453d28965c3a0a08 Mon Sep 17 00:00:00 2001 From: cgivre Date: Mon, 29 Jul 2024 15:05:47 -0400 Subject: [PATCH 08/14] Formatting and minor bug fix --- .../apache/drill/exec/store/splunk/SplunkGroupScan.java | 7 ------- .../apache/drill/exec/store/splunk/SplunkPluginConfig.java | 6 +++--- .../drill/exec/store/splunk/SplunkLimitPushDownTest.java | 1 + 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java index cb8de9a0c98..53596a2b549 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java @@ -321,16 +321,10 @@ private String generateQuery() { // default time boundaries in the config. These will be overwritten in filter pushdowns if (filters != null && filters.containsKey(SplunkUtils.EARLIEST_TIME_COLUMN)) { earliestTime = filters.get(SplunkUtils.EARLIEST_TIME_COLUMN).value.value.toString(); - - // Remove from map - filters.remove(SplunkUtils.EARLIEST_TIME_COLUMN); } if (filters != null && filters.containsKey(SplunkUtils.LATEST_TIME_COLUMN)) { latestTime = filters.get(SplunkUtils.LATEST_TIME_COLUMN).value.value.toString(); - - // Remove from map so they are not pushed down into the query - filters.remove(SplunkUtils.LATEST_TIME_COLUMN); } if (earliestTime == null) { @@ -355,7 +349,6 @@ private String generateQuery() { if (filters != null && filters.containsKey("sourcetype")) { String sourcetype = filters.get("sourcetype").value.value.toString(); builder.addSourceType(sourcetype); - filters.remove("sourcetype"); } // Add projected columns, skipping star and specials. diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java index 7c5f5b29676..15f8b0d5792 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java @@ -327,9 +327,9 @@ public String toString() { .field("earliestTime", earliestTime) .field("latestTime", latestTime) .field("Authentication Mode", authMode) - .field("maxColumns", maxColumns) - .field("maxCacheSize", maxCacheSize) - .field("cacheExpiration", cacheExpiration) + .field("maxColumns", maxColumns) + .field("maxCacheSize", maxCacheSize) + .field("cacheExpiration", cacheExpiration) .toString(); } diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkLimitPushDownTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkLimitPushDownTest.java index 813c665239d..f5200b98c8b 100644 --- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkLimitPushDownTest.java +++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkLimitPushDownTest.java @@ -64,6 +64,7 @@ public void testLimitWithFilter() throws Exception { .sql(sql) .planMatcher() .include("Limit", "maxRecords=4") + .include("spl", "search index=_audit rating=52.17 | fields rating | head 5 | table rating") .match(); } } From 992e53d7520dc5965cbf71342e53896820a24a2e Mon Sep 17 00:00:00 2001 From: cgivre Date: Mon, 12 Aug 2024 12:46:13 -0400 Subject: [PATCH 09/14] Bump Splunk test to 9.3 --- .../org/apache/drill/exec/store/splunk/SplunkTestSuite.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java index 49a1baa758d..7825f8af58e 100644 --- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java +++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java @@ -70,7 +70,7 @@ public class SplunkTestSuite extends ClusterTest { private static AtomicInteger initCount = new AtomicInteger(0); @ClassRule public static GenericContainer splunk = new GenericContainer<>( - DockerImageName.parse("splunk/splunk:9.2") + DockerImageName.parse("splunk/splunk:9.3") ) .withExposedPorts(8089, 8089) .withEnv("SPLUNK_START_ARGS", "--accept-license") From c5687ad07128f5883c6de0ce200c78627567641d Mon Sep 17 00:00:00 2001 From: cgivre Date: Mon, 12 Aug 2024 12:55:17 -0400 Subject: [PATCH 10/14] Fix unit itest --- .../org/apache/drill/exec/store/splunk/SplunkWriterTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkWriterTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkWriterTest.java index f5d399cd231..239a7930db2 100644 --- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkWriterTest.java +++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkWriterTest.java @@ -76,10 +76,10 @@ public void testBasicCTAS() throws Exception { .buildSchema(); RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) - .addRow("198.35.2.120", "ACCESSORIES") + .addRow("198.35.2.120", "STRATEGY") .addRow("198.35.2.120", null) .addRow("198.35.2.120", null) - .addRow("198.35.2.120", "STRATEGY") + .addRow("198.35.2.120", "ACCESSORIES") .addRow("198.35.2.120", "NULL") .build(); RowSetUtilities.verify(expected, results); From 238fe48be627da60ff2ce822eeb519230163f812 Mon Sep 17 00:00:00 2001 From: cgivre Date: Mon, 12 Aug 2024 16:43:30 -0400 Subject: [PATCH 11/14] Hopefully fixed UT --- .../drill/exec/store/splunk/SplunkTestSuite.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java index 7825f8af58e..dc434c8f06a 100644 --- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java +++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java @@ -88,6 +88,17 @@ public static void initSplunk() throws Exception { startCluster(builder); splunk.start(); + splunk.execInContainer("if ! sudo grep -q 'minFileSize' /opt/splunk/etc/system/local/server.conf; then " + + "sudo chmod a+w /opt/splunk/etc/system/local/server.conf; " + + "sudo echo \"# disk usage processor settings\" >> /opt/splunk/etc/system/local/server.conf; " + + "sudo echo \"[diskUsage]\" >> /opt/splunk/etc/system/local/server.conf; " + + "sudo echo \"minFreeSpace = 2000\" >> /opt/splunk/etc/system/local/server.conf; " + + "sudo echo \"pollingFrequency = 100000\" >> /opt/splunk/etc/system/local/server.conf; " + + "sudo echo \"pollingTimerFrequency = 10\" >> /opt/splunk/etc/system/local/server.conf; " + + "sudo chmod 600 /opt/splunk/etc/system/local/server.conf; " + + "sudo /opt/splunk/bin/splunk restart; " + + "fi"); + String hostname = splunk.getHost(); Integer port = splunk.getFirstMappedPort(); StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage(); From 42e0768853a76f14d32cf7e6da1e569239be7d25 Mon Sep 17 00:00:00 2001 From: cgivre Date: Tue, 13 Aug 2024 10:29:04 -0400 Subject: [PATCH 12/14] Clear swap space --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 39c3fc0aa9e..8de4b0ce001 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -67,6 +67,7 @@ jobs: - name: Remove swap space run : | sudo sh -c " + free -h swapoff /tmp/swapfile rm /tmp/swapfile " From 1d3a872dd31802f0198faaec61f75ff941072cc5 Mon Sep 17 00:00:00 2001 From: cgivre Date: Thu, 10 Oct 2024 16:25:51 -0400 Subject: [PATCH 13/14] Addressed review comments --- contrib/storage-splunk/pom.xml | 2 +- .../org/apache/drill/exec/store/splunk/SplunkSchema.java | 7 ++++--- metastore/iceberg-metastore/pom.xml | 4 ---- pom.xml | 1 + 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/contrib/storage-splunk/pom.xml b/contrib/storage-splunk/pom.xml index 78cc8b2ce85..165828aba06 100644 --- a/contrib/storage-splunk/pom.xml +++ b/contrib/storage-splunk/pom.xml @@ -61,7 +61,7 @@ com.github.ben-manes.caffeine caffeine - 2.9.3 + ${caffeine.version} diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java index 9db846a5cf7..dea9992916a 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java @@ -58,7 +58,7 @@ public SplunkSchema(SplunkStoragePlugin plugin, String queryUserName) { this.useCache = plugin.getConfig().getCacheExpiration() >= 0; if (useCache) { - logger.debug("Using splunk schema cache for {}", plugin.getName()); + logger.info("Using splunk schema cache for {}", plugin.getName()); this.cache = Caffeine.newBuilder() .expireAfterAccess(plugin.getConfig().getCacheExpiration(), TimeUnit.MINUTES) .maximumSize(plugin.getConfig().getMaxCacheSize()) @@ -101,8 +101,9 @@ private DynamicDrillTable registerTable(String name, DynamicDrillTable table) { } @Override - public CreateTableEntry createNewTable(String tableName, List partitionColumns, - StorageStrategy strategy) { + public CreateTableEntry createNewTable(String tableName, + List partitionColumns, + StorageStrategy strategy) { if (plugin.getConfig().isWritable() == null || (! plugin.getConfig().isWritable())) { throw UserException .dataWriteError() diff --git a/metastore/iceberg-metastore/pom.xml b/metastore/iceberg-metastore/pom.xml index 15a337121f9..fe28d07bec5 100644 --- a/metastore/iceberg-metastore/pom.xml +++ b/metastore/iceberg-metastore/pom.xml @@ -30,10 +30,6 @@ drill-iceberg-metastore Drill : Metastore : Iceberg - - 2.7.0 - - org.apache.drill diff --git a/pom.xml b/pom.xml index 460376d6536..48c9a13eadf 100644 --- a/pom.xml +++ b/pom.xml @@ -55,6 +55,7 @@ 1.23.0 1.11.4 1.78.1 + 2.9.3 org.apache.calcite 1.34.0 2.6 From 413e4ce0d108037ea73c227124b113ecfb8ec08e Mon Sep 17 00:00:00 2001 From: cgivre Date: Mon, 11 Nov 2024 17:46:11 -0500 Subject: [PATCH 14/14] Minor code cleanup --- .../java/org/apache/drill/exec/store/splunk/SplunkSchema.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java index dea9992916a..0e2760594d9 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java @@ -43,7 +43,7 @@ import java.util.concurrent.TimeUnit; public class SplunkSchema extends AbstractSchema { - private final static Logger logger = LoggerFactory.getLogger(SplunkSchema.class); + private static final Logger logger = LoggerFactory.getLogger(SplunkSchema.class); private static final String SPL_TABLE_NAME = "spl"; private final Map activeTables = new HashMap<>(); private final SplunkStoragePlugin plugin;