Skip to content

Commit

Permalink
rebase from main
Browse files Browse the repository at this point in the history
Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Aug 14, 2024
1 parent 9cb78cc commit 8356005
Show file tree
Hide file tree
Showing 17 changed files with 139 additions and 57 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ List<String> jacocoExclusions = [
'org.opensearch.timeseries.transport.CronRequest',
'org.opensearch.ad.task.ADBatchTaskCache',
'org.opensearch.timeseries.ratelimit.RateLimitedRequestWorker',
'org.opensearch.timeseries.util.TimeUtil',
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ protected ADHCImputeNodeResponse nodeOperation(ADHCImputeNodeRequest nodeRequest
long executionEndTime = dataEndMillis + windowDelayMillis;
String taskId = nodeRequest.getRequest().getTaskId();
for (ModelState<ThresholdedRandomCutForest> modelState : cache.get().getAllModels(configId)) {
// execution end time (when job starts execution in this interval) > last used time => the model state is updated in
// execution end time (when job starts execution in this interval) >= last used time => the model state is updated in
// previous intervals
if (executionEndTime > modelState.getLastUsedTime().toEpochMilli()) {
if (executionEndTime >= modelState.getLastUsedTime().toEpochMilli()) {
double[] nanArray = new double[featureSize];
Arrays.fill(nanArray, Double.NaN);
adInferencer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,19 @@

import static java.util.Collections.unmodifiableList;
import static org.opensearch.ad.constant.ADCommonName.ANOMALY_RESULT_INDEX_ALIAS;
import static org.opensearch.ad.constant.ADCommonName.CHECKPOINT_INDEX_NAME;
import static org.opensearch.ad.constant.ADCommonName.DETECTION_STATE_INDEX;
import static org.opensearch.ad.indices.ADIndexManagement.ALL_AD_RESULTS_INDEX_PATTERN;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_COOLDOWN_MINUTES;
import static org.opensearch.forecast.constant.ForecastCommonName.FORECAST_CHECKPOINT_INDEX_NAME;
import static org.opensearch.forecast.constant.ForecastCommonName.FORECAST_STATE_INDEX;
import static org.opensearch.timeseries.constant.CommonName.CONFIG_INDEX;
import static org.opensearch.timeseries.constant.CommonName.JOB_INDEX;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -253,6 +261,7 @@
import org.opensearch.forecast.transport.ValidateForecasterTransportAction;
import org.opensearch.forecast.transport.handler.ForecastIndexMemoryPressureAwareResultHandler;
import org.opensearch.forecast.transport.handler.ForecastSearchHandler;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
Expand All @@ -261,6 +270,7 @@
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.ScriptPlugin;
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
Expand Down Expand Up @@ -317,7 +327,7 @@
/**
* Entry point of time series analytics plugin.
*/
public class TimeSeriesAnalyticsPlugin extends Plugin implements ActionPlugin, ScriptPlugin, JobSchedulerExtension {
public class TimeSeriesAnalyticsPlugin extends Plugin implements ActionPlugin, ScriptPlugin, SystemIndexPlugin, JobSchedulerExtension {

private static final Logger LOG = LogManager.getLogger(TimeSeriesAnalyticsPlugin.class);

Expand Down Expand Up @@ -1695,6 +1705,19 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
);
}

@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
List<SystemIndexDescriptor> systemIndexDescriptors = new ArrayList<>();
systemIndexDescriptors.add(new SystemIndexDescriptor(CONFIG_INDEX, "Time Series Analytics config index"));
systemIndexDescriptors.add(new SystemIndexDescriptor(ALL_AD_RESULTS_INDEX_PATTERN, "AD result index pattern"));
systemIndexDescriptors.add(new SystemIndexDescriptor(CHECKPOINT_INDEX_NAME, "AD Checkpoints index"));
systemIndexDescriptors.add(new SystemIndexDescriptor(DETECTION_STATE_INDEX, "AD State index"));
systemIndexDescriptors.add(new SystemIndexDescriptor(FORECAST_CHECKPOINT_INDEX_NAME, "Forecast Checkpoints index"));
systemIndexDescriptors.add(new SystemIndexDescriptor(FORECAST_STATE_INDEX, "Forecast state index"));
systemIndexDescriptors.add(new SystemIndexDescriptor(JOB_INDEX, "Time Series Analytics job index"));
return systemIndexDescriptors;
}

@Override
public String getJobType() {
return TIME_SERIES_JOB_TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public static String getTooManyCategoricalFieldErr(int limit) {
+ " characters.";
public static final String INDEX_NOT_FOUND = "index does not exist";
public static final String FAIL_TO_GET_MAPPING_MSG = "Fail to get the index mapping of %s";
public static final String FAIL_TO_GET_CONFIG_MSG = "Fail to get config";

// ======================================
// Index message
Expand Down
35 changes: 27 additions & 8 deletions src/main/java/org/opensearch/timeseries/ml/Inferencer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import org.opensearch.timeseries.indices.TimeSeriesIndex;
import org.opensearch.timeseries.model.Config;
import org.opensearch.timeseries.model.IndexableResult;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.ratelimit.CheckpointWriteWorker;
import org.opensearch.timeseries.ratelimit.ColdStartWorker;
import org.opensearch.timeseries.ratelimit.FeatureRequest;
import org.opensearch.timeseries.ratelimit.RequestPriority;
import org.opensearch.timeseries.ratelimit.SaveResultStrategy;
import org.opensearch.timeseries.stats.Stats;
import org.opensearch.timeseries.util.TimeUtil;

import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

Expand Down Expand Up @@ -83,31 +83,43 @@ public Inferencer(
* @return whether process succeeds or not
*/
public boolean process(Sample sample, ModelState<RCFModelType> modelState, Config config, String taskId) {
long expiryEpoch = TimeUtil.calculateTimeoutMillis(config, sample.getDataEndTime().toEpochMilli());
return processWithTimeout(sample, modelState, config, taskId, expiryEpoch);
long windowDelayMillis = config.getWindowDelay() == null
? 0
: ((IntervalTimeConfiguration) config.getWindowDelay()).toDuration().toMillis();
long curExecutionEnd = sample.getDataEndTime().toEpochMilli() + windowDelayMillis;
long nextExecutionEnd = curExecutionEnd + config.getIntervalInMilliseconds();

return processWithTimeout(sample, modelState, config, taskId, curExecutionEnd, nextExecutionEnd);
}

private boolean processWithTimeout(Sample sample, ModelState<RCFModelType> modelState, Config config, String taskId, long expiryEpoch) {
private boolean processWithTimeout(
Sample sample,
ModelState<RCFModelType> modelState,
Config config,
String taskId,
long curExecutionEnd,
long nextExecutionEnd
) {
String modelId = modelState.getModelId();
ReentrantLock lock = (ReentrantLock) modelLocks.computeIfAbsent(modelId, k -> new ReentrantLock());

if (lock.tryLock()) {
try {
tryProcess(sample, modelState, config, taskId);
tryProcess(sample, modelState, config, taskId, curExecutionEnd);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return true;
} else {
if (System.currentTimeMillis() >= expiryEpoch) {
if (System.currentTimeMillis() >= nextExecutionEnd) {
LOG.warn("Timeout reached, not retrying.");
} else {
// Schedule a retry in one second
threadPool
.schedule(
() -> processWithTimeout(sample, modelState, config, taskId, expiryEpoch),
() -> processWithTimeout(sample, modelState, config, taskId, curExecutionEnd, nextExecutionEnd),
new TimeValue(1, TimeUnit.SECONDS),
threadPoolName
);
Expand All @@ -117,7 +129,14 @@ private boolean processWithTimeout(Sample sample, ModelState<RCFModelType> model
}
}

private boolean tryProcess(Sample sample, ModelState<RCFModelType> modelState, Config config, String taskId) {
private boolean tryProcess(Sample sample, ModelState<RCFModelType> modelState, Config config, String taskId, long curExecutionEnd) {
// execution end time (when job starts execution in this interval) > last used time => the model state is updated in
// previous intervals
// This can happen while scheduled to wait some other threads have already scored the same interval (e.g., during tests
// when everything happens fast)
if (curExecutionEnd < modelState.getLastUsedTime().toEpochMilli()) {
return false;
}
String modelId = modelState.getModelId();
try {
RCFResultType result = modelManager.getResult(sample, modelState, modelId, config, taskId);
Expand Down
58 changes: 29 additions & 29 deletions src/main/java/org/opensearch/timeseries/model/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,18 @@ protected Config(
}

if (imputationOption != null && imputationOption.getMethod() == ImputationMethod.FIXED_VALUES) {
Map<String, Double> defaultFill = imputationOption.getDefaultFill();
if (defaultFill.isEmpty()) {
issueType = ValidationIssueType.IMPUTATION;
errorMessage = "No given values for fixed value interpolation";
return;
}

// Calculate the number of enabled features
List<Feature> enabledFeatures = features == null
? null
: features.stream().filter(Feature::getEnabled).collect(Collectors.toList());

Map<String, Double> defaultFill = imputationOption.getDefaultFill();
if (defaultFill.isEmpty() && enabledFeatures.size() > 0) {
issueType = ValidationIssueType.IMPUTATION;
errorMessage = "No given values for fixed value imputation";
return;
}

// Check if the length of the defaultFill array matches the number of expected features
if (enabledFeatures == null || defaultFill.size() != enabledFeatures.size()) {
issueType = ValidationIssueType.IMPUTATION;
Expand Down Expand Up @@ -762,27 +762,27 @@ public static List<String> findRedundantNames(List<Feature> features) {
@Override
public String toString() {
return new ToStringBuilder(this)
.append("name", name)
.append("description", description)
.append("timeField", timeField)
.append("indices", indices)
.append("featureAttributes", featureAttributes)
.append("filterQuery", filterQuery)
.append("interval", interval)
.append("windowDelay", windowDelay)
.append("shingleSize", shingleSize)
.append("categoryFields", categoryFields)
.append("schemaVersion", schemaVersion)
.append("user", user)
.append("customResultIndex", customResultIndexOrAlias)
.append("imputationOption", imputationOption)
.append("recencyEmphasis", recencyEmphasis)
.append("seasonIntervals", seasonIntervals)
.append("historyIntervals", historyIntervals)
.append("customResultIndexMinSize", customResultIndexMinSize)
.append("customResultIndexMinAge", customResultIndexMinAge)
.append("customResultIndexTTL", customResultIndexTTL)
.append("flattenResultIndexMapping", flattenResultIndexMapping)
.toString();
.append("name", name)
.append("description", description)
.append("timeField", timeField)
.append("indices", indices)
.append("featureAttributes", featureAttributes)
.append("filterQuery", filterQuery)
.append("interval", interval)
.append("windowDelay", windowDelay)
.append("shingleSize", shingleSize)
.append("categoryFields", categoryFields)
.append("schemaVersion", schemaVersion)
.append("user", user)
.append("customResultIndex", customResultIndexOrAlias)
.append("imputationOption", imputationOption)
.append("recencyEmphasis", recencyEmphasis)
.append("seasonIntervals", seasonIntervals)
.append("historyIntervals", historyIntervals)
.append("customResultIndexMinSize", customResultIndexMinSize)
.append("customResultIndexMinAge", customResultIndexMinAge)
.append("customResultIndexTTL", customResultIndexTTL)
.append("flattenResultIndexMapping", flattenResultIndexMapping)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ protected void createConfig(boolean indexingDryRun, ActionListener<T> listener)
searchRequest,
ActionListener
.wrap(
response -> onSearchSingleStreamConfigResponse(response, indexingDryRun, listener),
response -> onSearchTotalConfigResponse(response, indexingDryRun, listener),
exception -> listener.onFailure(exception)
)
);
Expand All @@ -496,7 +496,7 @@ protected void createConfig(boolean indexingDryRun, ActionListener<T> listener)
}
}

protected void onSearchSingleStreamConfigResponse(SearchResponse response, boolean indexingDryRun, ActionListener<T> listener)
protected void onSearchTotalConfigResponse(SearchResponse response, boolean indexingDryRun, ActionListener<T> listener)
throws IOException {
if (response.getHits().getTotalHits().value >= getMaxSingleStreamConfigs()) {
String errorMsgSingleEntity = getExceedMaxSingleStreamConfigsErrorMsg(getMaxSingleStreamConfigs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package org.opensearch.timeseries.transport;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.forecast.constant.ForecastCommonMessages.FAIL_TO_GET_FORECASTER;
import static org.opensearch.timeseries.constant.CommonMessages.FAIL_TO_GET_CONFIG_MSG;
import static org.opensearch.timeseries.util.ParseUtils.resolveUserAndExecute;
import static org.opensearch.timeseries.util.RestHandlerUtils.PROFILE;
import static org.opensearch.timeseries.util.RestHandlerUtils.wrapRestActionListener;
Expand Down Expand Up @@ -161,7 +161,7 @@ public void doExecute(Task task, ActionRequest request, ActionListener<GetConfig
GetConfigRequest getConfigRequest = GetConfigRequest.fromActionRequest(request);
String configID = getConfigRequest.getConfigID();
User user = ParseUtils.getUserContext(client);
ActionListener<GetConfigResponseType> listener = wrapRestActionListener(actionListener, FAIL_TO_GET_FORECASTER);
ActionListener<GetConfigResponseType> listener = wrapRestActionListener(actionListener, FAIL_TO_GET_CONFIG_MSG);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
resolveUserAndExecute(
user,
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/mappings/anomaly-detection-state.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"dynamic": false,
"_meta": {
"schema_version": 3
"schema_version": 4
},
"properties": {
"schema_version": {
Expand Down
3 changes: 3 additions & 0 deletions src/main/resources/mappings/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@
}
}
}
},
"flatten_result_index_mapping": {
"type": "boolean"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,8 @@ public <Request extends ActionRequest, Response extends ActionResponse> void doE
null,
detector.getCustomResultIndexMinSize(),
detector.getCustomResultIndexMinAge(),
detector.getCustomResultIndexTTL()
detector.getCustomResultIndexTTL(),
false
);
try {
listener.onResponse((Response) TestHelpers.createGetResponse(clone, clone.getId(), CommonName.CONFIG_INDEX));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,5 +468,4 @@ private void verifyAnomalyDetectorCount(String uri, long expectedCount) throws E
Integer count = (Integer) responseMap.get("count");
assertEquals(expectedCount, (long) count);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public void testHCFixed() throws Exception {
}

public void testHCPrevious() throws Exception {
lastSeen.clear();
int numberOfEntities = 2;

AbstractSyntheticDataTest.MISSING_MODE mode = AbstractSyntheticDataTest.MISSING_MODE.NO_MISSING_DATA;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,33 @@ public void setUp() throws Exception {
inferencer
);

request = new FeatureRequest(Integer.MAX_VALUE, detectorId, RequestPriority.MEDIUM, new double[] { 0 }, 0, entity, null);
request2 = new FeatureRequest(Integer.MAX_VALUE, detectorId, RequestPriority.MEDIUM, new double[] { 0 }, 0, entity2, null);
request3 = new FeatureRequest(Integer.MAX_VALUE, detectorId, RequestPriority.MEDIUM, new double[] { 0 }, 0, entity3, null);
request = new FeatureRequest(
Integer.MAX_VALUE,
detectorId,
RequestPriority.MEDIUM,
new double[] { 0 },
System.currentTimeMillis(),
entity,
null
);
request2 = new FeatureRequest(
Integer.MAX_VALUE,
detectorId,
RequestPriority.MEDIUM,
new double[] { 0 },
System.currentTimeMillis(),
entity2,
null
);
request3 = new FeatureRequest(
Integer.MAX_VALUE,
detectorId,
RequestPriority.MEDIUM,
new double[] { 0 },
System.currentTimeMillis(),
entity3,
null
);
}

static class RegularSetUpConfig {
Expand Down
Loading

0 comments on commit 8356005

Please sign in to comment.