Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

demo #5436

Closed
wants to merge 1 commit into from
Closed

demo #5436

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions .github/workflows/scripts/get-supported-version.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/bin/bash
# SPDX-License-Identifier: AGPL-3.0-only

set -o errexit
set -o nounset
set -o pipefail

IMAGES_NAME="$1"
tags=$(skopeo --override-os linux inspect docker://$IMAGES_NAME | jq -r '.RepoTags' | grep -Eo "[0-9]+\.[0-9]+\.[0-9]+" | sort -ur)
supported_versions=$(echo "$tags" | awk -F '.' '{print $1 "." $2}' | sort -ur | head -n 2)

previous_major=$(echo "$tags" | awk -F '.' '{print $1}' | head -n 1)
previous_major=$((previous_major - 1))

previous_major_tags=$(echo "$tags" | grep -Eo "${previous_major}\.[0-9]+\.[0-9]+")
if [ -z "$previous_major_tags" ]; then
echo "No previous minor version found"
else
previous_minor=$(echo "$previous_major_tags" | awk -F '.' '{print $1 "." $2}' | sort -ur | head -n 1)
echo "Previous minor version is $previous_minor"
fi
# Construct the pattern for the last minor version of the previous major version
# pattern="^$previous_major\.[0-9]+$"
# | awk -F '.' '{print $1 "." $2}'
# Extract the last minor version of the previous major version
# last_minor_of_previous_major=$(echo "$latest_versions" | grep -E "$pattern" | sort -u | tail -n 1)

# previous_major=$(echo "$current_major-1" | bc)


# latest_versions=$(printf "%s\n%s" "$latest_versions" "$previous_minor")
# Add prefix "release-" to each element
# latest_versions=$(echo "$latest_versions" | sed 's/^/release-/')

echo "$previous_minor"
15 changes: 9 additions & 6 deletions cmd/mimir/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
"github.com/grafana/dskit/flagext"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/tracing"
"gopkg.in/yaml.v3"

"github.com/grafana/mimir/pkg/mimir"
util_log "github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/trace"
"github.com/grafana/mimir/pkg/util/usage"
"github.com/grafana/mimir/pkg/util/version"
)
Expand Down Expand Up @@ -180,12 +180,15 @@ func main() {
}
}

// Setting the environment variable JAEGER_AGENT_HOST enables tracing.
if trace, err := tracing.NewFromEnv(name); err != nil {
level.Error(util_log.Logger).Log("msg", "Failed to setup tracing", "err", err.Error())
} else {
defer trace.Close()
if err := trace.InitTracingService(util_log.Logger); err != nil {
level.Error(util_log.Logger).Log("msg", "Failed to initialize tracing", "err", err.Error())
}
// Setting the environment variable JAEGER_AGENT_HOST enables tracing.
// if trace, err := tracing.NewFromEnv(name); err != nil {
// level.Error(util_log.Logger).Log("msg", "Failed to setup tracing", "err", err.Error())
// } else {
// defer trace.Close()
// }
}

// Initialise seed for randomness usage.
Expand Down
10 changes: 5 additions & 5 deletions pkg/alertmanager/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/httpgrpc"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/grafana/mimir/pkg/alertmanager/merger"
"github.com/grafana/mimir/pkg/util"
util_log "github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/trace"
)

// Distributor forwards requests to individual alertmanagers.
Expand Down Expand Up @@ -175,8 +175,8 @@ func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Req
err = ring.DoBatch(r.Context(), RingOp, d.alertmanagerRing, []uint32{shardByUser(userID)}, func(am ring.InstanceDesc, _ []int) error {
// Use a background context to make sure all alertmanagers get the request even if we return early.
localCtx := user.InjectOrgID(context.Background(), userID)
sp, localCtx := opentracing.StartSpanFromContext(localCtx, "Distributor.doQuorum")
defer sp.Finish()
localCtx, sp := trace.GetTracer().Start(localCtx, "Distributor.doQuorum")
defer sp.End()

resp, err := d.doRequest(localCtx, am, &httpgrpc.HTTPRequest{
Method: r.Method,
Expand Down Expand Up @@ -243,8 +243,8 @@ func (d *Distributor) doUnary(userID string, w http.ResponseWriter, r *http.Requ
Headers: httpToHttpgrpcHeaders(r.Header),
}

sp, ctx := opentracing.StartSpanFromContext(r.Context(), "Distributor.doUnary")
defer sp.Finish()
ctx, sp := trace.GetTracer().Start(r.Context(), "Distributor.doUnary")
defer sp.End()
// Until we have a mechanism to combine the results from multiple alertmanagers,
// we forward the request to only only of the alertmanagers.
amDesc := replicationSet.Instances[rand.Intn(len(replicationSet.Instances))]
Expand Down
6 changes: 4 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/mtime"
"github.com/weaveworks/common/user"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
Expand All @@ -48,6 +49,7 @@ import (
util_math "github.com/grafana/mimir/pkg/util/math"
"github.com/grafana/mimir/pkg/util/pool"
"github.com/grafana/mimir/pkg/util/push"
"github.com/grafana/mimir/pkg/util/trace"
"github.com/grafana/mimir/pkg/util/validation"
)

Expand Down Expand Up @@ -707,9 +709,9 @@ func (d *Distributor) prePushHaDedupeMiddleware(next push.Func) push.Func {
// Make a copy of these, since they may be retained as labels on our metrics, e.g. dedupedSamples.
cluster, replica = copyString(cluster), copyString(replica)

span := opentracing.SpanFromContext(ctx)
ctx, span := trace.GetTracer().Start(ctx, "distributor.prePushHaDedupeMiddleware")
if span != nil {
span.SetTag("cluster", cluster)
span.SetAttributes("cluster", cluster, attribute.String("cluster", cluster))
span.SetTag("replica", replica)
}

Expand Down
76 changes: 76 additions & 0 deletions pkg/util/trace/openTelemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package trace

import (
"context"
"net/http"

"github.com/go-kit/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
)

type OpenTelemetry struct {
serviceName string
tracerProvider *tracesdk.TracerProvider
logger log.Logger
}

type OpentelemetrySpan struct {
span trace.Span
}

func (ote *OpenTelemetry) Inject(ctx context.Context, header http.Header, _ Span) {
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(header))
}

func (ote *OpenTelemetry) Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, Span) {
ctx, span := ote.tracerProvider.Tracer("").Start(ctx, spanName)
opentelemetrySpan := OpentelemetrySpan{
span: span,
}
return ctx, opentelemetrySpan
}

func (ote *OpenTelemetry) initTracerProvider() error {
// here we need to Integrate the PR in weaveworks/common here: https://github.com/weaveworks/common/pull/291/files
// or just copy this code for now and wait for people to review it
panic("implement me")
}

func (s OpentelemetrySpan) AddEvents(keys []string, values []EventValue) {
for i, v := range values {
// TODO: add support for other types
if v.Num != 0 {
s.span.AddEvent(keys[i], trace.WithAttributes(attribute.Key(keys[i]).String(v.Str)))
}
if v.Str != "" {
s.span.AddEvent(keys[i], trace.WithAttributes(attribute.Key(keys[i]).Int64(v.Num)))
}
}
}

func (s OpentelemetrySpan) End() {
s.span.End()
}

func (s OpentelemetrySpan) RecordError(err error, options ...trace.EventOption) {
for _, o := range options {
s.span.RecordError(err, o)
}
}

func (s OpentelemetrySpan) SetAttributes(key string, value interface{}, kv attribute.KeyValue) {
s.span.SetAttributes(kv)
}

func (s OpentelemetrySpan) SetName(name string) {
s.span.SetName(name)
}

func (s OpentelemetrySpan) SetStatus(code codes.Code, description string) {
s.span.SetStatus(code, description)
}
91 changes: 91 additions & 0 deletions pkg/util/trace/openTracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package trace

import (
"context"
"net/http"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
otl "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/weaveworks/common/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

type OpenTracing struct {
serviceName string
logger log.Logger
}

type OpenTracingSpan struct {
span opentracing.Span
}

func (ot *OpenTracing) initTracerProvider() error {
if trace, err := tracing.NewFromEnv(ot.serviceName); err != nil {
return errors.Wrap(err, "failed to setup opentracing tracer")
} else {
defer trace.Close()
}
return nil
}

func (ts *OpenTracing) Inject(ctx context.Context, header http.Header, span Span) {
opentracingSpan, ok := span.(OpenTracingSpan)
if !ok {
level.Error(ts.logger).Log("msg", "Failed to cast opentracing span")
}
err := opentracing.GlobalTracer().Inject(
opentracingSpan.span.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(header))

if err != nil {
level.Error(ts.logger).Log("msg", "Failed to inject span context instance", "err", err)
}
}

func (ts *OpenTracing) Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, Span) {
span, ctx := opentracing.StartSpanFromContext(ctx, spanName)
opentracingSpan := OpenTracingSpan{span: span}
return ctx, opentracingSpan
}

func (s OpenTracingSpan) AddEvents(keys []string, values []EventValue) {
fields := []otl.Field{}
for i, v := range values {
if v.Str != "" {
field := otl.String(keys[i], v.Str)
fields = append(fields, field)
}
if v.Num != 0 {
field := otl.Int64(keys[i], v.Num)
fields = append(fields, field)
}
}
s.span.LogFields(fields...)
}

func (s OpenTracingSpan) End() {
s.span.Finish()
}

func (s OpenTracingSpan) RecordError(err error, options ...trace.EventOption) {
ext.Error.Set(s.span, true)
}

func (s OpenTracingSpan) SetAttributes(key string, value interface{}, kv attribute.KeyValue) {
s.span.SetTag(key, value)
}

func (s OpenTracingSpan) SetName(name string) {
s.span.SetOperationName(name)
}

func (s OpenTracingSpan) SetStatus(code codes.Code, description string) {
ext.Error.Set(s.span, true)
}
Loading
Loading