From 692482c8b644fd308ffe266d974efd1cd785b73a Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Thu, 23 Jan 2025 17:36:38 -0800 Subject: [PATCH] Move exporter config to query insights level Signed-off-by: Chenyang Ji --- .../plugin/insights/QueryInsightsPlugin.java | 4 +- .../insights/core/exporter/DebugExporter.java | 6 + .../core/exporter/LocalIndexExporter.java | 9 +- .../core/exporter/QueryInsightsExporter.java | 2 + .../QueryInsightsExporterFactory.java | 47 +++--- .../insights/core/exporter/SinkType.java | 6 +- .../core/reader/LocalIndexReader.java | 14 +- .../core/reader/QueryInsightsReader.java | 2 + .../reader/QueryInsightsReaderFactory.java | 62 ++++---- .../core/service/QueryInsightsService.java | 140 +++++++++++++----- .../core/service/TopQueriesService.java | 120 +-------------- .../settings/QueryInsightsSettings.java | 60 ++------ .../insights/QueryInsightsPluginTests.java | 4 +- .../insights/QueryInsightsTestUtils.java | 4 +- .../exporter/LocalIndexExporterTests.java | 2 +- .../QueryInsightsExporterFactoryTests.java | 18 +-- .../core/reader/LocalIndexReaderTests.java | 2 +- .../QueryInsightsReaderFactoryTests.java | 4 +- .../service/QueryInsightsServiceTests.java | 58 ++++++++ .../core/service/TopQueriesServiceTests.java | 42 ------ 20 files changed, 282 insertions(+), 324 deletions(-) diff --git a/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java index 13d62151..d912ccf2 100644 --- a/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java +++ b/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -131,21 +131,19 @@ public List> getSettings() { QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED, QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE, QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE, - QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS, QueryInsightsSettings.TOP_N_CPU_QUERIES_ENABLED, QueryInsightsSettings.TOP_N_CPU_QUERIES_SIZE, QueryInsightsSettings.TOP_N_CPU_QUERIES_WINDOW_SIZE, - QueryInsightsSettings.TOP_N_CPU_EXPORTER_SETTINGS, QueryInsightsSettings.TOP_N_MEMORY_QUERIES_ENABLED, QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE, QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE, - QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS, QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY, QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N, QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME, QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE, QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING, QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER, + QueryInsightsSettings.TOP_N_EXPORTER_TYPE, QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY ); } diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java index d63f19cb..2fbfa5b9 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java @@ -21,12 +21,18 @@ public final class DebugExporter implements QueryInsightsExporter { * Logger of the debug exporter */ private final Logger logger = LogManager.getLogger(); + private static final String EXPORTER_ID = "debug_exporter"; /** * Constructor of DebugExporter */ private DebugExporter() {} + @Override + public String getId() { + return EXPORTER_ID; + } + private static class InstanceHolder { private static final DebugExporter INSTANCE = new DebugExporter(); } diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java index a5085fe1..cdc462fd 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java @@ -46,6 +46,7 @@ public final class LocalIndexExporter implements QueryInsightsExporter { private final Client client; private DateTimeFormatter indexPattern; private int deleteAfter; + private final String id; /** * Constructor of LocalIndexExporter @@ -53,10 +54,16 @@ public final class LocalIndexExporter implements QueryInsightsExporter { * @param client OS client * @param indexPattern the pattern of index to export to */ - public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern) { + public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern, final String id) { this.indexPattern = indexPattern; this.client = client; this.deleteAfter = DEFAULT_DELETE_AFTER_VALUE; + this.id = id; + } + + @Override + public String getId() { + return id; } /** diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java index d50f23c3..b9ff0ab6 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java @@ -22,4 +22,6 @@ public interface QueryInsightsExporter extends Closeable { * @param records list of {@link SearchQueryRecord} */ void export(final List records); + + String getId(); } diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java index 3a2c2fa7..4c4a6cae 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java @@ -8,18 +8,14 @@ package org.opensearch.plugin.insights.core.exporter; -import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE; -import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE; - import java.io.IOException; import java.time.format.DateTimeFormatter; -import java.util.HashSet; +import java.util.HashMap; import java.util.Locale; -import java.util.Set; +import java.util.Map; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.client.Client; -import org.opensearch.common.settings.Settings; import org.opensearch.plugin.insights.core.metrics.OperationalMetric; import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter; @@ -32,7 +28,7 @@ public class QueryInsightsExporterFactory { */ private final Logger logger = LogManager.getLogger(); final private Client client; - final private Set exporters; + final private Map exporters; /** * Constructor of QueryInsightsExporterFactory @@ -41,32 +37,26 @@ public class QueryInsightsExporterFactory { */ public QueryInsightsExporterFactory(final Client client) { this.client = client; - this.exporters = new HashSet<>(); + this.exporters = new HashMap<>(); } /** * Validate exporter sink config * - * @param settings exporter sink config {@link Settings} + * @param exporterType exporter sink type * @throws IllegalArgumentException if provided exporter sink config settings are invalid */ - public void validateExporterConfig(final Settings settings) throws IllegalArgumentException { + public void validateExporterType(final String exporterType) throws IllegalArgumentException { // Disable exporter if the EXPORTER_TYPE setting is null - if (settings.get(EXPORTER_TYPE) == null) { + if (exporterType == null) { return; } - SinkType type; try { - type = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE)); + SinkType.parse(exporterType); } catch (IllegalArgumentException e) { OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.INVALID_EXPORTER_TYPE_FAILURES); throw new IllegalArgumentException( - String.format( - Locale.ROOT, - "Invalid exporter type [%s], type should be one of %s", - settings.get(EXPORTER_TYPE), - SinkType.allSinkTypes() - ) + String.format(Locale.ROOT, "Invalid exporter type [%s], type should be one of %s", exporterType, SinkType.allSinkTypes()) ); } } @@ -78,10 +68,10 @@ public void validateExporterConfig(final Settings settings) throws IllegalArgume * @param indexPattern the index pattern if creating a index exporter * @return QueryInsightsExporter the created exporter sink */ - public QueryInsightsExporter createExporter(SinkType type, String indexPattern) { + public QueryInsightsExporter createExporter(String id, SinkType type, String indexPattern) { if (SinkType.LOCAL_INDEX.equals(type)) { - QueryInsightsExporter exporter = new LocalIndexExporter(client, DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT)); - this.exporters.add(exporter); + QueryInsightsExporter exporter = new LocalIndexExporter(client, DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT), id); + this.exporters.put(id, exporter); return exporter; } return DebugExporter.getInstance(); @@ -101,6 +91,15 @@ public QueryInsightsExporter updateExporter(QueryInsightsExporter exporter, Stri return exporter; } + /** + * Get a exporter by id + * @param id The id of the exporter + * @return QueryInsightsReader the Reader + */ + public QueryInsightsExporter getExporter(String id) { + return this.exporters.get(id); + } + /** * Close an exporter * @@ -110,7 +109,7 @@ public QueryInsightsExporter updateExporter(QueryInsightsExporter exporter, Stri public void closeExporter(QueryInsightsExporter exporter) throws IOException { if (exporter != null) { exporter.close(); - this.exporters.remove(exporter); + this.exporters.remove(exporter.getId()); } } @@ -119,7 +118,7 @@ public void closeExporter(QueryInsightsExporter exporter) throws IOException { * */ public void closeAllExporters() { - for (QueryInsightsExporter exporter : exporters) { + for (QueryInsightsExporter exporter : exporters.values()) { try { closeExporter(exporter); } catch (IOException e) { diff --git a/src/main/java/org/opensearch/plugin/insights/core/exporter/SinkType.java b/src/main/java/org/opensearch/plugin/insights/core/exporter/SinkType.java index c90c9c76..ded7cc35 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/exporter/SinkType.java +++ b/src/main/java/org/opensearch/plugin/insights/core/exporter/SinkType.java @@ -17,6 +17,8 @@ * Type of supported sinks */ public enum SinkType { + /** no exporter */ + NONE("none"), /** debug exporter */ DEBUG("debug"), /** local index exporter */ @@ -60,7 +62,9 @@ public static Set allSinkTypes() { public static SinkType getSinkTypeFromExporter(QueryInsightsExporter exporter) { if (exporter.getClass().equals(LocalIndexExporter.class)) { return SinkType.LOCAL_INDEX; + } else if (exporter.getClass().equals(DebugExporter.class)) { + return SinkType.DEBUG; } - return SinkType.DEBUG; + return SinkType.NONE; } } diff --git a/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java index 6969b021..7abc9b7a 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java +++ b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java @@ -46,6 +46,7 @@ public final class LocalIndexReader implements QueryInsightsReader { private final Client client; private DateTimeFormatter indexPattern; private final NamedXContentRegistry namedXContentRegistry; + private final String id; /** * Constructor of LocalIndexReader @@ -54,12 +55,23 @@ public final class LocalIndexReader implements QueryInsightsReader { * @param indexPattern the pattern of index to read from * @param namedXContentRegistry for parsing purposes */ - public LocalIndexReader(final Client client, final DateTimeFormatter indexPattern, final NamedXContentRegistry namedXContentRegistry) { + public LocalIndexReader( + final Client client, + final DateTimeFormatter indexPattern, + final NamedXContentRegistry namedXContentRegistry, + final String id + ) { this.indexPattern = indexPattern; this.client = client; + this.id = id; this.namedXContentRegistry = namedXContentRegistry; } + @Override + public String getId() { + return id; + } + /** * Getter of indexPattern * diff --git a/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReader.java b/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReader.java index 6e96a96e..2cc7434d 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReader.java +++ b/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReader.java @@ -25,4 +25,6 @@ public interface QueryInsightsReader extends Closeable { * @return List of SearchQueryRecord */ List read(final String from, final String to, final String id); + + String getId(); } diff --git a/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactory.java b/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactory.java index 0f3c2701..aa7fff4d 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactory.java +++ b/src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactory.java @@ -10,9 +10,9 @@ import java.io.IOException; import java.time.format.DateTimeFormatter; -import java.util.HashSet; +import java.util.HashMap; import java.util.Locale; -import java.util.Set; +import java.util.Map; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.client.Client; @@ -27,7 +27,7 @@ public class QueryInsightsReaderFactory { */ private final Logger logger = LogManager.getLogger(); final private Client client; - final private Set Readers; + final private Map readers; /** * Constructor of QueryInsightsReaderFactory @@ -36,7 +36,7 @@ public class QueryInsightsReaderFactory { */ public QueryInsightsReaderFactory(final Client client) { this.client = client; - this.Readers = new HashSet<>(); + this.readers = new HashMap<>(); } /** @@ -46,39 +46,49 @@ public QueryInsightsReaderFactory(final Client client) { * @param namedXContentRegistry for parsing purposes * @return QueryInsightsReader the created Reader */ - public QueryInsightsReader createReader(String indexPattern, NamedXContentRegistry namedXContentRegistry) { - QueryInsightsReader Reader = new LocalIndexReader( + public QueryInsightsReader createReader(String id, String indexPattern, NamedXContentRegistry namedXContentRegistry) { + QueryInsightsReader reader = new LocalIndexReader( client, DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT), - namedXContentRegistry + namedXContentRegistry, + id ); - this.Readers.add(Reader); - return Reader; + this.readers.put(id, reader); + return reader; } /** - * Update a Reader based on provided parameters + * Update a reader based on provided parameters * - * @param Reader The Reader to update - * @param indexPattern the index pattern if creating an index Reader - * @return QueryInsightsReader the updated Reader sink + * @param reader The reader to update + * @param indexPattern the index pattern if creating an index reader + * @return QueryInsightsReader the updated reader sink */ - public QueryInsightsReader updateReader(QueryInsightsReader Reader, String indexPattern) { - if (Reader.getClass() == LocalIndexReader.class) { - ((LocalIndexReader) Reader).setIndexPattern(DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT)); + public QueryInsightsReader updateReader(QueryInsightsReader reader, String indexPattern) { + if (reader.getClass() == LocalIndexReader.class) { + ((LocalIndexReader) reader).setIndexPattern(DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT)); } - return Reader; + return reader; + } + + /** + * Get a reader by id + * @param id The id of the reader + * @return QueryInsightsReader the Reader + */ + public QueryInsightsReader getReader(String id) { + return this.readers.get(id); } /** - * Close a Reader + * Close a reader * - * @param Reader the Reader to close + * @param reader the Reader to close */ - public void closeReader(QueryInsightsReader Reader) throws IOException { - if (Reader != null) { - Reader.close(); - this.Readers.remove(Reader); + public void closeReader(QueryInsightsReader reader) throws IOException { + if (reader != null) { + reader.close(); + this.readers.remove(reader.getId()); } } @@ -87,11 +97,11 @@ public void closeReader(QueryInsightsReader Reader) throws IOException { * */ public void closeAllReaders() { - for (QueryInsightsReader Reader : Readers) { + for (QueryInsightsReader reader : readers.values()) { try { - closeReader(Reader); + closeReader(reader); } catch (IOException e) { - logger.error("Fail to close query insights Reader, error: ", e); + logger.error("Fail to close query insights reader, error: ", e); } } } diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index 808e4ff5..3d63e41f 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -8,10 +8,15 @@ package org.opensearch.plugin.insights.core.service; +import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID; +import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_LOCAL_INDEX_READER_ID; +import static org.opensearch.plugin.insights.core.service.TopQueriesService.deleteSingleIndex; +import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER; -import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_TYPE; import java.io.IOException; import java.util.ArrayList; @@ -31,12 +36,15 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; -import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.plugin.insights.core.exporter.LocalIndexExporter; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; +import org.opensearch.plugin.insights.core.exporter.SinkType; import org.opensearch.plugin.insights.core.metrics.OperationalMetric; import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter; +import org.opensearch.plugin.insights.core.reader.QueryInsightsReader; import org.opensearch.plugin.insights.core.reader.QueryInsightsReaderFactory; import org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator; import org.opensearch.plugin.insights.core.service.categorizer.SearchQueryCategorizer; @@ -87,12 +95,12 @@ public class QueryInsightsService extends AbstractLifecycleComponent { protected volatile List scheduledFutures; /** - * Query Insights exporter factory + * Factory for validating and creating exporters */ final QueryInsightsExporterFactory queryInsightsExporterFactory; /** - * Query Insights reader factory + * Factory for validating and creating readers */ final QueryInsightsReaderFactory queryInsightsReaderFactory; @@ -112,6 +120,8 @@ public class QueryInsightsService extends AbstractLifecycleComponent { */ private QueryShapeGenerator queryShapeGenerator; + private final Client client; + /** * Constructor of the QueryInsightsService * @@ -136,6 +146,7 @@ public QueryInsightsService( this.queryInsightsExporterFactory = new QueryInsightsExporterFactory(client); this.queryInsightsReaderFactory = new QueryInsightsReaderFactory(client); this.namedXContentRegistry = namedXContentRegistry; + this.client = client; // initialize top n queries services and configurations consumers topQueriesServices = new HashMap<>(); for (MetricType metricType : MetricType.allMetricTypes()) { @@ -145,20 +156,18 @@ public QueryInsightsService( new TopQueriesService(client, metricType, threadPool, queryInsightsExporterFactory, queryInsightsReaderFactory) ); } - for (MetricType type : MetricType.allMetricTypes()) { - clusterService.getClusterSettings() - .addSettingsUpdateConsumer( - getExporterSettings(type), - (settings -> setExporterAndReader(type, settings, clusterService.state().metadata().indices())), - (settings -> validateExporterAndReaderConfig(type, settings)) - ); - clusterService.getClusterSettings() - .addSettingsUpdateConsumer( - TOP_N_EXPORTER_DELETE_AFTER, - (settings -> setExporterDeleteAfterAndDelete(type, settings)), - (TopQueriesService::validateExporterDeleteAfter) - ); - } + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + TOP_N_EXPORTER_TYPE, + (v -> setExporterAndReader(SinkType.parse(v), clusterService.state().metadata().indices())), + (this::validateExporterType) + ); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + TOP_N_EXPORTER_DELETE_AFTER, + (this::setExporterDeleteAfterAndDelete), + (TopQueriesService::validateExporterDeleteAfter) + ); this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry); this.enableSearchQueryMetricsFeature(false); @@ -407,28 +416,67 @@ public void setTopNSize(final MetricType type, final int topNSize) { /** * Set the exporter and reader config for a metricType * - * @param type {@link MetricType} - * @param settings exporter and reader settings + * @param sinkType {@link SinkType} + * @param indexMetadataMap index metadata map in the current cluster */ - private void setExporterAndReader(final MetricType type, final Settings settings, final Map indexMetadataMap) { - if (topQueriesServices.containsKey(type)) { - TopQueriesService tqs = topQueriesServices.get(type); - tqs.setExporter(settings, indexMetadataMap); - tqs.setReader(settings, namedXContentRegistry); + private void setExporterAndReader(final SinkType sinkType, final Map indexMetadataMap) { + final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID); + + // This method is invoked when sink type is changed + // Clear local indices if exporter is of type LocalIndexExporter + if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) { + deleteAllTopNIndices(client, indexMetadataMap); + } + + if (sinkType != null) { + if (topQueriesExporter != null && sinkType == SinkType.getSinkTypeFromExporter(topQueriesExporter)) { + // this won't happen since we disallowed users to change index patterns. + // But leaving the hook here since we will add support for more sinks and configurations in the future. + queryInsightsExporterFactory.updateExporter(topQueriesExporter, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN); + } else { + try { + queryInsightsExporterFactory.closeExporter(topQueriesExporter); + } catch (IOException e) { + OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.EXPORTER_FAIL_TO_CLOSE_EXCEPTION); + logger.error("Fail to close the current exporter when updating exporter, error: ", e); + } + // this is a new exporter, create it for all underlying services. + queryInsightsExporterFactory.createExporter( + TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID, + sinkType, + DEFAULT_TOP_N_QUERIES_INDEX_PATTERN + ); + } + } else { + // Disable exporter if exporter type is set to null + try { + queryInsightsExporterFactory.closeExporter(topQueriesExporter); + } catch (IOException e) { + OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.EXPORTER_FAIL_TO_CLOSE_EXCEPTION); + logger.error("Fail to close the current exporter when disabling exporter, error: ", e); + } } + + // set up reader for top n queries service + final QueryInsightsReader reader = queryInsightsReaderFactory.createReader( + TOP_QUERIES_LOCAL_INDEX_READER_ID, + DEFAULT_TOP_N_QUERIES_INDEX_PATTERN, + namedXContentRegistry + ); + queryInsightsReaderFactory.updateReader(reader, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN); } /** * Set the exporter delete after, then delete expired Top N indices * - * @param type {@link MetricType} * @param deleteAfter the number of days after which Top N local indices should be deleted */ - private void setExporterDeleteAfterAndDelete(final MetricType type, final int deleteAfter) { - if (topQueriesServices.containsKey(type)) { - topQueriesServices.get(type).setExporterDeleteAfter(deleteAfter); - deleteExpiredTopNIndices(); + private void setExporterDeleteAfterAndDelete(final int deleteAfter) { + final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID); + if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) { + ((LocalIndexExporter) topQueriesExporter).setDeleteAfter(deleteAfter); } + deleteExpiredTopNIndices(); } /** @@ -440,16 +488,12 @@ public SearchQueryCategorizer getSearchQueryCategorizer() { } /** - * Validate the exporter and reader config for a metricType + * Validate the exporter type config * - * @param type {@link MetricType} - * @param settings exporter and reader settings + * @param exporterType exporter type */ - public void validateExporterAndReaderConfig(final MetricType type, final Settings settings) { - if (topQueriesServices.containsKey(type)) { - TopQueriesService tqs = topQueriesServices.get(type); - tqs.validateExporterAndReaderConfig(settings); - } + public void validateExporterType(final String exporterType) { + queryInsightsExporterFactory.validateExporterType(exporterType); } @Override @@ -519,11 +563,27 @@ public QueryInsightsHealthStats getHealthStats() { * Delete Top N local indices older than the configured data retention period */ private void deleteExpiredTopNIndices() { - for (MetricType metricType : MetricType.allMetricTypes()) { - topQueriesServices.get(metricType).deleteExpiredTopNIndices(clusterService.state().metadata().indices()); + final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID); + if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) { + threadPool.executor(QUERY_INSIGHTS_EXECUTOR) + .execute( + () -> ((LocalIndexExporter) topQueriesExporter).deleteExpiredTopNIndices(clusterService.state().metadata().indices()) + ); } } + /** + * Deletes all Top N local indices + * + * @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata} + */ + void deleteAllTopNIndices(final Client client, final Map indexMetadataMap) { + indexMetadataMap.entrySet() + .stream() + .filter(entry -> isTopQueriesIndex(entry.getKey())) + .forEach(entry -> deleteSingleIndex(entry.getKey(), client)); + } + /** * Set query shape generator */ diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java index b99767c3..f802213d 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java @@ -8,9 +8,6 @@ package org.opensearch.plugin.insights.core.service; -import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN; -import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE; -import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MAX_DELETE_AFTER_VALUE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MIN_DELETE_AFTER_VALUE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR; @@ -40,15 +37,10 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.client.Client; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.plugin.insights.core.exporter.LocalIndexExporter; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; -import org.opensearch.plugin.insights.core.exporter.SinkType; import org.opensearch.plugin.insights.core.metrics.OperationalMetric; import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter; import org.opensearch.plugin.insights.core.reader.QueryInsightsReader; @@ -70,6 +62,8 @@ * with high latency or resource usage */ public class TopQueriesService { + public static final String TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID = "top_queries_local_index_exporter"; + public static final String TOP_QUERIES_LOCAL_INDEX_READER_ID = "top_queries_local_index_reader"; private static final String METRIC_TYPE_TAG = "metric_type"; private static final String GROUPBY_TAG = "groupby"; @@ -122,12 +116,6 @@ public class TopQueriesService { */ private final ThreadPool threadPool; - /** - * Exporter for exporting top queries data - */ - private QueryInsightsExporter exporter; - private QueryInsightsReader reader; - private final QueryGrouper queryGrouper; TopQueriesService( @@ -135,7 +123,7 @@ public class TopQueriesService { final MetricType metricType, final ThreadPool threadPool, final QueryInsightsExporterFactory queryInsightsExporterFactory, - QueryInsightsReaderFactory queryInsightsReaderFactory + final QueryInsightsReaderFactory queryInsightsReaderFactory ) { this.enabled = false; this.client = client; @@ -146,8 +134,6 @@ public class TopQueriesService { this.topNSize = QueryInsightsSettings.DEFAULT_TOP_N_SIZE; this.windowSize = QueryInsightsSettings.DEFAULT_WINDOW_SIZE; this.windowStart = -1L; - this.exporter = null; - this.reader = null; topQueriesStore = new PriorityBlockingQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType)); topQueriesCurrentSnapshot = new AtomicReference<>(new ArrayList<>()); topQueriesHistorySnapshot = new AtomicReference<>(new ArrayList<>()); @@ -269,66 +255,6 @@ public void validateWindowSize(final TimeValue windowSize) { } } - /** - * Set up the top queries exporter based on provided settings - * - * @param settings exporter config {@link Settings} - */ - public void setExporter(final Settings settings, final Map indexMetadataMap) { - // This method is invoked when sink type is changed - // Clear local indices if exporter is of type LocalIndexExporter - if (exporter != null && exporter.getClass() == LocalIndexExporter.class) { - deleteAllTopNIndices(indexMetadataMap); - } - - if (settings.get(EXPORTER_TYPE) != null) { - SinkType expectedType = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE)); - if (exporter != null && expectedType == SinkType.getSinkTypeFromExporter(exporter)) { - queryInsightsExporterFactory.updateExporter(exporter, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN); - } else { - try { - queryInsightsExporterFactory.closeExporter(this.exporter); - } catch (IOException e) { - OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.EXPORTER_FAIL_TO_CLOSE_EXCEPTION); - logger.error("Fail to close the current exporter when updating exporter, error: ", e); - } - this.exporter = queryInsightsExporterFactory.createExporter( - SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE)), - DEFAULT_TOP_N_QUERIES_INDEX_PATTERN - ); - } - } else { - // Disable exporter if exporter type is set to null - try { - queryInsightsExporterFactory.closeExporter(this.exporter); - this.exporter = null; - } catch (IOException e) { - OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.EXPORTER_FAIL_TO_CLOSE_EXCEPTION); - logger.error("Fail to close the current exporter when disabling exporter, error: ", e); - } - } - } - - /** - * Set up the top queries reader based on provided settings - * - * @param settings reader config {@link Settings} - * @param namedXContentRegistry NamedXContentRegistry for parsing purposes - */ - public void setReader(final Settings settings, final NamedXContentRegistry namedXContentRegistry) { - this.reader = queryInsightsReaderFactory.createReader(DEFAULT_TOP_N_QUERIES_INDEX_PATTERN, namedXContentRegistry); - queryInsightsReaderFactory.updateReader(reader, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN); - } - - /** - * Validate provided settings for top queries exporter and reader - * - * @param settings settings exporter/reader config {@link Settings} - */ - public void validateExporterAndReaderConfig(Settings settings) { - queryInsightsExporterFactory.validateExporterConfig(settings); - } - /** * Lambda function to mark if a record is internal */ @@ -426,6 +352,7 @@ public List getTopQueriesRecordsFromIndex(final String from, } final List queries = new ArrayList<>(); + final QueryInsightsReader reader = queryInsightsReaderFactory.getReader(TOP_QUERIES_LOCAL_INDEX_READER_ID); if (reader != null) { try { final ZonedDateTime start = ZonedDateTime.parse(from); @@ -515,6 +442,7 @@ private void rotateWindowIfNecessary(final long newWindowStart) { topQueriesCurrentSnapshot.set(new ArrayList<>()); windowStart = newWindowStart; // export to the configured sink + QueryInsightsExporter exporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID); if (exporter != null) { threadPool.executor(QUERY_INSIGHTS_EXECUTOR).execute(() -> exporter.export(history)); } @@ -548,10 +476,7 @@ public List getTopQueriesCurrentSnapshot() { * Close the top n queries service * @throws IOException exception */ - public void close() throws IOException { - queryInsightsExporterFactory.closeExporter(this.exporter); - queryInsightsReaderFactory.closeReader(this.reader); - } + public void close() throws IOException {} /** * Drain internal stores. @@ -591,39 +516,6 @@ static void validateExporterDeleteAfter(final int deleteAfter) { } } - /** - * Set exporter delete after if exporter is a {@link LocalIndexExporter} - * - * @param deleteAfter the number of days after which Top N local indices should be deleted - */ - void setExporterDeleteAfter(final int deleteAfter) { - if (exporter != null && exporter.getClass() == LocalIndexExporter.class) { - ((LocalIndexExporter) exporter).setDeleteAfter(deleteAfter); - } - } - - /** - * Delete Top N local indices older than the configured data retention period - */ - void deleteExpiredTopNIndices(final Map indexMetadataMap) { - if (exporter != null && exporter.getClass() == LocalIndexExporter.class) { - threadPool.executor(QUERY_INSIGHTS_EXECUTOR) - .execute(() -> ((LocalIndexExporter) exporter).deleteExpiredTopNIndices(indexMetadataMap)); - } - } - - /** - * Deletes all Top N local indices - * - * @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata} - */ - void deleteAllTopNIndices(final Map indexMetadataMap) { - indexMetadataMap.entrySet() - .stream() - .filter(entry -> isTopQueriesIndex(entry.getKey())) - .forEach(entry -> deleteSingleIndex(entry.getKey(), client)); - } - /** * Deletes the specified index and logs any failure that occurs during the operation. * diff --git a/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java b/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java index 204f5f33..ecff6a9b 100644 --- a/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java +++ b/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java @@ -13,7 +13,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.plugin.insights.core.exporter.SinkType; import org.opensearch.plugin.insights.rules.model.GroupingType; @@ -222,15 +221,7 @@ public class QueryInsightsSettings { /** * Settings and defaults for top queries exporters */ - private static final String TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX = TOP_N_LATENCY_QUERIES_PREFIX + ".exporter."; - /** - * Prefix for top n queries by cpu exporters - */ - private static final String TOP_N_CPU_QUERIES_EXPORTER_PREFIX = TOP_N_CPU_QUERIES_PREFIX + ".exporter."; - /** - * Prefix for top n queries by memory exporters - */ - private static final String TOP_N_MEMORY_QUERIES_EXPORTER_PREFIX = TOP_N_MEMORY_QUERIES_PREFIX + ".exporter."; + private static final String TOP_N_QUERIES_EXPORTER_PREFIX = TOP_N_QUERIES_SETTING_PREFIX + ".exporter"; /** * Default index pattern of top n queries */ @@ -238,7 +229,7 @@ public class QueryInsightsSettings { /** * Default exporter type of top queries */ - public static final String DEFAULT_TOP_QUERIES_EXPORTER_TYPE = SinkType.LOCAL_INDEX.toString(); + public static final String DEFAULT_TOP_QUERIES_EXPORTER_TYPE = SinkType.NONE.toString(); /** * Default Top N local indices retention period in days */ @@ -259,37 +250,20 @@ public class QueryInsightsSettings { * and it applies to exporters of all metric types */ public static final Setting TOP_N_EXPORTER_DELETE_AFTER = Setting.intSetting( - TOP_N_QUERIES_SETTING_PREFIX + ".delete_after_days", + TOP_N_QUERIES_EXPORTER_PREFIX + ".delete_after_days", DEFAULT_DELETE_AFTER_VALUE, Setting.Property.Dynamic, Setting.Property.NodeScope ); /** - * Settings for the exporter of top latency queries - */ - public static final Setting TOP_N_LATENCY_EXPORTER_SETTINGS = Setting.groupSetting( - TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Settings for the exporter of top cpu queries - */ - public static final Setting TOP_N_CPU_EXPORTER_SETTINGS = Setting.groupSetting( - TOP_N_CPU_QUERIES_EXPORTER_PREFIX, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Settings for the exporter of top cpu queries + * Settings for the top n queries exporter */ - public static final Setting TOP_N_MEMORY_EXPORTER_SETTINGS = Setting.groupSetting( - TOP_N_MEMORY_QUERIES_EXPORTER_PREFIX, - Setting.Property.Dynamic, - Setting.Property.NodeScope + public static final Setting TOP_N_EXPORTER_TYPE = Setting.simpleString( + TOP_N_QUERIES_EXPORTER_PREFIX + ".type", + DEFAULT_TOP_QUERIES_EXPORTER_TYPE, + Setting.Property.NodeScope, + Setting.Property.Dynamic ); /** @@ -340,22 +314,6 @@ public static Setting getTopNWindowSizeSetting(MetricType type) { } } - /** - * Get the exporter settings based on type - * @param type MetricType - * @return exporter setting - */ - public static Setting getExporterSettings(MetricType type) { - switch (type) { - case CPU: - return TOP_N_CPU_EXPORTER_SETTINGS; - case MEMORY: - return TOP_N_MEMORY_EXPORTER_SETTINGS; - default: - return TOP_N_LATENCY_EXPORTER_SETTINGS; - } - } - /** * Default constructor */ diff --git a/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java b/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java index 8d8dad0b..dacf64e7 100644 --- a/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java +++ b/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java @@ -68,21 +68,19 @@ public void testGetSettings() { QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED, QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE, QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE, - QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS, QueryInsightsSettings.TOP_N_CPU_QUERIES_ENABLED, QueryInsightsSettings.TOP_N_CPU_QUERIES_SIZE, QueryInsightsSettings.TOP_N_CPU_QUERIES_WINDOW_SIZE, - QueryInsightsSettings.TOP_N_CPU_EXPORTER_SETTINGS, QueryInsightsSettings.TOP_N_MEMORY_QUERIES_ENABLED, QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE, QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE, - QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS, QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY, QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N, QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME, QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE, QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING, QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER, + QueryInsightsSettings.TOP_N_EXPORTER_TYPE, QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY ), queryInsightsPlugin.getSettings() diff --git a/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java index 886238f9..18f570e6 100644 --- a/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java +++ b/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -342,15 +342,13 @@ public static void registerAllQueryInsightsSettings(ClusterSettings clusterSetti clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); - clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_QUERIES_ENABLED); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_QUERIES_SIZE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_QUERIES_WINDOW_SIZE); - clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_EXPORTER_SETTINGS); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_QUERIES_ENABLED); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE); - clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_EXPORTER_TYPE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME); diff --git a/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java b/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java index 7e5db95c..00ff6d34 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java @@ -54,7 +54,7 @@ public class LocalIndexExporterTests extends OpenSearchTestCase { @Before public void setup() { - localIndexExporter = new LocalIndexExporter(client, format); + localIndexExporter = new LocalIndexExporter(client, format, "id"); when(client.admin()).thenReturn(adminClient); when(adminClient.indices()).thenReturn(indicesAdminClient); diff --git a/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java b/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java index 99c96280..da250353 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java @@ -44,28 +44,24 @@ public void setup() { } public void testValidateConfigWhenResetExporter() { - Settings.Builder settingsBuilder = Settings.builder(); - // empty settings - Settings settings = settingsBuilder.build(); try { - queryInsightsExporterFactory.validateExporterConfig(settings); + // empty settings + queryInsightsExporterFactory.validateExporterType(null); } catch (Exception e) { fail("No exception should be thrown when setting is null"); } } public void testInvalidExporterTypeConfig() { - Settings.Builder settingsBuilder = Settings.builder(); - Settings settings = settingsBuilder.put(EXPORTER_TYPE, "some_invalid_type").build(); - assertThrows(IllegalArgumentException.class, () -> { queryInsightsExporterFactory.validateExporterConfig(settings); }); + assertThrows(IllegalArgumentException.class, () -> { queryInsightsExporterFactory.validateExporterType("some_invalid_type"); }); } public void testCreateAndCloseExporter() { - QueryInsightsExporter exporter1 = queryInsightsExporterFactory.createExporter(SinkType.LOCAL_INDEX, format); + QueryInsightsExporter exporter1 = queryInsightsExporterFactory.createExporter("id", SinkType.LOCAL_INDEX, format); assertTrue(exporter1 instanceof LocalIndexExporter); - QueryInsightsExporter exporter2 = queryInsightsExporterFactory.createExporter(SinkType.DEBUG, format); + QueryInsightsExporter exporter2 = queryInsightsExporterFactory.createExporter("id", SinkType.DEBUG, format); assertTrue(exporter2 instanceof DebugExporter); - QueryInsightsExporter exporter3 = queryInsightsExporterFactory.createExporter(SinkType.DEBUG, format); + QueryInsightsExporter exporter3 = queryInsightsExporterFactory.createExporter("id", SinkType.DEBUG, format); assertTrue(exporter3 instanceof DebugExporter); try { queryInsightsExporterFactory.closeExporter(exporter1); @@ -77,7 +73,7 @@ public void testCreateAndCloseExporter() { } public void testUpdateExporter() { - LocalIndexExporter exporter = new LocalIndexExporter(client, DateTimeFormatter.ofPattern(format, Locale.ROOT)); + LocalIndexExporter exporter = new LocalIndexExporter(client, DateTimeFormatter.ofPattern(format, Locale.ROOT), "id"); queryInsightsExporterFactory.updateExporter(exporter, "yyyy-MM-dd-HH"); assertEquals(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH", Locale.ROOT).toString(), exporter.getIndexPattern().toString()); } diff --git a/src/test/java/org/opensearch/plugin/insights/core/reader/LocalIndexReaderTests.java b/src/test/java/org/opensearch/plugin/insights/core/reader/LocalIndexReaderTests.java index 63a9ed0a..f9332edd 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/reader/LocalIndexReaderTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/reader/LocalIndexReaderTests.java @@ -48,7 +48,7 @@ public class LocalIndexReaderTests extends OpenSearchTestCase { @Before public void setup() { - localIndexReader = new LocalIndexReader(client, format, namedXContentRegistry); + localIndexReader = new LocalIndexReader(client, format, namedXContentRegistry, "id"); } @SuppressWarnings("unchecked") diff --git a/src/test/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactoryTests.java b/src/test/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactoryTests.java index 74adcac2..5d74076d 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactoryTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactoryTests.java @@ -45,7 +45,7 @@ public void setup() { } public void testCreateAndCloseReader() { - QueryInsightsReader reader1 = queryInsightsReaderFactory.createReader(format, namedXContentRegistry); + QueryInsightsReader reader1 = queryInsightsReaderFactory.createReader("id", format, namedXContentRegistry); assertTrue(reader1 instanceof LocalIndexReader); try { queryInsightsReaderFactory.closeReader(reader1); @@ -56,7 +56,7 @@ public void testCreateAndCloseReader() { } public void testUpdateReader() { - LocalIndexReader reader = new LocalIndexReader(client, DateTimeFormatter.ofPattern(format, Locale.ROOT), namedXContentRegistry); + LocalIndexReader reader = new LocalIndexReader(client, DateTimeFormatter.ofPattern(format, Locale.ROOT), namedXContentRegistry, "id"); queryInsightsReaderFactory.updateReader(reader, "yyyy-MM-dd-HH"); assertEquals(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH", Locale.ROOT).toString(), reader.getIndexPattern().toString()); } diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java index 61631d43..ba59bec2 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java @@ -11,18 +11,29 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; +import static org.opensearch.plugin.insights.core.exporter.LocalIndexExporter.generateLocalIndexDateHash; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.ENTRY_COUNT; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.EVICTIONS; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.HIT_COUNT; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.MISS_COUNT; import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.SIZE_IN_BYTES; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.client.AdminClient; import org.opensearch.client.Client; +import org.opensearch.client.IndicesAdminClient; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -54,6 +65,8 @@ public class QueryInsightsServiceTests extends OpenSearchTestCase { private final NamedXContentRegistry namedXContentRegistry = mock(NamedXContentRegistry.class); private QueryInsightsService queryInsightsService; private QueryInsightsService queryInsightsServiceSpy; + private final AdminClient adminClient = mock(AdminClient.class); + private final IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); @Before public void setup() { @@ -84,6 +97,9 @@ public void setup() { invocation -> mock(Counter.class) ); OperationalMetricsCounter.initialize("cluster", metricsRegistry); + + when(client.admin()).thenReturn(adminClient); + when(adminClient.indices()).thenReturn(indicesAdminClient); } @Override @@ -224,4 +240,46 @@ public void testGetHealthStats() { assertTrue(fieldTypeCacheStats.containsKey(HIT_COUNT)); assertTrue(fieldTypeCacheStats.containsKey(MISS_COUNT)); } + + public void testDeleteAllTopNIndices() { + // Create 9 top_queries-* indices + Map indexMetadataMap = new HashMap<>(); + for (int i = 1; i < 10; i++) { + String indexName = "top_queries-2024.01.0" + i + "-" + generateLocalIndexDateHash(); + long creationTime = Instant.now().minus(i, ChronoUnit.DAYS).toEpochMilli(); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put("index.version.created", Version.CURRENT.id) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(SETTING_CREATION_DATE, creationTime) + ) + .build(); + indexMetadataMap.put(indexName, indexMetadata); + } + // Create 5 user indices + for (int i = 0; i < 5; i++) { + String indexName = "my_index-" + i; + long creationTime = Instant.now().minus(i, ChronoUnit.DAYS).toEpochMilli(); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put("index.version.created", Version.CURRENT.id) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(SETTING_CREATION_DATE, creationTime) + ) + .build(); + indexMetadataMap.put(indexName, indexMetadata); + } + + queryInsightsService.deleteAllTopNIndices(client, indexMetadataMap); + // All 10 indices should be deleted + verify(client, times(9)).admin(); + verify(adminClient, times(9)).indices(); + verify(indicesAdminClient, times(9)).delete(any(), any()); + } } diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java index 69bc3461..e1bf51c5 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java @@ -222,48 +222,6 @@ public void testValidateExporterDeleteAfter() { ); } - public void testDeleteAllTopNIndices() { - // Create 9 top_queries-* indices - Map indexMetadataMap = new HashMap<>(); - for (int i = 1; i < 10; i++) { - String indexName = "top_queries-2024.01.0" + i + "-" + generateLocalIndexDateHash(); - long creationTime = Instant.now().minus(i, ChronoUnit.DAYS).toEpochMilli(); - - IndexMetadata indexMetadata = IndexMetadata.builder(indexName) - .settings( - Settings.builder() - .put("index.version.created", Version.CURRENT.id) - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 1) - .put(SETTING_CREATION_DATE, creationTime) - ) - .build(); - indexMetadataMap.put(indexName, indexMetadata); - } - // Create 5 user indices - for (int i = 0; i < 5; i++) { - String indexName = "my_index-" + i; - long creationTime = Instant.now().minus(i, ChronoUnit.DAYS).toEpochMilli(); - - IndexMetadata indexMetadata = IndexMetadata.builder(indexName) - .settings( - Settings.builder() - .put("index.version.created", Version.CURRENT.id) - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 1) - .put(SETTING_CREATION_DATE, creationTime) - ) - .build(); - indexMetadataMap.put(indexName, indexMetadata); - } - - topQueriesService.deleteAllTopNIndices(indexMetadataMap); - // All 10 indices should be delete - verify(client, times(9)).admin(); - verify(adminClient, times(9)).indices(); - verify(indicesAdminClient, times(9)).delete(any(), any()); - } - public void testIsTopQueriesIndex() { assertTrue(isTopQueriesIndex("top_queries-2024.01.01-01234")); assertTrue(isTopQueriesIndex("top_queries-2025.12.12-99999"));