Skip to content

Commit

Permalink
use 'valid' tag for DefaultSamplingProbability, remove unnecessary 'o…
Browse files Browse the repository at this point in the history
…ptions' input from functions

Signed-off-by: adityachopra29 <[email protected]>
  • Loading branch information
adityachopra29 committed Jan 9, 2025
1 parent 50d49e9 commit f0bdc32
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 66 deletions.
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/all-in-one.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ extensions:
# We can either use file or adaptive sampling strategy in remote_sampling
file:
path:
default_sampling_probability: 1
reload_interval: 1s
default_sampling_probability: 0.001
# adaptive:
# sampling_store: some_store
# initial_sampling_probability: 0.1
Expand Down
13 changes: 4 additions & 9 deletions cmd/jaeger/internal/extension/remotesampling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ import (
)

var (
errNoProvider = errors.New("no sampling strategy provider specified, expecting 'adaptive' or 'file'")
errMultipleProviders = errors.New("only one sampling strategy provider can be specified, 'adaptive' or 'file'")
errNegativeInterval = errors.New("reload interval must be a positive value, or zero to disable automatic reloading")
errInvalidDefaultProbability = errors.New("default sampling probability must be between 0 and 1")
errNoProvider = errors.New("no sampling strategy provider specified, expecting 'adaptive' or 'file'")
errMultipleProviders = errors.New("only one sampling strategy provider can be specified, 'adaptive' or 'file'")
errNegativeInterval = errors.New("reload interval must be a positive value, or zero to disable automatic reloading")
)

var (
Expand Down Expand Up @@ -52,7 +51,7 @@ type FileConfig struct {
// ReloadInterval is the time interval to check and reload sampling strategies file
ReloadInterval time.Duration `mapstructure:"reload_interval"`
// DefaultSamplingProbability is the sampling probability used by the Strategy Store for static sampling
DefaultSamplingProbability float64 `mapstructure:"default_sampling_probability"`
DefaultSamplingProbability float64 `mapstructure:"default_sampling_probability" valid:"range(0|1)"`
}

type AdaptiveConfig struct {
Expand Down Expand Up @@ -107,10 +106,6 @@ func (cfg *Config) Validate() error {
return errNegativeInterval
}

if cfg.File != nil && (cfg.File.DefaultSamplingProbability > 1 || cfg.File.DefaultSamplingProbability < 0) {
return errInvalidDefaultProbability
}

_, err := govalidator.ValidateStruct(cfg)
return err
}
4 changes: 2 additions & 2 deletions cmd/jaeger/internal/extension/remotesampling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ func Test_Validate(t *testing.T) {
config: &Config{
File: &FileConfig{Path: "", DefaultSamplingProbability: -0.5},
},
expectedErr: "default sampling probability must be between 0 and 1",
expectedErr: "File.DefaultSamplingProbability: -0.5 does not validate as range(0|1)",
},
{
name: "File provider has default sampling probability greater than 1",
config: &Config{
File: &FileConfig{Path: "", DefaultSamplingProbability: 1.5},
},
expectedErr: "default sampling probability must be between 0 and 1",
expectedErr: "File.DefaultSamplingProbability: 1.5 does not validate as range(0|1)",
},
{
name: "Invalid Adaptive provider",
Expand Down
4 changes: 3 additions & 1 deletion cmd/jaeger/internal/extension/remotesampling/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/extension"

"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/static"
"github.com/jaegertracing/jaeger/ports"
)

Expand Down Expand Up @@ -43,7 +44,8 @@ func createDefaultConfig() component.Config {
},
},
File: &FileConfig{
Path: "", // path needs to be specified
Path: "", // path needs to be specified
DefaultSamplingProbability: static.DefaultDefaultSamplingProbability,
},
Adaptive: &AdaptiveConfig{
SamplingStore: "", // storage name needs to be specified
Expand Down
2 changes: 1 addition & 1 deletion idl
6 changes: 3 additions & 3 deletions plugin/sampling/strategyprovider/static/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ const (
// only up to a fixed number of traces per second.
samplerTypeRateLimiting = "ratelimiting"

// defaultDefaultSamplingProbability is the default "defaultSamplingProbability"
// used by the Strategy Store in case no defaultSamplingProbability is defined is given
defaultDefaultSamplingProbability = 0.001
// DefaultDefaultSamplingProbability is the default value for "DefaultSamplingProbability"
// used by the Strategy Store in case no DefaultSamplingProbability is defined
DefaultDefaultSamplingProbability = 0.001
)

// defaultStrategy is the default sampling strategy the Strategy Store will return
Expand Down
4 changes: 2 additions & 2 deletions plugin/sampling/strategyprovider/static/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const (
samplingStrategiesFile = "sampling.strategies-file"
samplingStrategiesReloadInterval = "sampling.strategies-reload-interval"
samplingStrategiesBugfix5270 = "sampling.strategies.bugfix-5270"
samplingStrategiesDefaultSamplingProbability = "sampling.stategies-default-sampling-probability"
samplingStrategiesDefaultSamplingProbability = "sampling.default-sampling-probability"
)

// Options holds configuration for the static sampling strategy store.
Expand All @@ -37,7 +37,7 @@ func AddFlags(flagSet *flag.FlagSet) {
flagSet.Duration(samplingStrategiesReloadInterval, 0, "Reload interval to check and reload sampling strategies file. Zero value means no reloading")
flagSet.String(samplingStrategiesFile, "", "The path for the sampling strategies file in JSON format. See sampling documentation to see format of the file")
flagSet.Bool(samplingStrategiesBugfix5270, true, "Include default operation level strategies for Ratesampling type service level strategy. Cf. https://github.com/jaegertracing/jaeger/issues/5270")
flagSet.Float64(samplingStrategiesDefaultSamplingProbability, defaultDefaultSamplingProbability, "Sampling probability used by the Strategy Store for static sampling. Value must be between 0 and 1.")
flagSet.Float64(samplingStrategiesDefaultSamplingProbability, DefaultDefaultSamplingProbability, "Sampling probability used by the Strategy Store for static sampling. Value must be between 0 and 1.")
}

// InitFromViper initializes Options with properties from viper
Expand Down
51 changes: 25 additions & 26 deletions plugin/sampling/strategyprovider/static/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ func NewProvider(options Options, logger *zap.Logger) (ss.Provider, error) {
h.logger.Warn("Default operations level strategies will not be included for Ratelimiting service strategies." +
"This behavior will be changed in future releases. " +
"Cf. https://github.com/jaegertracing/jaeger/issues/5270")
h.parseStrategies_deprecated(options, strategies)
h.parseStrategies_deprecated(strategies)
} else {
h.parseStrategies(options, strategies)
h.parseStrategies(strategies)
}

if options.ReloadInterval > 0 {
go h.autoUpdateStrategies(ctx, options.ReloadInterval, loadFn, options)
go h.autoUpdateStrategies(ctx, loadFn)
}
return h, nil
}
Expand Down Expand Up @@ -154,21 +154,21 @@ func (h *samplingProvider) samplingStrategyLoader(strategiesFile string) strateg
}
}

func (h *samplingProvider) autoUpdateStrategies(ctx context.Context, interval time.Duration, loader strategyLoader, options Options) {
func (h *samplingProvider) autoUpdateStrategies(ctx context.Context, loader strategyLoader) {
lastValue := string(nullJSON)
ticker := time.NewTicker(interval)
ticker := time.NewTicker(h.options.ReloadInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
lastValue = h.reloadSamplingStrategy(options, loader, lastValue)
lastValue = h.reloadSamplingStrategy(loader, lastValue)
case <-ctx.Done():
return
}
}
}

func (h *samplingProvider) reloadSamplingStrategy(options Options, loadFn strategyLoader, lastValue string) string {
func (h *samplingProvider) reloadSamplingStrategy(loadFn strategyLoader, lastValue string) string {
newValue, err := loadFn()
if err != nil {
h.logger.Error("failed to re-load sampling strategies", zap.Error(err))
Expand All @@ -177,19 +177,19 @@ func (h *samplingProvider) reloadSamplingStrategy(options Options, loadFn strate
if lastValue == string(newValue) {
return lastValue
}
if err := h.updateSamplingStrategy(options, newValue); err != nil {
if err := h.updateSamplingStrategy(newValue); err != nil {
h.logger.Error("failed to update sampling strategies", zap.Error(err))
return lastValue
}
return string(newValue)
}

func (h *samplingProvider) updateSamplingStrategy(options Options, dataBytes []byte) error {
func (h *samplingProvider) updateSamplingStrategy(dataBytes []byte) error {
var strategies strategies
if err := json.Unmarshal(dataBytes, &strategies); err != nil {
return fmt.Errorf("failed to unmarshal sampling strategies: %w", err)
}
h.parseStrategies(options, &strategies)
h.parseStrategies(&strategies)
h.logger.Info("Updated sampling strategies:" + string(dataBytes))
return nil
}
Expand All @@ -208,10 +208,10 @@ func loadStrategies(loadFn strategyLoader) (*strategies, error) {
return strategies, nil
}

func (h *samplingProvider) parseStrategies_deprecated(options Options, strategies *strategies) {
newStore := defaultStrategies(options.DefaultSamplingProbability)
func (h *samplingProvider) parseStrategies_deprecated(strategies *strategies) {
newStore := defaultStrategies(h.options.DefaultSamplingProbability)
if strategies.DefaultStrategy != nil {
newStore.defaultStrategy = h.parseServiceStrategies(options, strategies.DefaultStrategy)
newStore.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy)
}

merge := true
Expand All @@ -221,7 +221,7 @@ func (h *samplingProvider) parseStrategies_deprecated(options Options, strategie
}

for _, s := range strategies.ServiceStrategies {
newStore.serviceStrategies[s.Service] = h.parseServiceStrategies(options, s)
newStore.serviceStrategies[s.Service] = h.parseServiceStrategies(s)

// Merge with the default operation strategies, because only merging with
// the default strategy has no effect on service strategies (the default strategy
Expand All @@ -247,14 +247,14 @@ func (h *samplingProvider) parseStrategies_deprecated(options Options, strategie
h.storedStrategies.Store(newStore)
}

func (h *samplingProvider) parseStrategies(options Options, strategies *strategies) {
newStore := defaultStrategies(options.DefaultSamplingProbability)
func (h *samplingProvider) parseStrategies(strategies *strategies) {
newStore := defaultStrategies(h.options.DefaultSamplingProbability)
if strategies.DefaultStrategy != nil {
newStore.defaultStrategy = h.parseServiceStrategies(options, strategies.DefaultStrategy)
newStore.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy)
}

for _, s := range strategies.ServiceStrategies {
newStore.serviceStrategies[s.Service] = h.parseServiceStrategies(options, s)
newStore.serviceStrategies[s.Service] = h.parseServiceStrategies(s)

// Config for this service may not have per-operation strategies,
// but if the default strategy has them they should still apply.
Expand Down Expand Up @@ -302,19 +302,19 @@ func mergePerOperationSamplingStrategies(
return a
}

func (h *samplingProvider) parseServiceStrategies(options Options, strategy *serviceStrategy) *api_v2.SamplingStrategyResponse {
resp := h.parseStrategy(options, &strategy.strategy)
func (h *samplingProvider) parseServiceStrategies(strategy *serviceStrategy) *api_v2.SamplingStrategyResponse {
resp := h.parseStrategy(&strategy.strategy)
if len(strategy.OperationStrategies) == 0 {
return resp
}
opS := &api_v2.PerOperationSamplingStrategies{
DefaultSamplingProbability: options.DefaultSamplingProbability,
DefaultSamplingProbability: h.options.DefaultSamplingProbability,
}
if resp.StrategyType == api_v2.SamplingStrategyType_PROBABILISTIC {
opS.DefaultSamplingProbability = resp.ProbabilisticSampling.SamplingRate
}
for _, operationStrategy := range strategy.OperationStrategies {
s, ok := h.parseOperationStrategy(options, operationStrategy, opS)
s, ok := h.parseOperationStrategy(operationStrategy, opS)
if !ok {
continue
}
Expand All @@ -330,11 +330,10 @@ func (h *samplingProvider) parseServiceStrategies(options Options, strategy *ser
}

func (h *samplingProvider) parseOperationStrategy(
options Options,
strategy *operationStrategy,
parent *api_v2.PerOperationSamplingStrategies,
) (s *api_v2.SamplingStrategyResponse, ok bool) {
s = h.parseStrategy(options, &strategy.strategy)
s = h.parseStrategy(&strategy.strategy)
if s.StrategyType == api_v2.SamplingStrategyType_RATE_LIMITING {
// TODO OperationSamplingStrategy only supports probabilistic sampling
h.logger.Warn(
Expand All @@ -348,7 +347,7 @@ func (h *samplingProvider) parseOperationStrategy(
return s, true
}

func (h *samplingProvider) parseStrategy(options Options, strategy *strategy) *api_v2.SamplingStrategyResponse {
func (h *samplingProvider) parseStrategy(strategy *strategy) *api_v2.SamplingStrategyResponse {
switch strategy.Type {
case samplerTypeProbabilistic:
return &api_v2.SamplingStrategyResponse{
Expand All @@ -366,7 +365,7 @@ func (h *samplingProvider) parseStrategy(options Options, strategy *strategy) *a
}
default:
h.logger.Warn("Failed to parse sampling strategy", zap.Any("strategy", strategy))
return defaultStrategyResponse(options.DefaultSamplingProbability)
return defaultStrategyResponse(h.options.DefaultSamplingProbability)
}
}

Expand Down
Loading

0 comments on commit f0bdc32

Please sign in to comment.