Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Skeleton changes for new memory limiter models #12558

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions extension/memorylimiterextension/admission/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Admission Package

The real implementation of this package currently resides in the
[Collector-Contrib/internal/otelarrow/admission2](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/otelarrow/admission2)
package. It would be copied in as part of a series of changes
described in https://github.com/open-telemetry/opentelemetry-collector/issues/9591.

56 changes: 56 additions & 0 deletions extension/memorylimiterextension/admission/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package admission // import "github.com/open-telemetry/opentelemetry-collector/internal/memorylimiter/admission"

import (
"context"
)

// Queue is a weighted admission queue interface.
type Queue interface {
// Acquire asks the controller to admit the caller.
//
// The weight parameter specifies how large of an admission to make.
// This might be used on the bytes of request (for example) to differentiate
// between large and small requests.
//
// Admit will return when one of the following events occurs:
//
// (1) admission is allowed, or
// (2) the provided ctx becomes canceled, or
// (3) there are so many existing waiters that the
// controller decides to reject this caller without
// admitting it.
//
// In case (1), the return value will be a non-nil
// ReleaseFunc. The caller must invoke it after it is finished
// with the resource being guarded by the admission
// controller.
//
// In case (2), the return value will be a Cancelled or
// DeadlineExceeded error.
//
// In case (3), the return value will be a ResourceExhausted
// error.
Acquire(ctx context.Context, weight uint64) (ReleaseFunc, error)
}

// ReleaseFunc is returned by Acquire when the Acquire() was admitted.
type ReleaseFunc func()

type unboundedController struct{}

var _ Queue = unboundedController{}

// NewUnboundedQueue returns a no-op implementation of the Queue interface.
func NewUnboundedQueue() Queue {
return unboundedController{}
}

func unboundedRelease() {}

// Acquire implements Queue.
func (unboundedController) Acquire(_ context.Context, _ uint64) (ReleaseFunc, error) {
return unboundedRelease, nil
}
98 changes: 80 additions & 18 deletions internal/memorylimiter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package memorylimiter // import "go.opentelemetry.io/collector/internal/memoryli

import (
"errors"
"strings"
"time"

"go.opentelemetry.io/collector/component"
Expand All @@ -14,6 +15,7 @@ var (
errCheckIntervalOutOfRange = errors.New("'check_interval' must be greater than zero")
errInconsistentGCMinInterval = errors.New("'min_gc_interval_when_soft_limited' should be larger than 'min_gc_interval_when_hard_limited'")
errLimitOutOfRange = errors.New("'limit_mib' or 'limit_percentage' must be greater than zero")
errAdmissionLimitOutOfRange = errors.New("'request_limit_mib' must be greater than zero")
errSpikeLimitOutOfRange = errors.New("'spike_limit_mib' must be smaller than 'limit_mib'")
errSpikeLimitPercentageOutOfRange = errors.New("'spike_limit_percentage' must be smaller than 'limit_percentage'")
errLimitPercentageOutOfRange = errors.New(
Expand All @@ -22,6 +24,31 @@ var (

// Config defines configuration for memory memoryLimiter processor.
type Config struct {
// Model is one of "gcstats" or "admission". Use the
// corresponding `gcstats` or `admission` sub-configuration
// objects.
Model string `mapstructure:"model"`

// GCStats contains settings that control a memory limiter
// based on garbage collector stats. This is the original
// model supported by this component, therefore the
// configuration is squashed.
//
// Note: can we un-squash this using a feature flag?
//
// Note: this is a breaking API change. Should we embed the
// GCStatsConfig object so that callers can continue to access
// GCStats config w/o adding `GCStats.` at every field reference?
GCStats GCStatsConfig `mapstructure:",squash"`

// Admission contains settings that control a memory limiter
// based on an exact count of bytes pending and in the
// pipeline.
Admission AdmissionConfig `mapstructure:"admission"`
}

// GCStatsConfig is the basis of a garbage-collector statistics-based memory limiter.
type GCStatsConfig struct {
// CheckInterval is the time between measurements of memory usage for the
// purposes of avoiding going over the limits. Defaults to zero, so no
// checks will be performed.
Expand Down Expand Up @@ -54,33 +81,68 @@ type Config struct {
MemorySpikePercentage uint32 `mapstructure:"spike_limit_percentage"`
}

// AdmissionConfig is the basis of a memory limiter that counts the
// number of bytes pending and in the pipeline.
type AdmissionConfig struct {
// RequestLimitMiB limits the number of requests that are received by the stream based on
// uncompressed request size. Request size is used to control how much traffic we admit
// for processing. When this field is zero, admission control is disabled meaning all
// requests will be immediately accepted.
RequestLimitMiB uint64 `mapstructure:"request_limit_mib"`

// WaitingLimitMiB is the limit on the amount of data waiting to be consumed.
// This is a dimension of memory limiting to ensure waiters are not consuming an
// unexpectedly large amount of memory in receivers that use it.
WaitingLimitMiB uint64 `mapstructure:"waiting_limit_mib"`
}

var _ component.Config = (*Config)(nil)

func NewDefaultConfig() *Config {
// Note that users are required to configure the primary limit in
// all models. For GCStats, the critical config is MemoryLimitMiB,
// for Admission, the critical config is RequestLimitMiB.
return &Config{
MinGCIntervalWhenSoftLimited: 10 * time.Second,
Model: "gcstats",
GCStats: GCStatsConfig{
MemoryLimitMiB: 0,
MinGCIntervalWhenSoftLimited: 10 * time.Second,
},
Admission: AdmissionConfig{
RequestLimitMiB: 0,
WaitingLimitMiB: 0,
},
}
}

// Validate checks if the processor configuration is valid
func (cfg *Config) Validate() error {
if cfg.CheckInterval <= 0 {
return errCheckIntervalOutOfRange
}
if cfg.MinGCIntervalWhenSoftLimited < cfg.MinGCIntervalWhenHardLimited {
return errInconsistentGCMinInterval
}
if cfg.MemoryLimitMiB == 0 && cfg.MemoryLimitPercentage == 0 {
return errLimitOutOfRange
}
if cfg.MemoryLimitPercentage > 100 || cfg.MemorySpikePercentage > 100 {
return errLimitPercentageOutOfRange
}
if cfg.MemoryLimitMiB > 0 && cfg.MemoryLimitMiB <= cfg.MemorySpikeLimitMiB {
return errSpikeLimitOutOfRange
}
if cfg.MemoryLimitPercentage > 0 && cfg.MemoryLimitPercentage <= cfg.MemorySpikePercentage {
return errSpikeLimitPercentageOutOfRange
switch strings.ToLower(cfg.Model) {
// The default branch includes the case where Model is unset,
// for backwards compatibility.
case "", "gcstats":
if cfg.GCStats.CheckInterval <= 0 {
return errCheckIntervalOutOfRange
}
if cfg.GCStats.MinGCIntervalWhenSoftLimited < cfg.GCStats.MinGCIntervalWhenHardLimited {
return errInconsistentGCMinInterval
}
if cfg.GCStats.MemoryLimitMiB == 0 && cfg.GCStats.MemoryLimitPercentage == 0 {
return errLimitOutOfRange
}
if cfg.GCStats.MemoryLimitPercentage > 100 || cfg.GCStats.MemorySpikePercentage > 100 {
return errLimitPercentageOutOfRange
}
if cfg.GCStats.MemoryLimitMiB > 0 && cfg.GCStats.MemoryLimitMiB <= cfg.GCStats.MemorySpikeLimitMiB {
return errSpikeLimitOutOfRange
}
if cfg.GCStats.MemoryLimitPercentage > 0 && cfg.GCStats.MemoryLimitPercentage <= cfg.GCStats.MemorySpikePercentage {
return errSpikeLimitPercentageOutOfRange
}
case "admission":
if cfg.Admission.RequestLimitMiB == 0 {
return errAdmissionLimitOutOfRange
}
}
return nil
}
89 changes: 66 additions & 23 deletions internal/memorylimiter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ func TestUnmarshalConfig(t *testing.T) {
assert.NoError(t, cm.Unmarshal(&cfg))
assert.Equal(t,
&Config{
CheckInterval: 5 * time.Second,
MemoryLimitMiB: 4000,
MemorySpikeLimitMiB: 500,
GCStats: GCStatsConfig{
CheckInterval: 5 * time.Second,
MemoryLimitMiB: 4000,
MemorySpikeLimitMiB: 500,
},
}, cfg)
}

Expand All @@ -36,65 +38,106 @@ func TestConfigValidate(t *testing.T) {
{
name: "valid",
cfg: &Config{
MemoryLimitMiB: 5722,
MemorySpikeLimitMiB: 1907,
CheckInterval: 100 * time.Millisecond,
GCStats: GCStatsConfig{
MemoryLimitMiB: 5722,
MemorySpikeLimitMiB: 1907,
CheckInterval: 100 * time.Millisecond,
},
},
err: nil,
},
{
name: "zero check interval",
cfg: &Config{
CheckInterval: 0,
GCStats: GCStatsConfig{
CheckInterval: 0,
},
},
err: errCheckIntervalOutOfRange,
},
{
name: "unset memory limit",
cfg: &Config{
CheckInterval: 1 * time.Second,
MemoryLimitMiB: 0,
MemoryLimitPercentage: 0,
GCStats: GCStatsConfig{
CheckInterval: 1 * time.Second,
MemoryLimitMiB: 0,
MemoryLimitPercentage: 0,
},
},
err: errLimitOutOfRange,
},
{
name: "invalid memory spike limit",
cfg: &Config{
CheckInterval: 1 * time.Second,
MemoryLimitMiB: 10,
MemorySpikeLimitMiB: 10,
GCStats: GCStatsConfig{
CheckInterval: 1 * time.Second,
MemoryLimitMiB: 10,
MemorySpikeLimitMiB: 10,
},
},
err: errSpikeLimitOutOfRange,
},
{
name: "invalid memory percentage limit",
cfg: &Config{
CheckInterval: 1 * time.Second,
MemoryLimitPercentage: 101,
GCStats: GCStatsConfig{
CheckInterval: 1 * time.Second,
MemoryLimitPercentage: 101,
},
},
err: errLimitPercentageOutOfRange,
},
{
name: "invalid memory spike percentage limit",
cfg: &Config{
CheckInterval: 1 * time.Second,
MemoryLimitPercentage: 50,
MemorySpikePercentage: 60,
GCStats: GCStatsConfig{
CheckInterval: 1 * time.Second,
MemoryLimitPercentage: 50,
MemorySpikePercentage: 60,
},
},
err: errSpikeLimitPercentageOutOfRange,
},
{
name: "invalid gc intervals",
cfg: &Config{
CheckInterval: 100 * time.Millisecond,
MinGCIntervalWhenSoftLimited: 50 * time.Millisecond,
MinGCIntervalWhenHardLimited: 100 * time.Millisecond,
MemoryLimitMiB: 5722,
MemorySpikeLimitMiB: 1907,
GCStats: GCStatsConfig{
CheckInterval: 100 * time.Millisecond,
MinGCIntervalWhenSoftLimited: 50 * time.Millisecond,
MinGCIntervalWhenHardLimited: 100 * time.Millisecond,
MemoryLimitMiB: 5722,
MemorySpikeLimitMiB: 1907,
},
},
err: errInconsistentGCMinInterval,
},
{
name: "missing request_limit",
cfg: &Config{
Model: "admission",
Admission: AdmissionConfig{},
},
err: errAdmissionLimitOutOfRange,
},
{
name: "valid request_limit without waiting_limit",
cfg: &Config{
Model: "admission",
Admission: AdmissionConfig{
RequestLimitMiB: 128,
},
},
},
{
name: "valid request_limit with waiting_limit",
cfg: &Config{
Model: "admission",
Admission: AdmissionConfig{
RequestLimitMiB: 128,
WaitingLimitMiB: 32,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
24 changes: 12 additions & 12 deletions internal/memorylimiter/memorylimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ func NewMemoryLimiter(cfg *Config, logger *zap.Logger) (*MemoryLimiter, error) {
logger.Info("Memory limiter configured",
zap.Uint64("limit_mib", usageChecker.memAllocLimit/mibBytes),
zap.Uint64("spike_limit_mib", usageChecker.memSpikeLimit/mibBytes),
zap.Duration("check_interval", cfg.CheckInterval))
zap.Duration("check_interval", cfg.GCStats.CheckInterval))

return &MemoryLimiter{
usageChecker: *usageChecker,
memCheckWait: cfg.CheckInterval,
ticker: time.NewTicker(cfg.CheckInterval),
minGCIntervalWhenSoftLimited: cfg.MinGCIntervalWhenSoftLimited,
minGCIntervalWhenHardLimited: cfg.MinGCIntervalWhenHardLimited,
memCheckWait: cfg.GCStats.CheckInterval,
ticker: time.NewTicker(cfg.GCStats.CheckInterval),
minGCIntervalWhenSoftLimited: cfg.GCStats.MinGCIntervalWhenSoftLimited,
minGCIntervalWhenHardLimited: cfg.GCStats.MinGCIntervalWhenHardLimited,
lastGCDone: time.Now(),
readMemStatsFn: ReadMemStatsFn,
runGCFn: runtime.GC,
Expand Down Expand Up @@ -136,9 +136,9 @@ func (ml *MemoryLimiter) MustRefuse() bool {
}

func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, error) {
memAllocLimit := uint64(cfg.MemoryLimitMiB) * mibBytes
memSpikeLimit := uint64(cfg.MemorySpikeLimitMiB) * mibBytes
if cfg.MemoryLimitMiB != 0 {
memAllocLimit := uint64(cfg.GCStats.MemoryLimitMiB) * mibBytes
memSpikeLimit := uint64(cfg.GCStats.MemorySpikeLimitMiB) * mibBytes
if cfg.GCStats.MemoryLimitMiB != 0 {
return newFixedMemUsageChecker(memAllocLimit, memSpikeLimit), nil
}
totalMemory, err := GetMemoryFn()
Expand All @@ -147,10 +147,10 @@ func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, erro
}
logger.Info("Using percentage memory limiter",
zap.Uint64("total_memory_mib", totalMemory/mibBytes),
zap.Uint32("limit_percentage", cfg.MemoryLimitPercentage),
zap.Uint32("spike_limit_percentage", cfg.MemorySpikePercentage))
return newPercentageMemUsageChecker(totalMemory, uint64(cfg.MemoryLimitPercentage),
uint64(cfg.MemorySpikePercentage)), nil
zap.Uint32("limit_percentage", cfg.GCStats.MemoryLimitPercentage),
zap.Uint32("spike_limit_percentage", cfg.GCStats.MemorySpikePercentage))
return newPercentageMemUsageChecker(totalMemory, uint64(cfg.GCStats.MemoryLimitPercentage),
uint64(cfg.GCStats.MemorySpikePercentage)), nil
}

func (ml *MemoryLimiter) readMemStats() *runtime.MemStats {
Expand Down
Loading
Loading