Skip to content

Commit

Permalink
Enforce DOCUMENT Replication for AD Indices and Adjust Primary Shards (
Browse files Browse the repository at this point in the history
…#948)

* Enforce DOCUMENT Replication for AD Indices and Adjust Primary Shards

In this PR, we temporarily enforce DOCUMENT replication for AD indices. This change is necessary due to the current limitation of SegRep, which doesn't support Get/MultiGet by ID. This measure will be in place until SegRep adds support for these operations.

This adjustment aligns with the modification made in the referenced PR: job-scheduler PR #417

Additionally, this PR increases the number of primary shards for forecasting indices from 10 to 20, recognizing the higher write intensity of these indices. For instance, when the horizon is 24, we are expected to save 25 documents: 24 for each forecast and 1 for the actual value. In contrast, for AD, we save one document per actual value.

Testing done:
1. gradle build passed

Signed-off-by: Kaituo Li <[email protected]>

* Fix comments

Signed-off-by: Kaituo Li <[email protected]>

---------

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo authored Jul 11, 2023
1 parent a120382 commit bc16499
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 11 deletions.
15 changes: 12 additions & 3 deletions src/main/java/org/opensearch/ad/indices/ADIndexManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static org.opensearch.ad.settings.AnomalyDetectorSettings.ANOMALY_DETECTION_STATE_INDEX_MAPPING_FILE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.ANOMALY_RESULTS_INDEX_MAPPING_FILE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.CHECKPOINT_INDEX_MAPPING_FILE;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.indices.replication.common.ReplicationType.DOCUMENT;

import java.io.IOException;
import java.util.EnumMap;
Expand Down Expand Up @@ -64,7 +66,7 @@ public class ADIndexManagement extends IndexManagement<ADIndex> {
* @param settings OS cluster setting
* @param nodeFilter Used to filter eligible nodes to host AD indices
* @param maxUpdateRunningTimes max number of retries to update index mapping and setting
* @throws IOException
* @throws IOException when failing to get mapping file
*/
public ADIndexManagement(
Client client,
Expand Down Expand Up @@ -195,7 +197,10 @@ public void initDefaultResultIndexDirectly(ActionListener<CreateIndexResponse> a
@Override
public void initStateIndex(ActionListener<CreateIndexResponse> actionListener) {
try {
CreateIndexRequest request = new CreateIndexRequest(ADCommonName.DETECTION_STATE_INDEX)
// AD indices need RAW (e.g., we want users to be able to consume AD results as soon as possible and send out an alert if
// anomalies found).
Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build();
CreateIndexRequest request = new CreateIndexRequest(ADCommonName.DETECTION_STATE_INDEX, replicationSettings)
.mapping(getStateMappings(), XContentType.JSON)
.settings(settings);
adminClient.indices().create(request, markMappingUpToDate(ADIndex.STATE, actionListener));
Expand All @@ -219,7 +224,11 @@ public void initCheckpointIndex(ActionListener<CreateIndexResponse> actionListen
} catch (IOException e) {
throw new EndRunException("", "Cannot find checkpoint mapping file", true);
}
CreateIndexRequest request = new CreateIndexRequest(ADCommonName.CHECKPOINT_INDEX_NAME).mapping(mapping, XContentType.JSON);
// AD indices need RAW (e.g., we want users to be able to consume AD results as soon as possible and send out an alert if anomalies
// found).
Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build();
CreateIndexRequest request = new CreateIndexRequest(ADCommonName.CHECKPOINT_INDEX_NAME, replicationSettings)
.mapping(mapping, XContentType.JSON);
choosePrimaryShards(request, true);
adminClient.indices().create(request, markMappingUpToDate(ADIndex.CHECKPOINT, actionListener));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package org.opensearch.forecast.indices;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.forecast.constant.ForecastCommonName.DUMMY_FORECAST_RESULT_ID;
import static org.opensearch.forecast.settings.ForecastSettings.FORECAST_CHECKPOINT_INDEX_MAPPING_FILE;
import static org.opensearch.forecast.settings.ForecastSettings.FORECAST_MAX_PRIMARY_SHARDS;
Expand All @@ -19,6 +20,7 @@
import static org.opensearch.forecast.settings.ForecastSettings.FORECAST_RESULT_HISTORY_RETENTION_PERIOD;
import static org.opensearch.forecast.settings.ForecastSettings.FORECAST_RESULT_HISTORY_ROLLOVER_PERIOD;
import static org.opensearch.forecast.settings.ForecastSettings.FORECAST_STATE_INDEX_MAPPING_FILE;
import static org.opensearch.indices.replication.common.ReplicationType.DOCUMENT;

import java.io.IOException;
import java.util.EnumMap;
Expand Down Expand Up @@ -61,7 +63,7 @@ public class ForecastIndexManagement extends IndexManagement<ForecastIndex> {
* @param settings OS cluster setting
* @param nodeFilter Used to filter eligible nodes to host forecast indices
* @param maxUpdateRunningTimes max number of retries to update index mapping and setting
* @throws IOException
* @throws IOException when failing to get mapping file
*/
public ForecastIndexManagement(
Client client,
Expand Down Expand Up @@ -177,7 +179,8 @@ public boolean doesCheckpointIndexExist() {
@Override
public void initStateIndex(ActionListener<CreateIndexResponse> actionListener) {
try {
CreateIndexRequest request = new CreateIndexRequest(ForecastCommonName.FORECAST_STATE_INDEX)
Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build();
CreateIndexRequest request = new CreateIndexRequest(ForecastCommonName.FORECAST_STATE_INDEX, replicationSettings)
.mapping(getStateMappings(), XContentType.JSON)
.settings(settings);
adminClient.indices().create(request, markMappingUpToDate(ForecastIndex.STATE, actionListener));
Expand All @@ -201,7 +204,10 @@ public void initCheckpointIndex(ActionListener<CreateIndexResponse> actionListen
} catch (IOException e) {
throw new EndRunException("", "Cannot find checkpoint mapping file", true);
}
CreateIndexRequest request = new CreateIndexRequest(ForecastCommonName.FORECAST_CHECKPOINT_INDEX_NAME)
// forecast indices need RAW (e.g., we want users to be able to consume forecast results as soon as
// possible and send out an alert if a threshold is breached).
Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build();
CreateIndexRequest request = new CreateIndexRequest(ForecastCommonName.FORECAST_CHECKPOINT_INDEX_NAME, replicationSettings)
.mapping(mapping, XContentType.JSON);
choosePrimaryShards(request, true);
adminClient.indices().create(request, markMappingUpToDate(ForecastIndex.CHECKPOINT, actionListener));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public final class ForecastSettings {

// max number of primary shards of a forecast index
public static final Setting<Integer> FORECAST_MAX_PRIMARY_SHARDS = Setting
.intSetting("plugins.forecast.max_primary_shards", 10, 0, 200, Setting.Property.NodeScope, Setting.Property.Dynamic);
.intSetting("plugins.forecast.max_primary_shards", 20, 0, 200, Setting.Property.NodeScope, Setting.Property.Dynamic);

// ======================================
// Security
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

package org.opensearch.timeseries.indices;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.indices.replication.common.ReplicationType.DOCUMENT;
import static org.opensearch.timeseries.constant.CommonMessages.CAN_NOT_FIND_RESULT_INDEX;

import java.io.IOException;
Expand Down Expand Up @@ -426,7 +428,10 @@ public void initConfigIndexIfAbsent(ActionListener<CreateIndexResponse> actionLi
* @throws IOException IOException from {@link IndexManagement#getConfigMappings}
*/
public void initConfigIndex(ActionListener<CreateIndexResponse> actionListener) throws IOException {
CreateIndexRequest request = new CreateIndexRequest(CommonName.CONFIG_INDEX)
// time series indices need RAW (e.g., we want users to be able to consume AD results as soon as possible
// and send out an alert if anomalies found).
Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build();
CreateIndexRequest request = new CreateIndexRequest(CommonName.CONFIG_INDEX, replicationSettings)
.mapping(getConfigMappings(), XContentType.JSON)
.settings(settings);
adminClient.indices().create(request, actionListener);
Expand Down Expand Up @@ -477,7 +482,11 @@ public static String getJobMappings() throws IOException {
*/
public void initJobIndex(ActionListener<CreateIndexResponse> actionListener) {
try {
CreateIndexRequest request = new CreateIndexRequest(CommonName.JOB_INDEX).mapping(getJobMappings(), XContentType.JSON);
// time series indices need RAW (e.g., we want users to be able to consume AD results as soon as
// possible and send out an alert if anomalies found).
Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build();
CreateIndexRequest request = new CreateIndexRequest(CommonName.JOB_INDEX, replicationSettings)
.mapping(getJobMappings(), XContentType.JSON);
request
.settings(
Settings
Expand Down Expand Up @@ -928,7 +937,10 @@ protected void rolloverAndDeleteHistoryIndex(

CreateIndexRequest createRequest = rollOverRequest.getCreateIndexRequest();

createRequest.index(rolloverIndexPattern).mapping(resultMapping, XContentType.JSON);
// time series indices need RAW (e.g., we want users to be able to consume AD results as soon as possible
// and send out an alert if anomalies found).
Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build();
createRequest.index(rolloverIndexPattern).settings(replicationSettings).mapping(resultMapping, XContentType.JSON);

choosePrimaryShards(createRequest, true);

Expand All @@ -953,7 +965,10 @@ protected void initResultIndexDirectly(
IndexType resultIndex,
ActionListener<CreateIndexResponse> actionListener
) {
CreateIndexRequest request = new CreateIndexRequest(resultIndexName).mapping(resultMapping, XContentType.JSON);
// time series indices need RAW (e.g., we want users to be able to consume AD results as soon as possible
// and send out an alert if anomalies found).
Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build();
CreateIndexRequest request = new CreateIndexRequest(resultIndexName, replicationSettings).mapping(resultMapping, XContentType.JSON);
if (alias != null) {
request.alias(new Alias(alias));
}
Expand Down

0 comments on commit bc16499

Please sign in to comment.