Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make all-in-one.yaml file independant of sampling-strategies.json file #6431

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
ee06229
Make all-in-one.yaml file independant of sampling-strategies.json file
adityachopra29 Dec 26, 2024
acd5500
add the errNoProvider test back again
adityachopra29 Dec 27, 2024
b57c01b
replace null with blank for sampling-strategy in all-in-one.yaml
adityachopra29 Dec 27, 2024
69ef912
Added back test for checking non-null provider
adityachopra29 Dec 27, 2024
9759455
set back the path to sampling strategies file in exetension test
adityachopra29 Dec 30, 2024
584db5c
restore idl
yurishkuro Dec 30, 2024
095434e
restore-test
yurishkuro Dec 30, 2024
8097990
make DefaultSamplingProbability parameterizable using flag
adityachopra29 Jan 5, 2025
d0a0d18
remove 'required' tag from Path variable in FileConfig
adityachopra29 Jan 5, 2025
bde692c
create default_sampling_probability config for file in yaml, add test…
adityachopra29 Jan 5, 2025
616448a
[refractor] Remove dependency on tlscfg.Options (#6478)
Saumya40-codes Jan 5, 2025
9b4467a
[v2][storage] Implement reverse adapter to translate v2 storage api t…
mahadzaryab1 Jan 5, 2025
ad19190
Refactor collector pipeline to allow v1/v2 data model (#6484)
yurishkuro Jan 5, 2025
58e65a7
Change collector's queue to use generics (#6486)
yurishkuro Jan 6, 2025
2593349
Return errors from span processor creation (#6488)
yurishkuro Jan 6, 2025
87dbb1c
fix(deps): update module golang.org/x/net to v0.34.0 (#6492)
renovate-bot Jan 6, 2025
00267fa
Upgrade storage integration tests: `DependencyReader` to v2 (#6477)
zzzk1 Jan 6, 2025
50d49e9
chore(deps): update alpine docker tag to v3.21.1 (#6496)
renovate-bot Jan 7, 2025
f0bdc32
use 'valid' tag for DefaultSamplingProbability, remove unnecessary 'o…
adityachopra29 Jan 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 38 additions & 31 deletions cmd/agent/app/reporter/grpc/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
yaml "gopkg.in/yaml.v3"

"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/discovery"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
Expand Down Expand Up @@ -201,7 +200,7 @@ func TestProxyClientTLS(t *testing.T) {
tests := []struct {
name string
clientTLS *configtls.ClientConfig
serverTLS tlscfg.Options
serverTLS configtls.ServerConfig
expectError bool
}{
{
Expand All @@ -215,10 +214,11 @@ func TestProxyClientTLS(t *testing.T) {
},
{
name: "should fail with TLS client to untrusted TLS server",
serverTLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
serverTLS: configtls.ServerConfig{
Config: configtls.Config{
CertFile: testCertKeyLocation + "/example-server-cert.pem",
KeyFile: testCertKeyLocation + "/example-server-key.pem",
},
},
clientTLS: &configtls.ClientConfig{
ServerName: "example.com",
Expand All @@ -227,10 +227,11 @@ func TestProxyClientTLS(t *testing.T) {
},
{
name: "should fail with TLS client to trusted TLS server with incorrect hostname",
serverTLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
serverTLS: configtls.ServerConfig{
Config: configtls.Config{
CertFile: testCertKeyLocation + "/example-server-cert.pem",
KeyFile: testCertKeyLocation + "/example-server-key.pem",
},
},
clientTLS: &configtls.ClientConfig{
Config: configtls.Config{
Expand All @@ -241,10 +242,11 @@ func TestProxyClientTLS(t *testing.T) {
},
{
name: "should pass with TLS client to trusted TLS server with correct hostname",
serverTLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
serverTLS: configtls.ServerConfig{
Config: configtls.Config{
CertFile: testCertKeyLocation + "/example-server-cert.pem",
KeyFile: testCertKeyLocation + "/example-server-key.pem",
},
},
clientTLS: &configtls.ClientConfig{
Config: configtls.Config{
Expand All @@ -256,11 +258,12 @@ func TestProxyClientTLS(t *testing.T) {
},
{
name: "should fail with TLS client without cert to trusted TLS server requiring cert",
serverTLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem",
serverTLS: configtls.ServerConfig{
ClientCAFile: testCertKeyLocation + "/example-CA-cert.pem",
Config: configtls.Config{
CertFile: testCertKeyLocation + "/example-server-cert.pem",
KeyFile: testCertKeyLocation + "/example-server-key.pem",
},
},
clientTLS: &configtls.ClientConfig{
Config: configtls.Config{
Expand All @@ -272,11 +275,12 @@ func TestProxyClientTLS(t *testing.T) {
},
{
name: "should fail with TLS client without cert to trusted TLS server requiring cert from a different CA",
serverTLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
ClientCAPath: testCertKeyLocation + "/wrong-CA-cert.pem", // NB: wrong CA
serverTLS: configtls.ServerConfig{
ClientCAFile: testCertKeyLocation + "/wrong-CA-cert.pem", // NB: wrong CA
Config: configtls.Config{
CertFile: testCertKeyLocation + "/example-server-cert.pem",
KeyFile: testCertKeyLocation + "/example-server-key.pem",
},
},
clientTLS: &configtls.ClientConfig{
Config: configtls.Config{
Expand All @@ -290,11 +294,12 @@ func TestProxyClientTLS(t *testing.T) {
},
{
name: "should pass with TLS client with cert to trusted TLS server requiring cert",
serverTLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem",
serverTLS: configtls.ServerConfig{
ClientCAFile: testCertKeyLocation + "/example-CA-cert.pem",
Config: configtls.Config{
CertFile: testCertKeyLocation + "/example-server-cert.pem",
KeyFile: testCertKeyLocation + "/example-server-key.pem",
},
},
clientTLS: &configtls.ClientConfig{
Config: configtls.Config{
Expand All @@ -314,11 +319,13 @@ func TestProxyClientTLS(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var opts []grpc.ServerOption
if test.serverTLS.Enabled {
tlsCfg, err := test.serverTLS.ToOtelServerConfig().LoadTLSConfig(ctx)

if test.serverTLS.CertFile != "" && test.serverTLS.KeyFile != "" {
tlsCfg, err := test.serverTLS.LoadTLSConfig(ctx)
require.NoError(t, err)
opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(tlsCfg))}
}

spanHandler := &mockSpanHandler{}
s, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
api_v2.RegisterCollectorServiceServer(s, spanHandler)
Expand Down
7 changes: 3 additions & 4 deletions cmd/agent/app/reporter/grpc/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@ func (b *ConnBuilder) InitFromViper(v *viper.Viper) (*ConnBuilder, error) {
b.CollectorHostPorts = strings.Split(hostPorts, ",")
}
b.MaxRetry = v.GetUint(retryFlag)
tls, err := tlsFlagsConfig.InitFromViper(v)
tlsCfg, err := tlsFlagsConfig.InitFromViper(v)
if err != nil {
return b, fmt.Errorf("failed to process TLS options: %w", err)
}
if tls.Enabled {
tlsConf := tls.ToOtelClientConfig()
b.TLS = &tlsConf
if !tlsCfg.Insecure {
b.TLS = &tlsCfg
}
b.DiscoveryMinPeers = v.GetInt(discoveryMinPeers)
return b, nil
Expand Down
6 changes: 5 additions & 1 deletion cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
})
}

c.spanProcessor = handlerBuilder.BuildSpanProcessor(additionalProcessors...)
spanProcessor, err := handlerBuilder.BuildSpanProcessor(additionalProcessors...)
if err != nil {
return fmt.Errorf("could not create span processor: %w", err)
}
c.spanProcessor = spanProcessor
c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor)
grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{
Handler: c.spanHandlers.GRPCHandler,
Expand Down
7 changes: 6 additions & 1 deletion cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,12 @@ func TestAggregator(t *testing.T) {
},
},
}
_, err := c.spanProcessor.ProcessSpans(spans, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})
_, err := c.spanProcessor.ProcessSpans(processor.SpansV1{
Spans: spans,
Details: processor.Details{
SpanFormat: processor.JaegerSpanFormat,
},
})
require.NoError(t, err)
require.NoError(t, c.Close())

Expand Down
8 changes: 4 additions & 4 deletions cmd/collector/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ func addGRPCFlags(flagSet *flag.FlagSet, cfg serverFlagsConfig, defaultHostPort
}

func initHTTPFromViper(v *viper.Viper, opts *confighttp.ServerConfig, cfg serverFlagsConfig) error {
tlsOpts, err := cfg.tls.InitFromViper(v)
tlsHTTPCfg, err := cfg.tls.InitFromViper(v)
if err != nil {
return fmt.Errorf("failed to parse HTTP TLS options: %w", err)
}
opts.TLSSetting = tlsOpts.ToOtelServerConfig()
opts.TLSSetting = tlsHTTPCfg
opts.Endpoint = ports.FormatHostPort(v.GetString(cfg.prefix + "." + flagSuffixHostPort))
opts.IdleTimeout = v.GetDuration(cfg.prefix + "." + flagSuffixHTTPIdleTimeout)
opts.ReadTimeout = v.GetDuration(cfg.prefix + "." + flagSuffixHTTPReadTimeout)
Expand All @@ -208,11 +208,11 @@ func initHTTPFromViper(v *viper.Viper, opts *confighttp.ServerConfig, cfg server
}

func initGRPCFromViper(v *viper.Viper, opts *configgrpc.ServerConfig, cfg serverFlagsConfig) error {
tlsOpts, err := cfg.tls.InitFromViper(v)
tlsGRPCCfg, err := cfg.tls.InitFromViper(v)
if err != nil {
return fmt.Errorf("failed to parse GRPC TLS options: %w", err)
}
opts.TLSSetting = tlsOpts.ToOtelServerConfig()
opts.TLSSetting = tlsGRPCCfg
opts.NetAddr.Endpoint = ports.FormatHostPort(v.GetString(cfg.prefix + "." + flagSuffixHostPort))
opts.MaxRecvMsgSizeMiB = v.GetInt(cfg.prefix+"."+flagSuffixGRPCMaxReceiveMessageLength) / (1024 * 1024)
opts.Keepalive = &configgrpc.KeepaliveServerConfig{
Expand Down
15 changes: 9 additions & 6 deletions cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
type batchConsumer struct {
logger *zap.Logger
spanProcessor processor.SpanProcessor
spanOptions processor.SpansOptions
spanOptions processor.Details // common settings for all spans
tenancyMgr *tenancy.Manager
}

func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, transport processor.InboundTransport, spanFormat processor.SpanFormat, tenancyMgr *tenancy.Manager) batchConsumer {
return batchConsumer{
logger: logger,
spanProcessor: spanProcessor,
spanOptions: processor.SpansOptions{
spanOptions: processor.Details{
InboundTransport: transport,
SpanFormat: spanFormat,
},
Expand All @@ -75,10 +75,13 @@ func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error {
span.Process = batch.Process
}
}
_, err = c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{
InboundTransport: c.spanOptions.InboundTransport,
SpanFormat: c.spanOptions.SpanFormat,
Tenant: tenant,
_, err = c.spanProcessor.ProcessSpans(processor.SpansV1{
Spans: batch.Spans,
Details: processor.Details{
InboundTransport: c.spanOptions.InboundTransport,
SpanFormat: c.spanOptions.SpanFormat,
Tenant: tenant,
},
})
if err != nil {
if errors.Is(err, processor.ErrBusy) {
Expand Down
17 changes: 11 additions & 6 deletions cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -33,17 +34,21 @@ type mockSpanProcessor struct {
spanFormat processor.SpanFormat
}

func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, opts processor.SpansOptions) ([]bool, error) {
func (p *mockSpanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) {
p.mux.Lock()
defer p.mux.Unlock()
p.spans = append(p.spans, spans...)
oks := make([]bool, len(spans))
batch.GetSpans(func(spans []*model.Span) {
p.spans = append(p.spans, spans...)
}, func(_ ptrace.Traces) {
panic("not implemented")
})
oks := make([]bool, len(p.spans))
if p.tenants == nil {
p.tenants = make(map[string]bool)
}
p.tenants[opts.Tenant] = true
p.transport = opts.InboundTransport
p.spanFormat = opts.SpanFormat
p.tenants[batch.GetTenant()] = true
p.transport = batch.GetInboundTransport()
p.spanFormat = batch.GetSpanFormat()
return oks, p.expectedError
}

Expand Down
18 changes: 12 additions & 6 deletions cmd/collector/app/handler/thrift_span_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@ func (jbh *jaegerBatchesHandler) SubmitBatches(batches []*jaeger.Batch, options
mSpan := jConv.ToDomainSpan(span, batch.Process)
mSpans = append(mSpans, mSpan)
}
oks, err := jbh.modelProcessor.ProcessSpans(mSpans, processor.SpansOptions{
InboundTransport: options.InboundTransport,
SpanFormat: processor.JaegerSpanFormat,
oks, err := jbh.modelProcessor.ProcessSpans(processor.SpansV1{
Spans: mSpans,
Details: processor.Details{
InboundTransport: options.InboundTransport,
SpanFormat: processor.JaegerSpanFormat,
},
})
if err != nil {
jbh.logger.Error("Collector failed to process span batch", zap.Error(err))
Expand Down Expand Up @@ -105,9 +108,12 @@ func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options
convCount[i] = len(converted)
mSpans = append(mSpans, converted...)
}
bools, err := h.modelProcessor.ProcessSpans(mSpans, processor.SpansOptions{
InboundTransport: options.InboundTransport,
SpanFormat: processor.ZipkinSpanFormat,
bools, err := h.modelProcessor.ProcessSpans(processor.SpansV1{
Spans: mSpans,
Details: processor.Details{
InboundTransport: options.InboundTransport,
SpanFormat: processor.ZipkinSpanFormat,
},
})
if err != nil {
h.logger.Error("Collector failed to process Zipkin span batch", zap.Error(err))
Expand Down
14 changes: 11 additions & 3 deletions cmd/collector/app/handler/thrift_span_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
Expand Down Expand Up @@ -58,12 +59,19 @@ type shouldIErrorProcessor struct {

var errTestError = errors.New("Whoops")

func (s *shouldIErrorProcessor) ProcessSpans(mSpans []*model.Span, _ processor.SpansOptions) ([]bool, error) {
func (s *shouldIErrorProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) {
if s.shouldError {
return nil, errTestError
}
retMe := make([]bool, len(mSpans))
for i := range mSpans {
var spans []*model.Span
batch.GetSpans(func(sp []*model.Span) {
spans = sp
}, func(_ ptrace.Traces) {
panic("not implemented")
})

retMe := make([]bool, len(spans))
for i := range spans {
retMe[i] = true
}
return retMe, nil
Expand Down
3 changes: 2 additions & 1 deletion cmd/collector/app/model_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
package app

import (
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
)

// ProcessSpan processes a Domain Model Span
type ProcessSpan func(span *model.Span, tenant string)

// ProcessSpans processes a batch of Domain Model Spans
type ProcessSpans func(spans []*model.Span, tenant string)
type ProcessSpans func(spans processor.Batch)

// FilterSpan decides whether to allow or disallow a span
type FilterSpan func(span *model.Span) bool
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (options) apply(opts ...Option) options {
ret.hostMetrics = metrics.NullFactory
}
if ret.preProcessSpans == nil {
ret.preProcessSpans = func(_ []*model.Span, _ /* tenant */ string) {}
ret.preProcessSpans = func(_ processor.Batch) {}
}
if ret.sanitizer == nil {
ret.sanitizer = func(span *model.Span) *model.Span { return span }
Expand Down
4 changes: 2 additions & 2 deletions cmd/collector/app/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestAllOptionSet(t *testing.T) {
Options.ServiceMetrics(metrics.NullFactory),
Options.Logger(zap.NewNop()),
Options.NumWorkers(5),
Options.PreProcessSpans(func(_ []*model.Span, _ /* tenant */ string) {}),
Options.PreProcessSpans(func(_ processor.Batch) {}),
Options.Sanitizer(func(span *model.Span) *model.Span { return span }),
Options.QueueSize(10),
Options.DynQueueSizeWarmup(1000),
Expand All @@ -53,7 +53,7 @@ func TestNoOptionsSet(t *testing.T) {
assert.Nil(t, opts.collectorTags)
assert.False(t, opts.reportBusy)
assert.False(t, opts.blockingSubmit)
assert.NotPanics(t, func() { opts.preProcessSpans(nil, "") })
assert.NotPanics(t, func() { opts.preProcessSpans(processor.SpansV1{}) })
assert.NotPanics(t, func() { opts.preSave(nil, "") })
assert.True(t, opts.spanFilter(nil))
span := model.Span{}
Expand Down
Loading