Skip to content

Commit

Permalink
add audit alerts in query level monitor
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Jul 8, 2023
1 parent f6a892b commit bd22aba
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@ class AlertService(
schemaVersion = IndexUtils.alertIndexSchemaVersion,
)
} else {
val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
val alertState = if (workflorwRunContext?.muteDelegateMonitorActions == true) {
Alert.State.AUDIT
} else if (alertError == null) Alert.State.ACTIVE
else Alert.State.ERROR
Alert(
monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
Expand Down Expand Up @@ -263,7 +266,6 @@ class AlertService(
)
}


fun updateActionResultsForBucketLevelAlert(
currentAlert: Alert,
actionResults: Map<String, ActionRunResult>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.isDocLevelMonitor
import org.opensearch.alerting.util.isQueryLevelMonitor
import org.opensearch.client.Client
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentParserUtils
Expand Down Expand Up @@ -327,7 +326,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
fun getDelegateMonitorAlertIndex(
dataSources: DataSources,
workflow: Workflow,
isAlertHistoryEnabled: Boolean
isAlertHistoryEnabled: Boolean,
): String {
return if (workflow.triggers.isNotEmpty()) {
if (isAlertHistoryEnabled) {
Expand Down
205 changes: 194 additions & 11 deletions alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
package org.opensearch.alerting

import org.junit.Assert
import org.mockito.Mockito
import org.mockito.Mockito.mock
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.indices.alias.Alias
import org.opensearch.action.admin.indices.close.CloseIndexRequest
Expand Down Expand Up @@ -40,6 +38,8 @@ import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest
import org.opensearch.commons.alerting.action.AcknowledgeAlertResponse
import org.opensearch.commons.alerting.action.AcknowledgeChainedAlertRequest
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.DeleteMonitorRequest
import org.opensearch.commons.alerting.action.GetAlertsRequest
Expand All @@ -62,7 +62,6 @@ import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.Table
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.index.mapper.MapperService
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.MatchQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.query.TermQueryBuilder
Expand Down Expand Up @@ -3344,8 +3343,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
(
monitorRunResults.inputResults.results[0]["hits"]
as kotlin.collections.Map<String, Any>
)
["total"] as kotlin.collections.Map<String, Any>
)["total"] as kotlin.collections.Map<String, Any>
)["value"]
assertEquals(2, totalHits)
@Suppress("UNCHECKED_CAST")
Expand All @@ -3354,8 +3352,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
(
monitorRunResults.inputResults.results[0]["hits"]
as kotlin.collections.Map<String, Any>
)
["hits"] as List<kotlin.collections.Map<String, String>>
)["hits"] as List<kotlin.collections.Map<String, String>>
)
.map { it["_id"]!! }
assertEquals(listOf("5", "6"), docIds.sorted())
Expand Down Expand Up @@ -3466,8 +3463,6 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
val alert = Alert.parse(xcp, hit.id, hit.version)
alert
}


}

private fun assertAlerts(
Expand Down Expand Up @@ -3510,10 +3505,44 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
assertEquals(alertSize, acknowledgeAlertResponse.acknowledged.size)
}

private fun verifyAcknowledgeChainedAlerts(
alerts: List<Alert>,
workflowId: String,
alertSize: Int,
) {
val alertIds = alerts.map { it.id }.toMutableList()
val acknowledgeAlertResponse = ackChainedAlerts(alertIds, workflowId)
assertTrue(acknowledgeAlertResponse.acknowledged.stream().map { it.id }.collect(Collectors.toList()).containsAll(alertIds))
assertEquals(alertSize, acknowledgeAlertResponse.acknowledged.size)
alertIds.add("dummy")
val redundantAck = ackChainedAlerts(alertIds, workflowId)
Assert.assertTrue(redundantAck.acknowledged.isEmpty())
Assert.assertTrue(redundantAck.missing.contains("dummy"))
alertIds.remove("dummy")
Assert.assertTrue(redundantAck.failed.map { it.id }.toList().containsAll(alertIds))
}

private fun ackChainedAlerts(alertIds: List<String>, workflowId: String): AcknowledgeAlertResponse {

return client().execute(
AlertingActions.ACKNOWLEDGE_CHAINED_ALERTS_ACTION_TYPE,
AcknowledgeChainedAlertRequest(workflowId, alertIds)
).get()
}

private fun assertAuditStateAlerts(
monitorId: String,
alerts: List<Alert>,
) {
alerts.forEach { Assert.assertEquals(it.state, Alert.State.AUDIT) }
val alertIds = alerts.stream().map { it.id }.collect(Collectors.toList())
val ack = client().execute(
AlertingActions.ACKNOWLEDGE_ALERTS_ACTION_TYPE,
AcknowledgeAlertRequest(monitorId, alertIds, WriteRequest.RefreshPolicy.IMMEDIATE)
).get()
Assert.assertTrue(ack.acknowledged.isEmpty())
Assert.assertTrue(ack.missing.containsAll(alertIds))
Assert.assertTrue(ack.failed.isEmpty())
}

fun `test execute workflow with bucket-level and doc-level chained monitors`() {
Expand Down Expand Up @@ -3603,6 +3632,158 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
assertEquals(2, executeResult!!.workflowRunResult.monitorRunResults.size)
}

fun `test chained alerts for AND OR and NOT conditions with custom alerts indices`() {
val docQuery1 = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3")
val docLevelInput1 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1))
val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex1 = "custom_findings_index"
val customFindingsIndexPattern1 = "custom_findings_index-1"
val customAlertsIndex = "custom_alerts_index"
val customAlertsHistoryIndex = "custom_alerts_history_index"
val customAlertsHistoryIndexPattern = "<custom_alerts_history_index-{now/d}-1>"
var monitor1 = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput1),
triggers = listOf(trigger1),
dataSources = DataSources(
findingsIndex = customFindingsIndex1,
findingsIndexPattern = customFindingsIndexPattern1,
alertsIndex = customAlertsIndex,
alertsHistoryIndex = customAlertsHistoryIndex,
alertsHistoryIndexPattern = customAlertsHistoryIndexPattern
)
)
val monitorResponse = createMonitor(monitor1)!!

val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4")
val docLevelInput2 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery2))
val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex2 = "custom_findings_index_2"
val customFindingsIndexPattern2 = "custom_findings_index-2"
var monitor2 = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput2),
triggers = listOf(trigger2),
dataSources = DataSources(
findingsIndex = customFindingsIndex2,
findingsIndexPattern = customFindingsIndexPattern2,
alertsIndex = customAlertsIndex,
alertsHistoryIndex = customAlertsHistoryIndex,
alertsHistoryIndexPattern = customAlertsHistoryIndexPattern
)
)

val monitorResponse2 = createMonitor(monitor2)!!
val andTrigger = randomChainedAlertTrigger(
name = "1And2",
condition = Script("monitor[id=${monitorResponse.id}] && monitor[id=${monitorResponse2.id}]")
)
val notTrigger = randomChainedAlertTrigger(
name = "Not1OrNot2",
condition = Script("!monitor[id=${monitorResponse.id}] || !monitor[id=${monitorResponse2.id}]")
)
var workflow = randomWorkflow(
monitorIds = listOf(monitorResponse.id, monitorResponse2.id),
triggers = listOf(andTrigger, notTrigger)
)
val workflowResponse = upsertWorkflow(workflow)!!
val workflowById = searchWorkflow(workflowResponse.id)
assertNotNull(workflowById)
val workflowId = workflowResponse.id

var executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!!
var triggerResults = executeWorkflowResponse.workflowRunResult.triggerResults
Assert.assertEquals(triggerResults.size, 2)
Assert.assertTrue(triggerResults.containsKey(andTrigger.id))
Assert.assertTrue(triggerResults.containsKey(notTrigger.id))
var andTriggerResult = triggerResults[andTrigger.id]
var notTriggerResult = triggerResults[notTrigger.id]
Assert.assertTrue(notTriggerResult!!.triggered)
Assert.assertFalse(andTriggerResult!!.triggered)
var chainedAlerts = searchChainedAlerts(
executeWorkflowResponse.workflowRunResult.executionId,
workflowId,
monitor1.dataSources.alertsIndex,
)
Assert.assertTrue(chainedAlerts.size == 1)
Assert.assertTrue(chainedAlerts[0].executionId == executeWorkflowResponse.workflowRunResult.executionId)
Assert.assertTrue(chainedAlerts[0].monitorId == "")
Assert.assertTrue(chainedAlerts[0].triggerId == notTrigger.id)
var testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS))
// Matches monitor1
val testDoc1 = """{
"message" : "This is an error from IAD region",
"source.ip.v6.v2" : 16644,
"test_strict_date_time" : "$testTime",
"test_field_1" : "us-west-2"
}"""
indexDoc(index, "1", testDoc1)

testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS))
// Matches monitor1 and monitor2
val testDoc2 = """{
"message" : "This is an error from IAD region",
"source.ip.v6.v2" : 16645,
"test_strict_date_time" : "$testTime",
"test_field_1" : "us-west-2"
}"""
indexDoc(index, "2", testDoc2)

testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS))
// Doesn't match
val testDoc3 = """{
"message" : "This is an error from IAD region",
"source.ip.v6.v2" : 16645,
"test_strict_date_time" : "$testTime",
"test_field_1" : "us-east-1"
}"""
indexDoc(index, "3", testDoc3)
executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!!
triggerResults = executeWorkflowResponse.workflowRunResult.triggerResults
Assert.assertEquals(triggerResults.size, 2)
Assert.assertTrue(triggerResults.containsKey(andTrigger.id))
Assert.assertTrue(triggerResults.containsKey(notTrigger.id))
andTriggerResult = triggerResults[andTrigger.id]
notTriggerResult = triggerResults[notTrigger.id]
Assert.assertFalse(notTriggerResult!!.triggered)
Assert.assertTrue(andTriggerResult!!.triggered)
chainedAlerts = searchChainedAlerts(
executeWorkflowResponse.workflowRunResult.executionId,
workflowId,
monitor1.dataSources.alertsIndex,
)
val numChainedAlerts = 1
Assert.assertTrue(chainedAlerts.size == numChainedAlerts)
Assert.assertTrue(chainedAlerts[0].executionId == executeWorkflowResponse.workflowRunResult.executionId)
Assert.assertTrue(chainedAlerts[0].monitorId == "")
Assert.assertTrue(chainedAlerts[0].triggerId == andTrigger.id)
val monitorsRunResults = executeWorkflowResponse.workflowRunResult.monitorRunResults
assertEquals(2, monitorsRunResults.size)

assertEquals(monitor1.name, monitorsRunResults[0].monitorName)
assertEquals(1, monitorsRunResults[0].triggerResults.size)

Assert.assertEquals(monitor2.name, monitorsRunResults[1].monitorName)
Assert.assertEquals(1, monitorsRunResults[1].triggerResults.size)

Assert.assertEquals(
monitor1.dataSources.alertsHistoryIndex,
CompositeWorkflowRunner.getDelegateMonitorAlertIndex(dataSources = monitor1.dataSources, workflow, true)
)
val alerts = getAuditStateAlerts(
monitorId = monitorResponse.id, executionId = executeWorkflowResponse.workflowRunResult.executionId,
alertsIndex = monitor1.dataSources.alertsHistoryIndex
)
assertAuditStateAlerts(monitorResponse.id, alerts)
assertFindings(monitorResponse.id, customFindingsIndex1, 2, 2, listOf("1", "2"))

val alerts1 = getAuditStateAlerts(
monitorId = monitorResponse2.id, executionId = executeWorkflowResponse.workflowRunResult.executionId,
alertsIndex = monitor2.dataSources.alertsHistoryIndex
)
assertAuditStateAlerts(monitorResponse2.id, alerts1)
assertFindings(monitorResponse2.id, customFindingsIndex2, 1, 1, listOf("2"))
verifyAcknowledgeChainedAlerts(chainedAlerts, workflowId, numChainedAlerts)
}

fun `test chained alerts for AND OR and NOT conditions`() {
val docQuery1 = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3")
val docLevelInput1 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1))
Expand Down Expand Up @@ -3666,6 +3847,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
monitor1.dataSources.alertsIndex,
)
Assert.assertTrue(chainedAlerts.size == 1)
verifyAcknowledgeChainedAlerts(chainedAlerts, workflowId, 1)
Assert.assertTrue(chainedAlerts[0].executionId == executeWorkflowResponse.workflowRunResult.executionId)
Assert.assertTrue(chainedAlerts[0].monitorId == "")
Assert.assertTrue(chainedAlerts[0].triggerId == notTrigger.id)
Expand Down Expand Up @@ -3733,15 +3915,16 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
monitorId = monitorResponse.id, executionId = executeWorkflowResponse.workflowRunResult.executionId,
alertsIndex = monitor1.dataSources.alertsHistoryIndex
)
assertAuditStateAlerts(alerts)
assertAuditStateAlerts(monitorResponse.id, alerts)
assertFindings(monitorResponse.id, customFindingsIndex1, 2, 2, listOf("1", "2"))

val alerts1 = getAuditStateAlerts(
monitorId = monitorResponse2.id, executionId = executeWorkflowResponse.workflowRunResult.executionId,
alertsIndex = monitor2.dataSources.alertsHistoryIndex
)
assertAuditStateAlerts(alerts1)
assertAuditStateAlerts(monitorResponse2.id, alerts1)
assertFindings(monitorResponse2.id, customFindingsIndex2, 1, 1, listOf("2"))
verifyAcknowledgeChainedAlerts(chainedAlerts, workflowId, 1)
}

private fun getDelegateMonitorMetadataId(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1004,4 +1004,6 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
assertEquals(RestStatus.NOT_FOUND, e.response.restStatus())
}
}


}

0 comments on commit bd22aba

Please sign in to comment.