Skip to content

Commit

Permalink
Add a way to generate ES id (#169)
Browse files Browse the repository at this point in the history
Added new `generate_id` config option
  • Loading branch information
buger authored Nov 5, 2019
1 parent 72aae90 commit 2da7f2f
Showing 1 changed file with 19 additions and 8 deletions.
27 changes: 19 additions & 8 deletions pumps/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pumps
import (
"context"
"errors"
"fmt"
"time"

"github.com/mitchellh/mapstructure"
Expand All @@ -11,6 +12,7 @@ import (
elasticv6 "gopkg.in/olivere/elastic.v6"

"github.com/TykTechnologies/logrus"
"github.com/TykTechnologies/murmur3"
"github.com/TykTechnologies/tyk-pump/analytics"
)

Expand All @@ -28,6 +30,7 @@ type ElasticsearchConf struct {
DocumentType string `mapstructure:"document_type"`
RollingIndex bool `mapstructure:"rolling_index"`
ExtendedStatistics bool `mapstructure:"extended_stats"`
GenerateID bool `mapstructure:"generate_id"`
Version string `mapstructure:"version"`
}

Expand Down Expand Up @@ -178,7 +181,7 @@ func getIndexName(esConf *ElasticsearchConf) string {
return indexName
}

func getMapping(datum analytics.AnalyticsRecord, extendedStatistics bool) map[string]interface{} {
func getMapping(datum analytics.AnalyticsRecord, extendedStatistics bool, generateID bool) (map[string]interface{}, string) {
record := datum

mapping := map[string]interface{}{
Expand All @@ -202,7 +205,15 @@ func getMapping(datum analytics.AnalyticsRecord, extendedStatistics bool) map[st
mapping["raw_response"] = record.RawResponse
mapping["user_agent"] = record.UserAgent
}
return mapping

if generateID {
hasher := murmur3.New64()
hasher.Write([]byte(fmt.Sprintf("%d%s%s%s%s%s%d%s", record.TimeStamp.UnixNano(), record.Method, record.Path, record.IPAddress, record.APIID, record.OauthID, record.RequestTime, record.Alias)))

return mapping, string(hasher.Sum(nil))
}

return mapping, ""
}

func (e Elasticsearch3Operator) processData(data []interface{}, esConf *ElasticsearchConf) error {
Expand All @@ -217,9 +228,9 @@ func (e Elasticsearch3Operator) processData(data []interface{}, esConf *Elastics
continue
}

mapping := getMapping(d, esConf.ExtendedStatistics)
mapping, id := getMapping(d, esConf.ExtendedStatistics, esConf.GenerateID)

_, err := index.BodyJson(mapping).Type(esConf.DocumentType).Do()
_, err := index.BodyJson(mapping).Type(esConf.DocumentType).Id(id).Do()
if err != nil {
log.WithFields(logrus.Fields{
"prefix": elasticsearchPrefix,
Expand All @@ -242,9 +253,9 @@ func (e Elasticsearch5Operator) processData(data []interface{}, esConf *Elastics
continue
}

mapping := getMapping(d, esConf.ExtendedStatistics)
mapping, id := getMapping(d, esConf.ExtendedStatistics, esConf.GenerateID)

_, err := index.BodyJson(mapping).Type(esConf.DocumentType).Do(context.TODO())
_, err := index.BodyJson(mapping).Type(esConf.DocumentType).Id(id).Do(context.TODO())
if err != nil {
log.WithFields(logrus.Fields{
"prefix": elasticsearchPrefix,
Expand All @@ -267,9 +278,9 @@ func (e Elasticsearch6Operator) processData(data []interface{}, esConf *Elastics
continue
}

mapping := getMapping(d, esConf.ExtendedStatistics)
mapping, id := getMapping(d, esConf.ExtendedStatistics, esConf.GenerateID)

_, err := index.BodyJson(mapping).Type(esConf.DocumentType).Do(context.Background())
_, err := index.BodyJson(mapping).Type(esConf.DocumentType).Id(id).Do(context.Background())
if err != nil {
log.WithFields(logrus.Fields{
"prefix": elasticsearchPrefix,
Expand Down

0 comments on commit 2da7f2f

Please sign in to comment.