Skip to content

Commit

Permalink
fix: empty values in state
Browse files Browse the repository at this point in the history
* Added update of variables if they were missing
  • Loading branch information
Nikolay committed Aug 19, 2020
1 parent fc81539 commit 12bac99
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 132 deletions.
68 changes: 21 additions & 47 deletions checker/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,21 @@ const (
func (triggerChecker *TriggerChecker) Check() error {
passError := false
checkData := newCheckData(triggerChecker.lastCheck, triggerChecker.until)
logIfNoValues(checkData, triggerChecker.logger, triggerChecker.triggerID, "newCheckData")
triggerMetricsData, err := triggerChecker.fetchTriggerMetrics()
if err != nil {
return triggerChecker.handleFetchError(checkData, err)
}

logIfTriggerKbaGateway(triggerChecker.logger, triggerChecker.triggerID, "fetchTriggerMetrics", triggerMetricsData)

preparedMetrics, aloneMetrics, err := triggerChecker.prepareMetrics(triggerMetricsData)
if err != nil {
logIfTriggerKbaGateway(triggerChecker.logger, triggerChecker.triggerID, fmt.Sprintf("prepareMetrics ERROR: %#v", err), "")
passError, checkData, err = triggerChecker.handlePrepareError(checkData, err)
if !passError {
return err
}
}

logIfTriggerKbaGateway(triggerChecker.logger, triggerChecker.triggerID, "preparedMetrics", preparedMetrics)
logIfTriggerKbaGateway(triggerChecker.logger, triggerChecker.triggerID, "aloneMetrics", aloneMetrics)

checkData.MetricsToTargetRelation = conversion.GetRelations(aloneMetrics)
logIfTriggerKbaGateway(triggerChecker.logger, triggerChecker.triggerID, "checkData.MetricsToTargetRelation",
checkData.MetricsToTargetRelation)
checkData, err = triggerChecker.check(preparedMetrics, aloneMetrics, checkData)
logIfNoValues(checkData, triggerChecker.logger, triggerChecker.triggerID, "triggerChecker.check")
if err != nil {
return triggerChecker.handleUndefinedError(checkData, err)
}
Expand All @@ -60,42 +50,9 @@ func (triggerChecker *TriggerChecker) Check() error {
}
}
checkData.UpdateScore()
logIfNoValues(checkData, triggerChecker.logger, triggerChecker.triggerID, "checkData.UpdateScore()")
return triggerChecker.database.SetTriggerLastCheck(triggerChecker.triggerID, &checkData, triggerChecker.trigger.IsRemote)
}

func logIfTriggerKbaGateway(logger moira.Logger, trigger, prefix string, Metrics interface{}) {
if trigger == "265cb2bf-e029-4df2-9836-b628c64a8373" {
data := ""
switch m := Metrics.(type) {
case map[string][]metricSource.MetricData:
data = fmt.Sprintf("%#v", m)
case map[string]map[string]metricSource.MetricData:
data = fmt.Sprintf("%#v", m)
case map[string]moira.MetricState:
data = fmt.Sprintf("%#v", m)
case moira.MetricState:
data = fmt.Sprintf("%#v", m)
case map[string]metricSource.MetricData:
data = fmt.Sprintf("%#v", m)
default:
data = fmt.Sprintf("default %#v", m)
}

logger.Warningf(" FINDNOVALUES TRIGGER:%s, PREFIX:%s, DATA:%s", trigger, prefix, data)
}
}

func logIfNoValues(data moira.CheckData, logger moira.Logger, trigger, prefix string) {
if len(data.Metrics) > 0 {
for target, metric := range data.Metrics {
if len(metric.Values) == 0 {
logIfTriggerKbaGateway(logger, trigger, prefix+" values no exists", target)
}
}
}
}

// handlePrepareError is a function that checks error returned from prepareMetrics function. If error
// is not serious and check process can be continued first return value became true and Filled CheckData returned.
// in the other case first return value became true and error passed to this function is handled.
Expand Down Expand Up @@ -312,19 +269,27 @@ func (triggerChecker *TriggerChecker) checkTargets(metricName string, metrics ma
if err != nil {
return lastState, needToDeleteMetric, err
}

for _, currentState := range metricStates {
lastState, err = triggerChecker.compareMetricStates(metricName, currentState, lastState)
state, err := triggerChecker.compareMetricStates(metricName, currentState, lastState)
if err != nil {
return lastState, needToDeleteMetric, err
}

if len(state.Values) > 0 {
lastState = state
}
}

needToDeleteMetric, noDataState := triggerChecker.checkForNoData(metricName, lastState)
if needToDeleteMetric {
return lastState, needToDeleteMetric, err
}

if noDataState != nil {
lastState, err = triggerChecker.compareMetricStates(metricName, *noDataState, lastState)
}

return lastState, needToDeleteMetric, err
}

Expand Down Expand Up @@ -370,30 +335,36 @@ func (triggerChecker *TriggerChecker) getMetricStepsStates(metricName string, me
previousState := last
difference := moira.MaxInt64(checkPoint-startTime, 0)
stepsDifference := difference / stepTime

if (difference % stepTime) > 0 {
stepsDifference++
}

valueTimestamp := startTime + stepTime*stepsDifference
endTimestamp := triggerChecker.until + stepTime

for ; valueTimestamp < endTimestamp; valueTimestamp += stepTime {
metricNewState, err := triggerChecker.getMetricDataState(&metricName, &metrics, &previousState, &valueTimestamp, &checkPoint)
if err != nil {
return last, current, err
}

if metricNewState == nil {
continue
}

previousState = *metricNewState
current = append(current, *metricNewState)
}

return last, current, nil
}

func (triggerChecker *TriggerChecker) getMetricDataState(metricName *string, metrics *map[string]metricSource.MetricData, lastState *moira.MetricState, valueTimestamp, checkPoint *int64) (*moira.MetricState, error) {
if *valueTimestamp <= *checkPoint {
return nil, nil
}
triggerExpression, values, noEmptyValues := getExpressionValues(metrics, valueTimestamp)
triggerExpression, values, noEmptyValues := getExpressionValues(len(lastState.Values) == 0, metrics, valueTimestamp)
if !noEmptyValues {
return nil, nil
}
Expand All @@ -418,7 +389,7 @@ func (triggerChecker *TriggerChecker) getMetricDataState(metricName *string, met
), nil
}

func getExpressionValues(metrics *map[string]metricSource.MetricData, valueTimestamp *int64) (*expression.TriggerExpression, map[string]float64, bool) {
func getExpressionValues(lastNoExistsValues bool, metrics *map[string]metricSource.MetricData, valueTimestamp *int64) (*expression.TriggerExpression, map[string]float64, bool) {
expression := &expression.TriggerExpression{
AdditionalTargetsValues: make(map[string]float64, len(*metrics)-1),
}
Expand All @@ -428,10 +399,13 @@ func getExpressionValues(metrics *map[string]metricSource.MetricData, valueTimes
targetName := fmt.Sprintf("t%d", i+1)
metric := (*metrics)[targetName]
value := metric.GetTimestampValue(*valueTimestamp)
values[targetName] = value
if !moira.IsValidFloat64(value) {
if lastNoExistsValues {
continue
}
return expression, values, false
}
values[targetName] = value
if i == 0 {
expression.MainTargetValue = value
continue
Expand Down
19 changes: 0 additions & 19 deletions checker/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,11 @@ func (triggerChecker *TriggerChecker) fetch() (map[string][]metricSource.MetricD
targetIndex++ // increasing target index to have target names started from 1 instead of 0
fetchResult, err := triggerChecker.source.Fetch(target, triggerChecker.from, triggerChecker.until, isSimpleTrigger)
if err != nil {
id := ""
if triggerChecker.trigger != nil {
id = triggerChecker.trigger.ID
}
triggerChecker.logger.Warningf("NOVARIABLES triggerChecker.source.Fetch ID: %s, ERROR: %v, ",
id, err)
return nil, nil, err
}
metricsData := fetchResult.GetMetricsData()

metricsFetchResult, metricsErr := fetchResult.GetPatternMetrics()
if metricsErr != nil {
id := ""
if triggerChecker.trigger != nil {
id = triggerChecker.trigger.ID
}
triggerChecker.logger.Warningf("NOVARIABLES GetPatternMetrics ID: %s, ERROR: %v, ",
id, metricsErr)
}

if metricsErr == nil {
metricsArr = append(metricsArr, metricsFetchResult...)
Expand All @@ -62,11 +48,6 @@ func (triggerChecker *TriggerChecker) fetch() (map[string][]metricSource.MetricD
targetName := fmt.Sprintf("t%d", targetIndex)
triggerMetricsData[targetName] = metricsData
}

if triggerChecker.trigger.ID == "265cb2bf-e029-4df2-9836-b628c64a8373" {
triggerChecker.logger.Warningf("NOVARIABLES triggerMetricsData: %#v, ",triggerMetricsData)
}

return triggerMetricsData, metricsArr, nil
}

Expand Down
4 changes: 0 additions & 4 deletions checker/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"fmt"
"testing"

"github.com/op/go-logging"

"github.com/golang/mock/gomock"
"github.com/moira-alert/moira"
metricSource "github.com/moira-alert/moira/metric_source"
Expand Down Expand Up @@ -100,7 +98,6 @@ func TestFetchTriggerMetrics(t *testing.T) {
func TestFetch(t *testing.T) {
mockCtrl := gomock.NewController(t)
dataBase := mock_moira_alert.NewMockDatabase(mockCtrl)
logger, _ := logging.GetLogger("Test")
source := mock_metric_source.NewMockMetricSource(mockCtrl)
fetchResult := mock_metric_source.NewMockFetchResult(mockCtrl)
defer mockCtrl.Finish()
Expand All @@ -121,7 +118,6 @@ func TestFetch(t *testing.T) {
source: source,
from: from,
until: until,
logger: logger,
trigger: &moira.Trigger{
Targets: []string{pattern},
Patterns: []string{pattern},
Expand Down
8 changes: 3 additions & 5 deletions database/redis/last_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (connector *DbConnector) SetTriggerCheckMaintenance(triggerID string, metri
return readingErr
}
for readingErr != redis.ErrNil {
lastCheck := moira.CheckData{}
var lastCheck = moira.CheckData{}
err := json.Unmarshal([]byte(lastCheckString), &lastCheck)
if err != nil {
return fmt.Errorf("failed to parse lastCheck json %s: %s", lastCheckString, err.Error())
Expand Down Expand Up @@ -145,10 +145,8 @@ func (connector *DbConnector) checkDataScoreChanged(triggerID string, checkData
return oldScore != checkData.Score
}

var (
badStateTriggersKey = "moira-bad-state-triggers"
triggersChecksKey = "moira-triggers-checks"
)
var badStateTriggersKey = "moira-bad-state-triggers"
var triggersChecksKey = "moira-triggers-checks"

func metricLastCheckKey(triggerID string) string {
return "moira-metric-last-check:" + triggerID
Expand Down
5 changes: 0 additions & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,3 @@ networks:
balancer:
volumes:
data:
driver: local
driver_opts:
type: 'none'
o: 'bind'
device: '/home/roo/data/'
3 changes: 0 additions & 3 deletions filter/patterns_storage.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package filter

import (
"fmt"
"sync/atomic"
"time"

Expand Down Expand Up @@ -70,8 +69,6 @@ func (storage *PatternStorage) ProcessIncomingMetric(lineBytes []byte) *moira.Ma
if count%10 == 0 {
storage.metrics.MatchingTimer.UpdateSince(matchingStart)
}

fmt.Printf("FILTERCHECK: %#v\n", matchedPatterns)
if len(matchedPatterns) > 0 {
storage.metrics.MatchingMetricsReceived.Inc()
return &moira.MatchedMetric{
Expand Down
10 changes: 4 additions & 6 deletions metric_source/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,14 @@ func (local *Local) Fetch(target string, from int64, until int64, allowRealTimeA
expr2, _, err := parser.ParseExpr(target)
if err != nil {
return nil, ErrParseExpr{
internalError: fmt.Errorf("parser.ParseExpr %v", err),
internalError: err,
target: target,
}
}
patterns := expr2.Metrics()
metricsMap, metrics, err := getPatternsMetricData(local.dataBase, patterns, from, until, allowRealTimeAlerting)
if err != nil {
//return nil, err
return nil, fmt.Errorf("getPatternsMetricData: %v", err)
return nil, err
}
rewritten, newTargets, err := expr.RewriteExpr(expr2, from, until, metricsMap)
if err != nil && err != parser.ErrSeriesDoesNotExist {
Expand All @@ -77,15 +76,14 @@ func (local *Local) Fetch(target string, from int64, until int64, allowRealTimeA
} else {
err = ErrEvalExpr{
target: target,
internalError: fmt.Errorf("ErrEvalExpr %v", err),
internalError: err,
}
}
}
return result, err
}()
if err != nil {
//return nil, err
return nil, fmt.Errorf("metricsData: %v", err)
return nil, err
}
for _, metricData := range metricsData {
md := *metricData
Expand Down
Loading

0 comments on commit 12bac99

Please sign in to comment.