Skip to content

Commit

Permalink
Change throttle limit distribution logic (#670)
Browse files Browse the repository at this point in the history
* Change throttle limit distribution logic

* Fix
  • Loading branch information
kirillov6 authored Sep 12, 2024
1 parent bbc5cd0 commit 8c767d0
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 52 deletions.
9 changes: 6 additions & 3 deletions plugin/action/throttle/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ It allows to distribute the `default_limit` between events by condition.
> All events for which the value in the `field` doesn't fall into any of the distributions:
> * fall into default distribution, if it exists
> * throttled, otherwise
> 3. **default distribution** can "steal" limit from other distributions after it has exhausted its.
> This is done in order to avoid reserving limits for explicitly defined distributions.
`LimitDistributionConfig` example:
```yaml
Expand All @@ -108,9 +110,10 @@ ratios:
values: ['warn', 'info']
```
For this config and the `default_limit=100`:
* events with `log.level=error` will have a limit of 50
* events with `log.level=warn` or `log.level=info` will have a limit of 30
* all other events will have a limit of 20
* events with `log.level=error` will be NO MORE than `50`
* events with `log.level=warn` or `log.level=info` will be NO MORE than `30`
* there will be AT LEAST `20` other events
(can be up to `100` if there are no events with `log.level=error/warn/info`)

<br>

Expand Down
2 changes: 1 addition & 1 deletion plugin/action/throttle/distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (ld *limitDistributions) size() int {
return len(ld.distributions)
}

// get returns (index, distribution limit) by key or (-1, default distribution limit) otherwise
// getLimit returns (index, distribution limit) by key or (-1, default distribution limit) otherwise
func (ld *limitDistributions) getLimit(key string) (int, int64) {
if idx, ok := ld.idxByKey[key]; ok {
return idx, ld.distributions[idx].limit
Expand Down
89 changes: 65 additions & 24 deletions plugin/action/throttle/in_memory_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,16 @@ func (l *inMemoryLimiter) isAllowed(event *pipeline.Event, ts time.Time) bool {
l.lock()
defer l.unlock()

id := l.rebuildBuckets(ts)
index := id - l.buckets.getMinID()

// If the limit is given with distribution, then distributed buckets are used
distrIdx := 0
distrFieldVal := ""
if l.limit.distributions.isEnabled() {
distrFieldVal = event.Root.Dig(l.limit.distributions.field...).AsString()
distrIdx, limit = l.limit.distributions.getLimit(distrFieldVal)

// The distribution index in the bucket matches the distribution value index in distributions,
// but is shifted by 1 because default distribution has index 0.
distrIdx++
distrFieldVal, distrIdx, limit = l.getDistrData(index, event)
}

id := l.rebuildBuckets(ts)
index := id - l.buckets.getMinID()
switch l.limit.kind {
case "", limitKindCount:
l.buckets.add(index, distrIdx, 1)
Expand All @@ -93,28 +89,73 @@ func (l *inMemoryLimiter) isAllowed(event *pipeline.Event, ts time.Time) bool {
}

isAllowed := l.buckets.get(index, distrIdx) <= limit

if !isAllowed && l.limit.distributions.isEnabled() {
l.metricLabelsBuf = l.metricLabelsBuf[:0]

l.metricLabelsBuf = append(l.metricLabelsBuf, distrFieldVal)
for _, lbl := range l.limitDistrMetrics.CustomLabels {
val := "not_set"
node := event.Root.Dig(lbl)
if node != nil {
val = node.AsString()
}
l.metricLabelsBuf = append(l.metricLabelsBuf, val)
l.updateDistrMetrics(distrFieldVal, event)
}

return isAllowed
}

// getDistrData returns distribution field value, index and limit
func (l *inMemoryLimiter) getDistrData(bucketIdx int, event *pipeline.Event) (string, int, int64) {
fieldVal := event.Root.Dig(l.limit.distributions.field...).AsString()
idx, limit := l.limit.distributions.getLimit(fieldVal)

// The distribution index in the bucket matches the distribution value index in distributions,
// but is shifted by 1 because default distribution has index 0.
idx++

if idx > 0 {
return fieldVal, idx, limit
}

// For default distribution сheck in advance that we are within the limit.
// If not, then try to steal reserve from the most free distribution.
val := int64(1)
if l.limit.kind == limitKindSize {
val = int64(event.Size)
}

// Within the limit
if l.buckets.get(bucketIdx, idx)+val <= limit {
return fieldVal, idx, limit
}

// Looking for a distribution with the most free space.
// If found, updating idx and limit - use different bucket for check allowance.
maxDiff := int64(-1)
for i, d := range l.limit.distributions.distributions {
curVal := l.buckets.get(bucketIdx, i+1)
if curDiff := d.limit - (curVal + val); curDiff > maxDiff {
maxDiff = curDiff
idx = i + 1
limit = d.limit
}
}

return fieldVal, idx, limit
}

func (l *inMemoryLimiter) updateDistrMetrics(fieldVal string, event *pipeline.Event) {
l.metricLabelsBuf = l.metricLabelsBuf[:0]

switch l.limit.kind {
case "", limitKindCount:
l.limitDistrMetrics.EventsCount.WithLabelValues(l.metricLabelsBuf...).Inc()
case limitKindSize:
l.limitDistrMetrics.EventsSize.WithLabelValues(l.metricLabelsBuf...).Add(float64(event.Size))
l.metricLabelsBuf = append(l.metricLabelsBuf, fieldVal)
for _, lbl := range l.limitDistrMetrics.CustomLabels {
val := "not_set"
node := event.Root.Dig(lbl)
if node != nil {
val = node.AsString()
}
l.metricLabelsBuf = append(l.metricLabelsBuf, val)
}

return isAllowed
switch l.limit.kind {
case "", limitKindCount:
l.limitDistrMetrics.EventsCount.WithLabelValues(l.metricLabelsBuf...).Inc()
case limitKindSize:
l.limitDistrMetrics.EventsSize.WithLabelValues(l.metricLabelsBuf...).Add(float64(event.Size))
}
}

func (l *inMemoryLimiter) lock() {
Expand Down
9 changes: 6 additions & 3 deletions plugin/action/throttle/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ type Config struct {
// >> All events for which the value in the `field` doesn't fall into any of the distributions:
// >> * fall into default distribution, if it exists
// >> * throttled, otherwise
// >> 3. **default distribution** can "steal" limit from other distributions after it has exhausted its.
// >> This is done in order to avoid reserving limits for explicitly defined distributions.
// >
// > `LimitDistributionConfig` example:
// > ```yaml
Expand All @@ -155,9 +157,10 @@ type Config struct {
// > values: ['warn', 'info']
// > ```
// > For this config and the `default_limit=100`:
// > * events with `log.level=error` will have a limit of 50
// > * events with `log.level=warn` or `log.level=info` will have a limit of 30
// > * all other events will have a limit of 20
// > * events with `log.level=error` will be NO MORE than `50`
// > * events with `log.level=warn` or `log.level=info` will be NO MORE than `30`
// > * there will be AT LEAST `20` other events
// > (can be up to `100` if there are no events with `log.level=error/warn/info`)
LimitDistribution LimitDistributionConfig `json:"limit_distribution" child:"true"` // *
}

Expand Down
48 changes: 27 additions & 21 deletions plugin/action/throttle/throttle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,32 +581,38 @@ func TestThrottleWithDistribution(t *testing.T) {
outEvents[level]++
wg.Done()
})

// error - limit 6
// info, warn - limit 4
// default - limit 2
events := []string{
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`, // 1/6
`{"time":"%s","k8s_pod":"pod_1","level":"info"}`, // 1/4
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`, // 2/6
`{"time":"%s","k8s_pod":"pod_1","level":""}`, // 1/2
`{"time":"%s","k8s_pod":"pod_1","level":"debug"}`, // 2/2
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`, // 3/6
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`, // 4/6
`{"time":"%s","k8s_pod":"pod_1","level":"debug"}`, // steal from "info, warn" - 2/4
`{"time":"%s","k8s_pod":"pod_1","level":"warn"}`, // 3/4
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`, // 5/6
`{"time":"%s","k8s_pod":"pod_1","level":"info"}`, // 4/4
`{"time":"%s","k8s_pod":"pod_1","level":"debug"}`, // steal from "error" - 6/6
`{"time":"%s","k8s_pod":"pod_1","level":"info"}`, // throttled
`{"time":"%s","k8s_pod":"pod_1","level":"warn"}`, // throttled
`{"time":"%s","k8s_pod":"pod_1","level":""}`, // throttled
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`, // throttled
`{"time":"%s","k8s_pod":"pod_1","level":"debug"}`, // throttled
}

wantOutEvents := map[string]int{
"error": 6,
"info": 3,
"error": 5,
"info": 2,
"warn": 1,
"debug": 1,
"debug": 3,
"": 1,
}

events := []string{
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"info"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"warn"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
`{"time":"%s","k8s_pod":"pod_1","level":""}`,
`{"time":"%s","k8s_pod":"pod_1","level":"info"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"info"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"debug"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"warn"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"debug"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
}

nowTs := time.Now().Format(time.RFC3339Nano)
for i := 0; i < len(events); i++ {
json := fmt.Sprintf(events[i], nowTs)
Expand Down

0 comments on commit 8c767d0

Please sign in to comment.