Skip to content

Commit

Permalink
add basic sucess/failure metrics for ingestion processing
Browse files Browse the repository at this point in the history
  • Loading branch information
ltucker committed Feb 5, 2025
1 parent 184d282 commit 5d8bc59
Show file tree
Hide file tree
Showing 14 changed files with 125 additions and 35 deletions.
17 changes: 10 additions & 7 deletions diode-server/cmd/ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ import (

"github.com/getsentry/sentry-go"
"github.com/kelseyhightower/envconfig"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"

"github.com/netboxlabs/diode/diode-server/ingester"
"github.com/netboxlabs/diode/diode-server/server"
"github.com/netboxlabs/diode/diode-server/telemetry"
)

const (
applicationName = "diode-ingester"
applicationName = "com.netboxlabs.diode.ingester"
metricStartup = "startup_count"
)

func main() {
Expand All @@ -31,15 +33,16 @@ func main() {
cfg.Telemetry.ServiceName = applicationName
}

// Initialize telemetry
shutdown, err := telemetry.Setup(ctx, cfg.Telemetry)
meter := otel.GetMeterProvider().Meter(applicationName)
startupCounter, err := meter.Int64Counter(metricStartup,
metric.WithDescription("Number of times the ingester service has started"))
if err != nil {
s.Logger().Error("failed to initialize telemetry", "error", err)
s.Logger().Error("failed to create startup metric", "error", err)
os.Exit(1)
}
defer shutdown(ctx)
startupCounter.Add(ctx, 1)

ingesterComponent, err := ingester.New(ctx, s.Logger(), cfg)
ingesterComponent, err := ingester.New(ctx, s.Logger(), cfg, meter)
if err != nil {
s.Logger().Error("failed to instantiate ingester component", "error", err)
os.Exit(1)
Expand Down
18 changes: 16 additions & 2 deletions diode-server/cmd/reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
_ "github.com/jackc/pgx/v5/stdlib" // pgx to database/sql compatibility
"github.com/kelseyhightower/envconfig"
"github.com/pressly/goose/v3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"

"github.com/netboxlabs/diode/diode-server/dbstore/postgres"
"github.com/netboxlabs/diode/diode-server/migrator"
Expand All @@ -21,7 +23,9 @@ import (
)

const (
applicationName = "diode-reconciler"
applicationName = "com.netboxlabs.diode.reconciler"
ingestionProcessorMeterName = "com.netboxlabs.diode.reconciler.ingestion-processor"
metricStartup = "startup_count"
)

func main() {
Expand All @@ -45,6 +49,14 @@ func main() {
os.Exit(1)
}
defer shutdown(ctx)

Check failure on line 51 in diode-server/cmd/reconciler/main.go

View workflow job for this annotation

GitHub Actions / golangci

Error return value is not checked (errcheck)
appMeter := otel.GetMeterProvider().Meter(applicationName)
startupCounter, err := appMeter.Int64Counter(metricStartup,
metric.WithDescription("Number of times the reconciler service has started"))
if err != nil {
s.Logger().Error("failed to create startup metric", "error", err)
os.Exit(1)
}
startupCounter.Add(ctx, 1)

dbURL := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", cfg.PostgresHost, cfg.PostgresPort, cfg.PostgresUser, cfg.PostgresPassword, cfg.PostgresDBName)

Expand All @@ -68,8 +80,10 @@ func main() {
if err != nil {
s.Logger().Error("failed to create netbox diode plugin client", "error", err)
}

ops := reconciler.NewOps(repository, nbClient, s.Logger())
ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), ops)
ingestionMeter := otel.GetMeterProvider().Meter(ingestionProcessorMeterName)
ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), ops, ingestionMeter)
if err != nil {
s.Logger().Error("failed to instantiate ingestion processor", "error", err)
os.Exit(1)
Expand Down
3 changes: 2 additions & 1 deletion diode-server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ require (
github.com/pressly/goose/v3 v3.23.0
github.com/redis/go-redis/v9 v9.5.1
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0
go.opentelemetry.io/otel v1.34.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.34.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0
go.opentelemetry.io/otel/exporters/prometheus v0.56.0
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0
go.opentelemetry.io/otel/metric v1.34.0
go.opentelemetry.io/otel/sdk v1.34.0
go.opentelemetry.io/otel/sdk/metric v1.34.0
golang.org/x/time v0.5.0
Expand Down Expand Up @@ -64,7 +66,6 @@ require (
github.com/yuin/gopher-lua v1.1.1 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 // indirect
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/otel/trace v1.34.0 // indirect
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions diode-server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0 h1:rgMkmiGfix9vFJDcDi1PK8WEQP4FLQwLDfhp5ZLpFeE=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0/go.mod h1:ijPqXp5P6IRRByFVVg9DY8P5HkxkHE5ARIa+86aXPf4=
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.34.0 h1:ajl4QczuJVA2TU9W9AGw++86Xga/RKt//16z/yxPgdk=
Expand Down
13 changes: 5 additions & 8 deletions diode-server/ingester/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

"github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb"
"github.com/netboxlabs/diode/diode-server/sentry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
Expand All @@ -27,9 +26,9 @@ const (
streamID = "diode.v1.ingest-stream"

// Metric names
metricIngestTotal = "diode.ingester.ingest.total"
metricIngestSuccess = "diode.ingester.ingest.success"
metricIngestFailure = "diode.ingester.ingest.failure"
metricIngestTotal = "ingest.total"
metricIngestSuccess = "ingest.success"
metricIngestFailure = "ingest.failure"
)

var (
Expand Down Expand Up @@ -58,7 +57,7 @@ type Component struct {
}

// New creates a new ingester component
func New(ctx context.Context, logger *slog.Logger, cfg Config) (*Component, error) {
func New(ctx context.Context, logger *slog.Logger, cfg Config, meter metric.Meter) (*Component, error) {
grpcListener, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.GRPCPort))
if err != nil {
return nil, fmt.Errorf("failed to listen on port %d: %v", cfg.GRPCPort, err)
Expand All @@ -83,16 +82,14 @@ func New(ctx context.Context, logger *slog.Logger, cfg Config) (*Component, erro
auth := newAuthUnaryInterceptor(apiKeys)
grpcServer := grpc.NewServer(grpc.ChainUnaryInterceptor(auth))

meter := otel.GetMeterProvider().Meter("diode/ingester")

ingestTotal, err := meter.Int64Counter(metricIngestTotal,
metric.WithDescription("Total number of ingest requests received"))
if err != nil {
return nil, fmt.Errorf("failed to create total counter: %v", err)
}

ingestSuccess, err := meter.Int64Counter(metricIngestSuccess,
metric.WithDescription("Number of successful ingest requests"))
metric.WithDescription("Number of successfully processed ingest requests"))
if err != nil {
return nil, fmt.Errorf("failed to create success counter: %v", err)
}
Expand Down
8 changes: 6 additions & 2 deletions diode-server/ingester/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/alicebob/miniredis/v2"
"github.com/kelseyhightower/envconfig"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
Expand Down Expand Up @@ -100,7 +101,9 @@ func startTestComponent(ctx context.Context, t *testing.T) (*ingester.Component,
require.NoError(t, err)

logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false}))
component, err := ingester.New(ctx, logger, cfg)

meter := otel.GetMeterProvider().Meter("test.ingester")
component, err := ingester.New(ctx, logger, cfg, meter)
require.NoError(t, err)

pb.RegisterIngesterServiceServer(s, component)
Expand Down Expand Up @@ -143,7 +146,8 @@ func TestNewComponent(t *testing.T) {
err := envconfig.Process("", &cfg)
require.NoError(t, err)

component, err := ingester.New(ctx, logger, cfg)
meter := otel.GetMeterProvider().Meter("test.ingester")
component, err := ingester.New(ctx, logger, cfg, meter)

require.NoError(t, err)
require.NotNil(t, component)
Expand Down
2 changes: 1 addition & 1 deletion diode-server/netboxdiodeplugin/mocks/netboxapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 55 additions & 8 deletions diode-server/reconciler/ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/netboxlabs/diode/diode-server/netboxdiodeplugin"
"github.com/netboxlabs/diode/diode-server/reconciler/changeset"
"github.com/netboxlabs/diode/diode-server/sentry"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

const (
Expand All @@ -32,6 +34,11 @@ const (

// RedisConsumerGroupExistsErrMsg is the error message returned by the redis client when the consumer group already exists
RedisConsumerGroupExistsErrMsg = "BUSYGROUP Consumer Group name already exists"

// Metric names
metricHandleMessageTotal = "handle_message.total"
metricHandleMessageSuccess = "handle_message.success"
metricHandleMessageFailure = "handle_message.failure"
)

// RedisClient is an interface that represents the methods used from redis.Client
Expand All @@ -56,6 +63,11 @@ type IngestionProcessor struct {
redisClient RedisClient
redisStreamClient RedisClient
ops IngestionProcessorOps

// Metrics
handleMessageTotalCounter metric.Int64Counter
handleMessageSuccessCounter metric.Int64Counter
handleMessageFailureCounter metric.Int64Counter
}

// IngestionLogToProcess represents an ingestion log to process
Expand All @@ -74,10 +86,25 @@ type IngestionProcessorOps interface {
}

// NewIngestionProcessor creates a new ingestion processor
func NewIngestionProcessor(ctx context.Context, logger *slog.Logger, ops IngestionProcessorOps) (*IngestionProcessor, error) {
func NewIngestionProcessor(ctx context.Context, logger *slog.Logger, ops IngestionProcessorOps, meter metric.Meter) (*IngestionProcessor, error) {
var cfg Config
envconfig.MustProcess("", &cfg)

handleMessageTotalCounter, err := meter.Int64Counter(metricHandleMessageTotal)
if err != nil {
return nil, fmt.Errorf("failed to create handle message total counter: %v", err)
}

handleMessageSuccessCounter, err := meter.Int64Counter(metricHandleMessageSuccess)
if err != nil {
return nil, fmt.Errorf("failed to create handle message success counter: %v", err)
}

handleMessageFailureCounter, err := meter.Int64Counter(metricHandleMessageFailure)
if err != nil {
return nil, fmt.Errorf("failed to create handle message failure counter: %v", err)
}

redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%s", cfg.RedisHost, cfg.RedisPort),
Password: cfg.RedisPassword,
Expand All @@ -104,12 +131,15 @@ func NewIngestionProcessor(ctx context.Context, logger *slog.Logger, ops Ingesti
}

component := &IngestionProcessor{
Config: cfg,
logger: logger,
hostname: hostname,
redisClient: redisClient,
redisStreamClient: redisStreamClient,
ops: ops,
Config: cfg,
logger: logger,
hostname: hostname,
redisClient: redisClient,
redisStreamClient: redisStreamClient,
ops: ops,
handleMessageTotalCounter: handleMessageTotalCounter,
handleMessageSuccessCounter: handleMessageSuccessCounter,
handleMessageFailureCounter: handleMessageFailureCounter,
}

return component, nil
Expand Down Expand Up @@ -169,13 +199,28 @@ func (p *IngestionProcessor) consumeIngestionStream(ctx context.Context, stream,
}

func (p *IngestionProcessor) handleStreamMessage(ctx context.Context, msg redis.XMessage) error {
p.logger.Debug("received stream message", "message", msg.Values, "id", msg.ID)
// Create attributes for metrics
attrs := []attribute.KeyValue{
attribute.String("hostname", p.hostname),
}

// Record total process attempt
p.handleMessageTotalCounter.Add(ctx, 1, metric.WithAttributes(attrs...))

ingestReq := &diodepb.IngestRequest{}
if err := proto.Unmarshal([]byte(msg.Values["request"].(string)), ingestReq); err != nil {
p.handleMessageFailureCounter.Add(ctx, 1, metric.WithAttributes(attrs...))
return err
}

// Add request-specific attributes
attrs = append(attrs,
attribute.String("sdk_name", ingestReq.SdkName),
attribute.String("sdk_version", ingestReq.SdkVersion),
attribute.String("producer_app_name", ingestReq.ProducerAppName),
attribute.String("producer_app_version", ingestReq.ProducerAppVersion),
)

errs := make([]error, 0)

ingestionTs, err := strconv.Atoi(msg.Values["ingestion_ts"].(string))
Expand Down Expand Up @@ -211,6 +256,7 @@ func (p *IngestionProcessor) handleStreamMessage(ctx context.Context, msg redis.
p.redisStreamClient.XAck(ctx, redisStreamID, redisConsumerGroup, msg.ID)

if len(errs) > 0 {
p.handleMessageFailureCounter.Add(ctx, 1, metric.WithAttributes(attrs...))
errsStr := make([]string, 0)
for _, err := range errs {
errsStr = append(errsStr, err.Error())
Expand All @@ -224,6 +270,7 @@ func (p *IngestionProcessor) handleStreamMessage(ctx context.Context, msg redis.
}
sentry.CaptureError(fmt.Errorf("failed to handle ingest request: %v", errs), nil, "Ingestion request", contextMap)
} else {
p.handleMessageSuccessCounter.Add(ctx, 1, metric.WithAttributes(attrs...))
p.redisStreamClient.XDel(ctx, redisStreamID, msg.ID)
}

Expand Down
15 changes: 15 additions & 0 deletions diode-server/reconciler/ingestion_processor_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"google.golang.org/protobuf/proto"

"github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb"
Expand All @@ -30,6 +31,17 @@ import (
func int32Ptr(i int32) *int32 { return &i }
func strPtr(s string) *string { return &s }

func addMeters(t *testing.T, p *IngestionProcessor) {
var err error
meter := otel.GetMeterProvider().Meter("test.ingestion-processor")
p.handleMessageTotalCounter, err = meter.Int64Counter("handle_message.total")
require.NoError(t, err)
p.handleMessageSuccessCounter, err = meter.Int64Counter("handle_message.success")
require.NoError(t, err)
p.handleMessageFailureCounter, err = meter.Int64Counter("handle_message.failure")
require.NoError(t, err)
}

func TestHandleStreamMessage(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -165,6 +177,7 @@ func TestHandleStreamMessage(t *testing.T) {
},
ops: NewOps(mockRepository, mockNbClient, logger),
}
addMeters(t, p)

request := redis.XMessage{}
if tt.validMsg {
Expand Down Expand Up @@ -281,6 +294,7 @@ func TestConsumeIngestionStream(t *testing.T) {
ReconcilerRateLimiterBurst: 1,
},
}
addMeters(t, p)

err := p.consumeIngestionStream(ctx, "test-stream", "test-group", "test-consumer")

Expand Down Expand Up @@ -492,6 +506,7 @@ func TestIngestionProcessor_GenerateAndApplyChangeSet(t *testing.T) {
},
ops: NewOps(mockRepository, mockNbClient, logger),
}
addMeters(t, p)

ingestionLogID := int32(1)

Expand Down
Loading

0 comments on commit 5d8bc59

Please sign in to comment.