From b6faf901f516c480e54e67dee38ac6ecfba5c9c7 Mon Sep 17 00:00:00 2001 From: Jackie Han Date: Mon, 10 Jun 2024 10:16:58 -0700 Subject: [PATCH] add custom result index lifecycle management Signed-off-by: Jackie Han --- .../timeseries/indices/IndexManagement.java | 97 +++++++++-- .../opensearch/ad/indices/RolloverTests.java | 156 +++++++++++++++++- 2 files changed, 235 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java b/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java index 9d2076488..1c3f30c77 100644 --- a/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java +++ b/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java @@ -12,6 +12,7 @@ package org.opensearch.timeseries.indices; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.forecast.constant.ForecastCommonName.CUSTOM_RESULT_INDEX_PREFIX; import static org.opensearch.timeseries.util.RestHandlerUtils.createXContentParserFromRegistry; import java.io.IOException; @@ -51,6 +52,7 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.action.support.GroupedActionListener; import org.opensearch.action.support.IndicesOptions; +import org.opensearch.ad.constant.ADCommonName; import org.opensearch.client.AdminClient; import org.opensearch.client.Client; import org.opensearch.cluster.LocalNodeClusterManagerListener; @@ -66,6 +68,8 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.core.xcontent.XContentParser.Token; @@ -298,7 +302,8 @@ protected void deleteOldHistoryIndices(String indexPattern, TimeValue historyRet long latest = Long.MIN_VALUE; for (IndexMetadata indexMetaData : clusterStateResponse.getState().metadata().indices().values()) { long creationTime = indexMetaData.getCreationDate(); - if ((Instant.now().toEpochMilli() - creationTime) > historyRetentionPeriod.millis()) { + long indexAgeMillis = Instant.now().toEpochMilli() - creationTime; + if (indexAgeMillis > historyRetentionPeriod.millis()) { String indexName = indexMetaData.getIndex().getName(); candidates.add(indexName); if (latest < creationTime) { @@ -1100,6 +1105,7 @@ protected void rescheduleRollover() { if (scheduledRollover != null) { scheduledRollover.cancel(); } + scheduledRollover = threadPool .scheduleWithFixedDelay(() -> rolloverAndDeleteHistoryIndex(), historyRolloverPeriod, executorName()); } @@ -1234,35 +1240,94 @@ protected void rolloverAndDeleteHistoryIndex( String rolloverIndexPattern, IndexType resultIndex ) { - if (!doesDefaultResultIndexExist()) { - return; + // perform rollover and delete on default result index + if (doesDefaultResultIndexExist()) { + RolloverRequest defaultResultIndexRolloverRequest = buildRolloverRequest(resultIndexAlias, rolloverIndexPattern); + defaultResultIndexRolloverRequest.addMaxIndexDocsCondition(historyMaxDocs * getNumberOfPrimaryShards()); + proceedWithDefaultRolloverAndDelete(resultIndexAlias, defaultResultIndexRolloverRequest, allResultIndicesPattern, resultIndex); } + // get config files that have custom result index alias to perform rollover on + getConfigsWithCustomResultIndexAlias(ActionListener.wrap(candidateResultAliases -> { + if (candidateResultAliases == null || candidateResultAliases.isEmpty()) { + logger.info("Candidate custom result indices are empty."); + return; + } - // We have to pass null for newIndexName in order to get Elastic to increment the index count. - RolloverRequest rollOverRequest = new RolloverRequest(resultIndexAlias, null); + // perform rollover and delete on found custom result index alias + candidateResultAliases.forEach(config -> handleCustomResultIndex(config, resultIndex)); + }, e -> { logger.error("Failed to get configs with custom result index alias.", e); })); + } + + private void handleCustomResultIndex(Config config, IndexType resultIndex) { + RolloverRequest rolloverRequest = buildRolloverRequest( + config.getCustomResultIndexOrAlias(), + getCustomResultIndexPattern(config.getCustomResultIndexOrAlias()) + ); + + // add rollover conditions if found in config + if (config.getCustomResultIndexMinAge() != null) { + rolloverRequest.addMaxIndexAgeCondition(TimeValue.timeValueDays(config.getCustomResultIndexMinAge())); + } + if (config.getCustomResultIndexMinSize() != null) { + rolloverRequest.addMaxIndexSizeCondition(new ByteSizeValue(config.getCustomResultIndexMinSize(), ByteSizeUnit.MB)); + } + + // perform rollover and delete on custom result index alias + proceedWithRolloverAndDelete( + config.getCustomResultIndexOrAlias(), + rolloverRequest, + getAllCustomResultIndexPattern(config.getCustomResultIndexOrAlias()), + resultIndex, + config.getCustomResultIndexTTL() + ); + } + + private void proceedWithDefaultRolloverAndDelete( + String resultIndexAlias, + RolloverRequest rolloverRequest, + String allResultIndicesPattern, + IndexType resultIndex + ) { + proceedWithRolloverAndDelete(resultIndexAlias, rolloverRequest, allResultIndicesPattern, resultIndex, null); + } + private RolloverRequest buildRolloverRequest(String resultIndexAlias, String rolloverIndexPattern) { + RolloverRequest rollOverRequest = new RolloverRequest(resultIndexAlias, null); CreateIndexRequest createRequest = rollOverRequest.getCreateIndexRequest(); createRequest.index(rolloverIndexPattern).mapping(resultMapping, XContentType.JSON); - choosePrimaryShards(createRequest, true); - rollOverRequest.addMaxIndexDocsCondition(historyMaxDocs * getNumberOfPrimaryShards()); + return rollOverRequest; + } + + private void proceedWithRolloverAndDelete( + String resultIndexAlias, + RolloverRequest rollOverRequest, + String allResultIndicesPattern, + IndexType resultIndex, + Integer customResultIndexTtl + ) { adminClient.indices().rolloverIndex(rollOverRequest, ActionListener.wrap(response -> { if (!response.isRolledOver()) { logger.warn("{} not rolled over. Conditions were: {}", resultIndexAlias, response.getConditionStatus()); } else { - IndexState indexStatetate = indexStates.computeIfAbsent(resultIndex, k -> new IndexState(k.getMapping())); - indexStatetate.mappingUpToDate = true; + IndexState indexState = indexStates.computeIfAbsent(resultIndex, k -> new IndexState(k.getMapping())); + indexState.mappingUpToDate = true; logger.info("{} rolled over. Conditions were: {}", resultIndexAlias, response.getConditionStatus()); - deleteOldHistoryIndices(allResultIndicesPattern, historyRetentionPeriod); + if (resultIndexAlias.startsWith(ADCommonName.CUSTOM_RESULT_INDEX_PREFIX) + || resultIndexAlias.startsWith(CUSTOM_RESULT_INDEX_PREFIX)) { + // handle custom result index deletion + if (customResultIndexTtl != null) { + deleteOldHistoryIndices(allResultIndicesPattern, TimeValue.timeValueHours(customResultIndexTtl * 24)); + + } + } else { + // handle default result index deletion + deleteOldHistoryIndices(allResultIndicesPattern, historyRetentionPeriod); + } } - }, exception -> { - // e.g., we may roll over too often. Since the index pattern is opensearch-ad-plugin-result-d-history-{now/d}-000001, - // we cannot roll over twice in the same day as the index with the same name exists. We will get - // resource_already_exists_exception. - logger.error("Fail to roll over result index", exception); - })); + }, exception -> { logger.error("Fail to roll over result index", exception); })); } protected void initResultIndexDirectly( diff --git a/src/test/java/org/opensearch/ad/indices/RolloverTests.java b/src/test/java/org/opensearch/ad/indices/RolloverTests.java index e021f5976..2d847d266 100644 --- a/src/test/java/org/opensearch/ad/indices/RolloverTests.java +++ b/src/test/java/org/opensearch/ad/indices/RolloverTests.java @@ -18,22 +18,31 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.timeseries.TestHelpers.createSearchResponse; +import java.io.IOException; import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Map; +import org.apache.lucene.search.TotalHits; +import org.opensearch.Version; import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.rollover.Condition; +import org.opensearch.action.admin.indices.rollover.MaxAgeCondition; import org.opensearch.action.admin.indices.rollover.MaxDocsCondition; +import org.opensearch.action.admin.indices.rollover.MaxSizeCondition; import org.opensearch.action.admin.indices.rollover.RolloverRequest; import org.opensearch.action.admin.indices.rollover.RolloverResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.ShardSearchFailure; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.ad.constant.ADCommonName; +import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.settings.AnomalyDetectorSettings; import org.opensearch.client.AdminClient; import org.opensearch.client.Client; @@ -41,14 +50,24 @@ import org.opensearch.client.IndicesAdminClient; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.AliasMetadata; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; +import org.opensearch.search.aggregations.InternalAggregations; +import org.opensearch.search.internal.InternalSearchResponse; import org.opensearch.threadpool.ThreadPool; import org.opensearch.timeseries.AbstractTimeSeriesTest; +import org.opensearch.timeseries.TestHelpers; import org.opensearch.timeseries.settings.TimeSeriesSettings; import org.opensearch.timeseries.util.DiscoveryNodeFilterer; @@ -56,16 +75,18 @@ public class RolloverTests extends AbstractTimeSeriesTest { private ADIndexManagement adIndices; private IndicesAdminClient indicesClient; private ClusterAdminClient clusterAdminClient; + private Client client; private ClusterName clusterName; private ClusterState clusterState; private ClusterService clusterService; + private NamedXContentRegistry namedXContentRegistry; private long defaultMaxDocs; private int numberOfNodes; @Override public void setUp() throws Exception { super.setUp(); - Client client = mock(Client.class); + client = mock(Client.class); indicesClient = mock(IndicesAdminClient.class); AdminClient adminClient = mock(AdminClient.class); clusterService = mock(ClusterService.class); @@ -98,6 +119,8 @@ public void setUp() throws Exception { numberOfNodes = 2; when(nodeFilter.getNumberOfEligibleDataNodes()).thenReturn(numberOfNodes); + namedXContentRegistry = TestHelpers.xContentRegistry(); + adIndices = new ADIndexManagement( client, clusterService, @@ -105,7 +128,7 @@ public void setUp() throws Exception { settings, nodeFilter, TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES, - NamedXContentRegistry.EMPTY + namedXContentRegistry ); clusterAdminClient = mock(ClusterAdminClient.class); @@ -248,4 +271,133 @@ public void testRetryingDelete() { // 1 group delete, 1 separate retry for each index to delete verify(indicesClient, times(2)).delete(any(), any()); } + + public void testNoCustomResultIndexFound_RolloverDefaultResultIndex_shouldSucceed() { + setUpRolloverSuccess(); + setUpGetConfigs_withNoCustomResultIndexAlias(); + + adIndices.rolloverAndDeleteHistoryIndex(); + verify(indicesClient, times(1)).rolloverIndex(any(), any()); + verify(client, times(1)).search(any(), any()); + } + + public void testCustomResultIndexFound_RolloverCustomResultIndex_withConditions_shouldSucceed() throws IOException { + setUpGetConfigs_withCustomResultIndexAlias(); + + adIndices.rolloverAndDeleteHistoryIndex(); + + verify(indicesClient, times(1)).rolloverIndex(any(), any()); + verify(client, times(1)).search(any(), any()); + } + + private void setUpGetConfigs_withNoCustomResultIndexAlias() { + Metadata.Builder metaBuilder = Metadata + .builder() + .put(indexMeta(".opendistro-anomaly-detectors", 1L, ADCommonName.ANOMALY_RESULT_INDEX_ALIAS), true); + clusterState = ClusterState.builder(clusterName).metadata(metaBuilder.build()).build(); + when(clusterService.state()).thenReturn(clusterState); + + String detectorString = "{\"name\":\"AhtYYGWTgqkzairTchcs\",\"description\":\"iIiAVPMyFgnFlEniLbMyfJxyoGvJAl\"," + + "\"time_field\":\"HmdFH\",\"indices\":[\"ffsBF\"],\"filter_query\":{\"bool\":{\"filter\":[{\"exists\":" + + "{\"field\":\"value\",\"boost\":1}}],\"adjust_pure_negative\":true,\"boost\":1}},\"window_delay\":" + + "{\"period\":{\"interval\":2,\"unit\":\"Minutes\"}},\"shingle_size\":8,\"schema_version\":-512063255," + + "\"feature_attributes\":[{\"feature_id\":\"OTYJs\",\"feature_name\":\"eYYCM\",\"feature_enabled\":false," + + "\"aggregation_query\":{\"XzewX\":{\"value_count\":{\"field\":\"ok\"}}}}],\"recency_emphasis\":3342," + + "\"history\":62,\"last_update_time\":1717192049845,\"category_field\":[\"Tcqcb\"],\"customResultIndexOrAlias\":" + + "\"\",\"imputation_option\":{\"method\":\"FIXED_VALUES\",\"defaultFill\"" + + ":[],\"integerSensitive\":false},\"suggested_seasonality\":64,\"detection_interval\":{\"period\":" + + "{\"interval\":5,\"unit\":\"Minutes\"}},\"detector_type\":\"MULTI_ENTITY\",\"rules\":[]}"; + + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(1); + SearchHit config = SearchHit.fromXContent(TestHelpers.parser(detectorString)); + SearchHits searchHits = new SearchHits(new SearchHit[] { config }, new TotalHits(1, TotalHits.Relation.EQUAL_TO), Float.NaN); + InternalSearchResponse response = new InternalSearchResponse( + searchHits, + InternalAggregations.EMPTY, + null, + null, + false, + null, + 1 + ); + SearchResponse searchResponse = new SearchResponse( + response, + null, + 1, + 1, + 0, + 100, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY + ); + listener.onResponse(searchResponse); + return null; + }).when(client).search(any(), any()); + } + + private void setUpRolloverSuccessForCustomIndex() { + doAnswer(invocation -> { + RolloverRequest request = invocation.getArgument(0); + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation.getArgument(1); + + assertEquals("opensearch-ad-plugin-result-", request.indices()[0]); + Map> conditions = request.getConditions(); + assertEquals(2, conditions.size()); + assertEquals(new MaxAgeCondition(TimeValue.timeValueDays(7)), conditions.get(MaxAgeCondition.NAME)); + assertEquals(new MaxSizeCondition(new ByteSizeValue(51200, ByteSizeUnit.MB)), conditions.get(MaxSizeCondition.NAME)); + + CreateIndexRequest createIndexRequest = request.getCreateIndexRequest(); + assertEquals("", createIndexRequest.index()); + assertTrue(createIndexRequest.mappings().contains("data_start_time")); + listener.onResponse(new RolloverResponse(null, null, Collections.emptyMap(), request.isDryRun(), true, true, true)); + return null; + }).when(indicesClient).rolloverIndex(any(), any()); + } + + private void setUpGetConfigs_withCustomResultIndexAlias() throws IOException { + IndexMetadata defaultResultIndex = IndexMetadata + .builder(".opendistro-anomaly-detectors") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + IndexMetadata customResultIndex = IndexMetadata + .builder("opensearch-ad-plugin-result-test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .putAlias(AliasMetadata.builder(ADCommonName.CUSTOM_RESULT_INDEX_PREFIX).writeIndex(true).build()) + .build(); + + clusterState = ClusterState + .builder(ClusterState.EMPTY_STATE) + .metadata(Metadata.builder().put(defaultResultIndex, false).put(customResultIndex, false).build()) + .build(); + + when(clusterService.state()).thenReturn(clusterState); + + String detectorStringWithCustomResultIndex = + "{\"name\":\"todagtCMkwpcaedpyYUM\",\"description\":\"ClrcaMpuLfeDSlVduRcKlqPZyqWDBf\"," + + "\"time_field\":\"dJRwh\",\"indices\":[\"eIrgWMqAED\"],\"feature_attributes\":[{\"feature_id\":\"lxYRN\"," + + "\"feature_name\":\"eqSeU\",\"feature_enabled\":true,\"aggregation_query\":{\"aa\":{\"value_count\":{\"field\":\"ok\"}}}}]," + + "\"detection_interval\":{\"period\":{\"interval\":425,\"unit\":\"Minutes\"}}," + + "\"window_delay\":{\"period\":{\"interval\":973,\"unit\":\"Minutes\"}},\"shingle_size\":4,\"schema_version\":-1203962153," + + "\"ui_metadata\":{\"JbAaV\":{\"feature_id\":\"rIFjS\",\"feature_name\":\"QXCmS\",\"feature_enabled\":false," + + "\"aggregation_query\":{\"aa\":{\"value_count\":{\"field\":\"ok\"}}}}},\"last_update_time\":1568396089028," + + "\"result_index\":\"opensearch-ad-plugin-result-\",\"result_index_min_size\":51200,\"result_index_min_age\":7}"; + + AnomalyDetector parsedDetector = AnomalyDetector + .parse(TestHelpers.parser(detectorStringWithCustomResultIndex), "id", 1L, null, null); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[1]; + setUpRolloverSuccessForCustomIndex(); + listener.onResponse(createSearchResponse(parsedDetector)); + return null; + }).when(client).search(any(), any()); + + } }