diff --git a/ingest/router_ingest_handler.go b/ingest/router_ingest_handler.go index f0ececc..545f1a8 100644 --- a/ingest/router_ingest_handler.go +++ b/ingest/router_ingest_handler.go @@ -9,14 +9,33 @@ import ( "github.com/jitsucom/bulker/jitsubase/appbase" "github.com/jitsucom/bulker/jitsubase/jsoniter" "github.com/jitsucom/bulker/jitsubase/jsonorder" + "github.com/jitsucom/bulker/jitsubase/logging" "github.com/jitsucom/bulker/jitsubase/types" "github.com/jitsucom/bulker/jitsubase/utils" "github.com/jitsucom/bulker/jitsubase/uuid" "io" "net/http" "strings" + "sync" + "time" ) +var ids = sync.Map{} + +func init() { + ticker := time.NewTicker(1 * time.Minute) + go func() { + for range ticker.C { + arr := make([]string, 0) + ids.Range(func(key, value interface{}) bool { + arr = append(arr, key.(string)) + return true + }) + logging.Infof("[S2S] %s", strings.Join(arr, ",")) + } + }() +} + func (r *Router) IngestHandler(c *gin.Context) { domain := "" // TODO: use workspaceId as default for all stream identification errors @@ -102,6 +121,9 @@ func (r *Router) IngestHandler(c *gin.Context) { return } eventsLogId = stream.Stream.Id + if ingestType == IngestTypeS2S { + ids.LoadOrStore(stream.Stream.WorkspaceId+"."+eventsLogId, true) + } //if err = r.checkOrigin(c, &loc, stream); err != nil { // r.Warnf("%v", err) //} diff --git a/ingest/router_segment_settings_handler.go b/ingest/router_segment_settings_handler.go index 79c1bf9..b209aeb 100644 --- a/ingest/router_segment_settings_handler.go +++ b/ingest/router_segment_settings_handler.go @@ -2,35 +2,14 @@ package main import ( "github.com/gin-gonic/gin" - "github.com/jitsucom/bulker/jitsubase/logging" "net/http" - "strings" - "sync" - "time" ) const settingsHeader = "{\"integrations\":{\"Segment.io\":{\"apiKey\":\"" const settingsFooter = "\",\"versionSettings\":{\"version\":\"4.4.7\",\"componentTypes\":[\"browser\"]}}},\"plan\":{\"track\":{\"__default\":{\"enabled\":true}},\"identify\":{\"__default\":{\"enabled\":true}},\"group\":{\"__default\":{\"enabled\":true}}},\"analyticsNextEnabled\":true}" -var ids = sync.Map{} - -func init() { - ticker := time.NewTicker(1 * time.Minute) - go func() { - for range ticker.C { - arr := make([]string, 0) - ids.Range(func(key, value interface{}) bool { - arr = append(arr, key.(string)) - return true - }) - logging.Infof("[SETTINGS]%s", strings.Join(arr, ",")) - } - }() -} - func (r *Router) SettingsHandler(c *gin.Context) { writeKey := c.Param("writeKey") - ids.LoadOrStore(strings.SplitN(writeKey, ":", 2)[0], true) writer := c.Writer writer.Header().Set("Content-Type", "application/json") writer.Header().Set("Cache-Control", "public, max-age=86400")