Skip to content

Commit

Permalink
Add config option to ignore tags with common prefix (#166)
Browse files Browse the repository at this point in the history
Fixes #162 

Added additional config options:
1. `ignore_tag_prefix_list`( in mongo aggregate and hybrid pump.)
It will not store analytics for tags having prefix specified in the list.
**Note**: Prefix `key-` is added in the list by default. This tag is added by gateway for keys.

2. `threshold_len_tag_list`( in mongo aggregate pump)
If number of tags in a document grows beyond `threshold_len_tag_list`, pump will throw a warning. The warning will print top 5 common tag prefix. Default value is **1000**. To disable alerts set it to -1.
  • Loading branch information
komalsukhani authored and buger committed Nov 5, 2019
1 parent 8bfdb36 commit 6a1381c
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 14 deletions.
28 changes: 23 additions & 5 deletions analytics/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,23 @@ func doHash(in string) string {
return search
}

func ignoreTag(tag string, ignoreTagPrefixList []string) bool {
// ignore tag added for key by gateway
if strings.HasPrefix(tag, "key-") {
return true
}

for _, prefix := range ignoreTagPrefixList {
if strings.HasPrefix(tag, prefix) {
return true
}
}

return false
}

// AggregateData calculates aggregated data, returns map orgID => aggregated analytics data
func AggregateData(data []interface{}, trackAllPaths bool) map[string]AnalyticsRecordAggregate {
func AggregateData(data []interface{}, trackAllPaths bool, ignoreTagPrefixList []string) map[string]AnalyticsRecordAggregate {
analyticsPerOrg := make(map[string]AnalyticsRecordAggregate)

for _, v := range data {
Expand Down Expand Up @@ -597,12 +612,15 @@ func AggregateData(data []interface{}, trackAllPaths bool) map[string]AnalyticsR
thisAggregate.Geo[thisV.Geo.Country.ISOCode].HumanIdentifier = thisV.Geo.Country.ISOCode
}
break

case "Tags":
for _, thisTag := range thisV.Tags {
c := IncrementOrSetUnit(thisAggregate.Tags[thisTag])
thisAggregate.Tags[thisTag] = c
thisAggregate.Tags[thisTag].Identifier = thisTag
thisAggregate.Tags[thisTag].HumanIdentifier = thisTag
if !ignoreTag(thisTag, ignoreTagPrefixList) {
c := IncrementOrSetUnit(thisAggregate.Tags[thisTag])
thisAggregate.Tags[thisTag] = c
thisAggregate.Tags[thisTag].Identifier = thisTag
thisAggregate.Tags[thisTag].HumanIdentifier = thisTag
}
}
break

Expand Down
17 changes: 14 additions & 3 deletions pumps/hybrid.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pumps

import (
"encoding/json"
"fmt"

"github.com/TykTechnologies/tyk-pump/analytics"

Expand Down Expand Up @@ -38,8 +39,9 @@ var (

// HybridPump allows to send analytics to MDCB over RPC
type HybridPump struct {
aggregated bool
trackAllPaths bool
aggregated bool
trackAllPaths bool
ignoreTagPrefixList []string
}

func (p *HybridPump) GetName() string {
Expand Down Expand Up @@ -115,6 +117,15 @@ func (p *HybridPump) Init(config interface{}) error {
if trackAllPaths, ok := meta["track_all_paths"]; ok {
p.trackAllPaths = trackAllPaths.(bool)
}

if list, ok := meta["ignore_tag_prefix_list"]; ok {
ignoreTagPrefixList := list.([]interface{})
p.ignoreTagPrefixList = make([]string, len(ignoreTagPrefixList))
for k, v := range ignoreTagPrefixList {
p.ignoreTagPrefixList[k] = fmt.Sprint(v)
}
}

}

return nil
Expand Down Expand Up @@ -151,7 +162,7 @@ func (p *HybridPump) WriteData(data []interface{}) error {
}
} else { // send aggregated data
// calculate aggregates
aggregates := analytics.AggregateData(data, p.trackAllPaths)
aggregates := analytics.AggregateData(data, p.trackAllPaths, p.ignoreTagPrefixList)

// turn map with analytics aggregates into JSON payload
jsonData, err := json.Marshal(aggregates)
Expand Down
88 changes: 82 additions & 6 deletions pumps/mongo_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pumps
import (
b64 "encoding/base64"
"errors"
"sort"
"strings"
"time"

Expand All @@ -17,25 +18,92 @@ import (
)

var mongoAggregatePumpPrefix = "PMP_MONGOAGG"
var THRESHOLD_LEN_TAG_LIST = 1000
var COMMON_TAGS_COUNT = 5

type MongoAggregatePump struct {
dbSession *mgo.Session
dbConf *MongoAggregateConf
}

type MongoAggregateConf struct {
MongoURL string `mapstructure:"mongo_url"`
MongoUseSSL bool `mapstructure:"mongo_use_ssl"`
MongoSSLInsecureSkipVerify bool `mapstructure:"mongo_ssl_insecure_skip_verify"`
UseMixedCollection bool `mapstructure:"use_mixed_collection"`
TrackAllPaths bool `mapstructure:"track_all_paths"`
MongoURL string `mapstructure:"mongo_url"`
MongoUseSSL bool `mapstructure:"mongo_use_ssl"`
MongoSSLInsecureSkipVerify bool `mapstructure:"mongo_ssl_insecure_skip_verify"`
UseMixedCollection bool `mapstructure:"use_mixed_collection"`
TrackAllPaths bool `mapstructure:"track_all_paths"`
IgnoreTagPrefixList []string `mapstructure:"ignore_tag_prefix_list"`
ThresholdLenTagList int `mapstructure:"threshold_len_tag_list"`
}

func (m *MongoAggregatePump) New() Pump {
newPump := MongoAggregatePump{}
return &newPump
}

func getListOfCommonPrefix(list []string) []string {
count := make(map[string]int)
result := make([]string, 0)
length := len(list)

if length == 0 || length == 1 {
return list
}

for i := 0; i < length-1; i++ {
for j := i + 1; j < length; j++ {
var prefLen int
str1 := list[i]
str2 := list[j]

if len(str1) > len(str2) {
prefLen = len(str2)
} else {
prefLen = len(str1)
}

k := 0
for k = 0; k < prefLen; k++ {
if str1[k] != str2[k] {
if k != 0 {
count[str1[:k]]++
}
break
}
}
if k == prefLen {
count[str1[:prefLen]]++
}
}
}

for k := range count {
result = append(result, k)
}

sort.Slice(result, func(i, j int) bool { return count[result[i]] > count[result[j]] })

return result
}

func printAlert(doc analytics.AnalyticsRecordAggregate, thresholdLenTagList int) {
var listofTags []string

for k := range doc.Tags {
listofTags = append(listofTags, k)
}

listOfCommonPrefix := getListOfCommonPrefix(listofTags)

// list 5 common tag prefix
l := len(listOfCommonPrefix)
if l > COMMON_TAGS_COUNT {
l = COMMON_TAGS_COUNT
}

log.Warnf("WARNING: Found more that %v tag entries per document, which may cause performance issues with aggregate logs. List of most common tag-prefix: %v. You can ignore these tags using ignore_tag_prefix_list option", thresholdLenTagList, listOfCommonPrefix[:l])
}

func (m *MongoAggregatePump) doHash(in string) string {
sEnc := b64.StdEncoding.EncodeToString([]byte(in))
search := strings.TrimRight(sEnc, "=")
Expand Down Expand Up @@ -69,6 +137,10 @@ func (m *MongoAggregatePump) Init(config interface{}) error {
log.Error("Failed to process environment variables for mongo aggregate pump: ", overrideErr)
}

if m.dbConf.ThresholdLenTagList == 0 {
m.dbConf.ThresholdLenTagList = THRESHOLD_LEN_TAG_LIST
}

m.connect()

log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -143,7 +215,7 @@ func (m *MongoAggregatePump) WriteData(data []interface{}) error {
m.WriteData(data)
} else {
// calculate aggregates
analyticsPerOrg := analytics.AggregateData(data, m.dbConf.TrackAllPaths)
analyticsPerOrg := analytics.AggregateData(data, m.dbConf.TrackAllPaths, m.dbConf.IgnoreTagPrefixList)

// put aggregated data into MongoDB
for orgID, filteredData := range analyticsPerOrg {
Expand Down Expand Up @@ -199,6 +271,10 @@ func (m *MongoAggregatePump) WriteData(data []interface{}) error {
withTimeUpdate := analytics.AnalyticsRecordAggregate{}
_, avgErr := analyticsCollection.Find(query).Apply(avgChange, &withTimeUpdate)

if m.dbConf.ThresholdLenTagList != -1 && (len(withTimeUpdate.Tags) > m.dbConf.ThresholdLenTagList) {
printAlert(withTimeUpdate, m.dbConf.ThresholdLenTagList)
}

if avgErr != nil {
log.WithFields(logrus.Fields{
"prefix": analytics.MongoAggregatePrefix,
Expand Down

0 comments on commit 6a1381c

Please sign in to comment.