Skip to content

Commit

Permalink
add audit alerts in bucket level monitor
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Jul 10, 2023
1 parent ca71981 commit f851715
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 14 deletions.
33 changes: 23 additions & 10 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@ class AlertService(

private val logger = LogManager.getLogger(AlertService::class.java)

suspend fun loadCurrentAlertsForQueryLevelMonitor(monitor: Monitor): Map<Trigger, Alert?> {
suspend fun loadCurrentAlertsForQueryLevelMonitor(monitor: Monitor, workflowRunContext: WorkflowRunContext?): Map<Trigger, Alert?> {
val searchAlertsResponse: SearchResponse = searchAlerts(
monitor = monitor,
size = monitor.triggers.size * 2 // We expect there to be only a single in-progress alert so fetch 2 to check
size = monitor.triggers.size * 2, // We expect there to be only a single in-progress alert so fetch 2 to check
workflowRunContext
)

val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) }
Expand All @@ -102,11 +103,15 @@ class AlertService(
}
}

suspend fun loadCurrentAlertsForBucketLevelMonitor(monitor: Monitor): Map<Trigger, MutableMap<String, Alert>> {
suspend fun loadCurrentAlertsForBucketLevelMonitor(
monitor: Monitor,
workflowRunContext: WorkflowRunContext?,
): Map<Trigger, MutableMap<String, Alert>> {
val searchAlertsResponse: SearchResponse = searchAlerts(
monitor = monitor,
// TODO: This should be limited based on a circuit breaker that limits Alerts
size = MAX_BUCKET_LEVEL_MONITOR_ALERT_SEARCH_COUNT
size = MAX_BUCKET_LEVEL_MONITOR_ALERT_SEARCH_COUNT,
workflowRunContext = workflowRunContext
)

val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) }
Expand Down Expand Up @@ -335,9 +340,12 @@ class AlertService(
currentAlerts.remove(aggAlertBucket.getBucketKeysHash())
} else {
// New Alert
val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) {
Alert.State.AUDIT
} else Alert.State.ACTIVE
val newAlert = Alert(
monitor = monitor, trigger = trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = Alert.State.ACTIVE, errorMessage = null,
lastNotificationTime = currentTime, state = alertState, errorMessage = null,
errorHistory = mutableListOf(), actionExecutionResults = mutableListOf(),
schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket,
findingIds = findings, executionId = executionId, workflowId = workflorwRunContext?.workflowId ?: ""
Expand Down Expand Up @@ -658,13 +666,16 @@ class AlertService(
val savedAlerts = mutableListOf<Alert>()
var alertsBeingIndexed = alerts
var requestsToRetry: MutableList<IndexRequest> = alerts.map { alert ->
if (alert.state != Alert.State.ACTIVE) {
if (alert.state != Alert.State.ACTIVE && alert.state != Alert.State.AUDIT) {
throw IllegalStateException("Unexpected attempt to save new alert [$alert] with state [${alert.state}]")
}
if (alert.id != Alert.NO_ID) {
throw IllegalStateException("Unexpected attempt to save new alert [$alert] with an existing alert ID [${alert.id}]")
}
IndexRequest(dataSources.alertsIndex)
val alertIndex = if (alert.state == Alert.State.AUDIT && alertIndices.isAlertHistoryEnabled()) {
dataSources.alertsHistoryIndex
} else dataSources.alertsIndex
IndexRequest(alertIndex)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
}.toMutableList()
Expand Down Expand Up @@ -723,13 +734,15 @@ class AlertService(
* @param monitorId The Monitor to get Alerts for
* @param size The number of search hits (Alerts) to return
*/
private suspend fun searchAlerts(monitor: Monitor, size: Int): SearchResponse {
private suspend fun searchAlerts(monitor: Monitor, size: Int, workflowRunContext: WorkflowRunContext?): SearchResponse {
val monitorId = monitor.id
val alertIndex = monitor.dataSources.alertsIndex

val queryBuilder = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId))

.must(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId))
if (workflowRunContext != null) {
queryBuilder.must(QueryBuilders.termQuery(Alert.WORKFLOW_ID_FIELD, workflowRunContext.workflowId))
}
val searchSourceBuilder = SearchSourceBuilder()
.size(size)
.query(queryBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object BucketLevelMonitorRunner : MonitorRunner() {
if (monitor.dataSources.findingsEnabled == true) {
monitorCtx.alertIndices!!.createOrUpdateInitialFindingHistoryIndex(monitor.dataSources)
}
monitorCtx.alertService!!.loadCurrentAlertsForBucketLevelMonitor(monitor)
monitorCtx.alertService!!.loadCurrentAlertsForBucketLevelMonitor(monitor, workflowRunContext)
} catch (e: Exception) {
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
internal fun currentTime() = Instant.ofEpochMilli(monitorCtx.threadPool!!.absoluteTimeInMillis())

internal fun isActionActionable(action: Action, alert: Alert?): Boolean {
if (alert != null && alert.state == Alert.State.AUDIT)
return false
if (alert == null || action.throttle == null) {
return true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object QueryLevelMonitorRunner : MonitorRunner() {
val currentAlerts = try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources)
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources)
monitorCtx.alertService!!.loadCurrentAlertsForQueryLevelMonitor(monitor)
monitorCtx.alertService!!.loadCurrentAlertsForQueryLevelMonitor(monitor, workflowRunContext)
} catch (e: Exception) {
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import org.opensearch.alerting.DocumentLevelMonitorRunner
import org.opensearch.alerting.MonitorRunnerExecutionContext
import org.opensearch.alerting.QueryLevelMonitorRunner
import org.opensearch.alerting.WorkflowMetadataService
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.WorkflowRunResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,6 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
.string()
.let { StringEntity(it, APPLICATION_JSON) }


val response = client().makeRequest(
"POST", "${AlertingPlugin.WORKFLOW_BASE_URI}/$workflowId/_acknowledge/alerts",
emptyMap(), request
Expand Down

0 comments on commit f851715

Please sign in to comment.