Skip to content

Commit

Permalink
send flow name with batch id metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 16, 2025
1 parent b910baa commit f52382e
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
Expand Down Expand Up @@ -315,7 +317,9 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
if err != nil {
logger.Error("Failed to get current batch id gauge", slog.Any("error", err))
} else {
currentBatchID.Record(ctx, res.CurrentSyncBatchID)
currentBatchID.Record(ctx, res.CurrentSyncBatchID, metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, flowName),
)))
}
}

Expand Down Expand Up @@ -670,7 +674,9 @@ func (a *FlowableActivity) normalizeLoop(
if err != nil {
logger.Error("Failed to get normalized batch id gauge", slog.Any("error", err))
} else {
lastNormalizedBatchID.Record(ctx, req.BatchID)
lastNormalizedBatchID.Record(ctx, req.BatchID, metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, config.FlowJobName),
)))
}
}
break
Expand Down

0 comments on commit f52382e

Please sign in to comment.