Skip to content

Commit

Permalink
add custom result index lifecycle management
Browse files Browse the repository at this point in the history
Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed Jun 10, 2024
1 parent 4d400e3 commit b6faf90
Show file tree
Hide file tree
Showing 2 changed files with 235 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package org.opensearch.timeseries.indices;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.forecast.constant.ForecastCommonName.CUSTOM_RESULT_INDEX_PREFIX;
import static org.opensearch.timeseries.util.RestHandlerUtils.createXContentParserFromRegistry;

import java.io.IOException;
Expand Down Expand Up @@ -51,6 +52,7 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.cluster.LocalNodeClusterManagerListener;
Expand All @@ -66,6 +68,8 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParser.Token;
Expand Down Expand Up @@ -298,7 +302,8 @@ protected void deleteOldHistoryIndices(String indexPattern, TimeValue historyRet
long latest = Long.MIN_VALUE;
for (IndexMetadata indexMetaData : clusterStateResponse.getState().metadata().indices().values()) {
long creationTime = indexMetaData.getCreationDate();
if ((Instant.now().toEpochMilli() - creationTime) > historyRetentionPeriod.millis()) {
long indexAgeMillis = Instant.now().toEpochMilli() - creationTime;
if (indexAgeMillis > historyRetentionPeriod.millis()) {
String indexName = indexMetaData.getIndex().getName();
candidates.add(indexName);
if (latest < creationTime) {
Expand Down Expand Up @@ -1100,6 +1105,7 @@ protected void rescheduleRollover() {
if (scheduledRollover != null) {
scheduledRollover.cancel();
}

scheduledRollover = threadPool
.scheduleWithFixedDelay(() -> rolloverAndDeleteHistoryIndex(), historyRolloverPeriod, executorName());
}
Expand Down Expand Up @@ -1234,35 +1240,94 @@ protected void rolloverAndDeleteHistoryIndex(
String rolloverIndexPattern,
IndexType resultIndex
) {
if (!doesDefaultResultIndexExist()) {
return;
// perform rollover and delete on default result index
if (doesDefaultResultIndexExist()) {
RolloverRequest defaultResultIndexRolloverRequest = buildRolloverRequest(resultIndexAlias, rolloverIndexPattern);
defaultResultIndexRolloverRequest.addMaxIndexDocsCondition(historyMaxDocs * getNumberOfPrimaryShards());
proceedWithDefaultRolloverAndDelete(resultIndexAlias, defaultResultIndexRolloverRequest, allResultIndicesPattern, resultIndex);
}
// get config files that have custom result index alias to perform rollover on
getConfigsWithCustomResultIndexAlias(ActionListener.wrap(candidateResultAliases -> {
if (candidateResultAliases == null || candidateResultAliases.isEmpty()) {
logger.info("Candidate custom result indices are empty.");
return;
}

// We have to pass null for newIndexName in order to get Elastic to increment the index count.
RolloverRequest rollOverRequest = new RolloverRequest(resultIndexAlias, null);
// perform rollover and delete on found custom result index alias
candidateResultAliases.forEach(config -> handleCustomResultIndex(config, resultIndex));
}, e -> { logger.error("Failed to get configs with custom result index alias.", e); }));
}

private void handleCustomResultIndex(Config config, IndexType resultIndex) {
RolloverRequest rolloverRequest = buildRolloverRequest(
config.getCustomResultIndexOrAlias(),
getCustomResultIndexPattern(config.getCustomResultIndexOrAlias())
);

// add rollover conditions if found in config
if (config.getCustomResultIndexMinAge() != null) {
rolloverRequest.addMaxIndexAgeCondition(TimeValue.timeValueDays(config.getCustomResultIndexMinAge()));
}
if (config.getCustomResultIndexMinSize() != null) {
rolloverRequest.addMaxIndexSizeCondition(new ByteSizeValue(config.getCustomResultIndexMinSize(), ByteSizeUnit.MB));
}

// perform rollover and delete on custom result index alias
proceedWithRolloverAndDelete(
config.getCustomResultIndexOrAlias(),
rolloverRequest,
getAllCustomResultIndexPattern(config.getCustomResultIndexOrAlias()),
resultIndex,
config.getCustomResultIndexTTL()
);
}

private void proceedWithDefaultRolloverAndDelete(
String resultIndexAlias,
RolloverRequest rolloverRequest,
String allResultIndicesPattern,
IndexType resultIndex
) {
proceedWithRolloverAndDelete(resultIndexAlias, rolloverRequest, allResultIndicesPattern, resultIndex, null);
}

private RolloverRequest buildRolloverRequest(String resultIndexAlias, String rolloverIndexPattern) {
RolloverRequest rollOverRequest = new RolloverRequest(resultIndexAlias, null);
CreateIndexRequest createRequest = rollOverRequest.getCreateIndexRequest();

createRequest.index(rolloverIndexPattern).mapping(resultMapping, XContentType.JSON);

choosePrimaryShards(createRequest, true);

rollOverRequest.addMaxIndexDocsCondition(historyMaxDocs * getNumberOfPrimaryShards());
return rollOverRequest;
}

private void proceedWithRolloverAndDelete(
String resultIndexAlias,
RolloverRequest rollOverRequest,
String allResultIndicesPattern,
IndexType resultIndex,
Integer customResultIndexTtl
) {
adminClient.indices().rolloverIndex(rollOverRequest, ActionListener.wrap(response -> {
if (!response.isRolledOver()) {
logger.warn("{} not rolled over. Conditions were: {}", resultIndexAlias, response.getConditionStatus());
} else {
IndexState indexStatetate = indexStates.computeIfAbsent(resultIndex, k -> new IndexState(k.getMapping()));
indexStatetate.mappingUpToDate = true;
IndexState indexState = indexStates.computeIfAbsent(resultIndex, k -> new IndexState(k.getMapping()));
indexState.mappingUpToDate = true;
logger.info("{} rolled over. Conditions were: {}", resultIndexAlias, response.getConditionStatus());
deleteOldHistoryIndices(allResultIndicesPattern, historyRetentionPeriod);
if (resultIndexAlias.startsWith(ADCommonName.CUSTOM_RESULT_INDEX_PREFIX)
|| resultIndexAlias.startsWith(CUSTOM_RESULT_INDEX_PREFIX)) {
// handle custom result index deletion
if (customResultIndexTtl != null) {
deleteOldHistoryIndices(allResultIndicesPattern, TimeValue.timeValueHours(customResultIndexTtl * 24));

}
} else {
// handle default result index deletion
deleteOldHistoryIndices(allResultIndicesPattern, historyRetentionPeriod);
}
}
}, exception -> {
// e.g., we may roll over too often. Since the index pattern is opensearch-ad-plugin-result-d-history-{now/d}-000001,
// we cannot roll over twice in the same day as the index with the same name exists. We will get
// resource_already_exists_exception.
logger.error("Fail to roll over result index", exception);
}));
}, exception -> { logger.error("Fail to roll over result index", exception); }));
}

protected void initResultIndexDirectly(
Expand Down
156 changes: 154 additions & 2 deletions src/test/java/org/opensearch/ad/indices/RolloverTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,54 +18,75 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.timeseries.TestHelpers.createSearchResponse;

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;

import org.apache.lucene.search.TotalHits;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.rollover.Condition;
import org.opensearch.action.admin.indices.rollover.MaxAgeCondition;
import org.opensearch.action.admin.indices.rollover.MaxDocsCondition;
import org.opensearch.action.admin.indices.rollover.MaxSizeCondition;
import org.opensearch.action.admin.indices.rollover.RolloverRequest;
import org.opensearch.action.admin.indices.rollover.RolloverResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.client.IndicesAdminClient;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.AliasMetadata;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AbstractTimeSeriesTest;
import org.opensearch.timeseries.TestHelpers;
import org.opensearch.timeseries.settings.TimeSeriesSettings;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;

public class RolloverTests extends AbstractTimeSeriesTest {
private ADIndexManagement adIndices;
private IndicesAdminClient indicesClient;
private ClusterAdminClient clusterAdminClient;
private Client client;
private ClusterName clusterName;
private ClusterState clusterState;
private ClusterService clusterService;
private NamedXContentRegistry namedXContentRegistry;
private long defaultMaxDocs;
private int numberOfNodes;

@Override
public void setUp() throws Exception {
super.setUp();
Client client = mock(Client.class);
client = mock(Client.class);
indicesClient = mock(IndicesAdminClient.class);
AdminClient adminClient = mock(AdminClient.class);
clusterService = mock(ClusterService.class);
Expand Down Expand Up @@ -98,14 +119,16 @@ public void setUp() throws Exception {
numberOfNodes = 2;
when(nodeFilter.getNumberOfEligibleDataNodes()).thenReturn(numberOfNodes);

namedXContentRegistry = TestHelpers.xContentRegistry();

adIndices = new ADIndexManagement(
client,
clusterService,
threadPool,
settings,
nodeFilter,
TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES,
NamedXContentRegistry.EMPTY
namedXContentRegistry
);

clusterAdminClient = mock(ClusterAdminClient.class);
Expand Down Expand Up @@ -248,4 +271,133 @@ public void testRetryingDelete() {
// 1 group delete, 1 separate retry for each index to delete
verify(indicesClient, times(2)).delete(any(), any());
}

public void testNoCustomResultIndexFound_RolloverDefaultResultIndex_shouldSucceed() {
setUpRolloverSuccess();
setUpGetConfigs_withNoCustomResultIndexAlias();

adIndices.rolloverAndDeleteHistoryIndex();
verify(indicesClient, times(1)).rolloverIndex(any(), any());
verify(client, times(1)).search(any(), any());
}

public void testCustomResultIndexFound_RolloverCustomResultIndex_withConditions_shouldSucceed() throws IOException {
setUpGetConfigs_withCustomResultIndexAlias();

adIndices.rolloverAndDeleteHistoryIndex();

verify(indicesClient, times(1)).rolloverIndex(any(), any());
verify(client, times(1)).search(any(), any());
}

private void setUpGetConfigs_withNoCustomResultIndexAlias() {
Metadata.Builder metaBuilder = Metadata
.builder()
.put(indexMeta(".opendistro-anomaly-detectors", 1L, ADCommonName.ANOMALY_RESULT_INDEX_ALIAS), true);
clusterState = ClusterState.builder(clusterName).metadata(metaBuilder.build()).build();
when(clusterService.state()).thenReturn(clusterState);

String detectorString = "{\"name\":\"AhtYYGWTgqkzairTchcs\",\"description\":\"iIiAVPMyFgnFlEniLbMyfJxyoGvJAl\","
+ "\"time_field\":\"HmdFH\",\"indices\":[\"ffsBF\"],\"filter_query\":{\"bool\":{\"filter\":[{\"exists\":"
+ "{\"field\":\"value\",\"boost\":1}}],\"adjust_pure_negative\":true,\"boost\":1}},\"window_delay\":"
+ "{\"period\":{\"interval\":2,\"unit\":\"Minutes\"}},\"shingle_size\":8,\"schema_version\":-512063255,"
+ "\"feature_attributes\":[{\"feature_id\":\"OTYJs\",\"feature_name\":\"eYYCM\",\"feature_enabled\":false,"
+ "\"aggregation_query\":{\"XzewX\":{\"value_count\":{\"field\":\"ok\"}}}}],\"recency_emphasis\":3342,"
+ "\"history\":62,\"last_update_time\":1717192049845,\"category_field\":[\"Tcqcb\"],\"customResultIndexOrAlias\":"
+ "\"\",\"imputation_option\":{\"method\":\"FIXED_VALUES\",\"defaultFill\""
+ ":[],\"integerSensitive\":false},\"suggested_seasonality\":64,\"detection_interval\":{\"period\":"
+ "{\"interval\":5,\"unit\":\"Minutes\"}},\"detector_type\":\"MULTI_ENTITY\",\"rules\":[]}";

doAnswer(invocation -> {
ActionListener<SearchResponse> listener = invocation.getArgument(1);
SearchHit config = SearchHit.fromXContent(TestHelpers.parser(detectorString));
SearchHits searchHits = new SearchHits(new SearchHit[] { config }, new TotalHits(1, TotalHits.Relation.EQUAL_TO), Float.NaN);
InternalSearchResponse response = new InternalSearchResponse(
searchHits,
InternalAggregations.EMPTY,
null,
null,
false,
null,
1
);
SearchResponse searchResponse = new SearchResponse(
response,
null,
1,
1,
0,
100,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY
);
listener.onResponse(searchResponse);
return null;
}).when(client).search(any(), any());
}

private void setUpRolloverSuccessForCustomIndex() {
doAnswer(invocation -> {
RolloverRequest request = invocation.getArgument(0);
@SuppressWarnings("unchecked")
ActionListener<RolloverResponse> listener = (ActionListener<RolloverResponse>) invocation.getArgument(1);

assertEquals("opensearch-ad-plugin-result-", request.indices()[0]);
Map<String, Condition<?>> conditions = request.getConditions();
assertEquals(2, conditions.size());
assertEquals(new MaxAgeCondition(TimeValue.timeValueDays(7)), conditions.get(MaxAgeCondition.NAME));
assertEquals(new MaxSizeCondition(new ByteSizeValue(51200, ByteSizeUnit.MB)), conditions.get(MaxSizeCondition.NAME));

CreateIndexRequest createIndexRequest = request.getCreateIndexRequest();
assertEquals("<opensearch-ad-plugin-result--history-{now/d}-1>", createIndexRequest.index());
assertTrue(createIndexRequest.mappings().contains("data_start_time"));
listener.onResponse(new RolloverResponse(null, null, Collections.emptyMap(), request.isDryRun(), true, true, true));
return null;
}).when(indicesClient).rolloverIndex(any(), any());
}

private void setUpGetConfigs_withCustomResultIndexAlias() throws IOException {
IndexMetadata defaultResultIndex = IndexMetadata
.builder(".opendistro-anomaly-detectors")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0)
.build();
IndexMetadata customResultIndex = IndexMetadata
.builder("opensearch-ad-plugin-result-test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0)
.putAlias(AliasMetadata.builder(ADCommonName.CUSTOM_RESULT_INDEX_PREFIX).writeIndex(true).build())
.build();

clusterState = ClusterState
.builder(ClusterState.EMPTY_STATE)
.metadata(Metadata.builder().put(defaultResultIndex, false).put(customResultIndex, false).build())
.build();

when(clusterService.state()).thenReturn(clusterState);

String detectorStringWithCustomResultIndex =
"{\"name\":\"todagtCMkwpcaedpyYUM\",\"description\":\"ClrcaMpuLfeDSlVduRcKlqPZyqWDBf\","
+ "\"time_field\":\"dJRwh\",\"indices\":[\"eIrgWMqAED\"],\"feature_attributes\":[{\"feature_id\":\"lxYRN\","
+ "\"feature_name\":\"eqSeU\",\"feature_enabled\":true,\"aggregation_query\":{\"aa\":{\"value_count\":{\"field\":\"ok\"}}}}],"
+ "\"detection_interval\":{\"period\":{\"interval\":425,\"unit\":\"Minutes\"}},"
+ "\"window_delay\":{\"period\":{\"interval\":973,\"unit\":\"Minutes\"}},\"shingle_size\":4,\"schema_version\":-1203962153,"
+ "\"ui_metadata\":{\"JbAaV\":{\"feature_id\":\"rIFjS\",\"feature_name\":\"QXCmS\",\"feature_enabled\":false,"
+ "\"aggregation_query\":{\"aa\":{\"value_count\":{\"field\":\"ok\"}}}}},\"last_update_time\":1568396089028,"
+ "\"result_index\":\"opensearch-ad-plugin-result-\",\"result_index_min_size\":51200,\"result_index_min_age\":7}";

AnomalyDetector parsedDetector = AnomalyDetector
.parse(TestHelpers.parser(detectorStringWithCustomResultIndex), "id", 1L, null, null);

doAnswer(invocation -> {
Object[] args = invocation.getArguments();
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) args[1];
setUpRolloverSuccessForCustomIndex();
listener.onResponse(createSearchResponse(parsedDetector));
return null;
}).when(client).search(any(), any());

}
}

0 comments on commit b6faf90

Please sign in to comment.