Skip to content

Commit

Permalink
Ingester: Support logging of samples EWMA CPU load is based on (#5508)
Browse files Browse the repository at this point in the history
* Ingester: Support logging of samples EWMA CPU load is based on

To help debugging, support logging of samples EWMA CPU load is based on 
when enabling read path limiting. I'm making this logging configurable, behind
 an experimental flag.

---------

Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 authored Jul 19, 2023
1 parent e8a18bb commit 59bd495
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 13 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
* `cortex_frontend_query_result_cache_requests_total{request_type="query_range|cardinality|label_names_and_values"}`
* `cortex_frontend_query_result_cache_hits_total{request_type="query_range|cardinality|label_names_and_values"}`
* [FEATURE] Added `-<prefix>.s3.list-objects-version` flag to configure the S3 list objects version.
* [FEATURE] Ingester: Add optional CPU/memory utilization based read request limiting, considered experimental. Disabled by default, enable by configuring limits via both of the following flags: #5012 #5392 #5394 #5526
* [FEATURE] Ingester: Add optional CPU/memory utilization based read request limiting, considered experimental. Disabled by default, enable by configuring limits via both of the following flags: #5012 #5392 #5394 #5526 #5508
* `-ingester.read-path-cpu-utilization-limit`
* `-ingester.read-path-memory-utilization-limit`
* `-ingester.log-utilization-based-limiter-cpu-samples`
* [FEATURE] Ruler: Support filtering results from rule status endpoint by `file`, `rule_group` and `rule_name`. #5291
* [FEATURE] Ingester: add experimental support for creating tokens by using spread minimizing strategy. This can be enabled with `-ingester.ring.token-generation-strategy: spread-minimizing` and `-ingester.ring.spread-minimizing-zones: <all available zones>`. In that case `-ingester.ring.tokens-file-path` must be empty. #5308 #5324
* [FEATURE] Ingester: add experimental support to compact the TSDB Head when the number of in-memory series is equal or greater than `-blocks-storage.tsdb.early-head-compaction-min-in-memory-series`, and the ingester estimates that the per-tenant TSDB Head compaction will reduce in-memory series by at least `-blocks-storage.tsdb.early-head-compaction-min-estimated-series-reduction-percentage`. #5371
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -2729,6 +2729,17 @@
"fieldFlag": "ingester.read-path-memory-utilization-limit",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "log_utilization_based_limiter_cpu_samples",
"required": false,
"desc": "Enable logging of utilization based limiter CPU samples.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "ingester.log-utilization-based-limiter-cpu-samples",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1247,6 +1247,8 @@ Usage of ./cmd/mimir/mimir:
Max series that this ingester can hold (across all tenants). Requests to create additional series will be rejected. 0 = unlimited.
-ingester.instance-limits.max-tenants int
Max tenants that this ingester can hold. Requests from additional tenants will be rejected. 0 = unlimited.
-ingester.log-utilization-based-limiter-cpu-samples
[experimental] Enable logging of utilization based limiter CPU samples.
-ingester.max-global-exemplars-per-user int
[experimental] The maximum number of exemplars in memory, across the cluster. 0 to disable exemplars ingestion.
-ingester.max-global-metadata-per-metric int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,10 @@ instance_limits:
# request limiting. Use 0 to disable it.
# CLI flag: -ingester.read-path-memory-utilization-limit
[read_path_memory_utilization_limit: <int> | default = 0]
# (experimental) Enable logging of utilization based limiter CPU samples.
# CLI flag: -ingester.log-utilization-based-limiter-cpu-samples
[log_utilization_based_limiter_cpu_samples: <boolean> | default = false]
```

### querier
Expand Down
9 changes: 6 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,9 @@ type Config struct {

IgnoreSeriesLimitForMetricNames string `yaml:"ignore_series_limit_for_metric_names" category:"advanced"`

ReadPathCPUUtilizationLimit float64 `yaml:"read_path_cpu_utilization_limit" category:"experimental"`
ReadPathMemoryUtilizationLimit uint64 `yaml:"read_path_memory_utilization_limit" category:"experimental"`
ReadPathCPUUtilizationLimit float64 `yaml:"read_path_cpu_utilization_limit" category:"experimental"`
ReadPathMemoryUtilizationLimit uint64 `yaml:"read_path_memory_utilization_limit" category:"experimental"`
LogUtilizationBasedLimiterCPUSamples bool `yaml:"log_utilization_based_limiter_cpu_samples" category:"experimental"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -177,6 +178,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.StringVar(&cfg.IgnoreSeriesLimitForMetricNames, "ingester.ignore-series-limit-for-metric-names", "", "Comma-separated list of metric names, for which the -ingester.max-global-series-per-metric limit will be ignored. Does not affect the -ingester.max-global-series-per-user limit.")
f.Float64Var(&cfg.ReadPathCPUUtilizationLimit, "ingester.read-path-cpu-utilization-limit", 0, "CPU utilization limit, as CPU cores, for CPU/memory utilization based read request limiting. Use 0 to disable it.")
f.Uint64Var(&cfg.ReadPathMemoryUtilizationLimit, "ingester.read-path-memory-utilization-limit", 0, "Memory limit, in bytes, for CPU/memory utilization based read request limiting. Use 0 to disable it.")
f.BoolVar(&cfg.LogUtilizationBasedLimiterCPUSamples, "ingester.log-utilization-based-limiter-cpu-samples", false, "Enable logging of utilization based limiter CPU samples.")
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -339,7 +341,8 @@ func New(cfg Config, limits *validation.Overrides, activeGroupsCleanupService *u

if cfg.ReadPathCPUUtilizationLimit > 0 || cfg.ReadPathMemoryUtilizationLimit > 0 {
i.utilizationBasedLimiter = limiter.NewUtilizationBasedLimiter(cfg.ReadPathCPUUtilizationLimit,
cfg.ReadPathMemoryUtilizationLimit, log.WithPrefix(logger, "context", "read path"),
cfg.ReadPathMemoryUtilizationLimit, cfg.LogUtilizationBasedLimiterCPUSamples,
log.WithPrefix(logger, "context", "read path"),
prometheus.WrapRegistererWithPrefix("cortex_ingester_", registerer))
}

Expand Down
53 changes: 51 additions & 2 deletions pkg/util/limiter/utilization.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package limiter
import (
"context"
"fmt"
"strings"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -68,19 +69,27 @@ type UtilizationBasedLimiter struct {
limitingReason atomic.String
currCPUUtil atomic.Float64
currMemoryUtil atomic.Uint64
// For logging of input to CPU load EWMA calculation, keep window of source samples
cpuSamples *cpuSampleBuffer
}

// NewUtilizationBasedLimiter returns a UtilizationBasedLimiter configured with cpuLimit and memoryLimit.
func NewUtilizationBasedLimiter(cpuLimit float64, memoryLimit uint64, logger log.Logger, reg prometheus.Registerer) *UtilizationBasedLimiter {
func NewUtilizationBasedLimiter(cpuLimit float64, memoryLimit uint64, logCPUSamples bool, logger log.Logger,
reg prometheus.Registerer) *UtilizationBasedLimiter {
// Calculate alpha for a minute long window
// https://github.com/VividCortex/ewma#choosing-alpha
alpha := 2 / (resourceUtilizationSlidingWindow.Seconds()/resourceUtilizationUpdateInterval.Seconds() + 1)
var cpuSamples *cpuSampleBuffer
if logCPUSamples {
cpuSamples = newCPUSampleBuffer(int(resourceUtilizationSlidingWindow.Seconds()))
}
l := &UtilizationBasedLimiter{
logger: logger,
cpuLimit: cpuLimit,
memoryLimit: memoryLimit,
// Use a minute long window, each sample being a second apart
cpuMovingAvg: math.NewEWMARate(alpha, resourceUtilizationUpdateInterval),
cpuSamples: cpuSamples,
}
l.Service = services.NewTimerService(resourceUtilizationUpdateInterval, l.starting, l.update, nil)

Expand Down Expand Up @@ -150,6 +159,9 @@ func (l *UtilizationBasedLimiter) compute(nowFn func() time.Time) (currCPUUtil f
cpuUtil := (cpuTime - prevCPUTime) / now.Sub(prevUpdate).Seconds()
l.cpuMovingAvg.Add(int64(cpuUtil * 100))
l.cpuMovingAvg.Tick()
if l.cpuSamples != nil {
l.cpuSamples.Add(cpuUtil)
}
}

l.lastUpdate = now
Expand Down Expand Up @@ -180,7 +192,12 @@ func (l *UtilizationBasedLimiter) compute(nowFn func() time.Time) (currCPUUtil f
}

if enable {
level.Info(l.logger).Log("msg", "enabling resource utilization based limiting",
logger := l.logger
if l.cpuSamples != nil {
// Log also the CPU samples the CPU load EWMA is based on
logger = log.WithSuffix(logger, "source_samples", l.cpuSamples.String())
}
level.Info(logger).Log("msg", "enabling resource utilization based limiting",
"reason", reason, "memory_limit", formatMemoryLimit(l.memoryLimit), "memory_utilization", formatMemory(currMemoryUtil),
"cpu_limit", formatCPULimit(l.cpuLimit), "cpu_utilization", formatCPU(currCPUUtil))
} else {
Expand Down Expand Up @@ -216,3 +233,35 @@ func formatMemoryLimit(limit uint64) string {
}
return formatMemory(limit)
}

// cpuSampleBuffer is a circular buffer of CPU samples.
type cpuSampleBuffer struct {
samples []float64
head int
}

func newCPUSampleBuffer(size int) *cpuSampleBuffer {
return &cpuSampleBuffer{
samples: make([]float64, size),
}
}

// Add adds a sample to the buffer.
func (b *cpuSampleBuffer) Add(sample float64) {
b.samples[b.head] = sample
b.head = (b.head + 1) % len(b.samples)
}

// String returns a comma-separated string representation of the buffer.
func (b *cpuSampleBuffer) String() string {
var sb strings.Builder
for i := range b.samples {
s := b.samples[(b.head+i)%len(b.samples)]
sb.WriteString(fmt.Sprintf("%.2f", s))
if i < len(b.samples)-1 {
sb.WriteByte(',')
}
}

return sb.String()
}
61 changes: 54 additions & 7 deletions pkg/util/limiter/utilization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package limiter

import (
"bytes"
"fmt"
"math"
"strings"
"testing"
"time"

Expand All @@ -18,11 +20,11 @@ import (
func TestUtilizationBasedLimiter(t *testing.T) {
const gigabyte = 1024 * 1024 * 1024

setup := func(t *testing.T, cpuLimit float64, memoryLimit uint64) (*UtilizationBasedLimiter,
setup := func(t *testing.T, cpuLimit float64, memoryLimit uint64, enableLogging bool) (*UtilizationBasedLimiter,
*fakeUtilizationScanner, prometheus.Gatherer) {
fakeScanner := &fakeUtilizationScanner{}
reg := prometheus.NewPedanticRegistry()
lim := NewUtilizationBasedLimiter(cpuLimit, memoryLimit, log.NewNopLogger(), reg)
lim := NewUtilizationBasedLimiter(cpuLimit, memoryLimit, enableLogging, log.NewNopLogger(), reg)
lim.utilizationScanner = fakeScanner
require.Empty(t, lim.LimitingReason(), "Limiting should initially be disabled")

Expand All @@ -35,7 +37,7 @@ func TestUtilizationBasedLimiter(t *testing.T) {
}

t.Run("CPU based limiting should be enabled if set to a value greater than 0", func(t *testing.T) {
lim, _, reg := setup(t, 0.11, gigabyte)
lim, _, reg := setup(t, 0.11, gigabyte, true)

// Warmup the CPU utilization.
for i := 0; i < int(resourceUtilizationSlidingWindow.Seconds()); i++ {
Expand Down Expand Up @@ -72,7 +74,7 @@ func TestUtilizationBasedLimiter(t *testing.T) {
})

t.Run("CPU based limiting should be disabled if set to 0", func(t *testing.T) {
lim, _, reg := setup(t, 0, gigabyte)
lim, _, reg := setup(t, 0, gigabyte, true)

// Warmup the CPU utilization.
for i := 0; i < int(resourceUtilizationSlidingWindow.Seconds()); i++ {
Expand All @@ -97,7 +99,7 @@ func TestUtilizationBasedLimiter(t *testing.T) {
})

t.Run("memory based limiting should be enabled if set to a value greater than 0", func(t *testing.T) {
lim, fakeScanner, reg := setup(t, 0.11, gigabyte)
lim, fakeScanner, reg := setup(t, 0.11, gigabyte, true)

// Compute the utilization a first time to warm up the limiter.
lim.compute(nowFn)
Expand All @@ -122,7 +124,7 @@ func TestUtilizationBasedLimiter(t *testing.T) {
})

t.Run("memory based limiting should be disabled if set to 0", func(t *testing.T) {
lim, fakeScanner, reg := setup(t, 0.11, 0)
lim, fakeScanner, reg := setup(t, 0.11, 0, true)

// Compute the utilization a first time to warm up the limiter.
lim.compute(nowFn)
Expand All @@ -141,6 +143,51 @@ func TestUtilizationBasedLimiter(t *testing.T) {
utilization_limiter_current_memory_usage_bytes 1.073741824e+09
`)))
})

t.Run("limiting should work without CPU samples logging", func(t *testing.T) {
lim, _, _ := setup(t, 0.11, gigabyte, false)

// Warmup the CPU utilization.
for i := 0; i < int(resourceUtilizationSlidingWindow.Seconds()); i++ {
lim.compute(nowFn)
tim = tim.Add(time.Second)
}

// The fake utilization scanner linearly increases CPU usage for a minute
for i := 0; i < 59; i++ {
lim.compute(nowFn)
tim = tim.Add(time.Second)
require.Empty(t, lim.LimitingReason(), "Limiting should be disabled")
}
lim.compute(nowFn)
tim = tim.Add(time.Second)
require.Equal(t, "cpu", lim.LimitingReason(), "Limiting should be enabled due to CPU")
require.Nil(t, lim.cpuSamples)
})

t.Run("the limiter should collect the last 60 CPU samples", func(t *testing.T) {
var instValues []float64
for i := 1; i <= 62; i++ {
instValues = append(instValues, float64(i))
}
scanner := &preRecordedUtilizationScanner{instantCPUValues: instValues}
lim := NewUtilizationBasedLimiter(1, 0, true, log.NewNopLogger(), prometheus.NewPedanticRegistry())
lim.utilizationScanner = scanner

for i, ts := 0, time.Now(); i < len(instValues); i++ {
lim.compute(func() time.Time {
return ts
})
ts = ts.Add(time.Second)
}

var sampleStrs []string
for i := 2; i < 62; i++ {
sampleStrs = append(sampleStrs, fmt.Sprintf("%.2f", instValues[i]))
}
exp := strings.Join(sampleStrs, ",")
assert.Equal(t, exp, lim.cpuSamples.String())
})
}

func TestFormatCPU(t *testing.T) {
Expand Down Expand Up @@ -240,7 +287,7 @@ func TestUtilizationBasedLimiter_CPUUtilizationSensitivity(t *testing.T) {
t.Run(testName, func(t *testing.T) {
scanner := &preRecordedUtilizationScanner{instantCPUValues: testData.instantCPUValues}

lim := NewUtilizationBasedLimiter(1, 0, log.NewNopLogger(), prometheus.NewPedanticRegistry())
lim := NewUtilizationBasedLimiter(1, 0, true, log.NewNopLogger(), prometheus.NewPedanticRegistry())
lim.utilizationScanner = scanner

minCPUUtilization := float64(math.MaxInt64)
Expand Down

0 comments on commit 59bd495

Please sign in to comment.