Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds chained alerts #976

Merged
merged 29 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
fb4084c
chained alert triggers
eirsep Jul 5, 2023
0b58afb
converge all single node test cases
eirsep Jul 6, 2023
322248f
add license headers to files
eirsep Jul 6, 2023
d31eed1
fix workflow not found issue
eirsep Jul 7, 2023
da09726
added audit state alerts for doc level monitors
eirsep Jul 7, 2023
1aa5d2e
add audit alerts in query level monitor
eirsep Jul 8, 2023
c7c34c1
temp: upload custom built common utils jar
eirsep Jul 9, 2023
1ef52d1
fix get monitor response parsing to include associated_workflows
eirsep Jul 10, 2023
ef76f3d
add query level monitor audit alerts tests
eirsep Jul 10, 2023
8fd8e3e
add audit alerts in bucket level monitor
eirsep Jul 10, 2023
e1de5cd
fix workflow tests
eirsep Jul 10, 2023
493b1cd
alerting
eirsep Jul 10, 2023
5b430d6
verify bucket monitor audit alerts and chained alerts in workflow
eirsep Jul 10, 2023
b179cb9
make execution id mandatory
eirsep Jul 10, 2023
0620bed
revert mapping update in run job method
eirsep Jul 10, 2023
ecdf928
minor fixes in chained alert trigger result
eirsep Jul 10, 2023
de0a37f
fix chained alert triggers tests
eirsep Jul 10, 2023
9110056
fix acknowledge chained alert bug
eirsep Jul 10, 2023
5230fcc
revert get alerts change
eirsep Jul 11, 2023
1b99ff6
refactor and remove transport actions being invoked in other transpor…
eirsep Jul 11, 2023
9853ef7
add license header
eirsep Jul 11, 2023
643c14e
scheduled job mapping schema
eirsep Jul 11, 2023
b2af473
fix ktlint and revert gradle dev set up chanegs
eirsep Jul 11, 2023
54b9469
fix post delete method and refactor alert mover to add class level lo…
eirsep Jul 11, 2023
50fcc3f
fix test - pass workflow id in get alerts
eirsep Jul 11, 2023
a6b0ba9
remove monitor empty filter in get alerts api as there is dedicated a…
eirsep Jul 11, 2023
b2da49b
fix check for workflow id is empty or null in get alerts action
eirsep Jul 11, 2023
00bead0
fix alert mover method delegate monitor parsing logic
eirsep Jul 11, 2023
f509a74
remove common utils jar from repo
eirsep Jul 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 98 additions & 35 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@ import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestAcknowledgeChainedAlertAction
import org.opensearch.alerting.resthandler.RestDeleteMonitorAction
import org.opensearch.alerting.resthandler.RestDeleteWorkflowAction
import org.opensearch.alerting.resthandler.RestExecuteMonitorAction
import org.opensearch.alerting.resthandler.RestExecuteWorkflowAction
import org.opensearch.alerting.resthandler.RestGetAlertsAction
import org.opensearch.alerting.resthandler.RestGetDestinationsAction
import org.opensearch.alerting.resthandler.RestGetEmailAccountAction
import org.opensearch.alerting.resthandler.RestGetEmailGroupAction
import org.opensearch.alerting.resthandler.RestGetFindingsAction
import org.opensearch.alerting.resthandler.RestGetMonitorAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAlertsAction
import org.opensearch.alerting.resthandler.RestIndexMonitorAction
import org.opensearch.alerting.resthandler.RestIndexWorkflowAction
import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
Expand All @@ -48,6 +51,7 @@ import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
import org.opensearch.alerting.transport.TransportDeleteWorkflowAction
import org.opensearch.alerting.transport.TransportExecuteMonitorAction
Expand All @@ -59,14 +63,14 @@ import org.opensearch.alerting.transport.TransportGetEmailGroupAction
import org.opensearch.alerting.transport.TransportGetFindingsSearchAction
import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportGetWorkflowAction
import org.opensearch.alerting.transport.TransportGetWorkflowAlertsAction
import org.opensearch.alerting.transport.TransportIndexMonitorAction
import org.opensearch.alerting.transport.TransportIndexWorkflowAction
import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
import org.opensearch.alerting.transport.TransportSearchMonitorAction
import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.alerting.util.destinationmigration.DestinationMigrationCoordinator
import org.opensearch.alerting.workflow.WorkflowRunnerService
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.node.DiscoveryNodes
Expand All @@ -81,6 +85,7 @@ import org.opensearch.common.settings.SettingsFilter
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.ChainedAlertTrigger
import org.opensearch.commons.alerting.model.ClusterMetricsInput
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocumentLevelTrigger
Expand Down Expand Up @@ -137,11 +142,11 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val LEGACY_OPENDISTRO_EMAIL_ACCOUNT_BASE_URI = "$LEGACY_OPENDISTRO_DESTINATION_BASE_URI/email_accounts"
@JvmField val LEGACY_OPENDISTRO_EMAIL_GROUP_BASE_URI = "$LEGACY_OPENDISTRO_DESTINATION_BASE_URI/email_groups"
@JvmField val FINDING_BASE_URI = "/_plugins/_alerting/findings"
@JvmField val ALERTING_JOB_TYPES = listOf("monitor")

@JvmField val ALERTING_JOB_TYPES = listOf("monitor", "workflow")
}

lateinit var runner: MonitorRunnerService
lateinit var workflowRunner: WorkflowRunnerService
lateinit var scheduler: JobScheduler
lateinit var sweeper: JobSweeper
lateinit var scheduledJobIndices: ScheduledJobIndices
Expand All @@ -167,14 +172,17 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestIndexWorkflowAction(),
RestSearchMonitorAction(settings, clusterService),
RestExecuteMonitorAction(),
RestExecuteWorkflowAction(),
RestAcknowledgeAlertAction(),
RestAcknowledgeChainedAlertAction(),
RestScheduledJobStatsHandler("_alerting"),
RestSearchEmailAccountAction(),
RestGetEmailAccountAction(),
RestSearchEmailGroupAction(),
RestGetEmailGroupAction(),
RestGetDestinationsAction(),
RestGetAlertsAction(),
RestGetWorkflowAlertsAction(),
RestGetFindingsAction(),
RestGetWorkflowAction(),
RestDeleteWorkflowAction()
Expand All @@ -190,12 +198,16 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(SearchMonitorAction.INSTANCE, TransportSearchMonitorAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_MONITOR_ACTION_TYPE, TransportDeleteMonitorAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.ACKNOWLEDGE_ALERTS_ACTION_TYPE, TransportAcknowledgeAlertAction::class.java),
ActionPlugin.ActionHandler(
AlertingActions.ACKNOWLEDGE_CHAINED_ALERTS_ACTION_TYPE, TransportAcknowledgeChainedAlertAction::class.java
),
ActionPlugin.ActionHandler(GetEmailAccountAction.INSTANCE, TransportGetEmailAccountAction::class.java),
ActionPlugin.ActionHandler(SearchEmailAccountAction.INSTANCE, TransportSearchEmailAccountAction::class.java),
ActionPlugin.ActionHandler(GetEmailGroupAction.INSTANCE, TransportGetEmailGroupAction::class.java),
ActionPlugin.ActionHandler(SearchEmailGroupAction.INSTANCE, TransportSearchEmailGroupAction::class.java),
ActionPlugin.ActionHandler(GetDestinationsAction.INSTANCE, TransportGetDestinationsAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_ALERTS_ACTION_TYPE, TransportGetAlertsAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ALERTS_ACTION_TYPE, TransportGetWorkflowAlertsAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_FINDINGS_ACTION_TYPE, TransportGetFindingsSearchAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java),
Expand All @@ -213,6 +225,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
BucketLevelTrigger.XCONTENT_REGISTRY,
ClusterMetricsInput.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY,
ChainedAlertTrigger.XCONTENT_REGISTRY,
Workflow.XCONTENT_REGISTRY
)
}
Expand Down Expand Up @@ -246,21 +259,6 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerTriggerService(TriggerService(scriptService))
.registerAlertService(AlertService(client, xContentRegistry, alertIndices))
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
.registerConsumers()
.registerDestinationSettings()
workflowRunner = WorkflowRunnerService
.registerClusterService(clusterService)
.registerClient(client)
.registerNamedXContentRegistry(xContentRegistry)
.registerScriptService(scriptService)
.registerIndexNameExpressionResolver(indexNameExpressionResolver)
.registerSettings(settings)
.registerThreadPool(threadPool)
.registerAlertIndices(alertIndices)
.registerInputService(InputService(client, scriptService, namedWriteableRegistry, xContentRegistry, clusterService, settings))
.registerTriggerService(TriggerService(scriptService))
.registerAlertService(AlertService(client, xContentRegistry, alertIndices))
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
.registerWorkflowService(WorkflowService(client, xContentRegistry))
.registerConsumers()
.registerDestinationSettings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
periodStart: Instant,
periodEnd: Instant,
dryrun: Boolean,
workflowRunContext: WorkflowRunContext?
workflowRunContext: WorkflowRunContext?,
executionId: String
): MonitorRunResult<BucketLevelTriggerRunResult> {
val roles = MonitorRunnerService.getRolesForMonitor(monitor)
logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}")
Expand All @@ -77,7 +78,7 @@ object BucketLevelMonitorRunner : MonitorRunner() {
if (monitor.dataSources.findingsEnabled == true) {
monitorCtx.alertIndices!!.createOrUpdateInitialFindingHistoryIndex(monitor.dataSources)
}
monitorCtx.alertService!!.loadCurrentAlertsForBucketLevelMonitor(monitor)
monitorCtx.alertService!!.loadCurrentAlertsForBucketLevelMonitor(monitor, workflowRunContext)
} catch (e: Exception) {
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id
Expand Down Expand Up @@ -158,15 +159,21 @@ object BucketLevelMonitorRunner : MonitorRunner() {
periodStart,
periodEnd,
!dryrun && monitor.id != Monitor.NO_ID,
workflowRunContext
executionId
)
} else {
emptyList()
}
// TODO: Should triggerResult's aggregationResultBucket be a list? If not, getCategorizedAlertsForBucketLevelMonitor can
// be refactored to use a map instead
val categorizedAlerts = monitorCtx.alertService!!.getCategorizedAlertsForBucketLevelMonitor(
monitor, trigger, currentAlertsForTrigger, triggerResult.aggregationResultBuckets.values.toList(), findings
monitor,
trigger,
currentAlertsForTrigger,
triggerResult.aggregationResultBuckets.values.toList(),
findings,
executionId,
workflowRunContext
).toMutableMap()
val dedupedAlerts = categorizedAlerts.getOrDefault(AlertCategory.DEDUPED, emptyList())
var newAlerts = categorizedAlerts.getOrDefault(AlertCategory.NEW, emptyList())
Expand All @@ -182,7 +189,11 @@ object BucketLevelMonitorRunner : MonitorRunner() {
*/
if (!dryrun && monitor.id != Monitor.NO_ID) {
monitorCtx.alertService!!.saveAlerts(
monitor.dataSources, dedupedAlerts, monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = true
monitor.dataSources,
dedupedAlerts,
monitorCtx.retryPolicy!!,
allowUpdatingAcknowledgedAlert = true,
monitor.id
)
newAlerts = monitorCtx.alertService!!.saveNewAlerts(monitor.dataSources, newAlerts, monitorCtx.retryPolicy!!)
}
Expand Down Expand Up @@ -318,14 +329,16 @@ object BucketLevelMonitorRunner : MonitorRunner() {
// ACKNOWLEDGED Alerts should not be saved here since actions are not executed for them.
if (!dryrun && monitor.id != Monitor.NO_ID) {
monitorCtx.alertService!!.saveAlerts(
monitor.dataSources, updatedAlerts, monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = false
monitor.dataSources, updatedAlerts, monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = false,
routingId = monitor.id
)
// Save any COMPLETED Alerts that were not covered in updatedAlerts
monitorCtx.alertService!!.saveAlerts(
monitor.dataSources,
completedAlertsToUpdate.toList(),
monitorCtx.retryPolicy!!,
allowUpdatingAcknowledgedAlert = false
allowUpdatingAcknowledgedAlert = false,
monitor.id
)
}
}
Expand All @@ -340,7 +353,7 @@ object BucketLevelMonitorRunner : MonitorRunner() {
periodStart: Instant,
periodEnd: Instant,
shouldCreateFinding: Boolean,
workflowRunContext: WorkflowRunContext? = null
executionId: String,
): List<String> {
monitor.inputs.forEach { input ->
if (input is SearchInput) {
Expand Down Expand Up @@ -397,7 +410,7 @@ object BucketLevelMonitorRunner : MonitorRunner() {
sr.source().query(queryBuilder)
}
val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) }
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding, workflowRunContext?.executionId)
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding, executionId)
} else {
logger.error("Couldn't resolve groupBy field. Not generating bucket level monitor findings for monitor %${monitor.id}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
periodStart: Instant,
periodEnd: Instant,
dryrun: Boolean,
workflowRunContext: WorkflowRunContext?
workflowRunContext: WorkflowRunContext?,
executionId: String
): MonitorRunResult<DocumentLevelTriggerRunResult> {
logger.debug("Document-level-monitor is running ...")
val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
Expand Down Expand Up @@ -239,7 +240,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docsToQueries,
queryToDocIds,
dryrun,
workflowRunContext?.executionId
executionId = executionId,
workflowRunContext = workflowRunContext
)
}
}
Expand All @@ -251,7 +253,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx.alertService!!.upsertMonitorErrorAlert(
monitor = monitor,
errorMessage = errorMessage,
executionId = workflowRunContext?.executionId,
executionId = executionId,
workflowRunContext
)
} else {
Expand All @@ -268,7 +270,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return monitorResult.copy(triggerResults = triggerResults)
} catch (e: Exception) {
val errorMessage = ExceptionsHelper.detailedMessage(e)
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage, workflowRunContext?.executionId, workflowRunContext)
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage, executionId, workflowRunContext)
logger.error("Failed running Document-level-monitor ${monitor.name}", e)
val alertingException = AlertingException(
errorMessage,
Expand Down Expand Up @@ -317,7 +319,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docsToQueries: Map<String, List<String>>,
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean,
workflowExecutionId: String? = null
workflowRunContext: WorkflowRunContext?,
executionId: String
): DocumentLevelTriggerRunResult {
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds)
Expand All @@ -334,7 +337,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
triggeredQueries,
it.key,
!dryrun && monitor.id != Monitor.NO_ID,
workflowExecutionId
executionId
)
findings.add(findingId)

Expand All @@ -355,7 +358,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
listOf(it.first),
listOf(it.second),
triggerCtx,
monitorResult.alertError() ?: triggerResult.alertError()
monitorResult.alertError() ?: triggerResult.alertError(),
executionId = executionId,
workflorwRunContext = workflowRunContext
)
alerts.add(alert)
}
Expand Down Expand Up @@ -395,7 +400,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
alert.copy(actionExecutionResults = actionExecutionResults)
}

monitorCtx.retryPolicy?.let { monitorCtx.alertService!!.saveAlerts(monitor.dataSources, updatedAlerts, it) }
monitorCtx.retryPolicy?.let {
monitorCtx.alertService!!.saveAlerts(
monitor.dataSources,
updatedAlerts,
it,
routingId = monitor.id
)
}
}
return triggerResult
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ abstract class MonitorRunner {
periodStart: Instant,
periodEnd: Instant,
dryRun: Boolean,
workflowRunContext: WorkflowRunContext? = null
workflowRunContext: WorkflowRunContext? = null,
executionId: String
): MonitorRunResult<*>

suspend fun runAction(
Expand Down
Loading
Loading