diff --git a/diode-server/cmd/ingester/main.go b/diode-server/cmd/ingester/main.go index 07ad686a..8d867963 100644 --- a/diode-server/cmd/ingester/main.go +++ b/diode-server/cmd/ingester/main.go @@ -5,18 +5,59 @@ import ( "os" "github.com/getsentry/sentry-go" + "github.com/kelseyhightower/envconfig" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" "github.com/netboxlabs/diode/diode-server/ingester" "github.com/netboxlabs/diode/diode-server/server" + "github.com/netboxlabs/diode/diode-server/telemetry" +) + +const ( + applicationName = "diode-ingester" // used by sentry + + // used by open telemetry metrics + telemetryServiceName = "netboxlabs/diode/ingester" + metricStartup = "netboxlabs/diode/ingester/startup_count" ) func main() { ctx := context.Background() - s := server.New(ctx, "diode-ingester") + s := server.New(ctx, applicationName) defer s.Recover(sentry.CurrentHub()) - ingesterComponent, err := ingester.New(ctx, s.Logger()) + // Load configuration + var cfg ingester.Config + envconfig.MustProcess("", &cfg) + + // Set default telemetry configuration if not provided + if cfg.Telemetry.ServiceName == "" { + cfg.Telemetry.ServiceName = telemetryServiceName + } + + shutdown, err := telemetry.Setup(ctx, cfg.Telemetry) + if err != nil { + s.Logger().Error("failed to initialize telemetry", "error", err) + os.Exit(1) + } + defer func() { + if err := shutdown(ctx); err != nil { + s.Logger().Error("failed to shutdown telemetry", "error", err) + } + }() + + meter := otel.GetMeterProvider().Meter(telemetryServiceName) + startupCounter, err := meter.Int64Counter(metricStartup, + metric.WithDescription("Number of times the ingester service has started")) + if err != nil { + s.Logger().Error("failed to create startup metric", "error", err) + os.Exit(1) + } + startupCounter.Add(ctx, 1) + + ingesterComponent, err := ingester.New(ctx, s.Logger(), cfg, meter) if err != nil { s.Logger().Error("failed to instantiate ingester component", "error", err) os.Exit(1) @@ -27,7 +68,7 @@ func main() { os.Exit(1) } - // TODO: instantiate prometheus server + telemetry.ServePrometheusMetricsIfNecessary(cfg.Telemetry, s.Logger()) if err := s.Run(); err != nil { s.Logger().Error("server failure", "serverName", s.Name(), "error", err) diff --git a/diode-server/cmd/reconciler/main.go b/diode-server/cmd/reconciler/main.go index f0009f6e..47e465c1 100644 --- a/diode-server/cmd/reconciler/main.go +++ b/diode-server/cmd/reconciler/main.go @@ -11,23 +11,61 @@ import ( _ "github.com/jackc/pgx/v5/stdlib" // pgx to database/sql compatibility "github.com/kelseyhightower/envconfig" "github.com/pressly/goose/v3" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" "github.com/netboxlabs/diode/diode-server/dbstore/postgres" "github.com/netboxlabs/diode/diode-server/migrator" "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" "github.com/netboxlabs/diode/diode-server/reconciler" "github.com/netboxlabs/diode/diode-server/server" + "github.com/netboxlabs/diode/diode-server/telemetry" +) + +const ( + applicationName = "diode-reconciler" // used by sentry + + // used by open telemetry metrics + telemetryServiceName = "netboxlabs/diode/reconciler" + ingestionProcessorName = "netboxlabs/diode/reconciler/ingestion_processor" + metricStartup = "netboxlabs/diode/reconciler/startup_count" ) func main() { ctx := context.Background() - s := server.New(ctx, "diode-reconciler") + s := server.New(ctx, applicationName) defer s.Recover(sentry.CurrentHub()) var cfg reconciler.Config envconfig.MustProcess("", &cfg) + // Set default telemetry configuration if not provided + if cfg.Telemetry.ServiceName == "" { + cfg.Telemetry.ServiceName = telemetryServiceName + } + + // Initialize telemetry + shutdown, err := telemetry.Setup(ctx, cfg.Telemetry) + if err != nil { + s.Logger().Error("failed to initialize telemetry", "error", err) + os.Exit(1) + } + defer func() { + if err := shutdown(ctx); err != nil { + s.Logger().Error("failed to shutdown telemetry", "error", err) + } + }() + + appMeter := otel.GetMeterProvider().Meter(telemetryServiceName) + startupCounter, err := appMeter.Int64Counter(metricStartup, + metric.WithDescription("Number of times the reconciler service has started")) + if err != nil { + s.Logger().Error("failed to create startup metric", "error", err) + os.Exit(1) + } + startupCounter.Add(ctx, 1) + dbURL := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", cfg.PostgresHost, cfg.PostgresPort, cfg.PostgresUser, cfg.PostgresPassword, cfg.PostgresDBName) if cfg.MigrationEnabled { @@ -50,8 +88,15 @@ func main() { if err != nil { s.Logger().Error("failed to create netbox diode plugin client", "error", err) } + ops := reconciler.NewOps(repository, nbClient, s.Logger()) - ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), ops) + ingestionMeter := otel.GetMeterProvider().Meter(ingestionProcessorName) + ingestionMetrics, err := reconciler.NewOtelIngestionProcessorMetrics(ingestionMeter, ingestionProcessorName) + if err != nil { + s.Logger().Error("failed to create ingestion processor metrics", "error", err) + os.Exit(1) + } + ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), ops, ingestionMetrics) if err != nil { s.Logger().Error("failed to instantiate ingestion processor", "error", err) os.Exit(1) @@ -73,7 +118,7 @@ func main() { os.Exit(1) } - // TODO: instantiate prometheus server + telemetry.ServePrometheusMetricsIfNecessary(cfg.Telemetry, s.Logger()) if err := s.Run(); err != nil { s.Logger().Error("server failure", "serverName", s.Name(), "error", err) diff --git a/diode-server/docker/docker-compose.yaml b/diode-server/docker/docker-compose.yaml index 55e9e7d3..c039b193 100644 --- a/diode-server/docker/docker-compose.yaml +++ b/diode-server/docker/docker-compose.yaml @@ -9,6 +9,12 @@ services: upstream diode-reconciler { server diode-reconciler:8081; } + upstream diode-reconcilermetrics { + server diode-reconciler:9090; + } + upstream diode-ingestermetrics { + server diode-ingester:9090; + } server { listen 80; http2 on; @@ -22,6 +28,14 @@ services: rewrite /diode/(.*) /$$1 break; grpc_pass grpc://diode-reconciler; } + location /diode/reconciler/metrics { + rewrite /diode/reconciler/(.*) /$$1 break; + proxy_pass http://diode-reconcilermetrics; + } + location /diode/ingester/metrics { + rewrite /diode/ingester/(.*) /$$1 break; + proxy_pass http://diode-ingestermetrics; + } }' > /etc/nginx/conf.d/default.conf && nginx -g 'daemon off;'" restart: always @@ -43,6 +57,9 @@ services: - RECONCILER_GRPC_PORT=${RECONCILER_GRPC_PORT} - DIODE_API_KEY=${DIODE_API_KEY} - SENTRY_DSN=${SENTRY_DSN} + - TELEMETRY_METRICS_EXPORTER=${TELEMETRY_METRICS_EXPORTER} + - TELEMETRY_TRACES_EXPORTER=${TELEMETRY_TRACES_EXPORTER} + - TELEMETRY_ENVIRONMENT=${TELEMETRY_ENVIRONMENT} restart: always ports: [ ] depends_on: @@ -69,6 +86,9 @@ services: - POSTGRES_DB_NAME=${POSTGRES_DB_NAME} - POSTGRES_USER=${POSTGRES_USER} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} + - TELEMETRY_METRICS_EXPORTER=${TELEMETRY_METRICS_EXPORTER} + - TELEMETRY_TRACES_EXPORTER=${TELEMETRY_TRACES_EXPORTER} + - TELEMETRY_ENVIRONMENT=${TELEMETRY_ENVIRONMENT} restart: always ports: [ ] volumes: diff --git a/diode-server/docker/sample.env b/diode-server/docker/sample.env index 0ede75c1..59675c05 100644 --- a/diode-server/docker/sample.env +++ b/diode-server/docker/sample.env @@ -19,3 +19,6 @@ POSTGRES_PORT=5432 POSTGRES_DB_NAME=diode POSTGRES_USER=diode POSTGRES_PASSWORD=CHANGE.ME +TELEMETRY_ENVIRONMENT=dev +TELEMETRY_METRICS_EXPORTER=prometheus +TELEMETRY_TRACES_EXPORTER=none \ No newline at end of file diff --git a/diode-server/go.mod b/diode-server/go.mod index f758be80..befa78d3 100644 --- a/diode-server/go.mod +++ b/diode-server/go.mod @@ -5,7 +5,7 @@ go 1.23.4 require ( github.com/alicebob/miniredis/v2 v2.33.0 github.com/andybalholm/brotli v1.1.1 - github.com/envoyproxy/protoc-gen-validate v1.0.4 + github.com/envoyproxy/protoc-gen-validate v1.1.0 github.com/getsentry/sentry-go v0.27.0 github.com/google/uuid v1.6.0 github.com/gosimple/slug v1.14.0 @@ -17,43 +17,67 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/oklog/run v1.1.0 github.com/pressly/goose/v3 v3.23.0 + github.com/prometheus/client_golang v1.20.5 github.com/redis/go-redis/v9 v9.5.1 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0 + go.opentelemetry.io/otel v1.34.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.34.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0 + go.opentelemetry.io/otel/exporters/prometheus v0.56.0 + go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0 + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0 + go.opentelemetry.io/otel/metric v1.34.0 + go.opentelemetry.io/otel/sdk v1.34.0 + go.opentelemetry.io/otel/sdk/metric v1.34.0 golang.org/x/time v0.5.0 - google.golang.org/grpc v1.64.1 - google.golang.org/protobuf v1.34.2 + google.golang.org/grpc v1.69.4 + google.golang.org/protobuf v1.36.3 modernc.org/sqlite v1.34.1 ) require ( github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/gosimple/unidecode v1.0.1 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect - github.com/kr/text v0.2.0 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mfridman/interpolate v0.0.2 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/ncruces/go-strftime v0.1.9 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.61.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect - github.com/rogpeppe/go-internal v1.13.1 // indirect github.com/sethvargo/go-retry v0.3.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 // indirect + go.opentelemetry.io/otel/trace v1.34.0 // indirect + go.opentelemetry.io/proto/otlp v1.5.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.31.0 // indirect - golang.org/x/net v0.33.0 // indirect + golang.org/x/crypto v0.32.0 // indirect + golang.org/x/net v0.34.0 // indirect golang.org/x/sync v0.10.0 // indirect - golang.org/x/sys v0.28.0 // indirect + golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.21.0 // indirect golang.org/x/tools v0.28.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240513163218-0867130af1f8 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect gopkg.in/yaml.v3 v3.0.1 // indirect modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 // indirect modernc.org/libc v1.55.3 // indirect diff --git a/diode-server/go.sum b/diode-server/go.sum index a1deba33..77e9a38d 100644 --- a/diode-server/go.sum +++ b/diode-server/go.sum @@ -4,13 +4,16 @@ github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUi github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0= github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -18,12 +21,19 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= -github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= -github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= +github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6Uu2PdjCQwWCJ3bM= +github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4= github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd h1:gbpYu9NMq8jhDVbvlGkMFWCjLFlqqEZjEmObmhUy6Vo= @@ -34,6 +44,8 @@ github.com/gosimple/slug v1.14.0 h1:RtTL/71mJNDfpUbCOmnf/XFkzKRtD6wL6Uy+3akm4Es= github.com/gosimple/slug v1.14.0/go.mod h1:UiRaFH+GEilHstLUmcBgWcI42viBN7mAb818JrYOeFQ= github.com/gosimple/unidecode v1.0.1 h1:hZzFTMMqSswvf0LBJZCZgThIZrpDHFXux9KeGmn6T/o= github.com/gosimple/unidecode v1.0.1/go.mod h1:CP0Cr1Y1kogOtx0bJblKzsVWrqYaqfNOnHzpgWw4Awc= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI= @@ -50,10 +62,14 @@ github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8= github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= -github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mfridman/interpolate v0.0.2 h1:pnuTK7MQIxxFz1Gr+rjSIx9u7qVjf5VOoM/u6BbAxPY= @@ -62,6 +78,8 @@ github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4 github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA= @@ -74,6 +92,14 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pressly/goose/v3 v3.23.0 h1:57hqKos8izGek4v6D5+OXBa+Y4Rq8MU//+MmnevdpVA= github.com/pressly/goose/v3 v3.23.0/go.mod h1:rpx+D9GX/+stXmzKa+uh1DkjPnNVMdiOCV9iLdle4N8= +github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= +github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.61.0 h1:3gv/GThfX0cV2lpO7gkTUwZru38mxevy90Bj8YFSRQQ= +github.com/prometheus/common v0.61.0/go.mod h1:zr29OCN/2BsJRaFwG8QOBr41D6kkchKbpeNH7pAjb/s= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8= github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= @@ -87,37 +113,69 @@ github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0 h1:rgMkmiGfix9vFJDcDi1PK8WEQP4FLQwLDfhp5ZLpFeE= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0/go.mod h1:ijPqXp5P6IRRByFVVg9DY8P5HkxkHE5ARIa+86aXPf4= +go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= +go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.34.0 h1:ajl4QczuJVA2TU9W9AGw++86Xga/RKt//16z/yxPgdk= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.34.0/go.mod h1:Vn3/rlOJ3ntf/Q3zAI0V5lDnTbHGaUsNUeF6nZmm7pA= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 h1:OeNbIYk/2C15ckl7glBlOBp5+WlYsOElzTNmiPW/x60= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0/go.mod h1:7Bept48yIeqxP2OZ9/AqIpYS94h2or0aB4FypJTc8ZM= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0 h1:tgJ0uaNS4c98WRNUEx5U3aDlrDOI5Rs+1Vifcw4DJ8U= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0/go.mod h1:U7HYyW0zt/a9x5J1Kjs+r1f/d4ZHnYFclhYY2+YbeoE= +go.opentelemetry.io/otel/exporters/prometheus v0.56.0 h1:GnCIi0QyG0yy2MrJLzVrIM7laaJstj//flf1zEJCG+E= +go.opentelemetry.io/otel/exporters/prometheus v0.56.0/go.mod h1:JQcVZtbIIPM+7SWBB+T6FK+xunlyidwLp++fN0sUaOk= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0 h1:czJDQwFrMbOr9Kk+BPo1y8WZIIFIK58SA1kykuVeiOU= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0/go.mod h1:lT7bmsxOe58Tq+JIOkTQMCGXdu47oA+VJKLZHbaBKbs= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0 h1:jBpDk4HAUsrnVO1FsfCfCOTEc/MkInJmvfCHYLFiT80= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0/go.mod h1:H9LUIM1daaeZaz91vZcfeM0fejXPmgCYE8ZhzqfJuiU= +go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= +go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= +go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= +go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= +go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= +go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= +go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= +go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= +go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= +go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= -golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= +golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= golang.org/x/mod v0.22.0 h1:D4nJWe9zXqHOmWqj4VMOJhvzj7bEZg4wEYa759z1pH4= golang.org/x/mod v0.22.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= -golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8= golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240513163218-0867130af1f8 h1:mxSlqyb8ZAHsYDCfiXN1EDdNTdvjUJSLY+OnAUtYNYA= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240513163218-0867130af1f8/go.mod h1:I7Y+G38R2bu5j1aLzfFmQfTcU/WnFuqDwLZAbvKTKpM= -google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA= -google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0= -google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= -google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f h1:gap6+3Gk41EItBuyi4XX/bp4oqJ3UwuIMl25yGinuAA= +google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:Ic02D47M+zbarjYYUlK57y316f2MoN0gjAwI3f2S95o= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1:OxYkA3wjPsZyBylwymxSHa7ViiW1Sml4ToBrncvFehI= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50= +google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A= +google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= +google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU= +google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/diode-server/ingester/component.go b/diode-server/ingester/component.go index 093eaedc..92ece4c6 100644 --- a/diode-server/ingester/component.go +++ b/diode-server/ingester/component.go @@ -10,8 +10,9 @@ import ( "slices" "time" - "github.com/kelseyhightower/envconfig" "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "google.golang.org/grpc/reflection" @@ -19,6 +20,7 @@ import ( "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" "github.com/netboxlabs/diode/diode-server/sentry" + "github.com/netboxlabs/diode/diode-server/telemetry" ) const ( @@ -43,13 +45,11 @@ type Component struct { grpcListener net.Listener grpcServer *grpc.Server redisStreamClient *redis.Client + metrics *Metrics } // New creates a new ingester component -func New(ctx context.Context, logger *slog.Logger) (*Component, error) { - var cfg Config - envconfig.MustProcess("", &cfg) - +func New(ctx context.Context, logger *slog.Logger, cfg Config, meter metric.Meter) (*Component, error) { grpcListener, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.GRPCPort)) if err != nil { return nil, fmt.Errorf("failed to listen on port %d: %v", cfg.GRPCPort, err) @@ -74,6 +74,11 @@ func New(ctx context.Context, logger *slog.Logger) (*Component, error) { auth := newAuthUnaryInterceptor(apiKeys) grpcServer := grpc.NewServer(grpc.ChainUnaryInterceptor(auth)) + metrics, err := NewMetrics(meter) + if err != nil { + return nil, fmt.Errorf("failed to create ingester metrics: %v", err) + } + component := &Component{ ctx: ctx, config: cfg, @@ -82,6 +87,7 @@ func New(ctx context.Context, logger *slog.Logger) (*Component, error) { grpcListener: grpcListener, grpcServer: grpcServer, redisStreamClient: redisStreamClient, + metrics: metrics, } diodepb.RegisterIngesterServiceServer(grpcServer, component) @@ -123,7 +129,20 @@ func (c *Component) Stop() error { // Ingest handles the ingest request func (c *Component) Ingest(ctx context.Context, in *diodepb.IngestRequest) (*diodepb.IngestResponse, error) { + // Create attributes for metrics + attrs := []attribute.KeyValue{ + attribute.String(telemetry.AttributeSDKName, in.SdkName), + attribute.String(telemetry.AttributeSDKVersion, in.SdkVersion), + attribute.String(telemetry.AttributeHostname, c.hostname), + attribute.String(telemetry.AttributeProducerAppName, in.ProducerAppName), + attribute.String(telemetry.AttributeProducerAppVersion, in.ProducerAppVersion), + attribute.String(telemetry.AttributeStream, in.Stream), + } + ctx = telemetry.ContextWithMetricAttributes(ctx, attrs...) + if err := validateRequest(in); err != nil { + c.metrics.RecordIngestRequest(ctx, false) + tags := map[string]string{ "hostname": c.hostname, "sdk_name": in.SdkName, @@ -145,6 +164,7 @@ func (c *Component) Ingest(ctx context.Context, in *diodepb.IngestRequest) (*dio encodedRequest, err := proto.Marshal(in) if err != nil { + c.metrics.RecordIngestRequest(ctx, false) c.logger.Error("failed to marshal request", "error", err, "request", in) } @@ -164,7 +184,11 @@ func (c *Component) Ingest(ctx context.Context, in *diodepb.IngestRequest) (*dio Stream: streamID, Values: msg, }).Err(); err != nil { + c.metrics.RecordIngestRequest(ctx, false) c.logger.Error("failed to add element to the stream", "error", err, "streamID", streamID, "value", msg) + } else { + c.metrics.RecordIngestRequest(ctx, true) + c.metrics.RecordIngestEntities(ctx, int64(len(in.GetEntities()))) } return &diodepb.IngestResponse{Errors: errs}, nil diff --git a/diode-server/ingester/component_test.go b/diode-server/ingester/component_test.go index b09a7443..66a65d24 100644 --- a/diode-server/ingester/component_test.go +++ b/diode-server/ingester/component_test.go @@ -10,7 +10,9 @@ import ( "time" "github.com/alicebob/miniredis/v2" + "github.com/kelseyhightower/envconfig" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/test/bufconn" @@ -94,8 +96,14 @@ func startTestComponent(ctx context.Context, t *testing.T) (*ingester.Component, listener := bufconn.Listen(bufSize) s := grpc.NewServer() + var cfg ingester.Config + err := envconfig.Process("", &cfg) + require.NoError(t, err) + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) - component, err := ingester.New(ctx, logger) + + meter := otel.GetMeterProvider().Meter("test.ingester") + component, err := ingester.New(ctx, logger, cfg, meter) require.NoError(t, err) pb.RegisterIngesterServiceServer(s, component) @@ -134,7 +142,12 @@ func TestNewComponent(t *testing.T) { _ = os.Setenv("GRPC_PORT", grpcPort) logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) - component, err := ingester.New(ctx, logger) + var cfg ingester.Config + err := envconfig.Process("", &cfg) + require.NoError(t, err) + + meter := otel.GetMeterProvider().Meter("test.ingester") + component, err := ingester.New(ctx, logger, cfg, meter) require.NoError(t, err) require.NotNil(t, component) diff --git a/diode-server/ingester/config.go b/diode-server/ingester/config.go index bfe3ef42..50f1277e 100644 --- a/diode-server/ingester/config.go +++ b/diode-server/ingester/config.go @@ -1,5 +1,7 @@ package ingester +import "github.com/netboxlabs/diode/diode-server/telemetry" + // Config is the configuration for the ingester service type Config struct { GRPCPort int `envconfig:"GRPC_PORT" default:"8081"` @@ -8,4 +10,6 @@ type Config struct { RedisPassword string `envconfig:"REDIS_PASSWORD" required:"true"` RedisStreamDB int `envconfig:"REDIS_STREAM_DB" default:"1"` DiodeAPIKey string `envconfig:"DIODE_API_KEY" required:"true"` + + Telemetry telemetry.Config `envconfig:"TELEMETRY"` } diff --git a/diode-server/ingester/metrics.go b/diode-server/ingester/metrics.go new file mode 100644 index 00000000..6ff3213d --- /dev/null +++ b/diode-server/ingester/metrics.go @@ -0,0 +1,56 @@ +package ingester + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/netboxlabs/diode/diode-server/telemetry" +) + +const ( + metricIngestRequest = "netboxlabs/diode/ingester/ingest_request_count" + metricIngestEntity = "netboxlabs/diode/ingester/ingest_entity_count" +) + +// Metrics is a struct that contains the metrics for the ingester. +type Metrics struct { + ingestRequest metric.Int64Counter + ingestEntity metric.Int64Counter +} + +// NewMetrics creates a new Metrics instance. +func NewMetrics(meter metric.Meter) (*Metrics, error) { + ingestRequest, err := meter.Int64Counter(metricIngestRequest, + metric.WithDescription("Total number of ingest requests handled")) + if err != nil { + return nil, fmt.Errorf("failed to create total counter: %v", err) + } + + ingestEntity, err := meter.Int64Counter(metricIngestEntity, + metric.WithDescription("Total number of entities ingested")) + if err != nil { + return nil, fmt.Errorf("failed to create total counter: %v", err) + } + + return &Metrics{ + ingestRequest: ingestRequest, + ingestEntity: ingestEntity, + }, nil +} + +// RecordIngestRequest records an ingest request. +func (m *Metrics) RecordIngestRequest(ctx context.Context, success bool) { + attrs := []attribute.KeyValue{ + attribute.Bool(telemetry.AttributeSuccess, success), + } + m.ingestRequest.Add(ctx, 1, telemetry.GatherOptions(ctx, attrs)...) +} + +// RecordIngestEntities records the number of entities ingested. +func (m *Metrics) RecordIngestEntities(ctx context.Context, count int64) { + attrs := []attribute.KeyValue{} + m.ingestEntity.Add(ctx, count, telemetry.GatherOptions(ctx, attrs)...) +} diff --git a/diode-server/netboxdiodeplugin/mocks/netboxapi.go b/diode-server/netboxdiodeplugin/mocks/netboxapi.go index 46b82906..4d5f7c2b 100644 --- a/diode-server/netboxdiodeplugin/mocks/netboxapi.go +++ b/diode-server/netboxdiodeplugin/mocks/netboxapi.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.51.0. DO NOT EDIT. +// Code generated by mockery v2.52.2. DO NOT EDIT. package mocks diff --git a/diode-server/reconciler/config.go b/diode-server/reconciler/config.go index b1c50936..09ce32f3 100644 --- a/diode-server/reconciler/config.go +++ b/diode-server/reconciler/config.go @@ -1,5 +1,7 @@ package reconciler +import "github.com/netboxlabs/diode/diode-server/telemetry" + // Config is the configuration for the reconciler service type Config struct { GRPCPort int `envconfig:"GRPC_PORT" default:"8081"` @@ -21,4 +23,6 @@ type Config struct { // API keys DiodeToNetBoxAPIKey string `envconfig:"DIODE_TO_NETBOX_API_KEY" required:"true"` NetBoxToDiodeAPIKey string `envconfig:"NETBOX_TO_DIODE_API_KEY" required:"true"` + + Telemetry telemetry.Config `envconfig:"TELEMETRY"` } diff --git a/diode-server/reconciler/ingestion_processor.go b/diode-server/reconciler/ingestion_processor.go index 34101027..fad9e30d 100644 --- a/diode-server/reconciler/ingestion_processor.go +++ b/diode-server/reconciler/ingestion_processor.go @@ -11,6 +11,7 @@ import ( "github.com/google/uuid" "github.com/kelseyhightower/envconfig" "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel/attribute" "golang.org/x/time/rate" "google.golang.org/protobuf/proto" @@ -20,6 +21,7 @@ import ( "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" "github.com/netboxlabs/diode/diode-server/reconciler/changeset" "github.com/netboxlabs/diode/diode-server/sentry" + "github.com/netboxlabs/diode/diode-server/telemetry" ) const ( @@ -56,6 +58,7 @@ type IngestionProcessor struct { redisClient RedisClient redisStreamClient RedisClient ops IngestionProcessorOps + metrics IngestionProcessorMetrics } // IngestionLogToProcess represents an ingestion log to process @@ -73,8 +76,16 @@ type IngestionProcessorOps interface { ApplyChangeSet(ctx context.Context, ingestionLogID int32, ingestionLog *reconcilerpb.IngestionLog, changeSetID int32, changeSet *changeset.ChangeSet) error } +// IngestionProcessorMetrics represents the metrics collecteingestion processor +type IngestionProcessorMetrics interface { + RecordHandleMessage(ctx context.Context, success bool) + RecordIngestionLogCreate(ctx context.Context, success bool) + RecordChangeSetCreate(ctx context.Context, success bool, changes int64) + RecordChangeSetApply(ctx context.Context, success bool, changes int64) +} + // NewIngestionProcessor creates a new ingestion processor -func NewIngestionProcessor(ctx context.Context, logger *slog.Logger, ops IngestionProcessorOps) (*IngestionProcessor, error) { +func NewIngestionProcessor(ctx context.Context, logger *slog.Logger, ops IngestionProcessorOps, metrics IngestionProcessorMetrics) (*IngestionProcessor, error) { var cfg Config envconfig.MustProcess("", &cfg) @@ -110,6 +121,7 @@ func NewIngestionProcessor(ctx context.Context, logger *slog.Logger, ops Ingesti redisClient: redisClient, redisStreamClient: redisStreamClient, ops: ops, + metrics: metrics, } return component, nil @@ -169,13 +181,27 @@ func (p *IngestionProcessor) consumeIngestionStream(ctx context.Context, stream, } func (p *IngestionProcessor) handleStreamMessage(ctx context.Context, msg redis.XMessage) error { - p.logger.Debug("received stream message", "message", msg.Values, "id", msg.ID) + // Create attributes for metrics + attrs := []attribute.KeyValue{ + attribute.String(telemetry.AttributeHostname, p.hostname), + } + ctx = telemetry.ContextWithMetricAttributes(ctx, attrs...) ingestReq := &diodepb.IngestRequest{} if err := proto.Unmarshal([]byte(msg.Values["request"].(string)), ingestReq); err != nil { + p.metrics.RecordHandleMessage(ctx, false) return err } + // Add request-specific attributes + attrs = append(attrs, + attribute.String(telemetry.AttributeSDKName, ingestReq.SdkName), + attribute.String(telemetry.AttributeSDKVersion, ingestReq.SdkVersion), + attribute.String(telemetry.AttributeProducerAppName, ingestReq.ProducerAppName), + attribute.String(telemetry.AttributeProducerAppVersion, ingestReq.ProducerAppVersion), + ) + ctx = telemetry.ContextWithMetricAttributes(ctx, attrs...) + errs := make([]error, 0) ingestionTs, err := strconv.Atoi(msg.Values["ingestion_ts"].(string)) @@ -223,8 +249,10 @@ func (p *IngestionProcessor) handleStreamMessage(ctx context.Context, msg redis. "hostname": p.hostname, } sentry.CaptureError(fmt.Errorf("failed to handle ingest request: %v", errs), nil, "Ingestion request", contextMap) + p.metrics.RecordHandleMessage(ctx, false) } else { p.redisStreamClient.XDel(ctx, redisStreamID, msg.ID) + p.metrics.RecordHandleMessage(ctx, true) } return nil @@ -261,6 +289,9 @@ func (p *IngestionProcessor) GenerateChangeSet(ctx context.Context, generateChan id, changeSet, err := p.ops.GenerateChangeSet(ctx, msg.ingestionLogID, msg.ingestionLog, "") if err != nil { p.logger.Error("error generating changeset", "error", err) + p.metrics.RecordChangeSetCreate(ctx, false, 0) + } else { + p.metrics.RecordChangeSetCreate(ctx, true, int64(len(changeSet.ChangeSet))) } if changeSet != nil && len(changeSet.ChangeSet) > 0 { @@ -305,6 +336,9 @@ func (p *IngestionProcessor) ApplyChangeSet(ctx context.Context, applyChan <-cha if err := p.ops.ApplyChangeSet(ctx, msg.ingestionLogID, msg.ingestionLog, msg.changeSetID, msg.changeSet); err != nil { p.logger.Error("error applying changeset", "error", err) + p.metrics.RecordChangeSetApply(ctx, false, 0) + } else { + p.metrics.RecordChangeSetApply(ctx, true, int64(len(msg.changeSet.ChangeSet))) } } } @@ -347,8 +381,11 @@ func (p *IngestionProcessor) CreateIngestionLogs(ctx context.Context, ingestReq id, err := p.ops.CreateIngestionLog(ctx, ingestionLog, nil) if err != nil { errs = append(errs, fmt.Errorf("failed to create ingestion log: %v", err)) + p.metrics.RecordIngestionLogCreate(ctx, false) continue } + + p.metrics.RecordIngestionLogCreate(ctx, true) p.logger.Debug("created ingestion log", "id", id, "externalID", ingestionLog.GetId()) generateIngestionLogChan <- IngestionLogToProcess{ diff --git a/diode-server/reconciler/ingestion_processor_internal_test.go b/diode-server/reconciler/ingestion_processor_internal_test.go index b381fc50..1b161495 100644 --- a/diode-server/reconciler/ingestion_processor_internal_test.go +++ b/diode-server/reconciler/ingestion_processor_internal_test.go @@ -152,6 +152,7 @@ func TestHandleStreamMessage(t *testing.T) { mockRedisStreamClient := new(mr.RedisClient) mockNbClient := new(mnp.NetBoxAPI) mockRepository := new(mr.Repository) + mockMetrics := new(mr.IngestionProcessorMetrics) logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) p := &IngestionProcessor{ @@ -163,7 +164,8 @@ func TestHandleStreamMessage(t *testing.T) { ReconcilerRateLimiterRPS: 20, ReconcilerRateLimiterBurst: 1, }, - ops: NewOps(mockRepository, mockNbClient, logger), + ops: NewOps(mockRepository, mockNbClient, logger), + metrics: mockMetrics, } request := redis.XMessage{} @@ -191,9 +193,9 @@ func TestHandleStreamMessage(t *testing.T) { } } if tt.reconcilerError { - mockNbClient.On("RetrieveObjectState", ctx, mock.Anything).Return(&netboxdiodeplugin.ObjectState{}, errors.New("prepare error")) + mockNbClient.On("RetrieveObjectState", mock.Anything, mock.Anything).Return(&netboxdiodeplugin.ObjectState{}, errors.New("prepare error")) } else { - mockNbClient.On("RetrieveObjectState", ctx, mock.Anything).Return(&netboxdiodeplugin.ObjectState{ + mockNbClient.On("RetrieveObjectState", mock.Anything, mock.Anything).Return(&netboxdiodeplugin.ObjectState{ ObjectType: "dcim.site", ObjectID: 0, ObjectChangeID: 0, @@ -202,12 +204,16 @@ func TestHandleStreamMessage(t *testing.T) { }, }, nil) } - mockNbClient.On("ApplyChangeSet", ctx, mock.Anything).Return(tt.changeSetResponse, tt.changeSetError) + mockNbClient.On("ApplyChangeSet", mock.Anything, mock.Anything).Return(tt.changeSetResponse, tt.changeSetError) if tt.entities[0].Entity != nil { - mockRepository.On("CreateIngestionLog", ctx, mock.Anything, mock.Anything).Return(int32Ptr(1), nil) + mockRepository.On("CreateIngestionLog", mock.Anything, mock.Anything, mock.Anything).Return(int32Ptr(1), nil) } - mockRedisStreamClient.On("XAck", ctx, mock.Anything, mock.Anything, mock.Anything).Return(redis.NewIntCmd(ctx)) - mockRedisStreamClient.On("XDel", ctx, mock.Anything, mock.Anything).Return(redis.NewIntCmd(ctx)) + mockRedisStreamClient.On("XAck", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(redis.NewIntCmd(ctx)) + mockRedisStreamClient.On("XDel", mock.Anything, mock.Anything, mock.Anything).Return(redis.NewIntCmd(ctx)) + mockMetrics.On("RecordHandleMessage", mock.Anything, mock.Anything).Return() + mockMetrics.On("RecordIngestionLogCreate", mock.Anything, mock.Anything).Return() + mockMetrics.On("RecordChangeSetCreate", mock.Anything, mock.Anything, mock.Anything).Return() + mockMetrics.On("RecordChangeSetApply", mock.Anything, mock.Anything, mock.Anything).Return() err := p.handleStreamMessage(ctx, request) if tt.expectedError { @@ -271,6 +277,11 @@ func TestConsumeIngestionStream(t *testing.T) { mockRedisClient.On("XReadGroup", mock.Anything, mock.Anything).Return(cmdSlice) } mockRedisClient.On("XGroupCreateMkStream", ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(status) + mockMetrics := new(mr.IngestionProcessorMetrics) + mockMetrics.On("RecordHandleMessage", mock.Anything, mock.Anything).Return() + mockMetrics.On("RecordIngestionLogCreate", mock.Anything, mock.Anything).Return() + mockMetrics.On("RecordChangeSetCreate", mock.Anything, mock.Anything, mock.Anything).Return() + mockMetrics.On("RecordChangeSetApply", mock.Anything, mock.Anything, mock.Anything).Return() p := &IngestionProcessor{ redisStreamClient: mockRedisClient, @@ -280,6 +291,7 @@ func TestConsumeIngestionStream(t *testing.T) { ReconcilerRateLimiterRPS: 20, ReconcilerRateLimiterBurst: 1, }, + metrics: mockMetrics, } err := p.consumeIngestionStream(ctx, "test-stream", "test-group", "test-consumer") @@ -480,6 +492,7 @@ func TestIngestionProcessor_GenerateAndApplyChangeSet(t *testing.T) { mockRedisClient := new(mr.RedisClient) mockNbClient := new(mnp.NetBoxAPI) mockRepository := new(mr.Repository) + mockMetrics := new(mr.IngestionProcessorMetrics) logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) p := &IngestionProcessor{ @@ -490,7 +503,8 @@ func TestIngestionProcessor_GenerateAndApplyChangeSet(t *testing.T) { ReconcilerRateLimiterRPS: 20, ReconcilerRateLimiterBurst: 1, }, - ops: NewOps(mockRepository, mockNbClient, logger), + ops: NewOps(mockRepository, mockNbClient, logger), + metrics: mockMetrics, } ingestionLogID := int32(1) @@ -502,6 +516,10 @@ func TestIngestionProcessor_GenerateAndApplyChangeSet(t *testing.T) { } mockRepository.On("UpdateIngestionLogStateWithError", ctx, ingestionLogID, tt.expectedStatus, mock.Anything).Return(nil) mockRepository.On("CreateChangeSet", ctx, mock.Anything, ingestionLogID).Return(int32Ptr(1), nil) + mockMetrics.On("RecordHandleMessage", mock.Anything, mock.Anything).Return() + mockMetrics.On("RecordIngestionLogCreate", mock.Anything, mock.Anything).Return() + mockMetrics.On("RecordChangeSetCreate", mock.Anything, mock.Anything, mock.Anything).Return() + mockMetrics.On("RecordChangeSetApply", mock.Anything, mock.Anything, mock.Anything).Return() bufCapacity := 1 diff --git a/diode-server/reconciler/ingestion_processor_metrics.go b/diode-server/reconciler/ingestion_processor_metrics.go new file mode 100644 index 00000000..a7207223 --- /dev/null +++ b/diode-server/reconciler/ingestion_processor_metrics.go @@ -0,0 +1,119 @@ +package reconciler + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/netboxlabs/diode/diode-server/telemetry" +) + +const ( + // Metric names + metricHandleMessage = "handle_message_count" + metricIngestionLogCreate = "ingestion_log_create_count" + metricChangeSetCreate = "change_set_create_count" + metricChangeSetApply = "change_set_apply_count" + metricChangeCreate = "change_create_count" + metricChangeApply = "change_apply_count" +) + +// OtelIngestionProcessorMetrics is a struct that contains the metrics for the ingestion processor. +type OtelIngestionProcessorMetrics struct { + // Metrics + handleMessage metric.Int64Counter + ingestionLogCreate metric.Int64Counter + changeSetCreate metric.Int64Counter + changeCreate metric.Int64Counter + changeSetApply metric.Int64Counter + changeApply metric.Int64Counter +} + +// NewOtelIngestionProcessorMetrics creates a new OtelIngestionProcessorMetrics instance. +// If a prefix is provided, it will be prepended to the metric names, separated by a /. +func NewOtelIngestionProcessorMetrics(meter metric.Meter, prefix string) (*OtelIngestionProcessorMetrics, error) { + if prefix != "" { + prefix += "/" + } + + handleMessage, err := meter.Int64Counter(prefix+metricHandleMessage, metric.WithDescription("Number of messages handled")) + if err != nil { + return nil, fmt.Errorf("failed to create handle message counter: %v", err) + } + + ingestionLogCreate, err := meter.Int64Counter(prefix+metricIngestionLogCreate, metric.WithDescription("Number of ingestion logs created")) + if err != nil { + return nil, fmt.Errorf("failed to create ingestion log create counter: %v", err) + } + + changeSetCreate, err := meter.Int64Counter(prefix+metricChangeSetCreate, metric.WithDescription("Number of change sets created")) + if err != nil { + return nil, fmt.Errorf("failed to create change set create counter: %v", err) + } + + changeSetApply, err := meter.Int64Counter(prefix+metricChangeSetApply, metric.WithDescription("Number of change sets applied")) + if err != nil { + return nil, fmt.Errorf("failed to create change set apply counter: %v", err) + } + + changeCreate, err := meter.Int64Counter(prefix+metricChangeCreate, metric.WithDescription("Number of changes created")) + if err != nil { + return nil, fmt.Errorf("failed to create change create counter: %v", err) + } + + changeApply, err := meter.Int64Counter(prefix+metricChangeApply, metric.WithDescription("Number of changes applied")) + if err != nil { + return nil, fmt.Errorf("failed to create change apply counter: %v", err) + } + + return &OtelIngestionProcessorMetrics{ + handleMessage: handleMessage, + ingestionLogCreate: ingestionLogCreate, + changeSetCreate: changeSetCreate, + changeCreate: changeCreate, + changeSetApply: changeSetApply, + changeApply: changeApply, + }, nil +} + +// RecordHandleMessage records a message being handled. +func (m *OtelIngestionProcessorMetrics) RecordHandleMessage(ctx context.Context, success bool) { + attrs := []attribute.KeyValue{ + attribute.Bool(telemetry.AttributeSuccess, success), + } + m.handleMessage.Add(ctx, 1, telemetry.GatherOptions(ctx, attrs)...) +} + +// RecordIngestionLogCreate records an ingestion log being created. +func (m *OtelIngestionProcessorMetrics) RecordIngestionLogCreate(ctx context.Context, success bool) { + attrs := []attribute.KeyValue{ + attribute.Bool(telemetry.AttributeSuccess, success), + } + m.ingestionLogCreate.Add(ctx, 1, telemetry.GatherOptions(ctx, attrs)...) +} + +// RecordChangeSetCreate records a change set being created. +func (m *OtelIngestionProcessorMetrics) RecordChangeSetCreate(ctx context.Context, success bool, changes int64) { + attrs := []attribute.KeyValue{ + attribute.Bool(telemetry.AttributeSuccess, success), + } + options := telemetry.GatherOptions(ctx, attrs) + m.changeSetCreate.Add(ctx, 1, options...) + if success { + m.changeCreate.Add(ctx, changes, options...) + } +} + +// RecordChangeSetApply records a change set being applied. +func (m *OtelIngestionProcessorMetrics) RecordChangeSetApply(ctx context.Context, success bool, changes int64) { + attrs := []attribute.KeyValue{ + attribute.Bool(telemetry.AttributeSuccess, success), + } + options := telemetry.GatherOptions(ctx, attrs) + m.changeSetApply.Add(ctx, 1, options...) + if success { + m.changeApply.Add(ctx, changes, options...) + } +} diff --git a/diode-server/reconciler/ingestion_processor_test.go b/diode-server/reconciler/ingestion_processor_test.go index 86c3a69b..25657a77 100644 --- a/diode-server/reconciler/ingestion_processor_test.go +++ b/diode-server/reconciler/ingestion_processor_test.go @@ -38,7 +38,8 @@ func TestNewIngestionProcessor(t *testing.T) { logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) nbClient, err := netboxdiodeplugin.NewClient(logger, cfg.DiodeToNetBoxAPIKey) require.NoError(t, err) - processor, err := reconciler.NewIngestionProcessor(ctx, logger, reconciler.NewOps(mockRepository, nbClient, logger)) + metrics := mocks.NewIngestionProcessorMetrics(t) + processor, err := reconciler.NewIngestionProcessor(ctx, logger, reconciler.NewOps(mockRepository, nbClient, logger), metrics) require.NoError(t, err) require.NotNil(t, processor) @@ -63,7 +64,13 @@ func TestIngestionProcessorStart(t *testing.T) { nbClient, err := netboxdiodeplugin.NewClient(logger, cfg.DiodeToNetBoxAPIKey) require.NoError(t, err) - processor, err := reconciler.NewIngestionProcessor(ctx, logger, reconciler.NewOps(mockRepository, nbClient, logger)) + mockMetrics := new(mocks.IngestionProcessorMetrics) + mockMetrics.On("RecordHandleMessage", mock.Anything, mock.Anything).Return() + mockMetrics.On("RecordIngestionLogCreate", mock.Anything, mock.Anything).Return() + mockMetrics.On("RecordChangeSetCreate", mock.Anything, mock.Anything, mock.Anything).Return() + mockMetrics.On("RecordChangeSetApply", mock.Anything, mock.Anything, mock.Anything).Return() + + processor, err := reconciler.NewIngestionProcessor(ctx, logger, reconciler.NewOps(mockRepository, nbClient, logger), mockMetrics) require.NoError(t, err) require.NotNil(t, processor) @@ -233,9 +240,9 @@ func TestIngestionProcessorStart(t *testing.T) { // Wait server time.Sleep(50 * time.Millisecond) - mockRepository.On("CreateIngestionLog", ctx, mock.Anything, mock.Anything).Return(int32Ptr(1), nil) - mockRepository.On("UpdateIngestionLogStateWithError", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil) - mockRepository.On("CreateChangeSet", ctx, mock.Anything, mock.Anything).Return(int32Ptr(1), nil) + mockRepository.On("CreateIngestionLog", mock.Anything, mock.Anything, mock.Anything).Return(int32Ptr(1), nil) + mockRepository.On("UpdateIngestionLogStateWithError", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + mockRepository.On("CreateChangeSet", mock.Anything, mock.Anything, mock.Anything).Return(int32Ptr(1), nil) redisClient := redis.NewClient(&redis.Options{ Addr: s.Addr(), DB: 1, diff --git a/diode-server/reconciler/mocks/ingestionprocessormetrics.go b/diode-server/reconciler/mocks/ingestionprocessormetrics.go new file mode 100644 index 00000000..43ea6ad5 --- /dev/null +++ b/diode-server/reconciler/mocks/ingestionprocessormetrics.go @@ -0,0 +1,174 @@ +// Code generated by mockery v2.52.2. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// IngestionProcessorMetrics is an autogenerated mock type for the IngestionProcessorMetrics type +type IngestionProcessorMetrics struct { + mock.Mock +} + +type IngestionProcessorMetrics_Expecter struct { + mock *mock.Mock +} + +func (_m *IngestionProcessorMetrics) EXPECT() *IngestionProcessorMetrics_Expecter { + return &IngestionProcessorMetrics_Expecter{mock: &_m.Mock} +} + +// RecordChangeSetApply provides a mock function with given fields: ctx, success, changes +func (_m *IngestionProcessorMetrics) RecordChangeSetApply(ctx context.Context, success bool, changes int64) { + _m.Called(ctx, success, changes) +} + +// IngestionProcessorMetrics_RecordChangeSetApply_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordChangeSetApply' +type IngestionProcessorMetrics_RecordChangeSetApply_Call struct { + *mock.Call +} + +// RecordChangeSetApply is a helper method to define mock.On call +// - ctx context.Context +// - success bool +// - changes int64 +func (_e *IngestionProcessorMetrics_Expecter) RecordChangeSetApply(ctx interface{}, success interface{}, changes interface{}) *IngestionProcessorMetrics_RecordChangeSetApply_Call { + return &IngestionProcessorMetrics_RecordChangeSetApply_Call{Call: _e.mock.On("RecordChangeSetApply", ctx, success, changes)} +} + +func (_c *IngestionProcessorMetrics_RecordChangeSetApply_Call) Run(run func(ctx context.Context, success bool, changes int64)) *IngestionProcessorMetrics_RecordChangeSetApply_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(bool), args[2].(int64)) + }) + return _c +} + +func (_c *IngestionProcessorMetrics_RecordChangeSetApply_Call) Return() *IngestionProcessorMetrics_RecordChangeSetApply_Call { + _c.Call.Return() + return _c +} + +func (_c *IngestionProcessorMetrics_RecordChangeSetApply_Call) RunAndReturn(run func(context.Context, bool, int64)) *IngestionProcessorMetrics_RecordChangeSetApply_Call { + _c.Run(run) + return _c +} + +// RecordChangeSetCreate provides a mock function with given fields: ctx, success, changes +func (_m *IngestionProcessorMetrics) RecordChangeSetCreate(ctx context.Context, success bool, changes int64) { + _m.Called(ctx, success, changes) +} + +// IngestionProcessorMetrics_RecordChangeSetCreate_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordChangeSetCreate' +type IngestionProcessorMetrics_RecordChangeSetCreate_Call struct { + *mock.Call +} + +// RecordChangeSetCreate is a helper method to define mock.On call +// - ctx context.Context +// - success bool +// - changes int64 +func (_e *IngestionProcessorMetrics_Expecter) RecordChangeSetCreate(ctx interface{}, success interface{}, changes interface{}) *IngestionProcessorMetrics_RecordChangeSetCreate_Call { + return &IngestionProcessorMetrics_RecordChangeSetCreate_Call{Call: _e.mock.On("RecordChangeSetCreate", ctx, success, changes)} +} + +func (_c *IngestionProcessorMetrics_RecordChangeSetCreate_Call) Run(run func(ctx context.Context, success bool, changes int64)) *IngestionProcessorMetrics_RecordChangeSetCreate_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(bool), args[2].(int64)) + }) + return _c +} + +func (_c *IngestionProcessorMetrics_RecordChangeSetCreate_Call) Return() *IngestionProcessorMetrics_RecordChangeSetCreate_Call { + _c.Call.Return() + return _c +} + +func (_c *IngestionProcessorMetrics_RecordChangeSetCreate_Call) RunAndReturn(run func(context.Context, bool, int64)) *IngestionProcessorMetrics_RecordChangeSetCreate_Call { + _c.Run(run) + return _c +} + +// RecordHandleMessage provides a mock function with given fields: ctx, success +func (_m *IngestionProcessorMetrics) RecordHandleMessage(ctx context.Context, success bool) { + _m.Called(ctx, success) +} + +// IngestionProcessorMetrics_RecordHandleMessage_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordHandleMessage' +type IngestionProcessorMetrics_RecordHandleMessage_Call struct { + *mock.Call +} + +// RecordHandleMessage is a helper method to define mock.On call +// - ctx context.Context +// - success bool +func (_e *IngestionProcessorMetrics_Expecter) RecordHandleMessage(ctx interface{}, success interface{}) *IngestionProcessorMetrics_RecordHandleMessage_Call { + return &IngestionProcessorMetrics_RecordHandleMessage_Call{Call: _e.mock.On("RecordHandleMessage", ctx, success)} +} + +func (_c *IngestionProcessorMetrics_RecordHandleMessage_Call) Run(run func(ctx context.Context, success bool)) *IngestionProcessorMetrics_RecordHandleMessage_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(bool)) + }) + return _c +} + +func (_c *IngestionProcessorMetrics_RecordHandleMessage_Call) Return() *IngestionProcessorMetrics_RecordHandleMessage_Call { + _c.Call.Return() + return _c +} + +func (_c *IngestionProcessorMetrics_RecordHandleMessage_Call) RunAndReturn(run func(context.Context, bool)) *IngestionProcessorMetrics_RecordHandleMessage_Call { + _c.Run(run) + return _c +} + +// RecordIngestionLogCreate provides a mock function with given fields: ctx, success +func (_m *IngestionProcessorMetrics) RecordIngestionLogCreate(ctx context.Context, success bool) { + _m.Called(ctx, success) +} + +// IngestionProcessorMetrics_RecordIngestionLogCreate_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordIngestionLogCreate' +type IngestionProcessorMetrics_RecordIngestionLogCreate_Call struct { + *mock.Call +} + +// RecordIngestionLogCreate is a helper method to define mock.On call +// - ctx context.Context +// - success bool +func (_e *IngestionProcessorMetrics_Expecter) RecordIngestionLogCreate(ctx interface{}, success interface{}) *IngestionProcessorMetrics_RecordIngestionLogCreate_Call { + return &IngestionProcessorMetrics_RecordIngestionLogCreate_Call{Call: _e.mock.On("RecordIngestionLogCreate", ctx, success)} +} + +func (_c *IngestionProcessorMetrics_RecordIngestionLogCreate_Call) Run(run func(ctx context.Context, success bool)) *IngestionProcessorMetrics_RecordIngestionLogCreate_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(bool)) + }) + return _c +} + +func (_c *IngestionProcessorMetrics_RecordIngestionLogCreate_Call) Return() *IngestionProcessorMetrics_RecordIngestionLogCreate_Call { + _c.Call.Return() + return _c +} + +func (_c *IngestionProcessorMetrics_RecordIngestionLogCreate_Call) RunAndReturn(run func(context.Context, bool)) *IngestionProcessorMetrics_RecordIngestionLogCreate_Call { + _c.Run(run) + return _c +} + +// NewIngestionProcessorMetrics creates a new instance of IngestionProcessorMetrics. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewIngestionProcessorMetrics(t interface { + mock.TestingT + Cleanup(func()) +}) *IngestionProcessorMetrics { + mock := &IngestionProcessorMetrics{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/diode-server/reconciler/mocks/ingestionprocessorops.go b/diode-server/reconciler/mocks/ingestionprocessorops.go index 77c57e97..fe6d2c61 100644 --- a/diode-server/reconciler/mocks/ingestionprocessorops.go +++ b/diode-server/reconciler/mocks/ingestionprocessorops.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.51.0. DO NOT EDIT. +// Code generated by mockery v2.52.2. DO NOT EDIT. package mocks diff --git a/diode-server/reconciler/mocks/redisclient.go b/diode-server/reconciler/mocks/redisclient.go index 2816ddb9..773331e1 100644 --- a/diode-server/reconciler/mocks/redisclient.go +++ b/diode-server/reconciler/mocks/redisclient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.51.0. DO NOT EDIT. +// Code generated by mockery v2.52.2. DO NOT EDIT. package mocks diff --git a/diode-server/reconciler/mocks/repository.go b/diode-server/reconciler/mocks/repository.go index e598d5d2..699784d5 100644 --- a/diode-server/reconciler/mocks/repository.go +++ b/diode-server/reconciler/mocks/repository.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.51.0. DO NOT EDIT. +// Code generated by mockery v2.52.2. DO NOT EDIT. package mocks diff --git a/diode-server/reconciler/server.go b/diode-server/reconciler/server.go index 081dc010..4fd00404 100644 --- a/diode-server/reconciler/server.go +++ b/diode-server/reconciler/server.go @@ -9,6 +9,7 @@ import ( "github.com/kelseyhightower/envconfig" "github.com/redis/go-redis/v9" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" @@ -62,7 +63,10 @@ func NewServer(ctx context.Context, logger *slog.Logger, repository Repository) apiKeys := loadAPIKeys(cfg) auth := newAuthUnaryInterceptor(logger, apiKeys) - grpcServer := grpc.NewServer(grpc.ChainUnaryInterceptor(auth)) + grpcServer := grpc.NewServer( + grpc.ChainUnaryInterceptor(auth), + grpc.StatsHandler(otelgrpc.NewServerHandler()), + ) component := &Server{ config: cfg, diff --git a/diode-server/telemetry/config.go b/diode-server/telemetry/config.go new file mode 100644 index 00000000..1b861a39 --- /dev/null +++ b/diode-server/telemetry/config.go @@ -0,0 +1,27 @@ +package telemetry + +// Config holds OpenTelemetry configuration settings +// Note the environment variables may be prefixed with TELEMETRY_ +// based on the application's configuration structure +type Config struct { + // ServiceName is the name of the service being instrumented + ServiceName string `envconfig:"SERVICE_NAME"` + + // Environment represents the deployment environment (e.g., prod, staging, dev) + Environment string `envconfig:"ENVIRONMENT" default:"dev"` + + // MetricsExporter represents the type of exporter to use. oltp,console and none are supported + MetricsExporter string `envconfig:"METRICS_EXPORTER" default:"none"` + + // MetricsPort is the port to serve the metrics on if MetricsExporter is prometheus + MetricsPort int `envconfig:"METRICS_PORT" default:"9090"` + + // TracesExporter represents the type of exporter to use. oltp,console and none are supported + TracesExporter string `envconfig:"TRACES_EXPORTER" default:"none"` + + // Additional environment variables used interally by otel + // can be found in the otel exporters documentation eg + // https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/ + // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc + // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc +} diff --git a/diode-server/telemetry/constants.go b/diode-server/telemetry/constants.go new file mode 100644 index 00000000..c1db91ab --- /dev/null +++ b/diode-server/telemetry/constants.go @@ -0,0 +1,20 @@ +package telemetry + +const ( + // AttributeHostname is the hostname of the machine that the request is coming from + AttributeHostname = "hostname" + // AttributeSDKName is the name of the SDK that is being used + AttributeSDKName = "sdk_name" + // AttributeSDKVersion is the version of the SDK that is being used + AttributeSDKVersion = "sdk_version" + // AttributeProducerAppName is the name of the producer application + AttributeProducerAppName = "producer_app_name" + // AttributeProducerAppVersion is the version of the producer application + AttributeProducerAppVersion = "producer_app_version" + // AttributeSuccess is a boolean attribute that indicates if the request was successful + AttributeSuccess = "success" + // AttributeStream is the stream that the request is coming from + AttributeStream = "stream" + // AttributeState is the state of the request + AttributeState = "state" +) diff --git a/diode-server/telemetry/prometheus.go b/diode-server/telemetry/prometheus.go new file mode 100644 index 00000000..b239b6ac --- /dev/null +++ b/diode-server/telemetry/prometheus.go @@ -0,0 +1,24 @@ +package telemetry + +import ( + "fmt" + "log/slog" + "net/http" + + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +// ServePrometheusMetricsIfNecessary serves the prometheus metrics if the metrics exporter is set to prometheus +func ServePrometheusMetricsIfNecessary(cfg Config, log *slog.Logger) { + if cfg.MetricsExporter == "prometheus" { + go ServePrometheusMetrics(cfg.MetricsPort, log) + } +} + +// ServePrometheusMetrics serves the prometheus metrics on the given port +func ServePrometheusMetrics(port int, log *slog.Logger) { + http.Handle("/metrics", promhttp.Handler()) + if err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil); err != nil { + log.Error("failed to serve prometheus metrics", "error", err) + } +} diff --git a/diode-server/telemetry/setup.go b/diode-server/telemetry/setup.go new file mode 100644 index 00000000..cc63f5ef --- /dev/null +++ b/diode-server/telemetry/setup.go @@ -0,0 +1,121 @@ +package telemetry + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "go.opentelemetry.io/otel/propagation" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" +) + +// Setup initializes OpenTelemetry with the provided configuration +// Returns a shutdown function to clean up resources that should be called +// when the application is shutting down. +// The OLTP exporters can be further configured using the environment variables +// specified in the OLTP documentation. +func Setup(ctx context.Context, cfg Config) (shutdown func(context.Context) error, err error) { + res, err := resource.New(ctx, + resource.WithAttributes( + semconv.ServiceName(cfg.ServiceName), + semconv.DeploymentEnvironment(cfg.Environment), + ), + ) + if err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + + enableTraces := true + var traceExporter sdktrace.SpanExporter + switch cfg.TracesExporter { + case "console": + traceExporter, err = stdouttrace.New(stdouttrace.WithPrettyPrint()) + if err != nil { + return nil, fmt.Errorf("failed to create console trace exporter: %w", err) + } + case "oltp": + traceExporter, err = otlptracegrpc.New(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create trace exporter: %w", err) + } + case "none": + enableTraces = false + default: + return nil, fmt.Errorf("unsupported traces exporter type: %s", cfg.TracesExporter) + } + + enableMetrics := true + var metricReader sdkmetric.Reader + switch cfg.MetricsExporter { + case "console": + metricExporter, err := stdoutmetric.New() + if err != nil { + return nil, fmt.Errorf("failed to create console metric exporter: %w", err) + } + metricReader = sdkmetric.NewPeriodicReader(metricExporter) + case "oltp": + metricExporter, err := otlpmetricgrpc.New(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create metric exporter: %w", err) + } + metricReader = sdkmetric.NewPeriodicReader(metricExporter) + case "prometheus": + metricReader, err = prometheus.New() + if err != nil { + return nil, fmt.Errorf("failed to create prometheus metric exporter: %w", err) + } + case "none": + enableMetrics = false + default: + return nil, fmt.Errorf("unsupported metrics exporter type: %s", cfg.MetricsExporter) + } + + var tracerProvider *sdktrace.TracerProvider + var meterProvider *sdkmetric.MeterProvider + + if enableTraces { + tracerProvider = sdktrace.NewTracerProvider( + sdktrace.WithResource(res), + sdktrace.WithBatcher(traceExporter), + ) + otel.SetTracerProvider(tracerProvider) + } + + if enableMetrics { + meterProvider = sdkmetric.NewMeterProvider( + sdkmetric.WithResource(res), + sdkmetric.WithReader(metricReader), + ) + otel.SetMeterProvider(meterProvider) + } + + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + )) + + shutdown = func(ctx context.Context) error { + var shutdownErr error + if meterProvider != nil { + if err := meterProvider.Shutdown(ctx); err != nil { + shutdownErr = fmt.Errorf("failed to shutdown meter provider: %w", err) + } + } + if tracerProvider != nil { + if err := tracerProvider.Shutdown(ctx); err != nil { + shutdownErr = fmt.Errorf("failed to shutdown tracer provider: %w", err) + } + } + return shutdownErr + } + + return shutdown, nil +} diff --git a/diode-server/telemetry/util.go b/diode-server/telemetry/util.go new file mode 100644 index 00000000..2b32c13e --- /dev/null +++ b/diode-server/telemetry/util.go @@ -0,0 +1,30 @@ +package telemetry + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// ContextWithMetricAttributes adds metric attributes to the context. +func ContextWithMetricAttributes(ctx context.Context, attrs ...attribute.KeyValue) context.Context { + existing := MetricAttributesFromContext(ctx) + return context.WithValue(ctx, metricAttributesKey{}, append(existing, attrs...)) +} + +// MetricAttributesFromContext returns the metric attributes from the context. +func MetricAttributesFromContext(ctx context.Context) []attribute.KeyValue { + if attrs, ok := ctx.Value(metricAttributesKey{}).([]attribute.KeyValue); ok { + return attrs + } + return []attribute.KeyValue{} +} + +type metricAttributesKey struct{} + +// GatherOptions collects metric atributes from the context and appends them to the options and attributes given +func GatherOptions(ctx context.Context, attrs []attribute.KeyValue, options ...metric.AddOption) []metric.AddOption { + return append(options, + metric.WithAttributes(append(attrs, MetricAttributesFromContext(ctx)...)...)) +}