Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport 2.x Adds chained alerts (#976) #1007

Merged
merged 2 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 88 additions & 31 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.util.IndexUtils
Expand All @@ -47,6 +48,7 @@
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.NoOpTrigger
import org.opensearch.commons.alerting.model.Trigger
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
Expand Down Expand Up @@ -81,10 +83,11 @@

private val logger = LogManager.getLogger(AlertService::class.java)

suspend fun loadCurrentAlertsForQueryLevelMonitor(monitor: Monitor): Map<Trigger, Alert?> {
suspend fun loadCurrentAlertsForQueryLevelMonitor(monitor: Monitor, workflowRunContext: WorkflowRunContext?): Map<Trigger, Alert?> {
val searchAlertsResponse: SearchResponse = searchAlerts(
monitor = monitor,
size = monitor.triggers.size * 2 // We expect there to be only a single in-progress alert so fetch 2 to check
size = monitor.triggers.size * 2, // We expect there to be only a single in-progress alert so fetch 2 to check
workflowRunContext
)

val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) }
Expand All @@ -100,11 +103,15 @@
}
}

suspend fun loadCurrentAlertsForBucketLevelMonitor(monitor: Monitor): Map<Trigger, MutableMap<String, Alert>> {
suspend fun loadCurrentAlertsForBucketLevelMonitor(
monitor: Monitor,
workflowRunContext: WorkflowRunContext?,
): Map<Trigger, MutableMap<String, Alert>> {
val searchAlertsResponse: SearchResponse = searchAlerts(
monitor = monitor,
// TODO: This should be limited based on a circuit breaker that limits Alerts
size = MAX_BUCKET_LEVEL_MONITOR_ALERT_SEARCH_COUNT
size = MAX_BUCKET_LEVEL_MONITOR_ALERT_SEARCH_COUNT,
workflowRunContext = workflowRunContext
)

val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) }
Expand All @@ -123,7 +130,9 @@
fun composeQueryLevelAlert(
ctx: QueryLevelTriggerExecutionContext,
result: QueryLevelTriggerRunResult,
alertError: AlertError?
alertError: AlertError?,
executionId: String,
workflorwRunContext: WorkflowRunContext?
): Alert? {
val currentTime = Instant.now()
val currentAlert = ctx.alert
Expand Down Expand Up @@ -181,15 +190,19 @@
errorMessage = alertError?.message,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
schemaVersion = IndexUtils.alertIndexSchemaVersion,
)
} else {
val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) {
Alert.State.AUDIT
} else if (alertError == null) Alert.State.ACTIVE
else Alert.State.ERROR
Alert(
monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = executionId,
workflowId = workflorwRunContext?.workflowId ?: ""
)
}
}
Expand All @@ -199,15 +212,24 @@
findings: List<String>,
relatedDocIds: List<String>,
ctx: DocumentLevelTriggerExecutionContext,
alertError: AlertError?
alertError: AlertError?,
executionId: String,
workflorwRunContext: WorkflowRunContext?
): Alert {
val currentTime = Instant.now()

val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) {
Alert.State.AUDIT
} else if (alertError == null) {
Alert.State.ACTIVE
} else {
Alert.State.ERROR

Check warning on line 226 in alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt

View check run for this annotation

Codecov / codecov/patch

alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt#L226

Added line #L226 was not covered by tests
}
return Alert(
id = UUID.randomUUID().toString(), monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
schemaVersion = IndexUtils.alertIndexSchemaVersion, findingIds = findings, relatedDocIds = relatedDocIds
schemaVersion = IndexUtils.alertIndexSchemaVersion, findingIds = findings, relatedDocIds = relatedDocIds,
executionId = executionId, workflowId = workflorwRunContext?.workflowId ?: ""
)
}

Expand All @@ -219,12 +241,33 @@
workflowRunContext: WorkflowRunContext?
): Alert {
val currentTime = Instant.now()
val alertState = if (workflowRunContext?.auditDelegateMonitorAlerts == true) {
Alert.State.AUDIT

Check warning on line 245 in alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt

View check run for this annotation

Codecov / codecov/patch

alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt#L245

Added line #L245 was not covered by tests
} else {
Alert.State.ERROR
}
return Alert(
id = id, monitor = monitor, trigger = NoOpTrigger(), startTime = currentTime,
lastNotificationTime = currentTime, state = Alert.State.ERROR, errorMessage = alertError?.message,
schemaVersion = IndexUtils.alertIndexSchemaVersion,
workflowId = workflowRunContext?.workflowId ?: "",
executionId = executionId ?: ""
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = executionId, workflowId = workflowRunContext?.workflowId ?: ""
)
}

fun composeChainedAlert(
ctx: ChainedAlertTriggerExecutionContext,
executionId: String,
workflow: Workflow,
associatedAlertIds: List<String>
): Alert {
return Alert(
startTime = Instant.now(),
lastNotificationTime = Instant.now(),
state = Alert.State.ACTIVE,
errorMessage = null, schemaVersion = -1,
chainedAlertTrigger = ctx.trigger,
executionId = executionId,
workflow = workflow,
associatedAlertIds = associatedAlertIds
)
}

Expand Down Expand Up @@ -279,7 +322,9 @@
trigger: BucketLevelTrigger,
currentAlerts: MutableMap<String, Alert>,
aggResultBuckets: List<AggregationResultBucket>,
findings: List<String>
findings: List<String>,
executionId: String,
workflorwRunContext: WorkflowRunContext?
): Map<AlertCategory, List<Alert>> {
val dedupedAlerts = mutableListOf<Alert>()
val newAlerts = mutableListOf<Alert>()
Expand All @@ -295,12 +340,15 @@
currentAlerts.remove(aggAlertBucket.getBucketKeysHash())
} else {
// New Alert
val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) {
Alert.State.AUDIT
} else Alert.State.ACTIVE
val newAlert = Alert(
monitor = monitor, trigger = trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = Alert.State.ACTIVE, errorMessage = null,
lastNotificationTime = currentTime, state = alertState, errorMessage = null,
errorHistory = mutableListOf(), actionExecutionResults = mutableListOf(),
schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket,
findingIds = findings
findingIds = findings, executionId = executionId, workflowId = workflorwRunContext?.workflowId ?: ""
)
newAlerts.add(newAlert)
}
Expand Down Expand Up @@ -528,7 +576,8 @@
dataSources: DataSources,
alerts: List<Alert>,
retryPolicy: BackoffPolicy,
allowUpdatingAcknowledgedAlert: Boolean = false
allowUpdatingAcknowledgedAlert: Boolean = false,
routingId: String // routing is mandatory and set as monitor id. for workflow chained alerts we pass workflow id as routing
) {
val alertsIndex = dataSources.alertsIndex
val alertsHistoryIndex = dataSources.alertsHistoryIndex
Expand All @@ -542,7 +591,7 @@
Alert.State.ACTIVE, Alert.State.ERROR -> {
listOf<DocWriteRequest<*>>(
IndexRequest(alertsIndex)
.routing(alert.monitorId)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
Expand All @@ -553,7 +602,7 @@
if (allowUpdatingAcknowledgedAlert) {
listOf<DocWriteRequest<*>>(
IndexRequest(alertsIndex)
.routing(alert.monitorId)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
Expand All @@ -562,9 +611,12 @@
}
}
Alert.State.AUDIT -> {
val index = if (alertIndices.isAlertHistoryEnabled()) {
dataSources.alertsHistoryIndex
} else dataSources.alertsIndex

Check warning on line 616 in alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt

View check run for this annotation

Codecov / codecov/patch

alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt#L616

Added line #L616 was not covered by tests
listOf<DocWriteRequest<*>>(
IndexRequest(alertsIndex)
.routing(alert.monitorId)
IndexRequest(index)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
Expand All @@ -575,11 +627,11 @@
Alert.State.COMPLETED -> {
listOfNotNull<DocWriteRequest<*>>(
DeleteRequest(alertsIndex, alert.id)
.routing(alert.monitorId),
.routing(routingId),
// Only add completed alert to history index if history is enabled
if (alertIndices.isAlertHistoryEnabled()) {
IndexRequest(alertsHistoryIndex)
.routing(alert.monitorId)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(alert.id)
} else null
Expand All @@ -591,7 +643,7 @@
if (requestsToRetry.isEmpty()) return
// Retry Bulk requests if there was any 429 response
retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(requestsToRetry)
val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) }
val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed }
requestsToRetry = failedResponses.filter { it.status() == RestStatus.TOO_MANY_REQUESTS }
Expand All @@ -616,13 +668,16 @@
val savedAlerts = mutableListOf<Alert>()
var alertsBeingIndexed = alerts
var requestsToRetry: MutableList<IndexRequest> = alerts.map { alert ->
if (alert.state != Alert.State.ACTIVE) {
if (alert.state != Alert.State.ACTIVE && alert.state != Alert.State.AUDIT) {
throw IllegalStateException("Unexpected attempt to save new alert [$alert] with state [${alert.state}]")
}
if (alert.id != Alert.NO_ID) {
throw IllegalStateException("Unexpected attempt to save new alert [$alert] with an existing alert ID [${alert.id}]")
}
IndexRequest(dataSources.alertsIndex)
val alertIndex = if (alert.state == Alert.State.AUDIT && alertIndices.isAlertHistoryEnabled()) {
dataSources.alertsHistoryIndex
} else dataSources.alertsIndex
IndexRequest(alertIndex)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
}.toMutableList()
Expand Down Expand Up @@ -683,13 +738,15 @@
* @param monitorId The Monitor to get Alerts for
* @param size The number of search hits (Alerts) to return
*/
private suspend fun searchAlerts(monitor: Monitor, size: Int): SearchResponse {
private suspend fun searchAlerts(monitor: Monitor, size: Int, workflowRunContext: WorkflowRunContext?): SearchResponse {
val monitorId = monitor.id
val alertIndex = monitor.dataSources.alertsIndex

val queryBuilder = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId))

.must(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId))
if (workflowRunContext != null) {
queryBuilder.must(QueryBuilders.termQuery(Alert.WORKFLOW_ID_FIELD, workflowRunContext.workflowId))
}
val searchSourceBuilder = SearchSourceBuilder()
.size(size)
.query(queryBuilder)
Expand Down
Loading
Loading