Skip to content

Commit

Permalink
支持配置请求header
Browse files Browse the repository at this point in the history
  • Loading branch information
chenlinfeng committed Mar 22, 2024
1 parent 63c49af commit 4866dc6
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 20 deletions.
17 changes: 9 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ import (

// Config opentelemetry trpc plugin config
type Config struct {
Addr string `yaml:"addr"`
TenantID string `yaml:"tenant_id"`
Sampler SamplerConfig `yaml:"sampler"`
Metrics MetricsConfig `yaml:"metrics"`
Logs LogsConfig `yaml:"logs"`
Traces TracesConfig `yaml:"traces"`
Codes []*codes.Code `yaml:"codes"`
Attributes []*Attribute `yaml:"attributes"`
Addr string `yaml:"addr"`
TenantID string `yaml:"tenant_id"`
Sampler SamplerConfig `yaml:"sampler"`
Metrics MetricsConfig `yaml:"metrics"`
Logs LogsConfig `yaml:"logs"`
Traces TracesConfig `yaml:"traces"`
Codes []*codes.Code `yaml:"codes"`
Attributes []*Attribute `yaml:"attributes"`
Header map[string]string `yaml:"header"`
}

// TracesConfig traces config
Expand Down
25 changes: 19 additions & 6 deletions opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ func Start(ctx context.Context, spanName string, opts ...apitrace.SpanStartOptio

// WithSpan sets up a span with the given name and calls the supplied function.
func WithSpan(ctx context.Context, spanName string, fn func(ctx context.Context) error,
opts ...apitrace.SpanStartOption) error {
opts ...apitrace.SpanStartOption,
) error {
ctx, sp := globalTracer.Start(ctx, spanName, opts...)
defer sp.End()
return fn(ctx)
Expand All @@ -95,7 +96,7 @@ func newTraceHTTPExporter(addr string, o *setupOptions) (sdktrace.SpanExporter,
otlpTraceOpts := []otlptracehttp.Option{
otlptracehttp.WithEndpoint(addr),
otlptracehttp.WithCompression(otlptracehttp.GzipCompression),
otlptracehttp.WithHeaders(map[string]string{api.TenantHeaderKey: o.tenantID}),
otlptracehttp.WithHeaders(o.otlptraceHeader),
otlptracehttp.WithRetry(otlptracehttp.RetryConfig{
Enabled: true,
InitialInterval: retry.DefaultConfig.InitialInterval,
Expand Down Expand Up @@ -124,7 +125,7 @@ func newTraceGRPCExporter(addr string, o *setupOptions) (sdktrace.SpanExporter,
otlptracegrpc.WithInsecure(),
otlptracegrpc.WithEndpoint(addr),
otlptracegrpc.WithCompressor("gzip"),
otlptracegrpc.WithHeaders(map[string]string{api.TenantHeaderKey: o.tenantID}),
otlptracegrpc.WithHeaders(o.otlptraceHeader),
otlptracegrpc.WithDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(MaxSendMessageSize))),
otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{
Enabled: true,
Expand Down Expand Up @@ -218,8 +219,8 @@ func newMetricHTTPExporter(addr string, o *setupOptions) (*sdkmetric.Exporter, e
otlpMetricOpts := []otlpmetrichttp.Option{
otlpmetrichttp.WithInsecure(),
otlpmetrichttp.WithEndpoint(addr),
otlpmetrichttp.WithHeaders(o.otlptraceHeader),
otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression),
otlpmetrichttp.WithHeaders(map[string]string{api.TenantHeaderKey: o.tenantID}),
otlpmetrichttp.WithRetry(otlpmetrichttp.RetryConfig{
Enabled: true,
InitialInterval: retry.DefaultConfig.InitialInterval,
Expand All @@ -236,7 +237,7 @@ func newMetricGrpcExporter(addr string, o *setupOptions) (*sdkmetric.Exporter, e
otlpmetricgrpc.WithInsecure(),
otlpmetricgrpc.WithEndpoint(addr),
otlpmetricgrpc.WithCompressor("gzip"),
otlpmetricgrpc.WithHeaders(map[string]string{api.TenantHeaderKey: o.tenantID}),
otlpmetricgrpc.WithHeaders(o.otlptraceHeader),
otlpmetricgrpc.WithDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(MaxSendMessageSize))),
otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetryConfig{
Enabled: true,
Expand Down Expand Up @@ -274,7 +275,7 @@ func setupLog(addr string, o *setupOptions, kvs []attribute.KeyValue) (err error
ecosystemotlp.WithAddress(addr),
ecosystemotlp.WithTenantID(o.tenantID),
ecosystemotlp.WithCompressor("gzip"),
ecosystemotlp.WithHeaders(map[string]string{api.TenantHeaderKey: o.tenantID}),
ecosystemotlp.WithHeaders(o.otlptraceHeader),
ecosystemotlp.WithRetryConfig(retry.DefaultConfig),
)
if err != nil {
Expand Down Expand Up @@ -307,6 +308,7 @@ type setupOptions struct {
deferredSampler trace.DeferredSampler
batchSpanOption []trace.BatchSpanProcessorOption
idGenerator sdktrace.IDGenerator
otlptraceHeader map[string]string
}

func defaultSetupOptions() *setupOptions {
Expand All @@ -315,6 +317,7 @@ func defaultSetupOptions() *setupOptions {
sampler: sdktrace.AlwaysSample(),
logEnabled: false,
enabledLogLevel: apilog.InfoLevel,
otlptraceHeader: make(map[string]string),
deferredSampler: trace.NewDeferredSampler(trace.DeferredSampleConfig{}),
}
}
Expand Down Expand Up @@ -343,10 +346,20 @@ func WithServerOwner(owner string) SetupOption {
}
}

// WithHeader with server header
func WithHeader(header map[string]string) SetupOption {
return func(options *setupOptions) {
for k, v := range header {
options.otlptraceHeader[k] = v
}
}
}

// WithTenantID with tenant id
func WithTenantID(tenantID string) SetupOption {
return func(options *setupOptions) {
options.tenantID = tenantID
options.otlptraceHeader[api.TenantHeaderKey] = tenantID
}
}

Expand Down
17 changes: 11 additions & 6 deletions oteltrpc/oteltrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ func RegisterTextMapSupplier(carrier func(md codec.MetaData, msg codec.Msg) prop

var _ plugin.Factory = (*factory)(nil)

type factory struct {
}
type factory struct{}

func (f factory) Type() string {
return consts.PluginType
Expand All @@ -71,7 +70,8 @@ func (f factory) Type() string {
func packetSizeMetric() func(ctx context.Context, method string,
req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return func(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
) error {
switch req := req.(type) {
case proto.Message:
prometheus.ObserveExportSpansBytes(proto.Size(req))
Expand All @@ -89,7 +89,8 @@ var DefaultRecoveryHandler = func(ctx context.Context, panicErr interface{}) err

func recovery() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) {
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
) (err error) {
defer func() {
if rec := recover(); rec != nil {
buf := make([]byte, 2048)
Expand Down Expand Up @@ -149,6 +150,7 @@ func (f factory) Setup(name string, configDec plugin.Decoder) error {
admin.HandleFunc("/debug/tracez", zpage.GetZPageHandlerFunc())
}
err = opentelemetry.Setup(cfg.Addr,
opentelemetry.WithHeader(cfg.Header),
opentelemetry.WithTenantID(cfg.TenantID),
opentelemetry.WithSampler(DefaultSampler),
opentelemetry.WithDeferredSampler(DeferredSampler),
Expand Down Expand Up @@ -239,6 +241,7 @@ func buildBatchSpanProcessorOptions(c config.TraceExporterOption) (options []eco
}
return
}

func setupCodes(cfg *config.Config, configurator remote.Configurator) {
var c []*codes.Code
c = append(c, cfg.Codes...)
Expand Down Expand Up @@ -315,5 +318,7 @@ func Register() {
plugin.Register(consts.PluginName, &factory{})
}

var ServerFilter = filter.ServerChain{traces.ServerFilter(), prometheus.ServerFilter(), logs.LogRecoveryFilter()}.Filter
var ClientFilter = filter.ClientChain{traces.ClientFilter(), prometheus.ClientFilter()}.Filter
var (
ServerFilter = filter.ServerChain{traces.ServerFilter(), prometheus.ServerFilter(), logs.LogRecoveryFilter()}.Filter
ClientFilter = filter.ClientChain{traces.ClientFilter(), prometheus.ClientFilter()}.Filter
)

0 comments on commit 4866dc6

Please sign in to comment.