diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index d1394561fd..fa1ebf21f9 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -440,13 +440,13 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) } var tags []string if errors.Is(err, context.Canceled) { - tags = append(tags, "err:Canceled") + tags = append(tags, string(shared.ErrTypeCanceled)) } if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { - tags = append(tags, "err:EOF") + tags = append(tags, string(shared.ErrTypeEOF)) } if errors.Is(err, net.ErrClosed) { - tags = append(tags, "err:Closed") + tags = append(tags, string(shared.ErrTypeClosed)) } var pgErr *pgconn.PgError if errors.As(err, &pgErr) { @@ -454,7 +454,7 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) } var netErr *net.OpError if errors.As(err, &netErr) { - tags = append(tags, "err:Net") + tags = append(tags, string(shared.ErrTypeNet)) } a.sendTelemetryMessage(ctx, logger, flowName, errorWithStack, telemetry.ERROR, tags...) } diff --git a/flow/shared/err_types.go b/flow/shared/err_types.go new file mode 100644 index 0000000000..171b11ca6c --- /dev/null +++ b/flow/shared/err_types.go @@ -0,0 +1,24 @@ +package shared + +type ErrType string + +const ( + ErrTypeCanceled ErrType = "err:Canceled" + ErrTypeClosed ErrType = "err:Closed" + ErrTypeNet ErrType = "err:Net" + ErrTypeEOF ErrType = "err:EOF" +) + +func SkipSendingToIncidentIo(errTags []string) bool { + skipTags := map[string]struct{}{ + string(ErrTypeCanceled): {}, + string(ErrTypeClosed): {}, + string(ErrTypeNet): {}, + } + for _, tag := range errTags { + if _, ok := skipTags[tag]; ok { + return true + } + } + return false +} diff --git a/flow/shared/telemetry/incidentio_message_sender.go b/flow/shared/telemetry/incidentio_message_sender.go index ead24dd9a6..d66131f5d5 100644 --- a/flow/shared/telemetry/incidentio_message_sender.go +++ b/flow/shared/telemetry/incidentio_message_sender.go @@ -8,10 +8,12 @@ import ( "encoding/json" "fmt" "io" + "log/slog" "net/http" "strings" "time" + "github.com/PeerDB-io/peer-flow/shared" "go.temporal.io/sdk/activity" ) @@ -50,6 +52,15 @@ func (i *IncidentIoMessageSender) SendMessage( activityInfo = activity.GetInfo(ctx) } + if shared.SkipSendingToIncidentIo(attributes.Tags) { + logger := shared.LoggerFromCtx(ctx) + logger.Info("skipping incident.io alert", + slog.Any("attributes", attributes), + slog.String("subject", subject), + slog.String("body", body)) + return "", nil + } + deduplicationString := strings.Join([]string{ "deployID", attributes.DeploymentUID, "subject", subject,