Skip to content

Commit

Permalink
ingest: s2s log
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Aug 15, 2024
1 parent 073411e commit 9c4ae1c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 21 deletions.
22 changes: 22 additions & 0 deletions ingest/router_ingest_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,33 @@ import (
"github.com/jitsucom/bulker/jitsubase/appbase"
"github.com/jitsucom/bulker/jitsubase/jsoniter"
"github.com/jitsucom/bulker/jitsubase/jsonorder"
"github.com/jitsucom/bulker/jitsubase/logging"
"github.com/jitsucom/bulker/jitsubase/types"
"github.com/jitsucom/bulker/jitsubase/utils"
"github.com/jitsucom/bulker/jitsubase/uuid"
"io"
"net/http"
"strings"
"sync"
"time"
)

var ids = sync.Map{}

func init() {
ticker := time.NewTicker(1 * time.Minute)
go func() {
for range ticker.C {
arr := make([]string, 0)
ids.Range(func(key, value interface{}) bool {
arr = append(arr, key.(string))
return true
})
logging.Infof("[S2S] %s", strings.Join(arr, ","))
}
}()
}

func (r *Router) IngestHandler(c *gin.Context) {
domain := ""
// TODO: use workspaceId as default for all stream identification errors
Expand Down Expand Up @@ -102,6 +121,9 @@ func (r *Router) IngestHandler(c *gin.Context) {
return
}
eventsLogId = stream.Stream.Id
if ingestType == IngestTypeS2S {
ids.LoadOrStore(stream.Stream.WorkspaceId+"."+eventsLogId, true)
}
//if err = r.checkOrigin(c, &loc, stream); err != nil {
// r.Warnf("%v", err)
//}
Expand Down
21 changes: 0 additions & 21 deletions ingest/router_segment_settings_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,14 @@ package main

import (
"github.com/gin-gonic/gin"
"github.com/jitsucom/bulker/jitsubase/logging"
"net/http"
"strings"
"sync"
"time"
)

const settingsHeader = "{\"integrations\":{\"Segment.io\":{\"apiKey\":\""
const settingsFooter = "\",\"versionSettings\":{\"version\":\"4.4.7\",\"componentTypes\":[\"browser\"]}}},\"plan\":{\"track\":{\"__default\":{\"enabled\":true}},\"identify\":{\"__default\":{\"enabled\":true}},\"group\":{\"__default\":{\"enabled\":true}}},\"analyticsNextEnabled\":true}"

var ids = sync.Map{}

func init() {
ticker := time.NewTicker(1 * time.Minute)
go func() {
for range ticker.C {
arr := make([]string, 0)
ids.Range(func(key, value interface{}) bool {
arr = append(arr, key.(string))
return true
})
logging.Infof("[SETTINGS]%s", strings.Join(arr, ","))
}
}()
}

func (r *Router) SettingsHandler(c *gin.Context) {
writeKey := c.Param("writeKey")
ids.LoadOrStore(strings.SplitN(writeKey, ":", 2)[0], true)
writer := c.Writer
writer.Header().Set("Content-Type", "application/json")
writer.Header().Set("Cache-Control", "public, max-age=86400")
Expand Down

0 comments on commit 9c4ae1c

Please sign in to comment.