Skip to content

Commit

Permalink
Merge branch 'master' into facilitate-OPA-decision-correlation-with-b…
Browse files Browse the repository at this point in the history
…usiness-flows
  • Loading branch information
JanardhanSharma authored Apr 26, 2024
2 parents 372e51e + f6bf033 commit 2b34b0f
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 18 deletions.
7 changes: 7 additions & 0 deletions filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,16 @@ type FilterContext interface {
Metrics() Metrics

// Allow filters to add Tags, Baggage to the trace or set the ComponentName.
//
// Deprecated: OpenTracing is deprecated, see https://github.com/zalando/skipper/issues/2104.
// Use opentracing.SpanFromContext(ctx.Request().Context()).Tracer() to get the proxy span Tracer.
Tracer() opentracing.Tracer

// Allow filters to create their own spans
//
// Deprecated: OpenTracing is deprecated, see https://github.com/zalando/skipper/issues/2104.
// Filter spans should be children of the proxy span,
// use opentracing.SpanFromContext(ctx.Request().Context()) to get the proxy span.
ParentSpan() opentracing.Span

// Returns a clone of the FilterContext including a brand new request object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ func TestAuthorizeRequestFilter(t *testing.T) {

opaFactory := openpolicyagent.NewOpenPolicyAgentRegistry(openpolicyagent.WithTracer(&tracingtest.Tracer{}))
ftSpec := NewOpaAuthorizeRequestSpec(opaFactory, opts...)

fr.Register(ftSpec)
ftSpec = NewOpaAuthorizeRequestWithBodySpec(opaFactory, opts...)
fr.Register(ftSpec)
Expand Down
2 changes: 1 addition & 1 deletion filters/openpolicyagent/openpolicyagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ func (opa *OpenPolicyAgentInstance) startSpanFromContextWithTracer(tr opentracin
}

func (opa *OpenPolicyAgentInstance) StartSpanFromFilterContext(fc filters.FilterContext) (opentracing.Span, context.Context) {
return opa.startSpanFromContextWithTracer(fc.Tracer(), fc.ParentSpan(), fc.Request().Context())
return opa.StartSpanFromContext(fc.Request().Context())
}

func (opa *OpenPolicyAgentInstance) StartSpanFromContext(ctx context.Context) (opentracing.Span, context.Context) {
Expand Down
2 changes: 1 addition & 1 deletion proxy/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (c *context) Split() (filters.FilterContext, error) {
}

func (c *context) Loopback() {
loopSpan := c.Tracer().StartSpan(c.proxy.tracing.initialOperationName, opentracing.ChildOf(c.ParentSpan().Context()))
loopSpan := c.tracer.StartSpan(c.proxy.tracing.initialOperationName, opentracing.ChildOf(c.parentSpan.Context()))
defer loopSpan.Finish()
err := c.proxy.do(c, loopSpan)
if c.response != nil && c.response.Body != nil {
Expand Down
2 changes: 1 addition & 1 deletion proxy/healthy_endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (h *healthyEndpoints) filterHealthyEndpoints(ctx *context, endpoints []rout
filtered := make([]routing.LBEndpoint, 0, len(endpoints))
for _, e := range endpoints {
dropProbability := e.Metrics.HealthCheckDropProbability()
if p < dropProbability {
if dropProbability > 0.05 && p < dropProbability {
ctx.Logger().Infof("Dropping endpoint %q due to passive health check: p=%0.2f, dropProbability=%0.2f",
e.Host, p, dropProbability)
metrics.IncCounter("passive-health-check.endpoints.dropped")
Expand Down
16 changes: 8 additions & 8 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,8 +960,7 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
}

proxySpanOpts := []ot.StartSpanOption{ot.Tags{
SpanKindTag: SpanKindClient,
SkipperRouteIDTag: ctx.route.Id,
SpanKindTag: SpanKindClient,
}}
if parentSpan := ot.SpanFromContext(req.Context()); parentSpan != nil {
proxySpanOpts = append(proxySpanOpts, ot.ChildOf(parentSpan.Context()))
Expand All @@ -970,7 +969,9 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co

u := cloneURL(req.URL)
u.RawQuery = ""
p.tracing.setTag(ctx.proxySpan, HTTPUrlTag, u.String())
p.tracing.
setTag(ctx.proxySpan, HTTPUrlTag, u.String()).
setTag(ctx.proxySpan, SkipperRouteIDTag, ctx.route.Id)
p.setCommonSpanInfo(u, req, ctx.proxySpan)

carrier := ot.HTTPHeadersCarrier(req.Header)
Expand Down Expand Up @@ -1188,14 +1189,13 @@ func (p *Proxy) do(ctx *context, parentSpan ot.Span) (err error) {
loopCTX := ctx.clone()

loopSpanOpts := []ot.StartSpanOption{ot.Tags{
SpanKindTag: SpanKindServer,
SkipperRouteIDTag: ctx.route.Id,
SpanKindTag: SpanKindServer,
}}
if parentSpan := ot.SpanFromContext(ctx.request.Context()); parentSpan != nil {
loopSpanOpts = append(loopSpanOpts, ot.ChildOf(parentSpan.Context()))
}
loopSpan := p.tracing.tracer.StartSpan("loopback", loopSpanOpts...)

p.tracing.setTag(loopSpan, SkipperRouteIDTag, ctx.route.Id)
p.setCommonSpanInfo(ctx.Request().URL, ctx.Request(), loopSpan)
ctx.parentSpan = loopSpan

Expand Down Expand Up @@ -1493,8 +1493,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var ctx *context

spanOpts := []ot.StartSpanOption{ot.Tags{
SpanKindTag: SpanKindServer,
HTTPRemoteIPTag: stripPort(r.RemoteAddr),
SpanKindTag: SpanKindServer,
}}
if wireContext, err := p.tracing.tracer.Extract(ot.HTTPHeaders, ot.HTTPHeadersCarrier(r.Header)); err == nil {
spanOpts = append(spanOpts, ext.RPCServerOption(wireContext))
Expand Down Expand Up @@ -1547,6 +1546,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.URL.Path = rfc.PatchPath(r.URL.Path, r.URL.RawPath)
}

p.tracing.setTag(span, HTTPRemoteIPTag, stripPort(r.RemoteAddr))
p.setCommonSpanInfo(r.URL, r, span)
r = r.WithContext(ot.ContextWithSpan(r.Context(), span))
r = r.WithContext(routing.NewContext(r.Context()))
Expand Down
17 changes: 10 additions & 7 deletions ratelimit/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ func (c *clusterLimitRedis) Allow(ctx context.Context, clearText string) bool {
failed := err != nil
if failed {
allow = !c.failClosed
setError(span)
c.logError("Failed to determine if operation is allowed: %v", err)
msgFmt := "Failed to determine if operation is allowed: %v"
setError(span, fmt.Sprintf(msgFmt, err))
c.logError(msgFmt, err)
}
if span != nil {
span.SetTag("allowed", allow)
Expand Down Expand Up @@ -207,9 +208,10 @@ func (c *clusterLimitRedis) Delta(clearText string) time.Duration {
return d
}

func setError(span opentracing.Span) {
func setError(span opentracing.Span, msg string) {
if span != nil {
ext.Error.Set(span, true)
span.LogKV("log", msg)
}
}

Expand All @@ -233,7 +235,7 @@ func (c *clusterLimitRedis) oldest(ctx context.Context, clearText string) (time.
res, err := c.ringClient.ZRangeByScoreWithScoresFirst(ctx, key, 0.0, float64(now.UnixNano()), 0, 1)

if err != nil {
setError(span)
setError(span, fmt.Sprintf("Failed to execute ZRangeByScoreWithScoresFirst: %v", err))
return time.Time{}, err
}

Expand All @@ -243,13 +245,14 @@ func (c *clusterLimitRedis) oldest(ctx context.Context, clearText string) (time.

s, ok := res.(string)
if !ok {
setError(span)
return time.Time{}, errors.New("failed to evaluate redis data")
msg := "failed to evaluate redis data"
setError(span, msg)
return time.Time{}, errors.New(msg)
}

oldest, err := strconv.ParseInt(s, 10, 64)
if err != nil {
setError(span)
setError(span, fmt.Sprintf("failed to convert value to int64: %v", err))
return time.Time{}, fmt.Errorf("failed to convert value to int64: %w", err)
}

Expand Down

0 comments on commit 2b34b0f

Please sign in to comment.