Skip to content

Commit

Permalink
chained alert triggers
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Jul 6, 2023
1 parent 62d2524 commit 769f8a5
Show file tree
Hide file tree
Showing 48 changed files with 4,812 additions and 497 deletions.
5 changes: 0 additions & 5 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,4 @@ run {
useCluster testClusters.integTest
}

// Only apply jacoco test coverage if we are running a local single node cluster
if (!usingRemoteCluster && !usingMultiNode) {
apply from: '../build-tools/opensearchplugin-coverage.gradle'
}

apply from: '../build-tools/pkgbuild.gradle'
77 changes: 57 additions & 20 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.MAX_SEARCH_SIZE
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.unit.TimeValue
Expand All @@ -46,6 +48,7 @@ import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.NoOpTrigger
import org.opensearch.commons.alerting.model.Trigger
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
Expand Down Expand Up @@ -122,7 +125,9 @@ class AlertService(
fun composeQueryLevelAlert(
ctx: QueryLevelTriggerExecutionContext,
result: QueryLevelTriggerRunResult,
alertError: AlertError?
alertError: AlertError?,
executionId: String? = null,
workflorwRunContext: WorkflowRunContext?
): Alert? {
val currentTime = Instant.now()
val currentAlert = ctx.alert
Expand Down Expand Up @@ -163,26 +168,33 @@ class AlertService(
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
currentAlert?.copy(
state = Alert.State.COMPLETED, endTime = currentTime, errorMessage = null,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
state = Alert.State.COMPLETED,
endTime = currentTime,
errorMessage = null,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
} else if (currentAlert != null) {
val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
currentAlert.copy(
state = alertState, lastNotificationTime = currentTime, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
state = alertState,
lastNotificationTime = currentTime,
errorMessage = alertError?.message,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion,
)
} else {
val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
Alert(
monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = executionId,
workflowId = workflorwRunContext?.workflowId ?: ""
)
}
}
Expand All @@ -192,28 +204,50 @@ class AlertService(
findings: List<String>,
relatedDocIds: List<String>,
ctx: DocumentLevelTriggerExecutionContext,
alertError: AlertError?
alertError: AlertError?,
workflowExecutionId: String? = null,
workflorwRunContext: WorkflowRunContext?
): Alert {
val currentTime = Instant.now()

val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
return Alert(
id = UUID.randomUUID().toString(), monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
schemaVersion = IndexUtils.alertIndexSchemaVersion, findingIds = findings, relatedDocIds = relatedDocIds
schemaVersion = IndexUtils.alertIndexSchemaVersion, findingIds = findings, relatedDocIds = relatedDocIds,
executionId = workflowExecutionId, workflowId = workflorwRunContext?.workflowId ?: ""
)
}

fun composeMonitorErrorAlert(
id: String,
monitor: Monitor,
alertError: AlertError
alertError: AlertError,
executionId: String?
): Alert {
val currentTime = Instant.now()
return Alert(
id = id, monitor = monitor, trigger = NoOpTrigger(), startTime = currentTime,
lastNotificationTime = currentTime, state = Alert.State.ERROR, errorMessage = alertError?.message,
schemaVersion = IndexUtils.alertIndexSchemaVersion
schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = executionId
)
}

fun composeChainedAlert(
ctx: ChainedAlertTriggerExecutionContext,
executionId: String,
workflow: Workflow,
associatedAlertIds: List<String>
): Alert {
return Alert(
startTime = Instant.now(),
lastNotificationTime = Instant.now(),
state = Alert.State.ACTIVE,
errorMessage = null, schemaVersion = -1,
chainedAlertTrigger = ctx.trigger,
executionId = executionId,
workflow = workflow,
associatedAlertIds = associatedAlertIds
)
}

Expand Down Expand Up @@ -268,7 +302,9 @@ class AlertService(
trigger: BucketLevelTrigger,
currentAlerts: MutableMap<String, Alert>,
aggResultBuckets: List<AggregationResultBucket>,
findings: List<String>
findings: List<String>,
executionId: String? = null,
workflorwRunContext: WorkflowRunContext?
): Map<AlertCategory, List<Alert>> {
val dedupedAlerts = mutableListOf<Alert>()
val newAlerts = mutableListOf<Alert>()
Expand All @@ -289,7 +325,7 @@ class AlertService(
lastNotificationTime = currentTime, state = Alert.State.ACTIVE, errorMessage = null,
errorHistory = mutableListOf(), actionExecutionResults = mutableListOf(),
schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket,
findingIds = findings
findingIds = findings, executionId = executionId, workflowId = workflorwRunContext?.workflowId ?: ""
)
newAlerts.add(newAlert)
}
Expand All @@ -311,7 +347,7 @@ class AlertService(
} ?: listOf()
}

suspend fun upsertMonitorErrorAlert(monitor: Monitor, errorMessage: String) {
suspend fun upsertMonitorErrorAlert(monitor: Monitor, errorMessage: String, executionId: String?) {
val newErrorAlertId = "$ERROR_ALERT_ID_PREFIX-${monitor.id}-${UUID.randomUUID()}"

val searchRequest = SearchRequest(monitor.dataSources.alertsIndex)
Expand All @@ -326,7 +362,7 @@ class AlertService(
)
val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }

var alert = composeMonitorErrorAlert(newErrorAlertId, monitor, AlertError(Instant.now(), errorMessage))
var alert = composeMonitorErrorAlert(newErrorAlertId, monitor, AlertError(Instant.now(), errorMessage), executionId)

if (searchResponse.hits.totalHits.value > 0L) {
if (searchResponse.hits.totalHits.value > 1L) {
Expand Down Expand Up @@ -509,7 +545,8 @@ class AlertService(
dataSources: DataSources,
alerts: List<Alert>,
retryPolicy: BackoffPolicy,
allowUpdatingAcknowledgedAlert: Boolean = false
allowUpdatingAcknowledgedAlert: Boolean = false,
routing: String? = null // routing is mandatory and set as monitor id. for workflow chained alerts we pass workflow id as routing
) {
val alertsIndex = dataSources.alertsIndex
val alertsHistoryIndex = dataSources.alertsHistoryIndex
Expand All @@ -523,7 +560,7 @@ class AlertService(
Alert.State.ACTIVE, Alert.State.ERROR -> {
listOf<DocWriteRequest<*>>(
IndexRequest(alertsIndex)
.routing(alert.monitorId)
.routing(if (routing.isNullOrEmpty()) alert.monitorId else routing)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
Expand All @@ -534,7 +571,7 @@ class AlertService(
if (allowUpdatingAcknowledgedAlert) {
listOf<DocWriteRequest<*>>(
IndexRequest(alertsIndex)
.routing(alert.monitorId)
.routing(if (routing.isNullOrEmpty()) alert.monitorId else routing)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
Expand All @@ -560,7 +597,7 @@ class AlertService(
// Only add completed alert to history index if history is enabled
if (alertIndices.isAlertHistoryEnabled()) {
IndexRequest(alertsHistoryIndex)
.routing(alert.monitorId)
.routing(if (routing.isNullOrEmpty()) alert.monitorId else routing)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(alert.id)
} else null
Expand All @@ -572,7 +609,7 @@ class AlertService(
if (requestsToRetry.isEmpty()) return
// Retry Bulk requests if there was any 429 response
retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(requestsToRetry)
val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) }
val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed }
requestsToRetry = failedResponses.filter { it.status() == RestStatus.TOO_MANY_REQUESTS }
Expand Down
Loading

0 comments on commit 769f8a5

Please sign in to comment.