Skip to content

Commit

Permalink
feat: add synced_tables metric
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta committed Jan 17, 2025
1 parent d8ec405 commit b7d3797
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 1 deletion.
15 changes: 15 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
lua "github.com/yuin/gopher-lua"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
Expand Down Expand Up @@ -767,6 +768,20 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
return
}
slotMetricGauges.IntervalSinceLastNormalizeGauge = intervalSinceLastNormalizeGauge

syncedTablesGauge, err := a.OtelManager.GetOrInitInt64Gauge(
otel_metrics.BuildMetricName(otel_metrics.SyncedTablesGaugeName),
metric.WithDescription("Number of tables synced"),
)
if err != nil {
logger.Error("Failed to get synced tables gauge", slog.Any("error", err))
return
}
syncedTablesGauge.Record(ctx, int64(len(config.TableMappings)), metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, config.FlowJobName),
attribute.String(otel_metrics.PeerNameKey, peerName),
)))

}

Check failure on line 785 in flow/activities/flowable.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)

if err := srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, &alerting.AlertKeys{
Expand Down
1 change: 0 additions & 1 deletion flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
attribute.String(otel_metrics.FlowNameKey, flowName),
)))
}

}

syncState.Store(shared.Ptr("updating schema"))
Expand Down
5 changes: 5 additions & 0 deletions flow/otel_metrics/otel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ const (
ErrorsEmittedCounterName = "errors_emitted"
// SyncRecordsSyncedGaugeName is the gauge name for the number of records synced for every Sync batch
SyncRecordsSyncedGaugeName = "sync_records_synced"
// SyncedTablesGaugeName is the gauge name for the number of tables being synced for a mirror
SyncedTablesGaugeName = "synced_tables"
// InstanceStatusGaugeName used for notifying the status of the instance, like if it is healthy/under maintenance etc.
InstanceStatusGaugeName = "instance_status"
)

type SlotMetricGauges struct {
Expand All @@ -40,6 +44,7 @@ type SlotMetricGauges struct {
OpenConnectionsGauge metric.Int64Gauge
OpenReplicationConnectionsGauge metric.Int64Gauge
IntervalSinceLastNormalizeGauge metric.Float64Gauge
InstanceStatusGauge metric.Int64Gauge
}

func BuildMetricName(baseName string) string {
Expand Down

0 comments on commit b7d3797

Please sign in to comment.