Skip to content

Commit

Permalink
expr: refactor for avoid global evaluator usage
Browse files Browse the repository at this point in the history
  • Loading branch information
msaf1980 committed Mar 14, 2024
1 parent 159cfe3 commit 7f00f02
Show file tree
Hide file tree
Showing 239 changed files with 1,648 additions and 1,601 deletions.
14 changes: 12 additions & 2 deletions cmd/carbonapi/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"time"

"github.com/go-graphite/carbonapi/cache"
"github.com/go-graphite/carbonapi/cmd/carbonapi/interfaces"
"github.com/go-graphite/carbonapi/expr"
"github.com/go-graphite/carbonapi/expr/interfaces"
"github.com/go-graphite/carbonapi/limiter"
"github.com/go-graphite/carbonapi/pkg/tlsconfig"
zipperCfg "github.com/go-graphite/carbonapi/zipper/config"
zipper "github.com/go-graphite/carbonapi/zipper/interfaces"
zipperTypes "github.com/go-graphite/carbonapi/zipper/types"

"github.com/lomik/zapwriter"
Expand Down Expand Up @@ -116,10 +118,12 @@ type ConfigType struct {
DefaultTimeZone *time.Location `mapstructure:"-" json:"-"`

// ZipperInstance is API entry to carbonzipper
ZipperInstance interfaces.CarbonZipper `mapstructure:"-" json:"-"`
ZipperInstance zipper.CarbonZipper `mapstructure:"-" json:"-"`

// Limiter limits concurrent zipper requests
Limiter limiter.SimpleLimiter `mapstructure:"-" json:"-"`

Evaluator interfaces.Evaluator `mapstructure:"-" json:"-"`
}

// skipcq: CRT-P0003
Expand All @@ -132,6 +136,12 @@ func (c ConfigType) String() string {
}
}

func (c *ConfigType) SetZipper(zipper zipper.CarbonZipper) (err error) {
c.ZipperInstance = zipper
c.Evaluator, err = expr.NewEvaluator(c.Limiter, c.ZipperInstance)
return
}

var Config = ConfigType{
ExtrapolateExperiment: false,
Buckets: 10,
Expand Down
2 changes: 1 addition & 1 deletion cmd/carbonapi/http/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func init() {
config.Config.Upstreams.Backends = []string{"dummy"}
config.SetUpConfigUpstreams(logger)
config.SetUpConfig(logger, "(test)")
config.Config.ZipperInstance = newMockCarbonZipper()
config.Config.SetZipper(newMockCarbonZipper())
emptyStringList := make([]string, 0)
InitHandlers(emptyStringList, emptyStringList)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/carbonapi/http/render_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func renderHandler(w http.ResponseWriter, r *http.Request) {

ApiMetrics.RenderRequests.Add(1)

result, errs := expr.FetchAndEvalExprs(ctx, exprs, from32, until32, values)
result, errs := expr.FetchAndEvalExprs(ctx, config.Config.Evaluator, exprs, from32, until32, values)
if errs != nil {
errors = errs
}
Expand All @@ -324,7 +324,7 @@ func renderHandler(w http.ResponseWriter, r *http.Request) {

ApiMetrics.RenderRequests.Add(1)

result, err := expr.FetchAndEvalExp(ctx, exp, from32, until32, values)
result, err := expr.FetchAndEvalExp(ctx, config.Config.Evaluator, exp, from32, until32, values)
if err != nil {
errors[target] = merry.Wrap(err)
}
Expand Down
6 changes: 5 additions & 1 deletion cmd/carbonapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ func main() {
dns.UseDNSCache(config.Config.CachingDNSRefreshTime)
}

config.Config.ZipperInstance = newZipper(carbonapiHttp.ZipperStats, &config.Config.Upstreams, config.Config.IgnoreClientTimeout, zapwriter.Logger("zipper"))
if err := config.Config.SetZipper(newZipper(carbonapiHttp.ZipperStats, &config.Config.Upstreams, config.Config.IgnoreClientTimeout, zapwriter.Logger("zipper"))); err != nil {
logger.Fatal("failed to setup zipper",
zap.Error(err),
)
}

wg := sync.WaitGroup{}
serve := func(listen config.Listener, handler http.Handler) {
Expand Down
135 changes: 72 additions & 63 deletions expr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,34 @@ package expr

import (
"context"
"errors"

"github.com/ansel1/merry"
pb "github.com/go-graphite/protocol/carbonapi_v3_pb"

"github.com/go-graphite/carbonapi/cmd/carbonapi/config"
_ "github.com/go-graphite/carbonapi/expr/functions"
"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/interfaces"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/limiter"
"github.com/go-graphite/carbonapi/pkg/parser"
utilctx "github.com/go-graphite/carbonapi/util/ctx"
zipper "github.com/go-graphite/carbonapi/zipper/interfaces"
)

type evaluator struct{}
var ErrZipperNotInit = errors.New("zipper not initialized")

func (eval evaluator) Fetch(ctx context.Context, exprs []parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (map[parser.MetricRequest][]*types.MetricData, error) {
if err := config.Config.Limiter.Enter(ctx); err != nil {
type Evaluator struct {
limiter limiter.SimpleLimiter
zipper zipper.CarbonZipper
}

func (eval Evaluator) Fetch(ctx context.Context, exprs []parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (map[parser.MetricRequest][]*types.MetricData, error) {
if err := eval.limiter.Enter(ctx); err != nil {
return nil, err
}
defer config.Config.Limiter.Leave()
defer eval.limiter.Leave()

multiFetchRequest := pb.MultiFetchRequest{}
metricRequestCache := make(map[string]parser.MetricRequest)
Expand Down Expand Up @@ -74,7 +82,7 @@ func (eval evaluator) Fetch(ctx context.Context, exprs []parser.Expr, from, unti
}

if len(multiFetchRequest.Metrics) > 0 {
metrics, _, err := config.Config.ZipperInstance.Render(ctx, multiFetchRequest)
metrics, _, err := eval.zipper.Render(ctx, multiFetchRequest)
// If we had only partial result, we want to do our best to actually do our job
if err != nil && merry.HTTPCode(err) >= 400 && !haveFallbackSeries {
return nil, err
Expand All @@ -97,16 +105,16 @@ func (eval evaluator) Fetch(ctx context.Context, exprs []parser.Expr, from, unti
targetValues[m] = values[m]
}

if config.Config.ZipperInstance.ScaleToCommonStep() {
if eval.zipper.ScaleToCommonStep() {
targetValues = helper.ScaleValuesToCommonStep(targetValues)
}

return targetValues, nil
}

// Eval evaluates expressions.
func (eval evaluator) Eval(ctx context.Context, exp parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (results []*types.MetricData, err error) {
rewritten, targets, err := RewriteExpr(ctx, exp, from, until, values)
func (eval Evaluator) Eval(ctx context.Context, exp parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (results []*types.MetricData, err error) {
rewritten, targets, err := RewriteExpr(ctx, eval, exp, from, until, values)
if err != nil {
return nil, err
}
Expand All @@ -128,63 +136,19 @@ func (eval evaluator) Eval(ctx context.Context, exp parser.Expr, from, until int
}
return results, nil
}
return EvalExpr(ctx, exp, from, until, values)
}

var _evaluator = evaluator{}

func init() {
helper.SetEvaluator(_evaluator)
metadata.SetEvaluator(_evaluator)
}

// FetchAndEvalExp fetch data and evaluates expressions
func FetchAndEvalExp(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, merry.Error) {
targetValues, err := _evaluator.Fetch(ctx, []parser.Expr{e}, from, until, values)
if err != nil {
return nil, merry.Wrap(err)
}

res, err := _evaluator.Eval(ctx, e, from, until, targetValues)
if err != nil {
return nil, merry.Wrap(err)
}

for mReq := range values {
SortMetrics(values[mReq], mReq)
}

return res, nil
return EvalExpr(ctx, eval, exp, from, until, values)
}

func FetchAndEvalExprs(ctx context.Context, exprs []parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, map[string]merry.Error) {
targetValues, err := _evaluator.Fetch(ctx, exprs, from, until, values)
if err != nil {
return nil, map[string]merry.Error{"*": merry.Wrap(err)}
}

res := make([]*types.MetricData, 0, len(exprs))
var errors map[string]merry.Error
for _, exp := range exprs {
evaluationResult, err := _evaluator.Eval(ctx, exp, from, until, targetValues)
if err != nil {
if errors == nil {
errors = make(map[string]merry.Error)
}
errors[exp.Target()] = merry.Wrap(err)
}
res = append(res, evaluationResult...)
}

for mReq := range values {
SortMetrics(values[mReq], mReq)
// NewEvaluator create evaluator with limiter and zipper
func NewEvaluator(limiter limiter.SimpleLimiter, zipper zipper.CarbonZipper) (*Evaluator, error) {
if zipper == nil {
return nil, ErrZipperNotInit
}

return res, errors
return &Evaluator{limiter: limiter, zipper: zipper}, nil
}

// EvalExpr is the main expression evaluator.
func EvalExpr(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) {
func EvalExpr(ctx context.Context, eval interfaces.Evaluator, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) {
if e.IsName() {
return values[parser.MetricRequest{Metric: e.Target(), From: from, Until: until}], nil
} else if e.IsConst() {
Expand Down Expand Up @@ -212,7 +176,7 @@ func EvalExpr(ctx context.Context, e parser.Expr, from, until int64, values map[
f, ok := metadata.FunctionMD.Functions[e.Target()]
metadata.FunctionMD.RUnlock()
if ok {
v, err := f.Do(ctx, e, from, until, values)
v, err := f.Do(ctx, eval, e, from, until, values)
if err != nil {
err = merry.WithMessagef(err, "function=%s: %s", e.Target(), err.Error())
if merry.Is(
Expand Down Expand Up @@ -242,14 +206,59 @@ func EvalExpr(ctx context.Context, e parser.Expr, from, until int64, values map[
// applyByNode(foo*, 1, "%") -> (true, ["foo1", "foo2"], nil)
// sumSeries(foo) -> (false, nil, nil)
// Assumes that applyByNode only appears as the outermost function.
func RewriteExpr(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (bool, []string, error) {
func RewriteExpr(ctx context.Context, eval interfaces.Evaluator, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (bool, []string, error) {
if e.IsFunc() {
metadata.FunctionMD.RLock()
f, ok := metadata.FunctionMD.RewriteFunctions[e.Target()]
metadata.FunctionMD.RUnlock()
if ok {
return f.Do(ctx, e, from, until, values)
return f.Do(ctx, eval, e, from, until, values)
}
}
return false, nil, nil
}

// FetchAndEvalExp fetch data and evaluates expressions
func FetchAndEvalExp(ctx context.Context, eval interfaces.Evaluator, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, merry.Error) {
targetValues, err := eval.Fetch(ctx, []parser.Expr{e}, from, until, values)
if err != nil {
return nil, merry.Wrap(err)
}

res, err := eval.Eval(ctx, e, from, until, targetValues)
if err != nil {
return nil, merry.Wrap(err)
}

for mReq := range values {
SortMetrics(values[mReq], mReq)
}

return res, nil
}

func FetchAndEvalExprs(ctx context.Context, eval interfaces.Evaluator, exprs []parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, map[string]merry.Error) {
targetValues, err := eval.Fetch(ctx, exprs, from, until, values)
if err != nil {
return nil, map[string]merry.Error{"*": merry.Wrap(err)}
}

res := make([]*types.MetricData, 0, len(exprs))
var errors map[string]merry.Error
for _, exp := range exprs {
evaluationResult, err := eval.Eval(ctx, exp, from, until, targetValues)
if err != nil {
if errors == nil {
errors = make(map[string]merry.Error)
}
errors[exp.Target()] = merry.Wrap(err)
}
res = append(res, evaluationResult...)
}

for mReq := range values {
SortMetrics(values[mReq], mReq)
}

return res, errors
}
Loading

0 comments on commit 7f00f02

Please sign in to comment.