diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 176ffb6b6..cebfc420c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -210,7 +210,13 @@ class AlertService( ): Alert { val currentTime = Instant.now() - val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR + val alertState = if (workflorwRunContext?.muteDelegateMonitorActions == true) { + Alert.State.AUDIT + } else if (alertError == null) { + Alert.State.ACTIVE + } else { + Alert.State.ERROR + } return Alert( id = UUID.randomUUID().toString(), monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime, lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message, @@ -227,12 +233,15 @@ class AlertService( workflowRunContext: WorkflowRunContext? ): Alert { val currentTime = Instant.now() + val alertState = if (workflowRunContext?.muteDelegateMonitorActions == true) { + Alert.State.AUDIT + } else { + Alert.State.ERROR + } return Alert( id = id, monitor = monitor, trigger = NoOpTrigger(), startTime = currentTime, - lastNotificationTime = currentTime, state = Alert.State.ERROR, errorMessage = alertError?.message, - schemaVersion = IndexUtils.alertIndexSchemaVersion, - workflowId = workflowRunContext?.workflowId ?: "", - executionId = executionId ?: "" + lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message, + schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = executionId, workflowId = workflowRunContext?.workflowId ?: "" ) } @@ -566,11 +575,12 @@ class AlertService( // In the rare event that a user acknowledges an alert between when it's read and when it's written // back we're ok if that acknowledgement is lost. It's easier to get the user to retry than for the runner to // spend time reloading the alert and writing it back. + val routingId = if (routing.isNullOrEmpty()) alert.monitorId else routing when (alert.state) { Alert.State.ACTIVE, Alert.State.ERROR -> { listOf>( IndexRequest(alertsIndex) - .routing(if (routing.isNullOrEmpty()) alert.monitorId else routing) + .routing(routingId) .source(alert.toXContentWithUser(XContentFactory.jsonBuilder())) .id(if (alert.id != Alert.NO_ID) alert.id else null) ) @@ -581,7 +591,7 @@ class AlertService( if (allowUpdatingAcknowledgedAlert) { listOf>( IndexRequest(alertsIndex) - .routing(if (routing.isNullOrEmpty()) alert.monitorId else routing) + .routing(routingId) .source(alert.toXContentWithUser(XContentFactory.jsonBuilder())) .id(if (alert.id != Alert.NO_ID) alert.id else null) ) @@ -590,9 +600,12 @@ class AlertService( } } Alert.State.AUDIT -> { + val index = if (alertIndices.isAlertHistoryEnabled()) { + dataSources.alertsHistoryIndex + } else dataSources.alertsIndex listOf>( - IndexRequest(alertsIndex) - .routing(alert.monitorId) + IndexRequest(index) + .routing(routingId) .source(alert.toXContentWithUser(XContentFactory.jsonBuilder())) .id(if (alert.id != Alert.NO_ID) alert.id else null) ) @@ -603,11 +616,11 @@ class AlertService( Alert.State.COMPLETED -> { listOfNotNull>( DeleteRequest(alertsIndex, alert.id) - .routing(alert.monitorId), + .routing(routingId), // Only add completed alert to history index if history is enabled if (alertIndices.isAlertHistoryEnabled()) { IndexRequest(alertsHistoryIndex) - .routing(if (routing.isNullOrEmpty()) alert.monitorId else routing) + .routing(routingId) .source(alert.toXContentWithUser(XContentFactory.jsonBuilder())) .id(alert.id) } else null diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index af6ce12b4..03edbfb5e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -13,6 +13,7 @@ import org.opensearch.alerting.DocumentLevelMonitorRunner import org.opensearch.alerting.MonitorRunnerExecutionContext import org.opensearch.alerting.QueryLevelMonitorRunner import org.opensearch.alerting.WorkflowMetadataService +import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.model.ChainedAlertTriggerRunResult import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.WorkflowRunResult @@ -21,6 +22,7 @@ import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.isDocLevelMonitor import org.opensearch.alerting.util.isQueryLevelMonitor +import org.opensearch.client.Client import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentParserUtils @@ -41,6 +43,7 @@ import java.time.ZoneOffset import java.util.UUID object CompositeWorkflowRunner : WorkflowRunner() { + private val logger = LogManager.getLogger(javaClass) override suspend fun runWorkflow( @@ -113,7 +116,8 @@ object CompositeWorkflowRunner : WorkflowRunner() { workflowMetadataId = workflowMetadata.id, chainedMonitorId = delegate.chainedMonitorFindings?.monitorId, executionId = executionId, - matchingDocIdsPerIndex = indexToDocIds + matchingDocIdsPerIndex = indexToDocIds, + muteDelegateMonitorActions = workflow.triggers.isNotEmpty() // todo add flag at transport layer for sap to consume ) try { dataSources = delegateMonitor.dataSources @@ -123,7 +127,7 @@ object CompositeWorkflowRunner : WorkflowRunner() { } catch (ex: Exception) { logger.error("Error executing workflow delegate monitor ${delegate.monitorId}", ex) lastErrorDelegateRun = AlertingException.wrap(ex) - continue + break } } logger.debug("Workflow ${workflow.id} delegate monitors in execution $executionId completed") @@ -148,7 +152,7 @@ object CompositeWorkflowRunner : WorkflowRunner() { if (dataSources != null) { try { monitorCtx.alertIndices!!.createOrUpdateAlertIndex(dataSources) - val monitorIdToAlertIdsMap = fetchAlertsGeneratedInCurrentExecution(dataSources, executionId, monitorCtx) + val monitorIdToAlertIdsMap = fetchAlertsGeneratedInCurrentExecution(dataSources, executionId, monitorCtx, workflow) for (trigger in workflow.triggers) { val caTrigger = trigger as ChainedAlertTrigger val triggerCtx = ChainedAlertTriggerExecutionContext( @@ -282,13 +286,17 @@ object CompositeWorkflowRunner : WorkflowRunner() { dataSources: DataSources, executionId: String, monitorCtx: MonitorRunnerExecutionContext, + workflow: Workflow, ): MutableMap> { try { - val searchRequest = SearchRequest(dataSources.alertsIndex) + val searchRequest = + SearchRequest(getDelegateMonitorAlertIndex(dataSources, workflow, monitorCtx.alertIndices!!.isAlertHistoryEnabled())) val queryBuilder = QueryBuilders.boolQuery() - queryBuilder.filter(QueryBuilders.termQuery("execution_id", executionId)) - queryBuilder.filter(QueryBuilders.termQuery("state", Alert.State.ACTIVE.name)) + queryBuilder.must(QueryBuilders.termQuery("execution_id", executionId)) + queryBuilder.must(QueryBuilders.termQuery("state", getDelegateMonitorAlertState(workflow))) searchRequest.source().query(queryBuilder) + val searchResponse1: SearchResponse = + monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(SearchRequest(AlertIndices.ALL_ALERT_INDEX_PATTERN), it) } val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(searchRequest, it) } val alerts = searchResponse.hits.map { hit -> val xcp = XContentHelper.createParser( @@ -307,10 +315,32 @@ object CompositeWorkflowRunner : WorkflowRunner() { map[alert.monitorId] = mutableSetOf(alert.id) } } + logger.error("ALL ALERT INDICES SEARCH: $searchResponse1") return map } catch (e: Exception) { logger.error("failed to get alerts generated by delegate monitors in current execution $executionId", e) return mutableMapOf() } } + + // todo use flag in workflow + fun getDelegateMonitorAlertIndex( + dataSources: DataSources, + workflow: Workflow, + isAlertHistoryEnabled: Boolean + ): String { + return if (workflow.triggers.isNotEmpty()) { + if (isAlertHistoryEnabled) { + dataSources.alertsHistoryIndex!! + } else dataSources.alertsIndex + } else dataSources.alertsIndex + } + + fun getDelegateMonitorAlertState( + workflow: Workflow, + ): Alert.State { + return if (workflow.triggers.isNotEmpty()) { + Alert.State.AUDIT + } else Alert.State.ACTIVE + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunContext.kt index 60285e70b..0190b673b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunContext.kt @@ -11,5 +11,6 @@ data class WorkflowRunContext( val workflowMetadataId: String, val chainedMonitorId: String?, val executionId: String, - val matchingDocIdsPerIndex: Map> + val matchingDocIdsPerIndex: Map>, + val muteDelegateMonitorActions: Boolean ) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index c3a196573..80124efa0 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -6,6 +6,8 @@ package org.opensearch.alerting import org.junit.Assert +import org.mockito.Mockito +import org.mockito.Mockito.mock import org.opensearch.action.admin.cluster.state.ClusterStateRequest import org.opensearch.action.admin.indices.alias.Alias import org.opensearch.action.admin.indices.close.CloseIndexRequest @@ -31,7 +33,11 @@ import org.opensearch.alerting.transport.AlertingSingleNodeTestCase import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.DocLevelMonitorQueries import org.opensearch.alerting.util.DocLevelMonitorQueries.Companion.INDEX_PATTERN_SUFFIX +import org.opensearch.alerting.workflow.CompositeWorkflowRunner import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentParserUtils import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest import org.opensearch.commons.alerting.action.AlertingActions @@ -54,9 +60,12 @@ import org.opensearch.commons.alerting.model.ScheduledJob.Companion.DOC_LEVEL_QU import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.commons.alerting.model.SearchInput import org.opensearch.commons.alerting.model.Table +import org.opensearch.core.xcontent.XContentParser import org.opensearch.index.mapper.MapperService +import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.MatchQueryBuilder import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.query.TermQueryBuilder import org.opensearch.rest.RestRequest import org.opensearch.rest.RestStatus import org.opensearch.script.Script @@ -3336,7 +3345,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { monitorRunResults.inputResults.results[0]["hits"] as kotlin.collections.Map ) - ["total"] as kotlin.collections.Map + ["total"] as kotlin.collections.Map )["value"] assertEquals(2, totalHits) @Suppress("UNCHECKED_CAST") @@ -3346,7 +3355,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { monitorRunResults.inputResults.results[0]["hits"] as kotlin.collections.Map ) - ["hits"] as List> + ["hits"] as List> ) .map { it["_id"]!! } assertEquals(listOf("5", "6"), docIds.sorted()) @@ -3436,6 +3445,31 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { assertTrue("Findings saved for test monitor", relatedDocIds.containsAll(findingDocIds)) } + private fun getAuditStateAlerts( + alertsIndex: String? = AlertIndices.ALERT_INDEX, + monitorId: String, + executionId: String? = null, + ): List { + val searchRequest = SearchRequest(alertsIndex) + val boolQueryBuilder = QueryBuilders.boolQuery() + boolQueryBuilder.must(TermQueryBuilder("monitor_id", monitorId)) + if (executionId.isNullOrEmpty() == false) + boolQueryBuilder.must(TermQueryBuilder("execution_id", executionId)) + searchRequest.source().query(boolQueryBuilder) + val searchResponse = client().search(searchRequest).get() + return searchResponse.hits.map { hit -> + val xcp = XContentHelper.createParser( + xContentRegistry(), LoggingDeprecationHandler.INSTANCE, + hit.sourceRef, XContentType.JSON + ) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val alert = Alert.parse(xcp, hit.id, hit.version) + alert + } + + + } + private fun assertAlerts( monitorId: String, alertsIndex: String? = AlertIndices.ALERT_INDEX, @@ -3476,6 +3510,12 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { assertEquals(alertSize, acknowledgeAlertResponse.acknowledged.size) } + private fun assertAuditStateAlerts( + alerts: List, + ) { + alerts.forEach { Assert.assertEquals(it.state, Alert.State.AUDIT) } + } + fun `test execute workflow with bucket-level and doc-level chained monitors`() { createTestIndex(TEST_HR_INDEX) @@ -3595,9 +3635,11 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { val monitorResponse2 = createMonitor(monitor2)!! val andTrigger = randomChainedAlertTrigger( + name = "1And2", condition = Script("monitor[id=${monitorResponse.id}] && monitor[id=${monitorResponse2.id}]") ) val notTrigger = randomChainedAlertTrigger( + name = "Not1OrNot2", condition = Script("!monitor[id=${monitorResponse.id}] || !monitor[id=${monitorResponse2.id}]") ) var workflow = randomWorkflow( @@ -3683,16 +3725,22 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertEquals(monitor2.name, monitorsRunResults[1].monitorName) Assert.assertEquals(1, monitorsRunResults[1].triggerResults.size) - val getAlertsResponse = assertAlerts( - monitorResponse.id, executionId = executeWorkflowResponse.workflowRunResult.executionId, alertSize = 2 + Assert.assertEquals( + monitor1.dataSources.alertsHistoryIndex, + CompositeWorkflowRunner.getDelegateMonitorAlertIndex(dataSources = monitor1.dataSources, workflow, true) ) - assertAcknowledges(getAlertsResponse.alerts, monitorResponse.id, 2) + val alerts = getAuditStateAlerts( + monitorId = monitorResponse.id, executionId = executeWorkflowResponse.workflowRunResult.executionId, + alertsIndex = monitor1.dataSources.alertsHistoryIndex + ) + assertAuditStateAlerts(alerts) assertFindings(monitorResponse.id, customFindingsIndex1, 2, 2, listOf("1", "2")) - val getAlertsResponse2 = assertAlerts( - monitorId = monitorResponse2.id, executionId = executeWorkflowResponse.workflowRunResult.executionId, alertSize = 1 + val alerts1 = getAuditStateAlerts( + monitorId = monitorResponse2.id, executionId = executeWorkflowResponse.workflowRunResult.executionId, + alertsIndex = monitor2.dataSources.alertsHistoryIndex ) - assertAcknowledges(getAlertsResponse2.alerts, monitorResponse2.id, 1) + assertAuditStateAlerts(alerts1) assertFindings(monitorResponse2.id, customFindingsIndex2, 1, 1, listOf("2")) } diff --git a/build-tools/opensearchplugin-coverage.gradle b/build-tools/opensearchplugin-coverage.gradle index f87be687a..d582d1453 100644 --- a/build-tools/opensearchplugin-coverage.gradle +++ b/build-tools/opensearchplugin-coverage.gradle @@ -62,7 +62,3 @@ jacocoTestReport { xml.required = true // for coverlay } } - -project.gradle.projectsEvaluated { - jacocoTestReport.dependsOn integTest -}