diff --git a/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java b/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java index 918107073..ef534621d 100644 --- a/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java +++ b/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java @@ -19,6 +19,8 @@ import static org.opensearch.ad.settings.AnomalyDetectorSettings.ANOMALY_DETECTION_STATE_INDEX_MAPPING_FILE; import static org.opensearch.ad.settings.AnomalyDetectorSettings.ANOMALY_RESULTS_INDEX_MAPPING_FILE; import static org.opensearch.ad.settings.AnomalyDetectorSettings.CHECKPOINT_INDEX_MAPPING_FILE; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; +import static org.opensearch.indices.replication.common.ReplicationType.DOCUMENT; import java.io.IOException; import java.util.EnumMap; @@ -64,7 +66,7 @@ public class ADIndexManagement extends IndexManagement { * @param settings OS cluster setting * @param nodeFilter Used to filter eligible nodes to host AD indices * @param maxUpdateRunningTimes max number of retries to update index mapping and setting - * @throws IOException + * @throws IOException when failing to get mapping file */ public ADIndexManagement( Client client, @@ -195,7 +197,10 @@ public void initDefaultResultIndexDirectly(ActionListener a @Override public void initStateIndex(ActionListener actionListener) { try { - CreateIndexRequest request = new CreateIndexRequest(ADCommonName.DETECTION_STATE_INDEX) + // AD indices need RAW (e.g., we want users to be able to consume AD results as soon as possible and send out an alert if + // anomalies found). + Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build(); + CreateIndexRequest request = new CreateIndexRequest(ADCommonName.DETECTION_STATE_INDEX, replicationSettings) .mapping(getStateMappings(), XContentType.JSON) .settings(settings); adminClient.indices().create(request, markMappingUpToDate(ADIndex.STATE, actionListener)); @@ -219,7 +224,11 @@ public void initCheckpointIndex(ActionListener actionListen } catch (IOException e) { throw new EndRunException("", "Cannot find checkpoint mapping file", true); } - CreateIndexRequest request = new CreateIndexRequest(ADCommonName.CHECKPOINT_INDEX_NAME).mapping(mapping, XContentType.JSON); + // AD indices need RAW (e.g., we want users to be able to consume AD results as soon as possible and send out an alert if anomalies + // found). + Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build(); + CreateIndexRequest request = new CreateIndexRequest(ADCommonName.CHECKPOINT_INDEX_NAME, replicationSettings) + .mapping(mapping, XContentType.JSON); choosePrimaryShards(request, true); adminClient.indices().create(request, markMappingUpToDate(ADIndex.CHECKPOINT, actionListener)); } diff --git a/src/main/java/org/opensearch/forecast/indices/ForecastIndexManagement.java b/src/main/java/org/opensearch/forecast/indices/ForecastIndexManagement.java index 0cde7b28f..db8b40d42 100644 --- a/src/main/java/org/opensearch/forecast/indices/ForecastIndexManagement.java +++ b/src/main/java/org/opensearch/forecast/indices/ForecastIndexManagement.java @@ -11,6 +11,7 @@ package org.opensearch.forecast.indices; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; import static org.opensearch.forecast.constant.ForecastCommonName.DUMMY_FORECAST_RESULT_ID; import static org.opensearch.forecast.settings.ForecastSettings.FORECAST_CHECKPOINT_INDEX_MAPPING_FILE; import static org.opensearch.forecast.settings.ForecastSettings.FORECAST_MAX_PRIMARY_SHARDS; @@ -19,6 +20,7 @@ import static org.opensearch.forecast.settings.ForecastSettings.FORECAST_RESULT_HISTORY_RETENTION_PERIOD; import static org.opensearch.forecast.settings.ForecastSettings.FORECAST_RESULT_HISTORY_ROLLOVER_PERIOD; import static org.opensearch.forecast.settings.ForecastSettings.FORECAST_STATE_INDEX_MAPPING_FILE; +import static org.opensearch.indices.replication.common.ReplicationType.DOCUMENT; import java.io.IOException; import java.util.EnumMap; @@ -61,7 +63,7 @@ public class ForecastIndexManagement extends IndexManagement { * @param settings OS cluster setting * @param nodeFilter Used to filter eligible nodes to host forecast indices * @param maxUpdateRunningTimes max number of retries to update index mapping and setting - * @throws IOException + * @throws IOException when failing to get mapping file */ public ForecastIndexManagement( Client client, @@ -177,7 +179,8 @@ public boolean doesCheckpointIndexExist() { @Override public void initStateIndex(ActionListener actionListener) { try { - CreateIndexRequest request = new CreateIndexRequest(ForecastCommonName.FORECAST_STATE_INDEX) + Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build(); + CreateIndexRequest request = new CreateIndexRequest(ForecastCommonName.FORECAST_STATE_INDEX, replicationSettings) .mapping(getStateMappings(), XContentType.JSON) .settings(settings); adminClient.indices().create(request, markMappingUpToDate(ForecastIndex.STATE, actionListener)); @@ -201,7 +204,10 @@ public void initCheckpointIndex(ActionListener actionListen } catch (IOException e) { throw new EndRunException("", "Cannot find checkpoint mapping file", true); } - CreateIndexRequest request = new CreateIndexRequest(ForecastCommonName.FORECAST_CHECKPOINT_INDEX_NAME) + // forecast indices need RAW (e.g., we want users to be able to consume forecast results as soon as + // possible and send out an alert if a threshold is breached). + Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build(); + CreateIndexRequest request = new CreateIndexRequest(ForecastCommonName.FORECAST_CHECKPOINT_INDEX_NAME, replicationSettings) .mapping(mapping, XContentType.JSON); choosePrimaryShards(request, true); adminClient.indices().create(request, markMappingUpToDate(ForecastIndex.CHECKPOINT, actionListener)); diff --git a/src/main/java/org/opensearch/forecast/settings/ForecastSettings.java b/src/main/java/org/opensearch/forecast/settings/ForecastSettings.java index 58952ecd8..b43543cbf 100644 --- a/src/main/java/org/opensearch/forecast/settings/ForecastSettings.java +++ b/src/main/java/org/opensearch/forecast/settings/ForecastSettings.java @@ -111,7 +111,7 @@ public final class ForecastSettings { // max number of primary shards of a forecast index public static final Setting FORECAST_MAX_PRIMARY_SHARDS = Setting - .intSetting("plugins.forecast.max_primary_shards", 10, 0, 200, Setting.Property.NodeScope, Setting.Property.Dynamic); + .intSetting("plugins.forecast.max_primary_shards", 20, 0, 200, Setting.Property.NodeScope, Setting.Property.Dynamic); // ====================================== // Security diff --git a/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java b/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java index 5fefd6415..d342f3f85 100644 --- a/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java +++ b/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java @@ -11,6 +11,8 @@ package org.opensearch.timeseries.indices; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; +import static org.opensearch.indices.replication.common.ReplicationType.DOCUMENT; import static org.opensearch.timeseries.constant.CommonMessages.CAN_NOT_FIND_RESULT_INDEX; import java.io.IOException; @@ -426,7 +428,10 @@ public void initConfigIndexIfAbsent(ActionListener actionLi * @throws IOException IOException from {@link IndexManagement#getConfigMappings} */ public void initConfigIndex(ActionListener actionListener) throws IOException { - CreateIndexRequest request = new CreateIndexRequest(CommonName.CONFIG_INDEX) + // time series indices need RAW (e.g., we want users to be able to consume AD results as soon as possible + // and send out an alert if anomalies found). + Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build(); + CreateIndexRequest request = new CreateIndexRequest(CommonName.CONFIG_INDEX, replicationSettings) .mapping(getConfigMappings(), XContentType.JSON) .settings(settings); adminClient.indices().create(request, actionListener); @@ -477,7 +482,11 @@ public static String getJobMappings() throws IOException { */ public void initJobIndex(ActionListener actionListener) { try { - CreateIndexRequest request = new CreateIndexRequest(CommonName.JOB_INDEX).mapping(getJobMappings(), XContentType.JSON); + // time series indices need RAW (e.g., we want users to be able to consume AD results as soon as + // possible and send out an alert if anomalies found). + Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build(); + CreateIndexRequest request = new CreateIndexRequest(CommonName.JOB_INDEX, replicationSettings) + .mapping(getJobMappings(), XContentType.JSON); request .settings( Settings @@ -928,7 +937,10 @@ protected void rolloverAndDeleteHistoryIndex( CreateIndexRequest createRequest = rollOverRequest.getCreateIndexRequest(); - createRequest.index(rolloverIndexPattern).mapping(resultMapping, XContentType.JSON); + // time series indices need RAW (e.g., we want users to be able to consume AD results as soon as possible + // and send out an alert if anomalies found). + Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build(); + createRequest.index(rolloverIndexPattern).settings(replicationSettings).mapping(resultMapping, XContentType.JSON); choosePrimaryShards(createRequest, true); @@ -953,7 +965,10 @@ protected void initResultIndexDirectly( IndexType resultIndex, ActionListener actionListener ) { - CreateIndexRequest request = new CreateIndexRequest(resultIndexName).mapping(resultMapping, XContentType.JSON); + // time series indices need RAW (e.g., we want users to be able to consume AD results as soon as possible + // and send out an alert if anomalies found). + Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build(); + CreateIndexRequest request = new CreateIndexRequest(resultIndexName, replicationSettings).mapping(resultMapping, XContentType.JSON); if (alias != null) { request.alias(new Alias(alias)); }