From f851715d9b8c8a028ca5a12484a361321b48e05a Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Mon, 10 Jul 2023 03:51:41 -0700 Subject: [PATCH] add audit alerts in bucket level monitor Signed-off-by: Surya Sashank Nistala --- .../org/opensearch/alerting/AlertService.kt | 33 +++++++++++++------ .../alerting/BucketLevelMonitorRunner.kt | 2 +- .../alerting/MonitorRunnerService.kt | 2 ++ .../alerting/QueryLevelMonitorRunner.kt | 2 +- .../workflow/CompositeWorkflowRunner.kt | 1 - .../alerting/AlertingRestTestCase.kt | 1 - 6 files changed, 27 insertions(+), 14 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 2bdd7abf7..f1e5ad4a6 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -83,10 +83,11 @@ class AlertService( private val logger = LogManager.getLogger(AlertService::class.java) - suspend fun loadCurrentAlertsForQueryLevelMonitor(monitor: Monitor): Map { + suspend fun loadCurrentAlertsForQueryLevelMonitor(monitor: Monitor, workflowRunContext: WorkflowRunContext?): Map { val searchAlertsResponse: SearchResponse = searchAlerts( monitor = monitor, - size = monitor.triggers.size * 2 // We expect there to be only a single in-progress alert so fetch 2 to check + size = monitor.triggers.size * 2, // We expect there to be only a single in-progress alert so fetch 2 to check + workflowRunContext ) val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) } @@ -102,11 +103,15 @@ class AlertService( } } - suspend fun loadCurrentAlertsForBucketLevelMonitor(monitor: Monitor): Map> { + suspend fun loadCurrentAlertsForBucketLevelMonitor( + monitor: Monitor, + workflowRunContext: WorkflowRunContext?, + ): Map> { val searchAlertsResponse: SearchResponse = searchAlerts( monitor = monitor, // TODO: This should be limited based on a circuit breaker that limits Alerts - size = MAX_BUCKET_LEVEL_MONITOR_ALERT_SEARCH_COUNT + size = MAX_BUCKET_LEVEL_MONITOR_ALERT_SEARCH_COUNT, + workflowRunContext = workflowRunContext ) val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) } @@ -335,9 +340,12 @@ class AlertService( currentAlerts.remove(aggAlertBucket.getBucketKeysHash()) } else { // New Alert + val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) { + Alert.State.AUDIT + } else Alert.State.ACTIVE val newAlert = Alert( monitor = monitor, trigger = trigger, startTime = currentTime, - lastNotificationTime = currentTime, state = Alert.State.ACTIVE, errorMessage = null, + lastNotificationTime = currentTime, state = alertState, errorMessage = null, errorHistory = mutableListOf(), actionExecutionResults = mutableListOf(), schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket, findingIds = findings, executionId = executionId, workflowId = workflorwRunContext?.workflowId ?: "" @@ -658,13 +666,16 @@ class AlertService( val savedAlerts = mutableListOf() var alertsBeingIndexed = alerts var requestsToRetry: MutableList = alerts.map { alert -> - if (alert.state != Alert.State.ACTIVE) { + if (alert.state != Alert.State.ACTIVE && alert.state != Alert.State.AUDIT) { throw IllegalStateException("Unexpected attempt to save new alert [$alert] with state [${alert.state}]") } if (alert.id != Alert.NO_ID) { throw IllegalStateException("Unexpected attempt to save new alert [$alert] with an existing alert ID [${alert.id}]") } - IndexRequest(dataSources.alertsIndex) + val alertIndex = if (alert.state == Alert.State.AUDIT && alertIndices.isAlertHistoryEnabled()) { + dataSources.alertsHistoryIndex + } else dataSources.alertsIndex + IndexRequest(alertIndex) .routing(alert.monitorId) .source(alert.toXContentWithUser(XContentFactory.jsonBuilder())) }.toMutableList() @@ -723,13 +734,15 @@ class AlertService( * @param monitorId The Monitor to get Alerts for * @param size The number of search hits (Alerts) to return */ - private suspend fun searchAlerts(monitor: Monitor, size: Int): SearchResponse { + private suspend fun searchAlerts(monitor: Monitor, size: Int, workflowRunContext: WorkflowRunContext?): SearchResponse { val monitorId = monitor.id val alertIndex = monitor.dataSources.alertsIndex val queryBuilder = QueryBuilders.boolQuery() - .filter(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId)) - + .must(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId)) + if (workflowRunContext != null) { + queryBuilder.must(QueryBuilders.termQuery(Alert.WORKFLOW_ID_FIELD, workflowRunContext.workflowId)) + } val searchSourceBuilder = SearchSourceBuilder() .size(size) .query(queryBuilder) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index e02d18922..d181ddb5c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -77,7 +77,7 @@ object BucketLevelMonitorRunner : MonitorRunner() { if (monitor.dataSources.findingsEnabled == true) { monitorCtx.alertIndices!!.createOrUpdateInitialFindingHistoryIndex(monitor.dataSources) } - monitorCtx.alertService!!.loadCurrentAlertsForBucketLevelMonitor(monitor) + monitorCtx.alertService!!.loadCurrentAlertsForBucketLevelMonitor(monitor, workflowRunContext) } catch (e: Exception) { // We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index 79bb13b0d..cea11d3b4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -284,6 +284,8 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon internal fun currentTime() = Instant.ofEpochMilli(monitorCtx.threadPool!!.absoluteTimeInMillis()) internal fun isActionActionable(action: Action, alert: Alert?): Boolean { + if (alert != null && alert.state == Alert.State.AUDIT) + return false if (alert == null || action.throttle == null) { return true } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt index f7c26918b..1a9b9bb13 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt @@ -40,7 +40,7 @@ object QueryLevelMonitorRunner : MonitorRunner() { val currentAlerts = try { monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources) monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources) - monitorCtx.alertService!!.loadCurrentAlertsForQueryLevelMonitor(monitor) + monitorCtx.alertService!!.loadCurrentAlertsForQueryLevelMonitor(monitor, workflowRunContext) } catch (e: Exception) { // We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index da20280ef..3f32b5133 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -13,7 +13,6 @@ import org.opensearch.alerting.DocumentLevelMonitorRunner import org.opensearch.alerting.MonitorRunnerExecutionContext import org.opensearch.alerting.QueryLevelMonitorRunner import org.opensearch.alerting.WorkflowMetadataService -import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.model.ChainedAlertTriggerRunResult import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.WorkflowRunResult diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index efff1ec47..47250f92b 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -744,7 +744,6 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { .string() .let { StringEntity(it, APPLICATION_JSON) } - val response = client().makeRequest( "POST", "${AlertingPlugin.WORKFLOW_BASE_URI}/$workflowId/_acknowledge/alerts", emptyMap(), request