Skip to content

Commit

Permalink
ingest: added pixel endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Sep 9, 2024
1 parent 73d6600 commit 10be473
Show file tree
Hide file tree
Showing 4 changed files with 317 additions and 1 deletion.
2 changes: 2 additions & 0 deletions ingest/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func NewRouter(appContext *Context, partitionSelector kafkabase.PartitionSelecto
"/batch",
"/api/s/s2s/batch",
"/api/s/:tp",
"/api/px/:tp",
"/api/s/s2s/:tp",
})

Expand Down Expand Up @@ -142,6 +143,7 @@ func NewRouter(appContext *Context, partitionSelector kafkabase.PartitionSelecto
fast.Match([]string{"OPTIONS", "POST"}, "/api/s/s2s/batch", router.BatchHandler)

fast.Match([]string{"OPTIONS", "POST"}, "/api/s/:tp", router.IngestHandler)
fast.Match([]string{"OPTIONS", "GET"}, "/api/px/:tp", router.PixelHandler)
fast.Match([]string{"OPTIONS", "POST"}, "/api/s/s2s/:tp", router.IngestHandler)

fast.Match([]string{"GET", "HEAD", "OPTIONS"}, "/p.js", router.ScriptHandler)
Expand Down
278 changes: 278 additions & 0 deletions ingest/router_pixel_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
package main

import (
"encoding/base64"
"errors"
"fmt"
kafka2 "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/gin-gonic/gin"
"github.com/jitsucom/bulker/eventslog"
"github.com/jitsucom/bulker/jitsubase/appbase"
"github.com/jitsucom/bulker/jitsubase/jsonorder"
"github.com/jitsucom/bulker/jitsubase/timestamp"
"github.com/jitsucom/bulker/jitsubase/types"
"github.com/jitsucom/bulker/jitsubase/utils"
"github.com/jitsucom/bulker/jitsubase/uuid"
"golang.org/x/net/publicsuffix"
"net/http"
"net/url"
"strings"
)

const (
dataField = "data"
cookieDomainField = "cookie_domain"
processHeaders = "process_headers"
anonymousIdCookie = "__eventn_id"
userIdCookie = "__eventn_uid"
userTraitsCookie = "__eventn_id_usr"
groupIdCookie = "__group_id"
groupTraitsCookie = "__group_traits"
)

func (r *Router) PixelHandler(c *gin.Context) {
domain := ""
// TODO: use workspaceId as default for all stream identification errors
var eventsLogId string
var rError *appbase.RouterError
var ingestMessageBytes []byte
var asyncDestinations []string
ingestType := IngestTypeWriteKeyDefined

defer func() {
if len(ingestMessageBytes) > 0 {
_ = r.backupsLogger.Log(utils.DefaultString(eventsLogId, "UNKNOWN"), ingestMessageBytes)
}
if rError != nil && rError.ErrorType != ErrNoDst {
obj := map[string]any{"body": string(ingestMessageBytes), "error": rError.PublicError.Error(), "status": utils.Ternary(rError.ErrorType == ErrThrottledType, "SKIPPED", "FAILED")}
r.eventsLogService.PostAsync(&eventslog.ActorEvent{EventType: eventslog.EventTypeIncoming, Level: eventslog.LevelError, ActorId: eventsLogId, Event: obj})
IngestHandlerRequests(domain, utils.Ternary(rError.ErrorType == ErrThrottledType, "throttled", "error"), rError.ErrorType).Inc()
_ = r.producer.ProduceAsync(r.config.KafkaDestinationsDeadLetterTopicName, uuid.New(), ingestMessageBytes, map[string]string{"error": rError.Error.Error()}, kafka2.PartitionAny)
} else {
obj := map[string]any{"body": string(ingestMessageBytes), "asyncDestinations": asyncDestinations}
if len(asyncDestinations) > 0 {
obj["status"] = "SUCCESS"
} else {
obj["status"] = "SKIPPED"
obj["error"] = "no destinations found for stream"
}
r.eventsLogService.PostAsync(&eventslog.ActorEvent{EventType: eventslog.EventTypeIncoming, Level: eventslog.LevelInfo, ActorId: eventsLogId, Event: obj})
IngestHandlerRequests(domain, "success", "").Inc()
}
}()
defer func() {
if rerr := recover(); rerr != nil {
rError = r.ResponseError(c, http.StatusOK, "panic", true, fmt.Errorf("%v", rerr), true)
}
}()
// disable cache
c.Header("Cache-Control", "no-cache, no-store, must-revalidate")
c.Header("Pragma", "no-cache")
c.Header("Expires", "0")
c.Set(appbase.ContextLoggerName, "ingest")
tp := c.Param("tp")
message, err := r.parsePixelEvent(c)
if err != nil {
rError = r.ResponseError(c, http.StatusOK, "error parsing message", false, err, true)
return
}
messageId := message.GetS("messageId")
if messageId == "" {
messageId = uuid.New()
} else {
messageId = utils.ShortenString(messageIdUnsupportedChars.ReplaceAllString(messageId, "_"), 64)
}
c.Set(appbase.ContextMessageId, messageId)
//func() string { wk, _ := message["writeKey"].(string); return wk }
loc, err := r.getDataLocator(c, ingestType, nil)
if err != nil {
rError = r.ResponseError(c, http.StatusOK, "error processing message", false, err, true)
return
}

domain = utils.DefaultString(loc.Slug, loc.Domain)
c.Set(appbase.ContextDomain, domain)

stream := r.getStream(&loc, false, false)
if stream == nil {
rError = r.ResponseError(c, http.StatusOK, "stream not found", false, fmt.Errorf("for: %+v", loc), true)
return
}

eventsLogId = stream.Stream.Id
//}
_, ingestMessageBytes, err = r.buildIngestMessage(c, messageId, message, nil, tp, loc, stream)
if err != nil {
rError = r.ResponseError(c, http.StatusOK, "event error", false, err, true)
return
}
if len(stream.AsynchronousDestinations) == 0 {
rError = r.ResponseError(c, http.StatusOK, ErrNoDst, false, fmt.Errorf(stream.Stream.Id), true)
return
}
asyncDestinations, _, rError = r.sendToRotor(c, ingestMessageBytes, stream, true)
if rError != nil {
return
}
// respond with empty git
c.Data(http.StatusOK, "image/gif", appbase.EmptyGif)

}

// parseEvent parses event from query parameters (dataField and json paths)
func (r *Router) parsePixelEvent(c *gin.Context) (event types.Json, err error) {
parameters := c.Request.URL.Query()
event = types.NewJson()

data := parameters.Get(dataField)
if data != "" {
dataBytes, err := base64.StdEncoding.DecodeString(data)
if err != nil {
return nil, fmt.Errorf("Error decoding event from %q field in tracking pixel: %v", dataField, err)
}

err = jsonorder.Unmarshal(dataBytes, &event)
if err != nil {
return nil, fmt.Errorf("Error unmarshalling event from %q: %v", dataField, err)
}
}

for key, value := range parameters {
if key == dataField || key == cookieDomainField || key == processHeaders {
continue
}
if len(value) == 1 {
event.SetPath(key, value[0])
} else {
event.SetPath(key, value)
}
}
processHeadersFlag := parameters.Get(processHeaders)
if utils.IsTruish(processHeadersFlag) {
processHeadersData(c, event)
}
return event, nil
}

func processHeadersData(c *gin.Context, event types.Json) {
anonymousId := event.GetS("anonymousId")
if anonymousId == "" {
var err error
anonymousId, err = c.Cookie(anonymousIdCookie)
if errors.Is(err, http.ErrNoCookie) {
anonymousId = uuid.New()
topLevelDomain := event.GetS(cookieDomainField)
if topLevelDomain == "" {
topLevelDomain, _ = ExtractTopLevelAndDomain(c.Request.Host)
}
http.SetCookie(c.Writer, &http.Cookie{
Name: anonymousIdCookie,
Value: url.QueryEscape(anonymousId),
Expires: timestamp.Now().AddDate(1000, 12, 31),
Path: "/",
Domain: fmt.Sprint(topLevelDomain),
SameSite: http.SameSiteNoneMode,
Secure: true,
HttpOnly: false,
})
}
event.Set("anonymousId", anonymousId)
}
userId := event.GetS("userId")
if userId == "" {
userId, _ = c.Cookie(userIdCookie)
if userId != "" {
event.Set("userId", userId)
}
}
var ctx types.Json
o, ok := event.Get("context")
if ok {
ctx, _ = o.(types.Json)
}
if ctx == nil {
ctx = types.NewJson()
}
groupId := ctx.GetS("groupId")
if groupId == "" {
groupId, _ = c.Cookie(groupIdCookie)
if groupId != "" {
ctx.Set("groupId", groupId)
}
}
var traits types.Json
o, ok = ctx.Get("traits")
if ok {
traits, _ = o.(types.Json)
}
if traits == nil {
traits = types.NewJson()
}
traitsNew := types.NewJson()
groupTraits, _ := c.Cookie(groupTraitsCookie)
if groupTraits != "" {
_ = jsonorder.Unmarshal([]byte(groupTraits), &traitsNew)
}
userTraits, _ := c.Cookie(userTraitsCookie)
if userTraits != "" {
_ = jsonorder.Unmarshal([]byte(userTraits), &traitsNew)
}
traitsNew.SetAll(traits)
traits = traitsNew
if traits.Len() > 0 {
ctx.Set("traits", traits)
}

referer := c.Request.Referer()
if referer != "" {
r, err := url.Parse(referer)
if err == nil {
var page types.Json
o, ok = ctx.Get("page")
if ok {
page, _ = o.(types.Json)
}
if page == nil {
page = types.NewJson()
}
page.SetIfAbsent("url", referer)
page.SetIfAbsent("path", r.Path)
page.SetIfAbsent("search", r.RawQuery)
page.SetIfAbsent("host", r.Host)
if page.Len() > 0 {
ctx.Set("page", page)
}
}
}
event.Set("context", ctx)
}

// ExtractTopLevelAndDomain returns top level domain and domain
// e.g. abc.efg.com returns "efg.com", "abc"
func ExtractTopLevelAndDomain(adr string) (string, string) {
var icann, topLevelDomain, domain string

for i := 0; i < 3; i++ {
if adr == "" {
break
}

adr = strings.TrimSuffix(adr, ".")
publicSuffix, isIcann := publicsuffix.PublicSuffix(adr)
if isIcann && topLevelDomain == "" {
icann = publicSuffix
} else if topLevelDomain == "" {
topLevelDomain = publicSuffix
} else {
domain = publicSuffix
}

adr = strings.TrimSuffix(adr, publicSuffix)
}

if icann != "" {
topLevelDomain += "." + icann
}

return topLevelDomain, domain
}
15 changes: 14 additions & 1 deletion jitsubase/appbase/router_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ const ContextLoggerName = "contextLogger"
const ContextDomain = "contextDomain"
const ContextMessageId = "contextMessageId"

var EmptyGif = []byte{
0x47, 0x49, 0x46, 0x38, 0x39, 0x61, 0x01, 0x00, 0x01, 0x00,
0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0x21,
0xF9, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00, 0x2C, 0x00, 0x00,
0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x00, 0x02, 0x02, 0x44,
0x01, 0x00, 0x3B,
}

var IsHexRegex = regexp.MustCompile(`^[a-fA-F0-9]+$`)

type Router struct {
Expand Down Expand Up @@ -140,7 +148,12 @@ func (r *Router) ResponseError(c *gin.Context, code int, errorType string, maskE
logFormat := utils.JoinNonEmptyStrings(" ", builder.String(), "%v")
r.Errorf(logFormat, err)
if sendResponse {
c.JSON(code, gin.H{"error": routerError.PublicError.Error()})
if c.FullPath() == "/api/px/:tp" {
c.Header("X-Jitsu-Error", routerError.PublicError.Error())
c.Data(http.StatusOK, "image/gif", EmptyGif)
} else {
c.JSON(code, gin.H{"error": routerError.PublicError.Error()})
}
}
return &routerError
}
Expand Down
23 changes: 23 additions & 0 deletions jitsubase/types/orderedmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,29 @@ func (m *OrderedMap[K, V]) Set(key K, value V) bool {
return true
}

func (m *OrderedMap[K, V]) SetPath(path K, value V) {
p := strings.Split(any(path).(string), ".")
obj := m
for i, key := range p {
if i == len(p)-1 {
obj.Set(any(key).(K), value)
return
}
var ok bool
o, ok := obj.Get(any(key).(K))
if !ok {
newObj := NewOrderedMap[K, V]()
obj.Set(any(key).(K), any(newObj).(V))
obj = newObj
continue
}
obj, ok = any(o).(*OrderedMap[K, V])
if !ok {
return
}
}
}

// SetIfAbsent sets a value for a key only if the key does not already exist in the map.
// It returns true if the value was set successfully, or false if the key already exists.
func (m *OrderedMap[K, V]) SetIfAbsent(key K, value V) bool {
Expand Down

0 comments on commit 10be473

Please sign in to comment.