From 2da7f2fdfc8c0c1ec924da48e7f2bd89ff55ea14 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Tue, 5 Nov 2019 15:19:12 +0300 Subject: [PATCH] Add a way to generate ES id (#169) Added new `generate_id` config option --- pumps/elasticsearch.go | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/pumps/elasticsearch.go b/pumps/elasticsearch.go index 7f199c38e..115490789 100644 --- a/pumps/elasticsearch.go +++ b/pumps/elasticsearch.go @@ -3,6 +3,7 @@ package pumps import ( "context" "errors" + "fmt" "time" "github.com/mitchellh/mapstructure" @@ -11,6 +12,7 @@ import ( elasticv6 "gopkg.in/olivere/elastic.v6" "github.com/TykTechnologies/logrus" + "github.com/TykTechnologies/murmur3" "github.com/TykTechnologies/tyk-pump/analytics" ) @@ -28,6 +30,7 @@ type ElasticsearchConf struct { DocumentType string `mapstructure:"document_type"` RollingIndex bool `mapstructure:"rolling_index"` ExtendedStatistics bool `mapstructure:"extended_stats"` + GenerateID bool `mapstructure:"generate_id"` Version string `mapstructure:"version"` } @@ -178,7 +181,7 @@ func getIndexName(esConf *ElasticsearchConf) string { return indexName } -func getMapping(datum analytics.AnalyticsRecord, extendedStatistics bool) map[string]interface{} { +func getMapping(datum analytics.AnalyticsRecord, extendedStatistics bool, generateID bool) (map[string]interface{}, string) { record := datum mapping := map[string]interface{}{ @@ -202,7 +205,15 @@ func getMapping(datum analytics.AnalyticsRecord, extendedStatistics bool) map[st mapping["raw_response"] = record.RawResponse mapping["user_agent"] = record.UserAgent } - return mapping + + if generateID { + hasher := murmur3.New64() + hasher.Write([]byte(fmt.Sprintf("%d%s%s%s%s%s%d%s", record.TimeStamp.UnixNano(), record.Method, record.Path, record.IPAddress, record.APIID, record.OauthID, record.RequestTime, record.Alias))) + + return mapping, string(hasher.Sum(nil)) + } + + return mapping, "" } func (e Elasticsearch3Operator) processData(data []interface{}, esConf *ElasticsearchConf) error { @@ -217,9 +228,9 @@ func (e Elasticsearch3Operator) processData(data []interface{}, esConf *Elastics continue } - mapping := getMapping(d, esConf.ExtendedStatistics) + mapping, id := getMapping(d, esConf.ExtendedStatistics, esConf.GenerateID) - _, err := index.BodyJson(mapping).Type(esConf.DocumentType).Do() + _, err := index.BodyJson(mapping).Type(esConf.DocumentType).Id(id).Do() if err != nil { log.WithFields(logrus.Fields{ "prefix": elasticsearchPrefix, @@ -242,9 +253,9 @@ func (e Elasticsearch5Operator) processData(data []interface{}, esConf *Elastics continue } - mapping := getMapping(d, esConf.ExtendedStatistics) + mapping, id := getMapping(d, esConf.ExtendedStatistics, esConf.GenerateID) - _, err := index.BodyJson(mapping).Type(esConf.DocumentType).Do(context.TODO()) + _, err := index.BodyJson(mapping).Type(esConf.DocumentType).Id(id).Do(context.TODO()) if err != nil { log.WithFields(logrus.Fields{ "prefix": elasticsearchPrefix, @@ -267,9 +278,9 @@ func (e Elasticsearch6Operator) processData(data []interface{}, esConf *Elastics continue } - mapping := getMapping(d, esConf.ExtendedStatistics) + mapping, id := getMapping(d, esConf.ExtendedStatistics, esConf.GenerateID) - _, err := index.BodyJson(mapping).Type(esConf.DocumentType).Do(context.Background()) + _, err := index.BodyJson(mapping).Type(esConf.DocumentType).Id(id).Do(context.Background()) if err != nil { log.WithFields(logrus.Fields{ "prefix": elasticsearchPrefix,