Skip to content

Commit

Permalink
Merge pull request #759 from go-graphite/cedwards/fix-hitcount
Browse files Browse the repository at this point in the history
Fix hitcount function handling of intervals and alignToInterval
  • Loading branch information
Civil authored Apr 28, 2023
2 parents a47f80f + d25aa85 commit d53ae1c
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 69 deletions.
12 changes: 6 additions & 6 deletions expr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ func (eval evaluator) FetchAndEvalExprs(ctx context.Context, exprs []parser.Expr

haveFallbackSeries := false
for _, exp := range exprs {
for _, m := range exp.Metrics() {
for _, m := range exp.Metrics(from, until) {
fetchRequest := pb.FetchRequest{
Name: m.Metric,
PathExpression: m.Metric,
StartTime: m.From + from,
StopTime: m.Until + until,
StartTime: m.From,
StopTime: m.Until,
MaxDataPoints: maxDataPoints,
}
metricRequest := parser.MetricRequest{
Expand Down Expand Up @@ -126,12 +126,12 @@ func (eval evaluator) FetchAndEvalExp(ctx context.Context, exp parser.Expr, from
// values related to this particular `target=`
targetValues := make(map[parser.MetricRequest][]*types.MetricData)

for _, m := range exp.Metrics() {
for _, m := range exp.Metrics(from, until) {
fetchRequest := pb.FetchRequest{
Name: m.Metric,
PathExpression: m.Metric,
StartTime: m.From + from,
StopTime: m.Until + until,
StartTime: m.From,
StopTime: m.Until,
MaxDataPoints: maxDataPoints,
}
metricRequest := parser.MetricRequest{
Expand Down
120 changes: 71 additions & 49 deletions expr/functions/hitcount/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@ package hitcount

import (
"context"
"fmt"
"math"
"strconv"
"strings"

"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/interfaces"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
pb "github.com/go-graphite/protocol/carbonapi_v3_pb"
"math"
"strconv"
"strings"
)

type hitcount struct {
Expand Down Expand Up @@ -39,33 +37,41 @@ func (f *hitcount) Do(ctx context.Context, e parser.Expr, from, until int64, val
return nil, parser.ErrMissingArgument
}

args, err := helper.GetSeriesArg(ctx, e.Arg(0), from, until, values)
bucketSizeInt32, err := e.GetIntervalArg(1, 1)
if err != nil {
return nil, err
}
interval := int64(bucketSizeInt32)

if len(args) == 0 {
return []*types.MetricData{}, nil
}

bucketSizeInt32, err := e.GetIntervalArg(1, 1)
alignToInterval, err := e.GetBoolNamedOrPosArgDefault("alignToInterval", 2, false)
if err != nil {
return nil, err
}
bucketSize := int64(bucketSizeInt32)

alignToInterval, err := e.GetBoolNamedOrPosArgDefault("alignToInterval", 2, false)
if alignToInterval {
// from needs to be adjusted before grabbing the series arg as it has been adjusted in the metric request
from = helper.AlignStartToInterval(from, until, interval)
}

args, err := helper.GetSeriesArg(ctx, e.Arg(0), from, until, values)
if err != nil {
return nil, err
}

if len(args) == 0 {
return []*types.MetricData{}, nil
}

start := args[0].StartTime
stop := args[0].StopTime

// Note: the start time for the fetch request is adjusted in expr.Metrics() so that the fetched
// data is already aligned by interval if this parameter is set to true
if alignToInterval {
start = helper.AlignStartToInterval(start, stop, bucketSize)
intervalCount := (stop - start) / interval
stop = start + (intervalCount * interval) + interval
}

buckets := helper.GetBuckets(start, stop, bucketSize)
results := make([]*types.MetricData, 0, len(args))
for _, arg := range args {
var nameBuf strings.Builder
Expand All @@ -82,53 +88,69 @@ func (f *hitcount) Do(ctx context.Context, e parser.Expr, from, until int64, val
}
nameBuf.WriteString(")")

bucketCount := helper.GetBuckets(start, stop, interval)

r := &types.MetricData{
FetchResponse: pb.FetchResponse{
Name: nameBuf.String(),
Values: make([]float64, buckets, buckets+1),
StepTime: bucketSize,
StartTime: start,
StopTime: stop,
ConsolidationFunc: "max",
Name: nameBuf.String(),
StepTime: interval,
StartTime: start,
StopTime: stop,
},
Tags: helper.CopyTags(arg),
}
r.Tags["hitcount"] = fmt.Sprintf("%d", bucketSizeInt32)

bucketEnd := start + bucketSize
t := arg.StartTime
ridx := 0
var count float64
bucketItems := 0
for _, v := range arg.Values {
bucketItems++
if !math.IsNaN(v) {
if math.IsNaN(count) {
count = 0
}
r.Tags["hitcount"] = strconv.FormatInt(int64(bucketSizeInt32), 10)

count += v * float64(arg.StepTime)
step := arg.StepTime
buckets := make([][]float64, bucketCount)
newStart := stop - bucketCount*interval
r.StartTime = newStart

for i, v := range arg.Values {
if math.IsNaN(v) {
continue
}

t += arg.StepTime
start_time := arg.StartTime + int64(i)*step
startBucket, startMod := helper.Divmod(start_time-newStart, interval)
end_time := start_time + step
endBucket, endMod := helper.Divmod(end_time-newStart, interval)

if t >= stop {
break
if endBucket >= bucketCount {
endBucket = bucketCount - 1
endMod = interval
}

if t >= bucketEnd {
r.Values[ridx] = count

ridx++
bucketEnd += bucketSize
count = math.NaN()
bucketItems = 0
if startBucket == endBucket {
// All hits go into a single bucket
if startBucket >= 0 {
buckets[startBucket] = append(buckets[startBucket], v*float64(endMod-startMod))
}
} else {
// Spread the hits amongst 2 or more buckets
if startBucket >= 0 {
buckets[startBucket] = append(buckets[startBucket], v*float64(interval-startMod))
}
hitsPerBucket := v * float64(interval)
for j := startBucket + 1; j < endBucket; j++ {
buckets[j] = append(buckets[j], hitsPerBucket)
}
if endMod > 0 {
buckets[endBucket] = append(buckets[endBucket], v*float64(endMod))
}
}
}

// remaining values
if bucketItems > 0 {
r.Values[ridx] = count
r.Values = make([]float64, len(buckets))
for i, bucket := range buckets {
if len(bucket) != 0 {
var sum float64
for _, v := range bucket {
sum += v
}
r.Values[i] = sum
} else {
r.Values[i] = math.NaN()
}
}

results = append(results, r)
Expand Down
32 changes: 23 additions & 9 deletions expr/functions/hitcount/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ func TestHitcount(t *testing.T) {
math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(),
5}, 5, now32)},
},
[]float64{35, 70, 105, 140, math.NaN(), 25},
[]float64{5, 40, 75, 110, 120, 25},

"hitcount(metric1,'30s')",
30,
now32,
1410344975,
now32 + 31*5,
},
{
Expand All @@ -72,7 +73,7 @@ func TestHitcount(t *testing.T) {
[]float64{375},
"hitcount(metric1,'1h')",
3600,
tenFiftyNine,
1410343265,
tenFiftyNine + 25*5,
},
{
Expand All @@ -83,11 +84,11 @@ func TestHitcount(t *testing.T) {
3, 3, 3, 4, 4, 4, 4, 4, 5, 5, 5, 5,
5}, 5, tenFiftyNine)},
},
[]float64{105, 270},
[]float64{375},
"hitcount(metric1,'1h',true)",
3600,
tenFiftyNine - (59 * 60),
tenFiftyNine + 25*5,
tenFiftyNine,
tenFiftyNine + (((tenFiftyNine + 25*5) - tenFiftyNine) / 3600) + 3600, // The end time is adjusted because of alignToInterval being set to true
},
{
"hitcount(metric1,\"1h\",alignToInterval=true)",
Expand All @@ -97,11 +98,24 @@ func TestHitcount(t *testing.T) {
3, 3, 3, 4, 4, 4, 4, 4, 5, 5, 5, 5,
5}, 5, tenFiftyNine)},
},
[]float64{105, 270},
[]float64{375},
"hitcount(metric1,'1h',true)",
3600,
tenFiftyNine - (59 * 60),
tenFiftyNine + 25*5,
tenFiftyNine,
tenFiftyNine + (((tenFiftyNine + 25*5) - tenFiftyNine) / 3600) + 3600, // The end time is adjusted because of alignToInterval being set to true
},
{
"hitcount(metric1,\"15s\")", // Test having a smaller interval than the data's step
map[parser.MetricRequest][]*types.MetricData{
{"metric1", 0, 1}: {types.MakeMetricData("metric1", []float64{
11, 7, 19, 32, 23}, 30, now32)},
},
[]float64{165, 165, 105, 105, 285, 285, 480, 480, 345, 345},

"hitcount(metric1,'15s')",
15,
now32,
now32 + 5*30,
},
}

Expand Down
6 changes: 6 additions & 0 deletions expr/helper/align.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,9 @@ func genNaNs(length int) []float64 {
}
return nans
}

func Divmod(numerator, denominator int64) (quotient, remainder int64) {
quotient = numerator / denominator // integer division, decimals are truncated
remainder = numerator % denominator
return
}
2 changes: 1 addition & 1 deletion pkg/parser/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type Expr interface {
MutateRawArgs(args string) Expr

// Metrics returns list of metric requests
Metrics() []MetricRequest
Metrics(from, until int64) []MetricRequest

// GetIntervalArg returns interval typed argument.
GetIntervalArg(n int, defaultSign int) (int32, error)
Expand Down
38 changes: 34 additions & 4 deletions pkg/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,23 +137,23 @@ func (e *expr) NamedArg(name string) (Expr, bool) {
return expr, exist
}

func (e *expr) Metrics() []MetricRequest {
func (e *expr) Metrics(from, until int64) []MetricRequest {
switch e.etype {
case EtName:
return []MetricRequest{{Metric: e.target}}
return []MetricRequest{{Metric: e.target, From: from, Until: until}}
case EtConst, EtString:
return nil
case EtFunc:
var r []MetricRequest
for _, a := range e.args {
r = append(r, a.Metrics()...)
r = append(r, a.Metrics(from, until)...)
}

switch e.target {
case "transformNull":
referenceSeriesExpr := e.GetNamedArg("referenceSeries")
if !referenceSeriesExpr.IsInterfaceNil() {
r = append(r, referenceSeriesExpr.Metrics()...)
r = append(r, referenceSeriesExpr.Metrics(from, until)...)
}
case "timeShift":
offs, err := e.GetIntervalArg(1, -1)
Expand Down Expand Up @@ -212,6 +212,36 @@ func (e *expr) Metrics() []MetricRequest {
r[i].From = fromNew
}
}
case "hitcount":
if len(e.args) < 2 {
return nil
}

alignToInterval, err := e.GetBoolNamedOrPosArgDefault("alignToInterval", 2, false)
if err != nil {
return nil
}
if alignToInterval {
bucketSizeInt32, err := e.GetIntervalArg(1, 1)
if err != nil {
return nil
}

interval := int64(bucketSizeInt32)
// This is done in order to replicate the behavior in Graphite web when alignToInterval is set,
// in which new data is fetched with the adjusted start time.
for i, _ := range r {
start := r[i].From
for _, v := range []int64{86400, 3600, 60} {
if interval >= v {
start -= start % v
break
}
}

r[i].From = start
}
}
}
return r
}
Expand Down
Loading

0 comments on commit d53ae1c

Please sign in to comment.