From a5ac3e7ce782ccd60464abfccab01a4d89422234 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Sw=C3=A4rd?= Date: Fri, 25 Aug 2023 20:01:10 +0200 Subject: [PATCH] Add tracing to policies service. (#7116) --- services/policies/pkg/command/server.go | 16 ++- services/policies/pkg/config/config.go | 1 + .../pkg/config/defaults/defaultconfig.go | 12 +++ services/policies/pkg/config/tracing.go | 21 ++++ .../policies/pkg/service/event/service.go | 101 ++++++++++-------- 5 files changed, 107 insertions(+), 44 deletions(-) create mode 100644 services/policies/pkg/config/tracing.go diff --git a/services/policies/pkg/command/server.go b/services/policies/pkg/command/server.go index 8644d11f701..ec749c24cf3 100644 --- a/services/policies/pkg/command/server.go +++ b/services/policies/pkg/command/server.go @@ -12,6 +12,7 @@ import ( "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/ocis-pkg/service/debug" "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" + "github.com/owncloud/ocis/v2/ocis-pkg/tracing" "github.com/owncloud/ocis/v2/ocis-pkg/version" svcProtogen "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/policies/v0" "github.com/owncloud/ocis/v2/services/policies/pkg/config" @@ -50,13 +51,23 @@ func Server(cfg *config.Config) *cli.Command { ) defer cancel() + traceProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name) + if err != nil { + return err + } + e, err := opa.NewOPA(cfg.Engine.Timeout, logger, cfg.Engine) if err != nil { return err } { - grpcClient, err := grpc.NewClient(grpc.GetClientOptions(cfg.GRPCClientTLS)...) + grpcClient, err := grpc.NewClient( + append( + grpc.GetClientOptions(cfg.GRPCClientTLS), + grpc.WithTraceProvider(traceProvider), + )..., + ) if err != nil { return err } @@ -74,6 +85,7 @@ func Server(cfg *config.Config) *cli.Command { grpc.Address(cfg.GRPC.Addr), grpc.Namespace(cfg.GRPC.Namespace), grpc.Version(version.GetString()), + grpc.TraceProvider(traceProvider), ) if err != nil { return err @@ -103,7 +115,7 @@ func Server(cfg *config.Config) *cli.Command { return err } - eventSvc, err := svcEvent.New(bus, logger, e, cfg.Postprocessing.Query) + eventSvc, err := svcEvent.New(ctx, bus, logger, traceProvider, e, cfg.Postprocessing.Query) if err != nil { return err } diff --git a/services/policies/pkg/config/config.go b/services/policies/pkg/config/config.go index 11011376c4a..ae3dcdd2cfe 100644 --- a/services/policies/pkg/config/config.go +++ b/services/policies/pkg/config/config.go @@ -22,6 +22,7 @@ type Config struct { Log *Log `yaml:"log"` Engine Engine `yaml:"engine"` Postprocessing Postprocessing `yaml:"postprocessing"` + Tracing *Tracing `yaml:"tracing"` } // Service defines the available service configuration. diff --git a/services/policies/pkg/config/defaults/defaultconfig.go b/services/policies/pkg/config/defaults/defaultconfig.go index 272809b628c..cfeed8eced4 100644 --- a/services/policies/pkg/config/defaults/defaultconfig.go +++ b/services/policies/pkg/config/defaults/defaultconfig.go @@ -82,6 +82,18 @@ func EnsureDefaults(cfg *config.Config) { if cfg.GRPC.TLS == nil && cfg.Commons != nil { cfg.GRPC.TLS = structs.CopyOrZeroValue(cfg.Commons.GRPCServiceTLS) } + + // provide with defaults for shared tracing, since we need a valid destination address for "envdecode". + if cfg.Tracing == nil && cfg.Commons != nil && cfg.Commons.Tracing != nil { + cfg.Tracing = &config.Tracing{ + Enabled: cfg.Commons.Tracing.Enabled, + Type: cfg.Commons.Tracing.Type, + Endpoint: cfg.Commons.Tracing.Endpoint, + Collector: cfg.Commons.Tracing.Collector, + } + } else if cfg.Tracing == nil { + cfg.Tracing = &config.Tracing{} + } } func Sanitize(_ *config.Config) {} diff --git a/services/policies/pkg/config/tracing.go b/services/policies/pkg/config/tracing.go new file mode 100644 index 00000000000..e95795564e1 --- /dev/null +++ b/services/policies/pkg/config/tracing.go @@ -0,0 +1,21 @@ +package config + +import "github.com/owncloud/ocis/v2/ocis-pkg/tracing" + +// Tracing defines the available tracing configuration. +type Tracing struct { + Enabled bool `yaml:"enabled" env:"OCIS_TRACING_ENABLED;POLICIES_TRACING_ENABLED" desc:"Activates tracing."` + Type string `yaml:"type" env:"OCIS_TRACING_TYPE;POLICIES_TRACING_TYPE" desc:"The type of tracing. Defaults to '', which is the same as 'jaeger'. Allowed tracing types are 'jaeger' and '' as of now."` + Endpoint string `yaml:"endpoint" env:"OCIS_TRACING_ENDPOINT;POLICIES_TRACING_ENDPOINT" desc:"The endpoint of the tracing agent."` + Collector string `yaml:"collector" env:"OCIS_TRACING_COLLECTOR;POLICIES_TRACING_COLLECTOR" desc:"The HTTP endpoint for sending spans directly to a collector, i.e. http://jaeger-collector:14268/api/traces. Only used if the tracing endpoint is unset."` +} + +// Convert Tracing to the tracing package's Config struct. +func (t Tracing) Convert() tracing.Config { + return tracing.Config{ + Enabled: t.Enabled, + Type: t.Type, + Endpoint: t.Endpoint, + Collector: t.Collector, + } +} diff --git a/services/policies/pkg/service/event/service.go b/services/policies/pkg/service/event/service.go index a1dba4590a2..756b360a900 100644 --- a/services/policies/pkg/service/event/service.go +++ b/services/policies/pkg/service/event/service.go @@ -6,21 +6,26 @@ import ( "github.com/cs3org/reva/v2/pkg/events" "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/services/policies/pkg/engine" + "go.opentelemetry.io/otel/trace" ) // Service defines the service handlers. type Service struct { + ctx context.Context query string log log.Logger stream events.Stream engine engine.Engine + tp trace.TracerProvider } // New returns a service implementation for Service. -func New(stream events.Stream, logger log.Logger, engine engine.Engine, query string) (Service, error) { +func New(ctx context.Context, stream events.Stream, logger log.Logger, tp trace.TracerProvider, engine engine.Engine, query string) (Service, error) { svc := Service{ + ctx: ctx, log: logger, query: query, + tp: tp, engine: engine, stream: stream, } @@ -36,53 +41,65 @@ func (s Service) Run() error { } for e := range ch { - switch ev := e.Event.(type) { - case events.StartPostprocessingStep: - if ev.StepToStart != events.PPStepPolicies { - continue + err := s.processEvent(e) + if err != nil { + return err + } + } + + return nil +} + +func (s Service) processEvent(e events.Event) error { + ctx := e.GetTraceContext(s.ctx) + ctx, span := s.tp.Tracer("policies").Start(ctx, "processEvent") + defer span.End() + + switch ev := e.Event.(type) { + case events.StartPostprocessingStep: + if ev.StepToStart != events.PPStepPolicies { + return nil + } + + outcome := events.PPOutcomeContinue + + if s.query != "" { + env := engine.Environment{ + Stage: engine.StagePP, + Resource: engine.Resource{ + Name: ev.Filename, + URL: ev.URL, + Size: ev.Filesize, + }, + } + + if ev.ExecutingUser != nil { + env.User = *ev.ExecutingUser + } + + if ev.ResourceID != nil { + env.Resource.ID = *ev.ResourceID } - outcome := events.PPOutcomeContinue - - if s.query != "" { - env := engine.Environment{ - Stage: engine.StagePP, - Resource: engine.Resource{ - Name: ev.Filename, - URL: ev.URL, - Size: ev.Filesize, - }, - } - - if ev.ExecutingUser != nil { - env.User = *ev.ExecutingUser - } - - if ev.ResourceID != nil { - env.Resource.ID = *ev.ResourceID - } - - result, err := s.engine.Evaluate(context.TODO(), s.query, env) - if err != nil { - s.log.Error().Err(err).Msg("unable evaluate policy") - } - - if !result { - outcome = events.PPOutcomeDelete - } + result, err := s.engine.Evaluate(context.TODO(), s.query, env) + if err != nil { + s.log.Error().Err(err).Msg("unable evaluate policy") } - if err := events.Publish(context.Background(), s.stream, events.PostprocessingStepFinished{ - Outcome: outcome, - UploadID: ev.UploadID, - ExecutingUser: ev.ExecutingUser, - Filename: ev.Filename, - FinishedStep: ev.StepToStart, - }); err != nil { - return err + if !result { + outcome = events.PPOutcomeDelete } } - } + if err := events.Publish(ctx, s.stream, events.PostprocessingStepFinished{ + Outcome: outcome, + UploadID: ev.UploadID, + ExecutingUser: ev.ExecutingUser, + Filename: ev.Filename, + FinishedStep: ev.StepToStart, + }); err != nil { + return err + } + } return nil }