Skip to content

Commit

Permalink
OPA: Add tracing for outbound http calls (#3034)
Browse files Browse the repository at this point in the history
Signed-off-by: Magnus Jungsbluth <[email protected]>
  • Loading branch information
mjungsbluth authored Apr 26, 2024
1 parent a45de70 commit 429e8b5
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 35 deletions.
3 changes: 1 addition & 2 deletions filters/openpolicyagent/evaluation.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/rego"
"github.com/open-policy-agent/opa/server"
"github.com/open-policy-agent/opa/tracing"
"github.com/opentracing/opentracing-go"
)

Expand Down Expand Up @@ -58,7 +57,7 @@ func (opa *OpenPolicyAgentInstance) Eval(ctx context.Context, req *ext_authz_v3.
return nil, err
}

err = envoyauth.Eval(ctx, opa, inputValue, result, rego.DistributedTracingOpts(tracing.Options{opa}))
err = envoyauth.Eval(ctx, opa, inputValue, result, rego.DistributedTracingOpts(opa.DistributedTracing()))
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/metrics/metricstest"
"github.com/zalando/skipper/proxy/proxytest"
"github.com/zalando/skipper/tracing/tracingtest"

"github.com/zalando/skipper/filters/filtertest"
"github.com/zalando/skipper/filters/openpolicyagent"
Expand Down Expand Up @@ -359,7 +360,7 @@ func TestAuthorizeRequestFilter(t *testing.T) {
}
}`, opaControlPlane.URL(), ti.regoQuery))

opaFactory := openpolicyagent.NewOpenPolicyAgentRegistry()
opaFactory := openpolicyagent.NewOpenPolicyAgentRegistry(openpolicyagent.WithTracer(&tracingtest.Tracer{}))
ftSpec := NewOpaAuthorizeRequestSpec(opaFactory, openpolicyagent.WithConfigTemplate(config))
fr.Register(ftSpec)
ftSpec = NewOpaAuthorizeRequestWithBodySpec(opaFactory, openpolicyagent.WithConfigTemplate(config))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/proxy/proxytest"
"github.com/zalando/skipper/tracing/tracingtest"

"github.com/zalando/skipper/filters/openpolicyagent"
)
Expand Down Expand Up @@ -239,7 +240,7 @@ func TestAuthorizeRequestFilter(t *testing.T) {
}
}`, opaControlPlane.URL(), ti.regoQuery))

opaFactory := openpolicyagent.NewOpenPolicyAgentRegistry()
opaFactory := openpolicyagent.NewOpenPolicyAgentRegistry(openpolicyagent.WithTracer(&tracingtest.Tracer{}))
ftSpec := NewOpaServeResponseSpec(opaFactory, openpolicyagent.WithConfigTemplate(config))
fr.Register(ftSpec)
ftSpec = NewOpaServeResponseWithReqBodySpec(opaFactory, openpolicyagent.WithConfigTemplate(config))
Expand Down
54 changes: 45 additions & 9 deletions filters/openpolicyagent/openpolicyagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
DefaultMaxRequestBodySize = 1 << 20 // 1 MB
DefaultMaxMemoryBodyParsing = 100 * DefaultMaxRequestBodySize
defaultBodyBufferSize = 8192 * 1024

spanNameEval = "open-policy-agent"
)

type OpenPolicyAgentRegistry struct {
Expand All @@ -69,6 +71,8 @@ type OpenPolicyAgentRegistry struct {
maxMemoryBodyParsingSem *semaphore.Weighted
maxRequestBodyBytes int64
bodyReadBufferSize int64

tracer opentracing.Tracer
}

type OpenPolicyAgentFilter interface {
Expand Down Expand Up @@ -110,6 +114,13 @@ func WithCleanInterval(interval time.Duration) func(*OpenPolicyAgentRegistry) er
}
}

func WithTracer(tracer opentracing.Tracer) func(*OpenPolicyAgentRegistry) error {
return func(cfg *OpenPolicyAgentRegistry) error {
cfg.tracer = tracer
return nil
}
}

func NewOpenPolicyAgentRegistry(opts ...func(*OpenPolicyAgentRegistry) error) *OpenPolicyAgentRegistry {
registry := &OpenPolicyAgentRegistry{
reuseDuration: defaultReuseDuration,
Expand Down Expand Up @@ -395,6 +406,22 @@ func interpolateConfigTemplate(configTemplate []byte, bundleName string) ([]byte
return buf.Bytes(), nil
}

func buildTracingOptions(tracer opentracing.Tracer, bundleName string, manager *plugins.Manager) opatracing.Options {
return opatracing.NewOptions(WithTracingOptTracer(tracer), WithTracingOptBundleName(bundleName), WithTracingOptManager(manager))
}

func (registry *OpenPolicyAgentRegistry) withTracingOptions(bundleName string) func(*plugins.Manager) {
return func(m *plugins.Manager) {
options := buildTracingOptions(
registry.tracer,
bundleName,
m,
)

plugins.WithDistributedTracingOpts(options)(m)
}
}

// new returns a new OPA object.
func (registry *OpenPolicyAgentRegistry) new(store storage.Store, configBytes []byte, instanceConfig OpenPolicyAgentInstanceConfig, filterName string, bundleName string, maxBodyBytes int64, bodyReadBufferSize int64) (*OpenPolicyAgentInstance, error) {
id := uuid.New().String()
Expand All @@ -412,7 +439,8 @@ func (registry *OpenPolicyAgentRegistry) new(store storage.Store, configBytes []

var logger logging.Logger = &QuietLogger{target: logging.Get()}
logger = logger.WithFields(map[string]interface{}{"skipper-filter": filterName})
manager, err := plugins.New(configBytes, id, store, configLabelsInfo(*opaConfig), plugins.Logger(logger))
manager, err := plugins.New(configBytes, id, store, configLabelsInfo(*opaConfig), plugins.Logger(logger), registry.withTracingOptions(bundleName))

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -544,20 +572,28 @@ func (opa *OpenPolicyAgentInstance) EnvoyPluginConfig() envoy.PluginConfig {
return defaultConfig
}

func setSpanTags(span opentracing.Span, bundleName string, manager *plugins.Manager) {
if bundleName != "" {
span.SetTag("opa.bundle_name", bundleName)
}

if manager != nil {
for label, value := range manager.Labels() {
span.SetTag("opa.label."+label, value)
}
}
}

func (opa *OpenPolicyAgentInstance) startSpanFromContextWithTracer(tr opentracing.Tracer, parent opentracing.Span, ctx context.Context) (opentracing.Span, context.Context) {

var span opentracing.Span
if parent != nil {
span = tr.StartSpan("open-policy-agent", opentracing.ChildOf(parent.Context()))
span = tr.StartSpan(spanNameEval, opentracing.ChildOf(parent.Context()))
} else {
span = tracing.CreateSpan("open-policy-agent", ctx, tr)
span = tracing.CreateSpan(spanNameEval, ctx, tr)
}

span.SetTag("opa.bundle_name", opa.bundleName)

for label, value := range opa.manager.Labels() {
span.SetTag("opa.label."+label, value)
}
setSpanTags(span, opa.bundleName, opa.manager)

return span, opentracing.ContextWithSpan(ctx, span)
}
Expand Down Expand Up @@ -730,7 +766,7 @@ func (opa *OpenPolicyAgentInstance) Config() *config.Config { return opa.opaConf

// DistributedTracing is an implementation of the envoyauth.EvalContext interface
func (opa *OpenPolicyAgentInstance) DistributedTracing() opatracing.Options {
return opatracing.NewOptions(opa)
return buildTracingOptions(opa.registry.tracer, opa.bundleName, opa.manager)
}

// logging.Logger that does not pollute info with debug logs
Expand Down
71 changes: 66 additions & 5 deletions filters/openpolicyagent/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,15 @@ package openpolicyagent
import (
"net/http"

"github.com/open-policy-agent/opa/plugins"
opatracing "github.com/open-policy-agent/opa/tracing"
"github.com/opentracing/opentracing-go"
"github.com/zalando/skipper/logging"
"github.com/zalando/skipper/proxy"
)

const (
spanNameHttpOut = "open-policy-agent.http"
)

func init() {
Expand All @@ -14,15 +21,48 @@ func init() {
type tracingFactory struct{}

type transport struct {
opa *OpenPolicyAgentInstance
tracer opentracing.Tracer
bundleName string
manager *plugins.Manager

wrapped http.RoundTripper
}

func WithTracingOptTracer(tracer opentracing.Tracer) func(*transport) {
return func(t *transport) {
t.tracer = tracer
}
}

func WithTracingOptBundleName(bundleName string) func(*transport) {
return func(t *transport) {
t.bundleName = bundleName
}
}

func WithTracingOptManager(manager *plugins.Manager) func(*transport) {
return func(t *transport) {
t.manager = manager
}
}

func (*tracingFactory) NewTransport(tr http.RoundTripper, opts opatracing.Options) http.RoundTripper {
return &transport{
opa: opts[0].(*OpenPolicyAgentInstance),
log := &logging.DefaultLog{}

wrapper := &transport{
wrapped: tr,
}

for _, o := range opts {
opt, ok := o.(func(*transport))
if !ok {
log.Warnf("invalid type for OPA tracing option, expected func(*transport) got %T, tracing information might be incomplete", o)
} else {
opt(wrapper)
}
}

return wrapper
}

func (*tracingFactory) NewHandler(f http.Handler, label string, opts opatracing.Options) http.Handler {
Expand All @@ -32,15 +72,36 @@ func (*tracingFactory) NewHandler(f http.Handler, label string, opts opatracing.
func (tr *transport) RoundTrip(req *http.Request) (*http.Response, error) {
ctx := req.Context()
parentSpan := opentracing.SpanFromContext(ctx)
var span opentracing.Span

if parentSpan != nil {
span := parentSpan.Tracer().StartSpan("http.send", opentracing.ChildOf(parentSpan.Context()))
span = parentSpan.Tracer().StartSpan(spanNameHttpOut, opentracing.ChildOf(parentSpan.Context()))
} else if tr.tracer != nil {
span = tr.tracer.StartSpan(spanNameHttpOut)
}

if span != nil {
defer span.Finish()

span.SetTag(proxy.HTTPMethodTag, req.Method)
span.SetTag(proxy.HTTPUrlTag, req.URL.String())
span.SetTag(proxy.HostnameTag, req.Host)
span.SetTag(proxy.HTTPPathTag, req.URL.Path)
span.SetTag(proxy.ComponentTag, "skipper")
span.SetTag(proxy.SpanKindTag, proxy.SpanKindClient)

setSpanTags(span, tr.bundleName, tr.manager)
req = req.WithContext(opentracing.ContextWithSpan(ctx, span))

carrier := opentracing.HTTPHeadersCarrier(req.Header)
span.Tracer().Inject(span.Context(), opentracing.HTTPHeaders, carrier)
}

return tr.wrapped.RoundTrip(req)
resp, err := tr.wrapped.RoundTrip(req)
if err != nil && span != nil {
span.SetTag("error", true)
span.LogKV("event", "error", "message", err.Error())
}

return resp, err
}
Loading

0 comments on commit 429e8b5

Please sign in to comment.