diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index b9bad18036..e20cf55fc4 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -349,6 +349,16 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon activity.RecordHeartbeat(ctx, pushedRecordsWithCount) a.Alerter.LogFlowInfo(ctx, flowName, pushedRecordsWithCount) + if a.OtelManager != nil { + currentBatchID, err := a.OtelManager.GetOrInitInt64Gauge( + otel_metrics.BuildMetricName(otel_metrics.CurrentBatchIdGaugeName)) + if err != nil { + logger.Error("Failed to get current batch id gauge", slog.Any("error", err)) + } else { + currentBatchID.Record(ctx, res.CurrentSyncBatchID) + } + } + if err := a.applySchemaDeltas(ctx, config, options, res.TableSchemaDeltas); err != nil { return 0, err } @@ -666,6 +676,15 @@ func (a *FlowableActivity) normalizeLoop( } else if req.Done != nil { close(req.Done) } + if a.OtelManager != nil { + lastNormalizedBatchID, err := a.OtelManager.GetOrInitInt64Gauge( + otel_metrics.BuildMetricName(otel_metrics.LastNormalizedBatchIdGaugeName)) + if err != nil { + logger.Error("Failed to get normalized batch id gauge", slog.Any("error", err)) + } else { + lastNormalizedBatchID.Record(ctx, req.BatchID) + } + } break } case <-syncDone: diff --git a/flow/otel_metrics/otel_manager.go b/flow/otel_metrics/otel_manager.go index dc3deb4246..1681e5217f 100644 --- a/flow/otel_metrics/otel_manager.go +++ b/flow/otel_metrics/otel_manager.go @@ -18,15 +18,19 @@ import ( ) const ( - SlotLagGaugeName string = "cdc_slot_lag" - OpenConnectionsGaugeName string = "open_connections" - OpenReplicationConnectionsGaugeName string = "open_replication_connections" - IntervalSinceLastNormalizeGaugeName string = "interval_since_last_normalize" - FetchedBytesCounterName string = "fetched_bytes" + SlotLagGaugeName = "cdc_slot_lag" + CurrentBatchIdGaugeName = "current_batch_id" + LastNormalizedBatchIdGaugeName = "last_normalized_batch_id" + OpenConnectionsGaugeName = "open_connections" + OpenReplicationConnectionsGaugeName = "open_replication_connections" + IntervalSinceLastNormalizeGaugeName = "interval_since_last_normalize" + FetchedBytesCounterName = "fetched_bytes" ) type SlotMetricGauges struct { SlotLagGauge metric.Float64Gauge + CurrentBatchIdGauge metric.Int64Gauge + LastNormalizedBatchIdGauge metric.Int64Gauge OpenConnectionsGauge metric.Int64Gauge OpenReplicationConnectionsGauge metric.Int64Gauge IntervalSinceLastNormalizeGauge metric.Float64Gauge