Skip to content

Commit

Permalink
added audit state alerts for doc level monitors
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Jul 8, 2023
1 parent 4fd662d commit f6a892b
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 30 deletions.
35 changes: 24 additions & 11 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,13 @@ class AlertService(
): Alert {
val currentTime = Instant.now()

val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
val alertState = if (workflorwRunContext?.muteDelegateMonitorActions == true) {
Alert.State.AUDIT
} else if (alertError == null) {
Alert.State.ACTIVE
} else {
Alert.State.ERROR
}
return Alert(
id = UUID.randomUUID().toString(), monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
Expand All @@ -227,12 +233,15 @@ class AlertService(
workflowRunContext: WorkflowRunContext?
): Alert {
val currentTime = Instant.now()
val alertState = if (workflowRunContext?.muteDelegateMonitorActions == true) {
Alert.State.AUDIT
} else {
Alert.State.ERROR
}
return Alert(
id = id, monitor = monitor, trigger = NoOpTrigger(), startTime = currentTime,
lastNotificationTime = currentTime, state = Alert.State.ERROR, errorMessage = alertError?.message,
schemaVersion = IndexUtils.alertIndexSchemaVersion,
workflowId = workflowRunContext?.workflowId ?: "",
executionId = executionId ?: ""
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = executionId, workflowId = workflowRunContext?.workflowId ?: ""
)
}

Expand Down Expand Up @@ -566,11 +575,12 @@ class AlertService(
// In the rare event that a user acknowledges an alert between when it's read and when it's written
// back we're ok if that acknowledgement is lost. It's easier to get the user to retry than for the runner to
// spend time reloading the alert and writing it back.
val routingId = if (routing.isNullOrEmpty()) alert.monitorId else routing
when (alert.state) {
Alert.State.ACTIVE, Alert.State.ERROR -> {
listOf<DocWriteRequest<*>>(
IndexRequest(alertsIndex)
.routing(if (routing.isNullOrEmpty()) alert.monitorId else routing)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
Expand All @@ -581,7 +591,7 @@ class AlertService(
if (allowUpdatingAcknowledgedAlert) {
listOf<DocWriteRequest<*>>(
IndexRequest(alertsIndex)
.routing(if (routing.isNullOrEmpty()) alert.monitorId else routing)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
Expand All @@ -590,9 +600,12 @@ class AlertService(
}
}
Alert.State.AUDIT -> {
val index = if (alertIndices.isAlertHistoryEnabled()) {
dataSources.alertsHistoryIndex
} else dataSources.alertsIndex
listOf<DocWriteRequest<*>>(
IndexRequest(alertsIndex)
.routing(alert.monitorId)
IndexRequest(index)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
Expand All @@ -603,11 +616,11 @@ class AlertService(
Alert.State.COMPLETED -> {
listOfNotNull<DocWriteRequest<*>>(
DeleteRequest(alertsIndex, alert.id)
.routing(alert.monitorId),
.routing(routingId),
// Only add completed alert to history index if history is enabled
if (alertIndices.isAlertHistoryEnabled()) {
IndexRequest(alertsHistoryIndex)
.routing(if (routing.isNullOrEmpty()) alert.monitorId else routing)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(alert.id)
} else null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.alerting.DocumentLevelMonitorRunner
import org.opensearch.alerting.MonitorRunnerExecutionContext
import org.opensearch.alerting.QueryLevelMonitorRunner
import org.opensearch.alerting.WorkflowMetadataService
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.WorkflowRunResult
Expand All @@ -21,6 +22,7 @@ import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.isDocLevelMonitor
import org.opensearch.alerting.util.isQueryLevelMonitor
import org.opensearch.client.Client
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentParserUtils
Expand All @@ -41,6 +43,7 @@ import java.time.ZoneOffset
import java.util.UUID

object CompositeWorkflowRunner : WorkflowRunner() {

private val logger = LogManager.getLogger(javaClass)

override suspend fun runWorkflow(
Expand Down Expand Up @@ -113,7 +116,8 @@ object CompositeWorkflowRunner : WorkflowRunner() {
workflowMetadataId = workflowMetadata.id,
chainedMonitorId = delegate.chainedMonitorFindings?.monitorId,
executionId = executionId,
matchingDocIdsPerIndex = indexToDocIds
matchingDocIdsPerIndex = indexToDocIds,
muteDelegateMonitorActions = workflow.triggers.isNotEmpty() // todo add flag at transport layer for sap to consume
)
try {
dataSources = delegateMonitor.dataSources
Expand All @@ -123,7 +127,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
} catch (ex: Exception) {
logger.error("Error executing workflow delegate monitor ${delegate.monitorId}", ex)
lastErrorDelegateRun = AlertingException.wrap(ex)
continue
break
}
}
logger.debug("Workflow ${workflow.id} delegate monitors in execution $executionId completed")
Expand All @@ -148,7 +152,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
if (dataSources != null) {
try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(dataSources)
val monitorIdToAlertIdsMap = fetchAlertsGeneratedInCurrentExecution(dataSources, executionId, monitorCtx)
val monitorIdToAlertIdsMap = fetchAlertsGeneratedInCurrentExecution(dataSources, executionId, monitorCtx, workflow)
for (trigger in workflow.triggers) {
val caTrigger = trigger as ChainedAlertTrigger
val triggerCtx = ChainedAlertTriggerExecutionContext(
Expand Down Expand Up @@ -282,13 +286,17 @@ object CompositeWorkflowRunner : WorkflowRunner() {
dataSources: DataSources,
executionId: String,
monitorCtx: MonitorRunnerExecutionContext,
workflow: Workflow,
): MutableMap<String, MutableSet<String>> {
try {
val searchRequest = SearchRequest(dataSources.alertsIndex)
val searchRequest =
SearchRequest(getDelegateMonitorAlertIndex(dataSources, workflow, monitorCtx.alertIndices!!.isAlertHistoryEnabled()))
val queryBuilder = QueryBuilders.boolQuery()
queryBuilder.filter(QueryBuilders.termQuery("execution_id", executionId))
queryBuilder.filter(QueryBuilders.termQuery("state", Alert.State.ACTIVE.name))
queryBuilder.must(QueryBuilders.termQuery("execution_id", executionId))
queryBuilder.must(QueryBuilders.termQuery("state", getDelegateMonitorAlertState(workflow)))
searchRequest.source().query(queryBuilder)
val searchResponse1: SearchResponse =
monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(SearchRequest(AlertIndices.ALL_ALERT_INDEX_PATTERN), it) }
val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(searchRequest, it) }
val alerts = searchResponse.hits.map { hit ->
val xcp = XContentHelper.createParser(
Expand All @@ -307,10 +315,32 @@ object CompositeWorkflowRunner : WorkflowRunner() {
map[alert.monitorId] = mutableSetOf(alert.id)
}
}
logger.error("ALL ALERT INDICES SEARCH: $searchResponse1")
return map
} catch (e: Exception) {
logger.error("failed to get alerts generated by delegate monitors in current execution $executionId", e)
return mutableMapOf()
}
}

// todo use flag in workflow
fun getDelegateMonitorAlertIndex(
dataSources: DataSources,
workflow: Workflow,
isAlertHistoryEnabled: Boolean
): String {
return if (workflow.triggers.isNotEmpty()) {
if (isAlertHistoryEnabled) {
dataSources.alertsHistoryIndex!!
} else dataSources.alertsIndex
} else dataSources.alertsIndex
}

fun getDelegateMonitorAlertState(
workflow: Workflow,
): Alert.State {
return if (workflow.triggers.isNotEmpty()) {
Alert.State.AUDIT
} else Alert.State.ACTIVE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ data class WorkflowRunContext(
val workflowMetadataId: String,
val chainedMonitorId: String?,
val executionId: String,
val matchingDocIdsPerIndex: Map<String, List<String>>
val matchingDocIdsPerIndex: Map<String, List<String>>,
val muteDelegateMonitorActions: Boolean
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.alerting

import org.junit.Assert
import org.mockito.Mockito
import org.mockito.Mockito.mock
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.indices.alias.Alias
import org.opensearch.action.admin.indices.close.CloseIndexRequest
Expand All @@ -31,7 +33,11 @@ import org.opensearch.alerting.transport.AlertingSingleNodeTestCase
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.alerting.util.DocLevelMonitorQueries.Companion.INDEX_PATTERN_SUFFIX
import org.opensearch.alerting.workflow.CompositeWorkflowRunner
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest
import org.opensearch.commons.alerting.action.AlertingActions
Expand All @@ -54,9 +60,12 @@ import org.opensearch.commons.alerting.model.ScheduledJob.Companion.DOC_LEVEL_QU
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.Table
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.index.mapper.MapperService
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.MatchQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.query.TermQueryBuilder
import org.opensearch.rest.RestRequest
import org.opensearch.rest.RestStatus
import org.opensearch.script.Script
Expand Down Expand Up @@ -3336,7 +3345,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
monitorRunResults.inputResults.results[0]["hits"]
as kotlin.collections.Map<String, Any>
)
["total"] as kotlin.collections.Map<String, Any>
["total"] as kotlin.collections.Map<String, Any>
)["value"]
assertEquals(2, totalHits)
@Suppress("UNCHECKED_CAST")
Expand All @@ -3346,7 +3355,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
monitorRunResults.inputResults.results[0]["hits"]
as kotlin.collections.Map<String, Any>
)
["hits"] as List<kotlin.collections.Map<String, String>>
["hits"] as List<kotlin.collections.Map<String, String>>
)
.map { it["_id"]!! }
assertEquals(listOf("5", "6"), docIds.sorted())
Expand Down Expand Up @@ -3436,6 +3445,31 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
assertTrue("Findings saved for test monitor", relatedDocIds.containsAll(findingDocIds))
}

private fun getAuditStateAlerts(
alertsIndex: String? = AlertIndices.ALERT_INDEX,
monitorId: String,
executionId: String? = null,
): List<Alert> {
val searchRequest = SearchRequest(alertsIndex)
val boolQueryBuilder = QueryBuilders.boolQuery()
boolQueryBuilder.must(TermQueryBuilder("monitor_id", monitorId))
if (executionId.isNullOrEmpty() == false)
boolQueryBuilder.must(TermQueryBuilder("execution_id", executionId))
searchRequest.source().query(boolQueryBuilder)
val searchResponse = client().search(searchRequest).get()
return searchResponse.hits.map { hit ->
val xcp = XContentHelper.createParser(
xContentRegistry(), LoggingDeprecationHandler.INSTANCE,
hit.sourceRef, XContentType.JSON
)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
val alert = Alert.parse(xcp, hit.id, hit.version)
alert
}


}

private fun assertAlerts(
monitorId: String,
alertsIndex: String? = AlertIndices.ALERT_INDEX,
Expand Down Expand Up @@ -3476,6 +3510,12 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
assertEquals(alertSize, acknowledgeAlertResponse.acknowledged.size)
}

private fun assertAuditStateAlerts(
alerts: List<Alert>,
) {
alerts.forEach { Assert.assertEquals(it.state, Alert.State.AUDIT) }
}

fun `test execute workflow with bucket-level and doc-level chained monitors`() {
createTestIndex(TEST_HR_INDEX)

Expand Down Expand Up @@ -3595,9 +3635,11 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {

val monitorResponse2 = createMonitor(monitor2)!!
val andTrigger = randomChainedAlertTrigger(
name = "1And2",
condition = Script("monitor[id=${monitorResponse.id}] && monitor[id=${monitorResponse2.id}]")
)
val notTrigger = randomChainedAlertTrigger(
name = "Not1OrNot2",
condition = Script("!monitor[id=${monitorResponse.id}] || !monitor[id=${monitorResponse2.id}]")
)
var workflow = randomWorkflow(
Expand Down Expand Up @@ -3683,16 +3725,22 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
Assert.assertEquals(monitor2.name, monitorsRunResults[1].monitorName)
Assert.assertEquals(1, monitorsRunResults[1].triggerResults.size)

val getAlertsResponse = assertAlerts(
monitorResponse.id, executionId = executeWorkflowResponse.workflowRunResult.executionId, alertSize = 2
Assert.assertEquals(
monitor1.dataSources.alertsHistoryIndex,
CompositeWorkflowRunner.getDelegateMonitorAlertIndex(dataSources = monitor1.dataSources, workflow, true)
)
assertAcknowledges(getAlertsResponse.alerts, monitorResponse.id, 2)
val alerts = getAuditStateAlerts(
monitorId = monitorResponse.id, executionId = executeWorkflowResponse.workflowRunResult.executionId,
alertsIndex = monitor1.dataSources.alertsHistoryIndex
)
assertAuditStateAlerts(alerts)
assertFindings(monitorResponse.id, customFindingsIndex1, 2, 2, listOf("1", "2"))

val getAlertsResponse2 = assertAlerts(
monitorId = monitorResponse2.id, executionId = executeWorkflowResponse.workflowRunResult.executionId, alertSize = 1
val alerts1 = getAuditStateAlerts(
monitorId = monitorResponse2.id, executionId = executeWorkflowResponse.workflowRunResult.executionId,
alertsIndex = monitor2.dataSources.alertsHistoryIndex
)
assertAcknowledges(getAlertsResponse2.alerts, monitorResponse2.id, 1)
assertAuditStateAlerts(alerts1)
assertFindings(monitorResponse2.id, customFindingsIndex2, 1, 1, listOf("2"))
}

Expand Down
4 changes: 0 additions & 4 deletions build-tools/opensearchplugin-coverage.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,3 @@ jacocoTestReport {
xml.required = true // for coverlay
}
}

project.gradle.projectsEvaluated {
jacocoTestReport.dependsOn integTest
}

0 comments on commit f6a892b

Please sign in to comment.