diff --git a/extension/memorylimiterextension/admission/README.md b/extension/memorylimiterextension/admission/README.md new file mode 100644 index 00000000000..e5ed0091a6a --- /dev/null +++ b/extension/memorylimiterextension/admission/README.md @@ -0,0 +1,7 @@ +# Admission Package + +The real implementation of this package currently resides in the +[Collector-Contrib/internal/otelarrow/admission2](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/otelarrow/admission2) +package. It would be copied in as part of a series of changes +described in https://github.com/open-telemetry/opentelemetry-collector/issues/9591. + diff --git a/extension/memorylimiterextension/admission/controller.go b/extension/memorylimiterextension/admission/controller.go new file mode 100644 index 00000000000..4416ca3e25b --- /dev/null +++ b/extension/memorylimiterextension/admission/controller.go @@ -0,0 +1,56 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package admission // import "github.com/open-telemetry/opentelemetry-collector/internal/memorylimiter/admission" + +import ( + "context" +) + +// Queue is a weighted admission queue interface. +type Queue interface { + // Acquire asks the controller to admit the caller. + // + // The weight parameter specifies how large of an admission to make. + // This might be used on the bytes of request (for example) to differentiate + // between large and small requests. + // + // Admit will return when one of the following events occurs: + // + // (1) admission is allowed, or + // (2) the provided ctx becomes canceled, or + // (3) there are so many existing waiters that the + // controller decides to reject this caller without + // admitting it. + // + // In case (1), the return value will be a non-nil + // ReleaseFunc. The caller must invoke it after it is finished + // with the resource being guarded by the admission + // controller. + // + // In case (2), the return value will be a Cancelled or + // DeadlineExceeded error. + // + // In case (3), the return value will be a ResourceExhausted + // error. + Acquire(ctx context.Context, weight uint64) (ReleaseFunc, error) +} + +// ReleaseFunc is returned by Acquire when the Acquire() was admitted. +type ReleaseFunc func() + +type unboundedController struct{} + +var _ Queue = unboundedController{} + +// NewUnboundedQueue returns a no-op implementation of the Queue interface. +func NewUnboundedQueue() Queue { + return unboundedController{} +} + +func unboundedRelease() {} + +// Acquire implements Queue. +func (unboundedController) Acquire(_ context.Context, _ uint64) (ReleaseFunc, error) { + return unboundedRelease, nil +} diff --git a/internal/memorylimiter/config.go b/internal/memorylimiter/config.go index 76610e3c73e..65361c533ea 100644 --- a/internal/memorylimiter/config.go +++ b/internal/memorylimiter/config.go @@ -5,6 +5,7 @@ package memorylimiter // import "go.opentelemetry.io/collector/internal/memoryli import ( "errors" + "strings" "time" "go.opentelemetry.io/collector/component" @@ -14,6 +15,7 @@ var ( errCheckIntervalOutOfRange = errors.New("'check_interval' must be greater than zero") errInconsistentGCMinInterval = errors.New("'min_gc_interval_when_soft_limited' should be larger than 'min_gc_interval_when_hard_limited'") errLimitOutOfRange = errors.New("'limit_mib' or 'limit_percentage' must be greater than zero") + errAdmissionLimitOutOfRange = errors.New("'request_limit_mib' must be greater than zero") errSpikeLimitOutOfRange = errors.New("'spike_limit_mib' must be smaller than 'limit_mib'") errSpikeLimitPercentageOutOfRange = errors.New("'spike_limit_percentage' must be smaller than 'limit_percentage'") errLimitPercentageOutOfRange = errors.New( @@ -22,6 +24,31 @@ var ( // Config defines configuration for memory memoryLimiter processor. type Config struct { + // Model is one of "gcstats" or "admission". Use the + // corresponding `gcstats` or `admission` sub-configuration + // objects. + Model string `mapstructure:"model"` + + // GCStats contains settings that control a memory limiter + // based on garbage collector stats. This is the original + // model supported by this component, therefore the + // configuration is squashed. + // + // Note: can we un-squash this using a feature flag? + // + // Note: this is a breaking API change. Should we embed the + // GCStatsConfig object so that callers can continue to access + // GCStats config w/o adding `GCStats.` at every field reference? + GCStats GCStatsConfig `mapstructure:",squash"` + + // Admission contains settings that control a memory limiter + // based on an exact count of bytes pending and in the + // pipeline. + Admission AdmissionConfig `mapstructure:"admission"` +} + +// GCStatsConfig is the basis of a garbage-collector statistics-based memory limiter. +type GCStatsConfig struct { // CheckInterval is the time between measurements of memory usage for the // purposes of avoiding going over the limits. Defaults to zero, so no // checks will be performed. @@ -54,33 +81,68 @@ type Config struct { MemorySpikePercentage uint32 `mapstructure:"spike_limit_percentage"` } +// AdmissionConfig is the basis of a memory limiter that counts the +// number of bytes pending and in the pipeline. +type AdmissionConfig struct { + // RequestLimitMiB limits the number of requests that are received by the stream based on + // uncompressed request size. Request size is used to control how much traffic we admit + // for processing. When this field is zero, admission control is disabled meaning all + // requests will be immediately accepted. + RequestLimitMiB uint64 `mapstructure:"request_limit_mib"` + + // WaitingLimitMiB is the limit on the amount of data waiting to be consumed. + // This is a dimension of memory limiting to ensure waiters are not consuming an + // unexpectedly large amount of memory in receivers that use it. + WaitingLimitMiB uint64 `mapstructure:"waiting_limit_mib"` +} + var _ component.Config = (*Config)(nil) func NewDefaultConfig() *Config { + // Note that users are required to configure the primary limit in + // all models. For GCStats, the critical config is MemoryLimitMiB, + // for Admission, the critical config is RequestLimitMiB. return &Config{ - MinGCIntervalWhenSoftLimited: 10 * time.Second, + Model: "gcstats", + GCStats: GCStatsConfig{ + MemoryLimitMiB: 0, + MinGCIntervalWhenSoftLimited: 10 * time.Second, + }, + Admission: AdmissionConfig{ + RequestLimitMiB: 0, + WaitingLimitMiB: 0, + }, } } // Validate checks if the processor configuration is valid func (cfg *Config) Validate() error { - if cfg.CheckInterval <= 0 { - return errCheckIntervalOutOfRange - } - if cfg.MinGCIntervalWhenSoftLimited < cfg.MinGCIntervalWhenHardLimited { - return errInconsistentGCMinInterval - } - if cfg.MemoryLimitMiB == 0 && cfg.MemoryLimitPercentage == 0 { - return errLimitOutOfRange - } - if cfg.MemoryLimitPercentage > 100 || cfg.MemorySpikePercentage > 100 { - return errLimitPercentageOutOfRange - } - if cfg.MemoryLimitMiB > 0 && cfg.MemoryLimitMiB <= cfg.MemorySpikeLimitMiB { - return errSpikeLimitOutOfRange - } - if cfg.MemoryLimitPercentage > 0 && cfg.MemoryLimitPercentage <= cfg.MemorySpikePercentage { - return errSpikeLimitPercentageOutOfRange + switch strings.ToLower(cfg.Model) { + // The default branch includes the case where Model is unset, + // for backwards compatibility. + case "", "gcstats": + if cfg.GCStats.CheckInterval <= 0 { + return errCheckIntervalOutOfRange + } + if cfg.GCStats.MinGCIntervalWhenSoftLimited < cfg.GCStats.MinGCIntervalWhenHardLimited { + return errInconsistentGCMinInterval + } + if cfg.GCStats.MemoryLimitMiB == 0 && cfg.GCStats.MemoryLimitPercentage == 0 { + return errLimitOutOfRange + } + if cfg.GCStats.MemoryLimitPercentage > 100 || cfg.GCStats.MemorySpikePercentage > 100 { + return errLimitPercentageOutOfRange + } + if cfg.GCStats.MemoryLimitMiB > 0 && cfg.GCStats.MemoryLimitMiB <= cfg.GCStats.MemorySpikeLimitMiB { + return errSpikeLimitOutOfRange + } + if cfg.GCStats.MemoryLimitPercentage > 0 && cfg.GCStats.MemoryLimitPercentage <= cfg.GCStats.MemorySpikePercentage { + return errSpikeLimitPercentageOutOfRange + } + case "admission": + if cfg.Admission.RequestLimitMiB == 0 { + return errAdmissionLimitOutOfRange + } } return nil } diff --git a/internal/memorylimiter/config_test.go b/internal/memorylimiter/config_test.go index 5861ad683bb..cf063c8c689 100644 --- a/internal/memorylimiter/config_test.go +++ b/internal/memorylimiter/config_test.go @@ -21,9 +21,11 @@ func TestUnmarshalConfig(t *testing.T) { assert.NoError(t, cm.Unmarshal(&cfg)) assert.Equal(t, &Config{ - CheckInterval: 5 * time.Second, - MemoryLimitMiB: 4000, - MemorySpikeLimitMiB: 500, + GCStats: GCStatsConfig{ + CheckInterval: 5 * time.Second, + MemoryLimitMiB: 4000, + MemorySpikeLimitMiB: 500, + }, }, cfg) } @@ -36,65 +38,106 @@ func TestConfigValidate(t *testing.T) { { name: "valid", cfg: &Config{ - MemoryLimitMiB: 5722, - MemorySpikeLimitMiB: 1907, - CheckInterval: 100 * time.Millisecond, + GCStats: GCStatsConfig{ + MemoryLimitMiB: 5722, + MemorySpikeLimitMiB: 1907, + CheckInterval: 100 * time.Millisecond, + }, }, err: nil, }, { name: "zero check interval", cfg: &Config{ - CheckInterval: 0, + GCStats: GCStatsConfig{ + CheckInterval: 0, + }, }, err: errCheckIntervalOutOfRange, }, { name: "unset memory limit", cfg: &Config{ - CheckInterval: 1 * time.Second, - MemoryLimitMiB: 0, - MemoryLimitPercentage: 0, + GCStats: GCStatsConfig{ + CheckInterval: 1 * time.Second, + MemoryLimitMiB: 0, + MemoryLimitPercentage: 0, + }, }, err: errLimitOutOfRange, }, { name: "invalid memory spike limit", cfg: &Config{ - CheckInterval: 1 * time.Second, - MemoryLimitMiB: 10, - MemorySpikeLimitMiB: 10, + GCStats: GCStatsConfig{ + CheckInterval: 1 * time.Second, + MemoryLimitMiB: 10, + MemorySpikeLimitMiB: 10, + }, }, err: errSpikeLimitOutOfRange, }, { name: "invalid memory percentage limit", cfg: &Config{ - CheckInterval: 1 * time.Second, - MemoryLimitPercentage: 101, + GCStats: GCStatsConfig{ + CheckInterval: 1 * time.Second, + MemoryLimitPercentage: 101, + }, }, err: errLimitPercentageOutOfRange, }, { name: "invalid memory spike percentage limit", cfg: &Config{ - CheckInterval: 1 * time.Second, - MemoryLimitPercentage: 50, - MemorySpikePercentage: 60, + GCStats: GCStatsConfig{ + CheckInterval: 1 * time.Second, + MemoryLimitPercentage: 50, + MemorySpikePercentage: 60, + }, }, err: errSpikeLimitPercentageOutOfRange, }, { name: "invalid gc intervals", cfg: &Config{ - CheckInterval: 100 * time.Millisecond, - MinGCIntervalWhenSoftLimited: 50 * time.Millisecond, - MinGCIntervalWhenHardLimited: 100 * time.Millisecond, - MemoryLimitMiB: 5722, - MemorySpikeLimitMiB: 1907, + GCStats: GCStatsConfig{ + CheckInterval: 100 * time.Millisecond, + MinGCIntervalWhenSoftLimited: 50 * time.Millisecond, + MinGCIntervalWhenHardLimited: 100 * time.Millisecond, + MemoryLimitMiB: 5722, + MemorySpikeLimitMiB: 1907, + }, }, err: errInconsistentGCMinInterval, }, + { + name: "missing request_limit", + cfg: &Config{ + Model: "admission", + Admission: AdmissionConfig{}, + }, + err: errAdmissionLimitOutOfRange, + }, + { + name: "valid request_limit without waiting_limit", + cfg: &Config{ + Model: "admission", + Admission: AdmissionConfig{ + RequestLimitMiB: 128, + }, + }, + }, + { + name: "valid request_limit with waiting_limit", + cfg: &Config{ + Model: "admission", + Admission: AdmissionConfig{ + RequestLimitMiB: 128, + WaitingLimitMiB: 32, + }, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/internal/memorylimiter/memorylimiter.go b/internal/memorylimiter/memorylimiter.go index 906e6217c7b..1eb20586530 100644 --- a/internal/memorylimiter/memorylimiter.go +++ b/internal/memorylimiter/memorylimiter.go @@ -74,14 +74,14 @@ func NewMemoryLimiter(cfg *Config, logger *zap.Logger) (*MemoryLimiter, error) { logger.Info("Memory limiter configured", zap.Uint64("limit_mib", usageChecker.memAllocLimit/mibBytes), zap.Uint64("spike_limit_mib", usageChecker.memSpikeLimit/mibBytes), - zap.Duration("check_interval", cfg.CheckInterval)) + zap.Duration("check_interval", cfg.GCStats.CheckInterval)) return &MemoryLimiter{ usageChecker: *usageChecker, - memCheckWait: cfg.CheckInterval, - ticker: time.NewTicker(cfg.CheckInterval), - minGCIntervalWhenSoftLimited: cfg.MinGCIntervalWhenSoftLimited, - minGCIntervalWhenHardLimited: cfg.MinGCIntervalWhenHardLimited, + memCheckWait: cfg.GCStats.CheckInterval, + ticker: time.NewTicker(cfg.GCStats.CheckInterval), + minGCIntervalWhenSoftLimited: cfg.GCStats.MinGCIntervalWhenSoftLimited, + minGCIntervalWhenHardLimited: cfg.GCStats.MinGCIntervalWhenHardLimited, lastGCDone: time.Now(), readMemStatsFn: ReadMemStatsFn, runGCFn: runtime.GC, @@ -136,9 +136,9 @@ func (ml *MemoryLimiter) MustRefuse() bool { } func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, error) { - memAllocLimit := uint64(cfg.MemoryLimitMiB) * mibBytes - memSpikeLimit := uint64(cfg.MemorySpikeLimitMiB) * mibBytes - if cfg.MemoryLimitMiB != 0 { + memAllocLimit := uint64(cfg.GCStats.MemoryLimitMiB) * mibBytes + memSpikeLimit := uint64(cfg.GCStats.MemorySpikeLimitMiB) * mibBytes + if cfg.GCStats.MemoryLimitMiB != 0 { return newFixedMemUsageChecker(memAllocLimit, memSpikeLimit), nil } totalMemory, err := GetMemoryFn() @@ -147,10 +147,10 @@ func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, erro } logger.Info("Using percentage memory limiter", zap.Uint64("total_memory_mib", totalMemory/mibBytes), - zap.Uint32("limit_percentage", cfg.MemoryLimitPercentage), - zap.Uint32("spike_limit_percentage", cfg.MemorySpikePercentage)) - return newPercentageMemUsageChecker(totalMemory, uint64(cfg.MemoryLimitPercentage), - uint64(cfg.MemorySpikePercentage)), nil + zap.Uint32("limit_percentage", cfg.GCStats.MemoryLimitPercentage), + zap.Uint32("spike_limit_percentage", cfg.GCStats.MemorySpikePercentage)) + return newPercentageMemUsageChecker(totalMemory, uint64(cfg.GCStats.MemoryLimitPercentage), + uint64(cfg.GCStats.MemorySpikePercentage)), nil } func (ml *MemoryLimiter) readMemStats() *runtime.MemStats { diff --git a/internal/memorylimiter/memorylimiter_test.go b/internal/memorylimiter/memorylimiter_test.go index 1f26d024cc8..b38b53e4d52 100644 --- a/internal/memorylimiter/memorylimiter_test.go +++ b/internal/memorylimiter/memorylimiter_test.go @@ -20,9 +20,11 @@ import ( func TestMemoryPressureResponse(t *testing.T) { var currentMemAlloc uint64 cfg := &Config{ - CheckInterval: 1 * time.Minute, - MemoryLimitMiB: 1024, - MemorySpikeLimitMiB: 0, + GCStats: GCStatsConfig{ + CheckInterval: 1 * time.Minute, + MemoryLimitMiB: 1024, + MemorySpikeLimitMiB: 0, + }, } ml, err := NewMemoryLimiter(cfg, zap.NewNop()) require.NoError(t, err) @@ -56,7 +58,12 @@ func TestMemoryPressureResponse(t *testing.T) { func TestGetDecision(t *testing.T) { t.Run("fixed_limit", func(t *testing.T) { - d, err := getMemUsageChecker(&Config{MemoryLimitMiB: 100, MemorySpikeLimitMiB: 20}, zap.NewNop()) + d, err := getMemUsageChecker(&Config{ + GCStats: GCStatsConfig{ + MemoryLimitMiB: 100, + MemorySpikeLimitMiB: 20, + }, + }, zap.NewNop()) require.NoError(t, err) assert.Equal(t, &memUsageChecker{ memAllocLimit: 100 * mibBytes, @@ -71,7 +78,12 @@ func TestGetDecision(t *testing.T) { return 100 * mibBytes, nil } t.Run("percentage_limit", func(t *testing.T) { - d, err := getMemUsageChecker(&Config{MemoryLimitPercentage: 50, MemorySpikePercentage: 10}, zap.NewNop()) + d, err := getMemUsageChecker(&Config{ + GCStats: GCStatsConfig{ + MemoryLimitPercentage: 50, + MemorySpikePercentage: 10, + }, + }, zap.NewNop()) require.NoError(t, err) assert.Equal(t, &memUsageChecker{ memAllocLimit: 50 * mibBytes, @@ -143,10 +155,12 @@ func TestCallGCWhenSoftLimit(t *testing.T) { { name: "GC when first soft limit and not immediately", mlCfg: &Config{ - CheckInterval: 1 * time.Minute, - MinGCIntervalWhenSoftLimited: 10 * time.Second, - MemoryLimitMiB: 50, - MemorySpikeLimitMiB: 10, + GCStats: GCStatsConfig{ + CheckInterval: 1 * time.Minute, + MinGCIntervalWhenSoftLimited: 10 * time.Second, + MemoryLimitMiB: 50, + MemorySpikeLimitMiB: 10, + }, }, memAllocMiB: [2]uint64{45, 45}, numGCs: 1, @@ -154,10 +168,12 @@ func TestCallGCWhenSoftLimit(t *testing.T) { { name: "GC always when soft limit min interval is 0", mlCfg: &Config{ - CheckInterval: 1 * time.Minute, - MinGCIntervalWhenSoftLimited: 0, - MemoryLimitMiB: 50, - MemorySpikeLimitMiB: 10, + GCStats: GCStatsConfig{ + CheckInterval: 1 * time.Minute, + MinGCIntervalWhenSoftLimited: 0, + MemoryLimitMiB: 50, + MemorySpikeLimitMiB: 10, + }, }, memAllocMiB: [2]uint64{45, 45}, numGCs: 2, @@ -165,10 +181,12 @@ func TestCallGCWhenSoftLimit(t *testing.T) { { name: "GC when first hard limit and not immediately", mlCfg: &Config{ - CheckInterval: 1 * time.Minute, - MinGCIntervalWhenHardLimited: 10 * time.Second, - MemoryLimitMiB: 50, - MemorySpikeLimitMiB: 10, + GCStats: GCStatsConfig{ + CheckInterval: 1 * time.Minute, + MinGCIntervalWhenHardLimited: 10 * time.Second, + MemoryLimitMiB: 50, + MemorySpikeLimitMiB: 10, + }, }, memAllocMiB: [2]uint64{55, 55}, numGCs: 1, @@ -176,10 +194,12 @@ func TestCallGCWhenSoftLimit(t *testing.T) { { name: "GC always when hard limit min interval is 0", mlCfg: &Config{ - CheckInterval: 1 * time.Minute, - MinGCIntervalWhenHardLimited: 0, - MemoryLimitMiB: 50, - MemorySpikeLimitMiB: 10, + GCStats: GCStatsConfig{ + CheckInterval: 1 * time.Minute, + MinGCIntervalWhenHardLimited: 0, + MemoryLimitMiB: 50, + MemorySpikeLimitMiB: 10, + }, }, memAllocMiB: [2]uint64{55, 55}, numGCs: 2, @@ -187,11 +207,13 @@ func TestCallGCWhenSoftLimit(t *testing.T) { { name: "GC based on soft then based on hard limit", mlCfg: &Config{ - CheckInterval: 1 * time.Minute, - MinGCIntervalWhenSoftLimited: 10 * time.Second, - MinGCIntervalWhenHardLimited: 0, - MemoryLimitMiB: 50, - MemorySpikeLimitMiB: 10, + GCStats: GCStatsConfig{ + CheckInterval: 1 * time.Minute, + MinGCIntervalWhenSoftLimited: 10 * time.Second, + MinGCIntervalWhenHardLimited: 0, + MemoryLimitMiB: 50, + MemorySpikeLimitMiB: 10, + }, }, memAllocMiB: [2]uint64{45, 55}, numGCs: 2,