diff --git a/alertobserver/alertobserver.go b/alertobserver/alertobserver.go new file mode 100644 index 0000000000..cf05d9210c --- /dev/null +++ b/alertobserver/alertobserver.go @@ -0,0 +1,36 @@ +// Copyright 2023 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package alertobserver + +import ( + "github.com/prometheus/alertmanager/types" +) + +const ( + EventAlertReceived string = "received" + EventAlertRejected string = "rejected" + EventAlertAddedToAggrGroup string = "addedAggrGroup" + EventAlertFailedAddToAggrGroup string = "failedAddAggrGroup" + EventAlertPipelineStart string = "pipelineStart" + EventAlertPipelinePassStage string = "pipelinePassStage" + EventAlertMuted string = "muted" + EventAlertSent string = "sent" + EventAlertSendFailed string = "sendFailed" +) + +type AlertEventMeta map[string]interface{} + +type LifeCycleObserver interface { + Observe(event string, alerts []*types.Alert, meta AlertEventMeta) +} diff --git a/alertobserver/testing.go b/alertobserver/testing.go new file mode 100644 index 0000000000..66f774fbb7 --- /dev/null +++ b/alertobserver/testing.go @@ -0,0 +1,46 @@ +// Copyright 2023 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package alertobserver + +import ( + "sync" + + "github.com/prometheus/alertmanager/types" +) + +type FakeLifeCycleObserver struct { + AlertsPerEvent map[string][]*types.Alert + PipelineStageAlerts map[string][]*types.Alert + MetaPerEvent map[string][]AlertEventMeta + Mtx sync.RWMutex +} + +func (o *FakeLifeCycleObserver) Observe(event string, alerts []*types.Alert, meta AlertEventMeta) { + o.Mtx.Lock() + defer o.Mtx.Unlock() + if event == EventAlertPipelinePassStage { + o.PipelineStageAlerts[meta["stageName"].(string)] = append(o.PipelineStageAlerts[meta["stageName"].(string)], alerts...) + } else { + o.AlertsPerEvent[event] = append(o.AlertsPerEvent[event], alerts...) + } + o.MetaPerEvent[event] = append(o.MetaPerEvent[event], meta) +} + +func NewFakeLifeCycleObserver() *FakeLifeCycleObserver { + return &FakeLifeCycleObserver{ + PipelineStageAlerts: map[string][]*types.Alert{}, + AlertsPerEvent: map[string][]*types.Alert{}, + MetaPerEvent: map[string][]AlertEventMeta{}, + } +} diff --git a/api/api.go b/api/api.go index 1c03463b60..ee5ff024b8 100644 --- a/api/api.go +++ b/api/api.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/common/route" + "github.com/prometheus/alertmanager/alertobserver" apiv1 "github.com/prometheus/alertmanager/api/v1" apiv2 "github.com/prometheus/alertmanager/api/v2" "github.com/prometheus/alertmanager/cluster" @@ -81,6 +82,9 @@ type Options struct { GroupInfoFunc func(func(*dispatch.Route) bool) dispatch.AlertGroupInfos // APICallback define the callback function that each api call will perform before returned. APICallback callback.Callback + // AlertLCObserver is used to add hooks to the different alert life cycle events. + // If nil then no observer methods will be invoked in the life cycle events. + AlertLCObserver alertobserver.LifeCycleObserver } func (o Options) validate() error { @@ -124,6 +128,7 @@ func New(opts Options) (*API, error) { opts.Peer, log.With(l, "version", "v1"), opts.Registry, + opts.AlertLCObserver, ) v2, err := apiv2.NewAPI( @@ -136,6 +141,7 @@ func New(opts Options) (*API, error) { opts.Peer, log.With(l, "version", "v2"), opts.Registry, + opts.AlertLCObserver, ) if err != nil { return nil, err diff --git a/api/v1/api.go b/api/v1/api.go index 39018a7589..d8b4ce0a19 100644 --- a/api/v1/api.go +++ b/api/v1/api.go @@ -30,6 +30,7 @@ import ( "github.com/prometheus/common/route" "github.com/prometheus/common/version" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/api/metrics" "github.com/prometheus/alertmanager/cluster" "github.com/prometheus/alertmanager/config" @@ -67,14 +68,15 @@ func setCORS(w http.ResponseWriter) { // API provides registration of handlers for API routes. type API struct { - alerts provider.Alerts - silences *silence.Silences - config *config.Config - route *dispatch.Route - uptime time.Time - peer cluster.ClusterPeer - logger log.Logger - m *metrics.Alerts + alerts provider.Alerts + silences *silence.Silences + config *config.Config + route *dispatch.Route + uptime time.Time + peer cluster.ClusterPeer + logger log.Logger + m *metrics.Alerts + alertLCObserver alertobserver.LifeCycleObserver getAlertStatus getAlertStatusFn @@ -91,19 +93,21 @@ func New( peer cluster.ClusterPeer, l log.Logger, r prometheus.Registerer, + o alertobserver.LifeCycleObserver, ) *API { if l == nil { l = log.NewNopLogger() } return &API{ - alerts: alerts, - silences: silences, - getAlertStatus: sf, - uptime: time.Now(), - peer: peer, - logger: l, - m: metrics.NewAlerts("v1", r), + alerts: alerts, + silences: silences, + getAlertStatus: sf, + uptime: time.Now(), + peer: peer, + logger: l, + m: metrics.NewAlerts("v1", r), + alertLCObserver: o, } } @@ -447,6 +451,10 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...* if err := a.Validate(); err != nil { validationErrs.Add(err) api.m.Invalid().Inc() + if api.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{"msg": err.Error()} + api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m) + } continue } validAlerts = append(validAlerts, a) @@ -456,8 +464,15 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...* typ: errorInternal, err: err, }, nil) + if api.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{"msg": err.Error()} + api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m) + } return } + if api.alertLCObserver != nil { + api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, alertobserver.AlertEventMeta{}) + } if validationErrs.Len() > 0 { api.respondError(w, apiError{ diff --git a/api/v1/api_test.go b/api/v1/api_test.go index 84315ef394..8310c78164 100644 --- a/api/v1/api_test.go +++ b/api/v1/api_test.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/dispatch" "github.com/prometheus/alertmanager/pkg/labels" @@ -134,7 +135,7 @@ func TestAddAlerts(t *testing.T) { } alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err) - api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil) + api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, nil) defaultGlobalConfig := config.DefaultGlobalConfig() route := config.Route{} api.Update(&config.Config{ @@ -153,6 +154,74 @@ func TestAddAlerts(t *testing.T) { body, _ := io.ReadAll(res.Body) require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, StartsAt %v, EndsAt %v, Response: %s", i, tc.start, tc.end, string(body))) + + observer := alertobserver.NewFakeLifeCycleObserver() + api.alertLCObserver = observer + r, err = http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b)) + w = httptest.NewRecorder() + if err != nil { + t.Errorf("Unexpected error %v", err) + } + api.addAlerts(w, r) + if tc.code == 200 { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), alerts[0].Fingerprint()) + } else { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), alerts[0].Fingerprint()) + } + } +} + +func TestAddAlertsWithAlertLCObserver(t *testing.T) { + now := func(offset int) time.Time { + return time.Now().Add(time.Duration(offset) * time.Second) + } + + for i, tc := range []struct { + start, end time.Time + err bool + code int + }{ + {time.Time{}, time.Time{}, false, 200}, + {now(1), now(0), false, 400}, + {now(0), time.Time{}, true, 500}, + } { + alerts := []model.Alert{{ + StartsAt: tc.start, + EndsAt: tc.end, + Labels: model.LabelSet{"label1": "test1"}, + Annotations: model.LabelSet{"annotation1": "some text"}, + }} + b, err := json.Marshal(&alerts) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err) + observer := alertobserver.NewFakeLifeCycleObserver() + api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, observer) + defaultGlobalConfig := config.DefaultGlobalConfig() + route := config.Route{} + api.Update(&config.Config{ + Global: &defaultGlobalConfig, + Route: &route, + }) + + r, err := http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b)) + w := httptest.NewRecorder() + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + api.addAlerts(w, r) + res := w.Result() + body, _ := io.ReadAll(res.Body) + + require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, StartsAt %v, EndsAt %v, Response: %s", i, tc.start, tc.end, string(body))) + if tc.code == 200 { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), alerts[0].Fingerprint()) + } else { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), alerts[0].Fingerprint()) + } } } @@ -267,7 +336,7 @@ func TestListAlerts(t *testing.T) { }, } { alertsProvider := newFakeAlerts(alerts, tc.err) - api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil) + api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, nil) api.route = dispatch.NewRoute(&config.Route{Receiver: "def-receiver"}, nil) r, err := http.NewRequest("GET", "/api/v1/alerts", nil) diff --git a/api/v2/api.go b/api/v2/api.go index 6f3a8b9275..fb086ae821 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -37,6 +37,7 @@ import ( alertgroupinfolist_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alertgroupinfolist" "github.com/prometheus/alertmanager/util/callback" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/api/metrics" open_api_models "github.com/prometheus/alertmanager/api/v2/models" "github.com/prometheus/alertmanager/api/v2/restapi" @@ -76,8 +77,9 @@ type API struct { route *dispatch.Route setAlertStatus setAlertStatusFn - logger log.Logger - m *metrics.Alerts + logger log.Logger + m *metrics.Alerts + alertLCObserver alertobserver.LifeCycleObserver Handler http.Handler } @@ -100,6 +102,7 @@ func NewAPI( peer cluster.ClusterPeer, l log.Logger, r prometheus.Registerer, + o alertobserver.LifeCycleObserver, ) (*API, error) { if apiCallback == nil { apiCallback = callback.NoopAPICallback{} @@ -115,6 +118,7 @@ func NewAPI( logger: l, m: metrics.NewAlerts("v2", r), uptime: time.Now(), + alertLCObserver: o, } // Load embedded swagger file. @@ -402,12 +406,20 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware. if err := a.Validate(); err != nil { validationErrs.Add(err) api.m.Invalid().Inc() + if api.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{"msg": err.Error()} + api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m) + } continue } validAlerts = append(validAlerts, a) } if err := api.alerts.Put(validAlerts...); err != nil { level.Error(logger).Log("msg", "Failed to create alerts", "err", err) + if api.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{"msg": err.Error()} + api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m) + } return alert_ops.NewPostAlertsInternalServerError().WithPayload(err.Error()) } @@ -415,6 +427,9 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware. level.Error(logger).Log("msg", "Failed to validate alerts", "err", validationErrs.Error()) return alert_ops.NewPostAlertsBadRequest().WithPayload(validationErrs.Error()) } + if api.alertLCObserver != nil { + api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, alertobserver.AlertEventMeta{}) + } return alert_ops.NewPostAlertsOK() } diff --git a/api/v2/api_test.go b/api/v2/api_test.go index 794a1219c1..c95da7360b 100644 --- a/api/v2/api_test.go +++ b/api/v2/api_test.go @@ -17,6 +17,7 @@ import ( "bytes" "encoding/json" "fmt" + "io" "net/http" "net/http/httptest" @@ -29,6 +30,8 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/prometheus/alertmanager/alertobserver" + "github.com/prometheus/alertmanager/api/metrics" alert_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alert" alertgroup_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alertgroup" alertgroupinfolist_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alertgroupinfolist" @@ -1123,6 +1126,67 @@ func TestListAlertInfosHandler(t *testing.T) { } } +func TestPostAlertHandler(t *testing.T) { + now := time.Now() + for i, tc := range []struct { + start, end time.Time + err bool + code int + }{ + {time.Time{}, time.Time{}, false, 200}, + {now, time.Time{}, false, 200}, + {time.Time{}, now.Add(time.Duration(-1) * time.Second), false, 200}, + {time.Time{}, now, false, 200}, + {time.Time{}, now.Add(time.Duration(1) * time.Second), false, 200}, + {now.Add(time.Duration(-2) * time.Second), now.Add(time.Duration(-1) * time.Second), false, 200}, + {now.Add(time.Duration(1) * time.Second), now.Add(time.Duration(2) * time.Second), false, 200}, + {now.Add(time.Duration(1) * time.Second), now, false, 400}, + } { + alerts, alertsBytes := createAlert(t, tc.start, tc.end) + api := API{ + uptime: time.Now(), + alerts: newFakeAlerts([]*types.Alert{}), + logger: log.NewNopLogger(), + m: metrics.NewAlerts("v2", nil), + } + api.Update(&config.Config{ + Global: &config.GlobalConfig{ + ResolveTimeout: model.Duration(5), + }, + Route: &config.Route{}, + }, nil) + + r, err := http.NewRequest("POST", "/api/v2/alerts", bytes.NewReader(alertsBytes)) + require.NoError(t, err) + + w := httptest.NewRecorder() + p := runtime.TextProducer() + responder := api.postAlertsHandler(alert_ops.PostAlertsParams{ + HTTPRequest: r, + Alerts: alerts, + }) + responder.WriteResponse(w, p) + body, _ := io.ReadAll(w.Result().Body) + + require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, response: %s", i, string(body))) + + observer := alertobserver.NewFakeLifeCycleObserver() + api.alertLCObserver = observer + r, err = http.NewRequest("POST", "/api/v2/alerts", bytes.NewReader(alertsBytes)) + require.NoError(t, err) + api.postAlertsHandler(alert_ops.PostAlertsParams{ + HTTPRequest: r, + Alerts: alerts, + }) + amAlert := OpenAPIAlertsToAlerts(alerts) + if tc.code == 200 { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), amAlert[0].Fingerprint()) + } else { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), amAlert[0].Fingerprint()) + } + } +} + type limitNumberOfAlertsReturnedCallback struct { limit int } diff --git a/api/v2/testing.go b/api/v2/testing.go index fbb38a9d53..0f3f74a554 100644 --- a/api/v2/testing.go +++ b/api/v2/testing.go @@ -137,3 +137,23 @@ func newGetAlertStatus(f *fakeAlerts) func(model.Fingerprint) types.AlertStatus return status } } + +func createAlert(t *testing.T, start, ends time.Time) (open_api_models.PostableAlerts, []byte) { + startsAt := strfmt.DateTime(start) + endsAt := strfmt.DateTime(ends) + + alert := open_api_models.PostableAlert{ + StartsAt: startsAt, + EndsAt: endsAt, + Annotations: open_api_models.LabelSet{"annotation1": "some text"}, + Alert: open_api_models.Alert{ + Labels: open_api_models.LabelSet{"label1": "test1"}, + GeneratorURL: "http://localhost:3000", + }, + } + alerts := open_api_models.PostableAlerts{} + alerts = append(alerts, &alert) + b, err := json.Marshal(alerts) + require.NoError(t, err) + return alerts, b +} diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 9b798d01db..dc762ae86b 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -492,6 +492,7 @@ func run() int { timeIntervals, notificationLog, pipelinePeer, + nil, ) configuredReceivers.Set(float64(len(activeReceivers))) configuredIntegrations.Set(float64(integrationsNum)) @@ -501,7 +502,7 @@ func run() int { silencer.Mutes(labels) }) - disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics) + disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics, nil) routes.Walk(func(r *dispatch.Route) { if r.RouteOpts.RepeatInterval > *retention { level.Warn(configLogger).Log( diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 50dc8e04a2..53781e9108 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/provider" "github.com/prometheus/alertmanager/store" @@ -91,7 +92,8 @@ type Dispatcher struct { ctx context.Context cancel func() - logger log.Logger + logger log.Logger + alertLCObserver alertobserver.LifeCycleObserver } // Limits describes limits used by Dispatcher. @@ -112,19 +114,21 @@ func NewDispatcher( lim Limits, l log.Logger, m *DispatcherMetrics, + o alertobserver.LifeCycleObserver, ) *Dispatcher { if lim == nil { lim = nilLimits{} } disp := &Dispatcher{ - alerts: ap, - stage: s, - route: r, - timeout: to, - logger: log.With(l, "component", "dispatcher"), - metrics: m, - limits: lim, + alerts: ap, + stage: s, + route: r, + timeout: to, + logger: log.With(l, "component", "dispatcher"), + metrics: m, + limits: lim, + alertLCObserver: o, } return disp } @@ -365,13 +369,25 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { ag, ok := routeGroups[fp] if ok { ag.insert(alert) + if d.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{ + "groupKey": ag.GroupKey(), + "routeId": ag.routeID, + "groupId": ag.GroupID(), + } + d.alertLCObserver.Observe(alertobserver.EventAlertAddedToAggrGroup, []*types.Alert{alert}, m) + } return } // If the group does not exist, create it. But check the limit first. if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit { d.metrics.aggrGroupLimitReached.Inc() - level.Error(d.logger).Log("msg", "Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name()) + errMsg := "Too many aggregation groups, cannot create new group for alert" + level.Error(d.logger).Log("msg", errMsg, "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name()) + if d.alertLCObserver != nil { + d.alertLCObserver.Observe(alertobserver.EventAlertFailedAddToAggrGroup, []*types.Alert{alert}, alertobserver.AlertEventMeta{"msg": errMsg}) + } return } @@ -379,6 +395,14 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { routeGroups[fp] = ag d.aggrGroupsNum++ d.metrics.aggrGroups.Inc() + if d.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{ + "groupKey": ag.GroupKey(), + "routeId": ag.routeID, + "groupId": ag.GroupID(), + } + d.alertLCObserver.Observe(alertobserver.EventAlertAddedToAggrGroup, []*types.Alert{alert}, m) + } // Insert the 1st alert in the group before starting the group's run() // function, to make sure that when the run() will be executed the 1st @@ -495,6 +519,7 @@ func (ag *aggrGroup) run(nf notifyFunc) { // Populate context with information needed along the pipeline. ctx = notify.WithGroupKey(ctx, ag.GroupKey()) + ctx = notify.WithGroupId(ctx, ag.GroupID()) ctx = notify.WithGroupLabels(ctx, ag.labels) ctx = notify.WithReceiverName(ctx, ag.opts.Receiver) ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval) diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 492449268d..1e160cc57f 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/provider/mem" @@ -107,6 +108,9 @@ func TestAggrGroup(t *testing.T) { if _, ok := notify.GroupKey(ctx); !ok { t.Errorf("group key missing") } + if _, ok := notify.GroupId(ctx); !ok { + t.Errorf("group id missing") + } if lbls, ok := notify.GroupLabels(ctx); !ok || !reflect.DeepEqual(lbls, lset) { t.Errorf("wrong group labels: %q", lbls) } @@ -374,7 +378,7 @@ route: timeout := func(d time.Duration) time.Duration { return time.Duration(0) } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()), nil) go dispatcher.Run() defer dispatcher.Stop() @@ -516,7 +520,7 @@ route: recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} lim := limits{groups: 6} m := NewDispatcherMetrics(true, prometheus.NewRegistry()) - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m, nil) go dispatcher.Run() defer dispatcher.Stop() @@ -612,7 +616,7 @@ route: timeout := func(d time.Duration) time.Duration { return time.Duration(0) } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()), nil) go dispatcher.Run() defer dispatcher.Stop() @@ -726,6 +730,74 @@ route: }, alertGroupInfos) } +func TestGroupsAlertLCObserver(t *testing.T) { + confData := `receivers: +- name: 'testing' + +route: + group_by: ['alertname'] + group_wait: 10ms + group_interval: 10ms + receiver: 'testing'` + conf, err := config.Load(confData) + if err != nil { + t.Fatal(err) + } + + logger := log.NewNopLogger() + route := NewRoute(conf.Route, nil) + marker := types.NewMarker(prometheus.NewRegistry()) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil) + if err != nil { + t.Fatal(err) + } + defer alerts.Close() + + timeout := func(d time.Duration) time.Duration { return time.Duration(0) } + recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} + m := NewDispatcherMetrics(true, prometheus.NewRegistry()) + observer := alertobserver.NewFakeLifeCycleObserver() + lim := limits{groups: 1} + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m, observer) + go dispatcher.Run() + defer dispatcher.Stop() + + // Create alerts. the dispatcher will automatically create the groups. + alert1 := newAlert(model.LabelSet{"alertname": "OtherAlert", "cluster": "cc", "service": "dd"}) + alert2 := newAlert(model.LabelSet{"alertname": "YetAnotherAlert", "cluster": "cc", "service": "db"}) + err = alerts.Put(alert1) + if err != nil { + t.Fatal(err) + } + // Let alerts get processed. + for i := 0; len(recorder.Alerts()) != 1 && i < 10; i++ { + time.Sleep(200 * time.Millisecond) + } + err = alerts.Put(alert2) + if err != nil { + t.Fatal(err) + } + // Let alert get processed. + for i := 0; testutil.ToFloat64(m.aggrGroupLimitReached) == 0 && i < 10; i++ { + time.Sleep(200 * time.Millisecond) + } + observer.Mtx.RLock() + defer observer.Mtx.RUnlock() + require.Equal(t, 1, len(recorder.Alerts())) + require.Equal(t, alert1.Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertAddedToAggrGroup][0].Fingerprint()) + groupFp := getGroupLabels(alert1, route).Fingerprint() + group := dispatcher.aggrGroupsPerRoute[route][groupFp] + groupKey := group.GroupKey() + groupId := group.GroupID() + routeId := group.routeID + require.Equal(t, groupKey, observer.MetaPerEvent[alertobserver.EventAlertAddedToAggrGroup][0]["groupKey"].(string)) + require.Equal(t, groupId, observer.MetaPerEvent[alertobserver.EventAlertAddedToAggrGroup][0]["groupId"].(string)) + require.Equal(t, routeId, observer.MetaPerEvent[alertobserver.EventAlertAddedToAggrGroup][0]["routeId"].(string)) + + require.Equal(t, 1, len(observer.AlertsPerEvent[alertobserver.EventAlertFailedAddToAggrGroup])) + require.Equal(t, alert2.Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertFailedAddToAggrGroup][0].Fingerprint()) +} + type recordStage struct { mtx sync.RWMutex alerts map[string]map[model.Fingerprint]*types.Alert @@ -790,7 +862,7 @@ func TestDispatcherRace(t *testing.T) { defer alerts.Close() timeout := func(d time.Duration) time.Duration { return time.Duration(0) } - dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()), nil) go dispatcher.Run() dispatcher.Stop() } @@ -818,7 +890,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) timeout := func(d time.Duration) time.Duration { return d } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()), nil) go dispatcher.Run() defer dispatcher.Stop() diff --git a/notify/notify.go b/notify/notify.go index 0364a04909..f1853ac958 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "sort" + "strings" "sync" "time" @@ -28,6 +29,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/inhibit" "github.com/prometheus/alertmanager/nflog" "github.com/prometheus/alertmanager/nflog/nflogpb" @@ -116,6 +118,7 @@ const ( keyNow keyMuteTimeIntervals keyActiveTimeIntervals + keyGroupId ) // WithReceiverName populates a context with a receiver name. @@ -128,6 +131,11 @@ func WithGroupKey(ctx context.Context, s string) context.Context { return context.WithValue(ctx, keyGroupKey, s) } +// WithGroupId populates a context with a group id. +func WithGroupId(ctx context.Context, s string) context.Context { + return context.WithValue(ctx, keyGroupId, s) +} + // WithFiringAlerts populates a context with a slice of firing alerts. func WithFiringAlerts(ctx context.Context, alerts []uint64) context.Context { return context.WithValue(ctx, keyFiringAlerts, alerts) @@ -183,6 +191,13 @@ func GroupKey(ctx context.Context) (string, bool) { return v, ok } +// GroupId extracts a group id from the context. Iff none exists, the +// second argument is false. +func GroupId(ctx context.Context) (string, bool) { + v, ok := ctx.Value(keyGroupId).(string) + return v, ok +} + // GroupLabels extracts grouping label set from the context. Iff none exists, the // second argument is false. func GroupLabels(ctx context.Context) (model.LabelSet, bool) { @@ -329,18 +344,25 @@ func (pb *PipelineBuilder) New( times map[string][]timeinterval.TimeInterval, notificationLog NotificationLog, peer Peer, + o alertobserver.LifeCycleObserver, ) RoutingStage { - rs := make(RoutingStage, len(receivers)) + rs := RoutingStage{ + stages: make(map[string]Stage, len(receivers)), + alertLCObserver: o, + } ms := NewGossipSettleStage(peer) - is := NewMuteStage(inhibitor) + is := NewMuteStage(inhibitor, o) tas := NewTimeActiveStage(times) tms := NewTimeMuteStage(times) - ss := NewMuteStage(silencer) + ss := NewMuteStage(silencer, o) for name := range receivers { - st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics) - rs[name] = MultiStage{ms, is, tas, tms, ss, st} + st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics, o) + rs.stages[name] = MultiStage{ + alertLCObserver: o, + stages: []Stage{ms, is, tas, tms, ss, st}, + } } return rs } @@ -352,6 +374,7 @@ func createReceiverStage( wait func() time.Duration, notificationLog NotificationLog, metrics *Metrics, + o alertobserver.LifeCycleObserver, ) Stage { var fs FanoutStage for i := range integrations { @@ -360,20 +383,23 @@ func createReceiverStage( Integration: integrations[i].Name(), Idx: uint32(integrations[i].Index()), } - var s MultiStage + var s []Stage s = append(s, NewWaitStage(wait)) s = append(s, NewDedupStage(&integrations[i], notificationLog, recv)) - s = append(s, NewRetryStage(integrations[i], name, metrics)) + s = append(s, NewRetryStage(integrations[i], name, metrics, o)) s = append(s, NewSetNotifiesStage(notificationLog, recv)) - fs = append(fs, s) + fs = append(fs, MultiStage{stages: s, alertLCObserver: o}) } return fs } // RoutingStage executes the inner stages based on the receiver specified in // the context. -type RoutingStage map[string]Stage +type RoutingStage struct { + stages map[string]Stage + alertLCObserver alertobserver.LifeCycleObserver +} // Exec implements the Stage interface. func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { @@ -382,21 +408,28 @@ func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types. return ctx, nil, errors.New("receiver missing") } - s, ok := rs[receiver] + s, ok := rs.stages[receiver] if !ok { return ctx, nil, errors.New("stage for receiver missing") } + if rs.alertLCObserver != nil { + rs.alertLCObserver.Observe(alertobserver.EventAlertPipelineStart, alerts, alertobserver.AlertEventMeta{"ctx": ctx}) + } + return s.Exec(ctx, l, alerts...) } // A MultiStage executes a series of stages sequentially. -type MultiStage []Stage +type MultiStage struct { + stages []Stage + alertLCObserver alertobserver.LifeCycleObserver +} // Exec implements the Stage interface. func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { var err error - for _, s := range ms { + for _, s := range ms.stages { if len(alerts) == 0 { return ctx, nil, nil } @@ -405,6 +438,10 @@ func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Al if err != nil { return ctx, nil, err } + if ms.alertLCObserver != nil { + p := strings.Split(fmt.Sprintf("%T", s), ".") + ms.alertLCObserver.Observe(alertobserver.EventAlertPipelinePassStage, alerts, alertobserver.AlertEventMeta{"ctx": ctx, "stageName": p[len(p)-1]}) + } } return ctx, alerts, nil } @@ -458,25 +495,34 @@ func (n *GossipSettleStage) Exec(ctx context.Context, _ log.Logger, alerts ...*t // MuteStage filters alerts through a Muter. type MuteStage struct { - muter types.Muter + muter types.Muter + alertLCObserver alertobserver.LifeCycleObserver } // NewMuteStage return a new MuteStage. -func NewMuteStage(m types.Muter) *MuteStage { - return &MuteStage{muter: m} +func NewMuteStage(m types.Muter, o alertobserver.LifeCycleObserver) *MuteStage { + return &MuteStage{muter: m, alertLCObserver: o} } // Exec implements the Stage interface. func (n *MuteStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { - var filtered []*types.Alert + var ( + filtered []*types.Alert + muted []*types.Alert + ) for _, a := range alerts { // TODO(fabxc): increment total alerts counter. // Do not send the alert if muted. - if !n.muter.Mutes(a.Labels) { + if n.muter.Mutes(a.Labels) { + muted = append(muted, a) + } else { filtered = append(filtered, a) } // TODO(fabxc): increment muted alerts counter if muted. } + if n.alertLCObserver != nil { + n.alertLCObserver.Observe(alertobserver.EventAlertMuted, muted, alertobserver.AlertEventMeta{"ctx": ctx}) + } return ctx, filtered, nil } @@ -649,23 +695,25 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al // RetryStage notifies via passed integration with exponential backoff until it // succeeds. It aborts if the context is canceled or timed out. type RetryStage struct { - integration Integration - groupName string - metrics *Metrics + integration Integration + groupName string + metrics *Metrics + alertLCObserver alertobserver.LifeCycleObserver } // NewRetryStage returns a new instance of a RetryStage. -func NewRetryStage(i Integration, groupName string, metrics *Metrics) *RetryStage { +func NewRetryStage(i Integration, groupName string, metrics *Metrics, o alertobserver.LifeCycleObserver) *RetryStage { return &RetryStage{ - integration: i, - groupName: groupName, - metrics: metrics, + integration: i, + groupName: groupName, + metrics: metrics, + alertLCObserver: o, } } func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { r.metrics.numNotifications.WithLabelValues(r.integration.Name()).Inc() - ctx, alerts, err := r.exec(ctx, l, alerts...) + ctx, alerts, sent, err := r.exec(ctx, l, alerts...) failureReason := DefaultReason.String() if err != nil { @@ -673,11 +721,26 @@ func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Ale failureReason = e.Reason.String() } r.metrics.numTotalFailedNotifications.WithLabelValues(r.integration.Name(), failureReason).Inc() + if r.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{ + "ctx": ctx, + "integration": r.integration.Name(), + "stageName": "RetryStage", + } + r.alertLCObserver.Observe(alertobserver.EventAlertSendFailed, sent, m) + } + } else if r.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{ + "ctx": ctx, + "integration": r.integration.Name(), + "stageName": "RetryStage", + } + r.alertLCObserver.Observe(alertobserver.EventAlertSent, sent, m) } return ctx, alerts, err } -func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { +func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, []*types.Alert, error) { var sent []*types.Alert // If we shouldn't send notifications for resolved alerts, but there are only @@ -686,10 +749,10 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale if !r.integration.SendResolved() { firing, ok := FiringAlerts(ctx) if !ok { - return ctx, nil, errors.New("firing alerts missing") + return ctx, nil, nil, errors.New("firing alerts missing") } if len(firing) == 0 { - return ctx, alerts, nil + return ctx, alerts, sent, nil } for _, a := range alerts { if a.Status() != model.AlertResolved { @@ -725,7 +788,7 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale iErr = ctx.Err() } - return ctx, nil, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i) + return ctx, nil, sent, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i) default: } @@ -738,7 +801,7 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale if err != nil { r.metrics.numNotificationRequestsFailedTotal.WithLabelValues(r.integration.Name()).Inc() if !retry { - return ctx, alerts, errors.Wrapf(err, "%s/%s: notify retry canceled due to unrecoverable error after %d attempts", r.groupName, r.integration.String(), i) + return ctx, alerts, sent, errors.Wrapf(err, "%s/%s: notify retry canceled due to unrecoverable error after %d attempts", r.groupName, r.integration.String(), i) } if ctx.Err() == nil && (iErr == nil || err.Error() != iErr.Error()) { // Log the error if the context isn't done and the error isn't the same as before. @@ -755,14 +818,14 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale } lvl.Log("msg", "Notify success", "attempts", i) - return ctx, alerts, nil + return ctx, alerts, sent, nil } case <-ctx.Done(): if iErr == nil { iErr = ctx.Err() } - return ctx, nil, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i) + return ctx, nil, sent, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i) } } } diff --git a/notify/notify_test.go b/notify/notify_test.go index 996c132dd8..05fad62b01 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "github.com/prometheus/alertmanager/alertobserver" "io" "reflect" "testing" @@ -305,7 +306,7 @@ func TestMultiStage(t *testing.T) { alerts3 = []*types.Alert{{}, {}, {}} ) - stage := MultiStage{ + stages := []Stage{ StageFunc(func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { if !reflect.DeepEqual(alerts, alerts1) { t.Fatal("Input not equal to input of MultiStage") @@ -325,7 +326,9 @@ func TestMultiStage(t *testing.T) { return ctx, alerts3, nil }), } - + stage := MultiStage{ + stages: stages, + } _, alerts, err := stage.Exec(context.Background(), log.NewNopLogger(), alerts1...) if err != nil { t.Fatalf("Exec failed: %s", err) @@ -334,13 +337,28 @@ func TestMultiStage(t *testing.T) { if !reflect.DeepEqual(alerts, alerts3) { t.Fatal("Output of MultiStage is not equal to the output of the last stage") } + + // Rerun multistage but with alert life cycle observer + observer := alertobserver.NewFakeLifeCycleObserver() + ctx := WithGroupKey(context.Background(), "test") + stage.alertLCObserver = observer + _, _, err = stage.Exec(ctx, log.NewNopLogger(), alerts1...) + if err != nil { + t.Fatalf("Exec failed: %s", err) + } + + require.Equal(t, 1, len(observer.PipelineStageAlerts)) + require.Equal(t, 5, len(observer.PipelineStageAlerts["StageFunc"])) + metaCtx := observer.MetaPerEvent[alertobserver.EventAlertPipelinePassStage][0]["ctx"].(context.Context) + _, ok := GroupKey(metaCtx) + require.True(t, ok) } func TestMultiStageFailure(t *testing.T) { var ( ctx = context.Background() s1 = failStage{} - stage = MultiStage{s1} + stage = MultiStage{stages: []Stage{s1}} ) _, _, err := stage.Exec(ctx, log.NewNopLogger(), nil) @@ -355,7 +373,7 @@ func TestRoutingStage(t *testing.T) { alerts2 = []*types.Alert{{}, {}} ) - stage := RoutingStage{ + s := map[string]Stage{ "name": StageFunc(func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { if !reflect.DeepEqual(alerts, alerts1) { t.Fatal("Input not equal to input of RoutingStage") @@ -364,6 +382,9 @@ func TestRoutingStage(t *testing.T) { }), "not": failStage{}, } + stage := RoutingStage{ + stages: s, + } ctx := WithReceiverName(context.Background(), "name") @@ -375,6 +396,20 @@ func TestRoutingStage(t *testing.T) { if !reflect.DeepEqual(alerts, alerts2) { t.Fatal("Output of RoutingStage is not equal to the output of the inner stage") } + + // Rerun RoutingStage but with alert life cycle observer + observer := alertobserver.NewFakeLifeCycleObserver() + stage.alertLCObserver = observer + _, _, err = stage.Exec(ctx, log.NewNopLogger(), alerts1...) + if err != nil { + t.Fatalf("Exec failed: %s", err) + } + require.Equal(t, len(alerts1), len(observer.AlertsPerEvent[alertobserver.EventAlertPipelineStart])) + metaCtx := observer.MetaPerEvent[alertobserver.EventAlertPipelineStart][0]["ctx"].(context.Context) + + _, ok := ReceiverName(metaCtx) + require.True(t, ok) + } func TestRetryStageWithError(t *testing.T) { @@ -391,10 +426,7 @@ func TestRetryStageWithError(t *testing.T) { }), rs: sendResolved(false), } - r := RetryStage{ - integration: i, - metrics: NewMetrics(prometheus.NewRegistry()), - } + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry()), nil) alerts := []*types.Alert{ { @@ -406,6 +438,7 @@ func TestRetryStageWithError(t *testing.T) { ctx := context.Background() ctx = WithFiringAlerts(ctx, []uint64{0}) + ctx = WithGroupKey(ctx, "test") // Notify with a recoverable error should retry and succeed. resctx, res, err := r.Exec(ctx, log.NewNopLogger(), alerts...) @@ -414,13 +447,40 @@ func TestRetryStageWithError(t *testing.T) { require.Equal(t, alerts, sent) require.NotNil(t, resctx) + // Rerun recoverable error but with alert life cycle observer + observer := alertobserver.NewFakeLifeCycleObserver() + r.alertLCObserver = observer + _, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...) + require.Nil(t, err) + require.Equal(t, len(alerts), len(observer.AlertsPerEvent[alertobserver.EventAlertSent])) + meta := observer.MetaPerEvent[alertobserver.EventAlertSent][0] + require.Equal(t, "RetryStage", meta["stageName"].(string)) + require.Equal(t, i.Name(), meta["integration"].(string)) + metaCtx := meta["ctx"].(context.Context) + _, ok := GroupKey(metaCtx) + require.True(t, ok) + // Notify with an unrecoverable error should fail. sent = sent[:0] fail = true retry = false + r.alertLCObserver = nil resctx, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...) require.NotNil(t, err) require.NotNil(t, resctx) + + // Rerun the unrecoverable error but with alert life cycle observer + fail = true + r.alertLCObserver = observer + _, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...) + require.NotNil(t, err) + require.Equal(t, len(alerts), len(observer.AlertsPerEvent[alertobserver.EventAlertSendFailed])) + meta = observer.MetaPerEvent[alertobserver.EventAlertSendFailed][0] + require.Equal(t, "RetryStage", meta["stageName"].(string)) + require.Equal(t, i.Name(), meta["integration"].(string)) + metaCtx = meta["ctx"].(context.Context) + _, ok = GroupKey(metaCtx) + require.True(t, ok) } func TestRetryStageWithErrorCode(t *testing.T) { @@ -447,10 +507,8 @@ func TestRetryStageWithErrorCode(t *testing.T) { }), rs: sendResolved(false), } - r := RetryStage{ - integration: i, - metrics: NewMetrics(prometheus.NewRegistry()), - } + + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry()), nil) alerts := []*types.Alert{ { @@ -537,10 +595,7 @@ func TestRetryStageSendResolved(t *testing.T) { }), rs: sendResolved(true), } - r := RetryStage{ - integration: i, - metrics: NewMetrics(prometheus.NewRegistry()), - } + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry()), nil) alerts := []*types.Alert{ { @@ -644,7 +699,7 @@ func TestMuteStage(t *testing.T) { return ok }) - stage := NewMuteStage(muter) + stage := NewMuteStage(muter, nil) in := []model.LabelSet{ {}, @@ -700,7 +755,7 @@ func TestMuteStageWithSilences(t *testing.T) { marker := types.NewMarker(prometheus.NewRegistry()) silencer := silence.NewSilencer(silences, marker, log.NewNopLogger()) - stage := NewMuteStage(silencer) + stage := NewMuteStage(silencer, nil) in := []model.LabelSet{ {}, @@ -778,6 +833,45 @@ func TestMuteStageWithSilences(t *testing.T) { } } +func TestMuteStageWithAlertObserver(t *testing.T) { + silences, err := silence.New(silence.Options{Retention: time.Hour}) + if err != nil { + t.Fatal(err) + } + _, err = silences.Set(&silencepb.Silence{ + EndsAt: utcNow().Add(time.Hour), + Matchers: []*silencepb.Matcher{{Name: "mute", Pattern: "me"}}, + }) + if err != nil { + t.Fatal(err) + } + + marker := types.NewMarker(prometheus.NewRegistry()) + silencer := silence.NewSilencer(silences, marker, log.NewNopLogger()) + observer := alertobserver.NewFakeLifeCycleObserver() + stage := NewMuteStage(silencer, observer) + + in := []model.LabelSet{ + {"test": "set"}, + {"mute": "me"}, + {"foo": "bar", "test": "set"}, + } + + var inAlerts []*types.Alert + for _, lset := range in { + inAlerts = append(inAlerts, &types.Alert{ + Alert: model.Alert{Labels: lset}, + }) + } + + _, _, err = stage.Exec(context.Background(), log.NewNopLogger(), inAlerts...) + if err != nil { + t.Fatalf("Exec failed: %s", err) + } + require.Equal(t, 1, len(observer.AlertsPerEvent[alertobserver.EventAlertMuted])) + require.Equal(t, inAlerts[1], observer.AlertsPerEvent[alertobserver.EventAlertMuted][0]) +} + func TestTimeMuteStage(t *testing.T) { // Route mutes alerts outside business hours in November, using the +1100 timezone. muteIn := `