Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: oauth v2 stats refactor #5262

Merged
merged 6 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 58 additions & 47 deletions services/oauth/v2/oauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"errors"
"fmt"
"net/http"
"strings"
"time"

"github.com/tidwall/gjson"
Expand Down Expand Up @@ -141,11 +142,11 @@
authErrCategory: "",
errorMessage: "",
destDefName: fetchTokenParams.DestDefName,
isTokenFetch: true,
flowType: h.RudderFlowType,
action: "fetch_token",
}
return h.GetTokenInfo(fetchTokenParams, "Fetch token", authStats)
statshandler := NewStatsHandlerFromOAuthStats(authStats)
return h.GetTokenInfo(fetchTokenParams, "Fetch token", statshandler)
}

/*
Expand Down Expand Up @@ -176,10 +177,11 @@
action: "refresh_token",
stats: h.stats,
}
return h.GetTokenInfo(refTokenParams, "Refresh token", authStats)
statsHandler := NewStatsHandlerFromOAuthStats(authStats)
return h.GetTokenInfo(refTokenParams, "Refresh token", statsHandler)
}

func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeName string, authStats *OAuthStats) (int, *AuthResponse, error) {
func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeName string, statsHandler OAuthStatsHandler) (int, *AuthResponse, error) {
log := h.Logger.Withn(
logger.NewStringField("Call Type", logTypeName),
logger.NewStringField("AccountId", refTokenParams.AccountID),
Expand All @@ -189,13 +191,13 @@
)
log.Debugn("[request] :: Get Token Info request received")
startTime := time.Now()
defer func() {
authStats.statName = GetOAuthActionStatName("total_latency")
authStats.isCallToCpApi = false
authStats.SendTimerStats(startTime)
}()
h.CacheMutex.Lock(refTokenParams.AccountID)
defer h.CacheMutex.Unlock(refTokenParams.AccountID)
defer func() {
statsHandler.SendTiming(startTime, "total_latency", stats.Tags{
"isCallToCpApi": "false",
})
}()
refTokenBody := RefreshTokenBodyParams{}
storedCache, ok := h.Cache.Load(refTokenParams.AccountID)
if ok {
Expand All @@ -205,7 +207,7 @@
return http.StatusInternalServerError, nil, errors.New("failed to type assert the stored cache")
}
// TODO: verify if the storedCache is nil at this point
if !checkIfTokenExpired(cachedSecret.Account, refTokenParams.Secret, h.ExpirationTimeDiff, authStats) {
if !checkIfTokenExpired(cachedSecret.Account, refTokenParams.Secret, h.ExpirationTimeDiff, statsHandler) {
return http.StatusOK, cachedSecret, nil
}
// Refresh token preparation
Expand All @@ -214,7 +216,7 @@
ExpiredSecret: refTokenParams.Secret,
}
}
statusCode, refSecret, refErr := h.fetchAccountInfoFromCp(refTokenParams, refTokenBody, authStats, logTypeName)
statusCode, refSecret, refErr := h.fetchAccountInfoFromCp(refTokenParams, refTokenBody, statsHandler, logTypeName)
// handling of refresh token response
if statusCode == http.StatusOK {
// fetching/refreshing through control plane was successful
Expand All @@ -241,11 +243,7 @@
action: action,
stats: h.stats,
}
defer func() {
authStatusToggleStats.statName = GetOAuthActionStatName("total_latency")
authStatusToggleStats.isCallToCpApi = false
authStatusToggleStats.SendTimerStats(authErrHandlerTimeStart)
}()
statsHandler := NewStatsHandlerFromOAuthStats(authStatusToggleStats)
h.CacheMutex.Lock(params.RudderAccountID)
isAuthStatusUpdateActive, isAuthStatusUpdateReqPresent := h.AuthStatusUpdateActiveMap[destinationId]
if isAuthStatusUpdateReqPresent && isAuthStatusUpdateActive {
Expand All @@ -266,6 +264,11 @@
h.Cache.Delete(params.RudderAccountID)
h.CacheMutex.Unlock(params.RudderAccountID)
}()
defer func() {
statsHandler.SendTiming(authErrHandlerTimeStart, "total_latency", stats.Tags{
"isCallToCpApi": "false",
})
}()

authStatusToggleUrl := fmt.Sprintf("%s/workspaces/%s/destinations/%s/authStatus/toggle", h.ConfigBEURL, params.WorkspaceID, destinationId)

Expand All @@ -278,14 +281,15 @@
RequestType: action,
BasicAuthUser: h.Identity(),
}
authStatusToggleStats.statName = GetOAuthActionStatName("request_sent")
authStatusToggleStats.isCallToCpApi = true
authStatusToggleStats.SendCountStat()
statsHandler.Increment("request_sent", stats.Tags{
"isCallToCpApi": "true",
})

cpiCallStartTime := time.Now()
statusCode, respBody = h.CpConn.CpApiCall(authStatusInactiveCpReq)
authStatusToggleStats.statName = GetOAuthActionStatName("request_latency")
authStatusToggleStats.SendTimerStats(cpiCallStartTime)
statsHandler.SendTiming(cpiCallStartTime, "request_latency", stats.Tags{
"isCallToCpApi": "true",
})
h.Logger.Debugn("[request] :: Response from CP for auth status inactive req",
logger.NewIntField("StatusCode", int64(statusCode)),
logger.NewStringField("Response", respBody))
Expand All @@ -299,18 +303,20 @@
} else {
msg = fmt.Sprintf("Could not update authStatus to inactive for destination: %v", authStatusToggleRes.Message)
}
authStatusToggleStats.statName = GetOAuthActionStatName("request")
authStatusToggleStats.errorMessage = msg
authStatusToggleStats.SendCountStat()
statsHandler.Increment("request", stats.Tags{
"errorMessage": msg,
"isCallToCpApi": "true",
})
return http.StatusBadRequest, ErrPermissionOrTokenRevoked.Error()
}
h.Logger.Debugn("[request] :: (Write) auth status inactive Response received",
logger.NewIntField("StatusCode", int64(statusCode)),
logger.NewStringField("Response", respBody))
authStatusToggleStats.statName = GetOAuthActionStatName("request")
authStatusToggleStats.errorMessage = ""
authStatusToggleStats.SendCountStat()

statsHandler.Increment("request", stats.Tags{
"errorMessage": "",
"isCallToCpApi": "true",
})
return http.StatusBadRequest, ErrPermissionOrTokenRevoked.Error()
}

Expand Down Expand Up @@ -342,14 +348,15 @@
// This method hits the Control Plane to get the account information
// As well update the account information into the destAuthInfoMap(which acts as an in-memory cache)
func (h *OAuthHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams, refTokenBody RefreshTokenBodyParams,
authStats *OAuthStats, logTypeName string,
statsHandler OAuthStatsHandler, logTypeName string,
) (int, *AuthResponse, error) {
actionType := strings.Join(strings.Fields(strings.ToLower(logTypeName)), "_")
Copy link
Contributor Author

@sanpj2292 sanpj2292 Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reference

> strings.Join(strings.Fields(strings.ToLower("Refresh Token")), "_")
refresh_token

refreshUrl := fmt.Sprintf("%s/destination/workspaces/%s/accounts/%s/token", h.ConfigBEURL, refTokenParams.WorkspaceID, refTokenParams.AccountID)
res, err := json.Marshal(refTokenBody)
if err != nil {
authStats.statName = GetOAuthActionStatName("request")
authStats.errorMessage = "error in marshalling refresh token body"
authStats.SendCountStat()
statsHandler.Increment("request", stats.Tags{
"errorMessage": "error in marshalling refresh token body",
})

Check warning on line 359 in services/oauth/v2/oauth.go

View check run for this annotation

Codecov / codecov/patch

services/oauth/v2/oauth.go#L357-L359

Added lines #L357 - L359 were not covered by tests
return http.StatusInternalServerError, nil, err
}
refreshCpReq := &controlplane.Request{
Expand All @@ -358,20 +365,21 @@
ContentType: "application/json; charset=utf-8",
Body: string(res),
DestName: refTokenParams.DestDefName,
RequestType: authStats.action,
RequestType: actionType,
BasicAuthUser: h.TokenProvider.Identity(),
}
var accountSecret AccountSecret
// Stat for counting number of Refresh Token endpoint calls
authStats.statName = GetOAuthActionStatName("request_sent")
authStats.isCallToCpApi = true
authStats.errorMessage = ""
authStats.SendCountStat()
statsHandler.Increment("request_sent", stats.Tags{
"isCallToCpApi": "true",
"errorMessage": "",
})

cpiCallStartTime := time.Now()
statusCode, response := h.CpConn.CpApiCall(refreshCpReq)
authStats.statName = GetOAuthActionStatName("request_latency")
authStats.SendTimerStats(cpiCallStartTime)
statsHandler.SendTiming(cpiCallStartTime, "request_latency", stats.Tags{
"isCallToCpApi": "true",
})

log := h.Logger.Withn(logger.NewIntField("StatusCode", int64(statusCode)),
logger.NewIntField("WorkerId", int64(refTokenParams.WorkerID)),
Expand All @@ -380,9 +388,10 @@

// Empty Refresh token response
if !routerutils.IsNotEmptyString(response) {
authStats.statName = GetOAuthActionStatName("request")
authStats.errorMessage = "Empty secret"
authStats.SendCountStat()
statsHandler.Increment("request", stats.Tags{
"errorMessage": "Empty secret",
"isCallToCpApi": "true",
})
// Setting empty accessToken value into in-memory auth info map(cache)
h.Logger.Debugn("Empty response from Control-Plane",
logger.NewStringField("Response", response),
Expand All @@ -398,19 +407,21 @@
Err: errType,
ErrorMessage: refErrMsg,
}
authStats.statName = GetOAuthActionStatName("request")
authStats.errorMessage = errType
authStats.SendCountStat()
statsHandler.Increment("request", stats.Tags{
"errorMessage": errType,
"isCallToCpApi": "true",
})
if authResponse.Err == common.RefTokenInvalidGrant {
// Should abort the event as refresh is not going to work
// until we have new refresh token for the account
return http.StatusBadRequest, authResponse, fmt.Errorf("invalid grant")
}
return http.StatusInternalServerError, authResponse, fmt.Errorf("error occurred while fetching/refreshing account info from CP: %s", refErrMsg)
}
authStats.statName = GetOAuthActionStatName("request")
authStats.errorMessage = ""
authStats.SendCountStat()
statsHandler.Increment("request", stats.Tags{
"errorMessage": "",
"isCallToCpApi": "true",
})
log.Debugn("[request] :: (Write) Account Secret received")
// Store expirationDate information
accountSecret.ExpirationDate = gjson.Get(response, "secret.expirationDate").String()
Expand Down
72 changes: 45 additions & 27 deletions services/oauth/v2/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v2

import (
"strconv"
"strings"
"time"

"github.com/rudderlabs/rudder-go-kit/stats"
Expand All @@ -18,40 +19,57 @@ type OAuthStats struct {
isCallToCpApi bool // is a call being made to control-plane APIs
authErrCategory string // for action=refresh_token -> REFRESH_TOKEN, for action=fetch_token -> "", for action=auth_status_inactive -> auth_status_inactive
destDefName string
isTokenFetch bool // This stats field is used to identify if a request to get token is arising from processor
flowType common.RudderFlow // delivery, delete
action string // refresh_token, fetch_token, auth_status_inactive
}

func (s *OAuthStats) SendTimerStats(startTime time.Time) {
statsTags := stats.Tags{
"id": s.id,
"workspaceId": s.workspaceID,
"rudderCategory": s.rudderCategory,
"isCallToCpApi": strconv.FormatBool(s.isCallToCpApi),
"authErrCategory": s.authErrCategory,
"destType": s.destDefName,
"flowType": string(s.flowType),
"action": s.action,
type OAuthStatsHandler struct {
stats stats.Stats
defaultTags stats.Tags
}

func GetDefaultTagsFromOAuthStats(oauthStats *OAuthStats) stats.Tags {
return stats.Tags{
"id": oauthStats.id,
"workspaceId": oauthStats.workspaceID,
"rudderCategory": "destination",
"isCallToCpApi": strconv.FormatBool(oauthStats.isCallToCpApi),
"authErrCategory": oauthStats.authErrCategory,
"destType": oauthStats.destDefName,
"flowType": string(oauthStats.flowType),
"action": oauthStats.action,
"oauthVersion": "v2",
}
s.stats.NewTaggedStat(s.statName, stats.TimerType, statsTags).SendTiming(time.Since(startTime))
}

// SendCountStat Send count type stats related to OAuth(Destination)
func (s *OAuthStats) SendCountStat() {
statsTags := stats.Tags{
"oauthVersion": "v2",
"id": s.id,
"workspaceId": s.workspaceID,
"rudderCategory": s.rudderCategory,
"errorMessage": s.errorMessage,
"isCallToCpApi": strconv.FormatBool(s.isCallToCpApi),
"authErrCategory": s.authErrCategory,
"destType": s.destDefName,
"isTokenFetch": strconv.FormatBool(s.isTokenFetch),
"flowType": string(s.flowType),
"action": s.action,
func NewStatsHandlerFromOAuthStats(oauthStats *OAuthStats) OAuthStatsHandler {
defaultTags := GetDefaultTagsFromOAuthStats(oauthStats)
return OAuthStatsHandler{
stats: oauthStats.stats,
defaultTags: defaultTags,
}
}

func (m *OAuthStatsHandler) mergeTags(tags stats.Tags) stats.Tags {
allTags := m.defaultTags
for key, value := range tags {
allTags[key] = value
}
s.stats.NewTaggedStat(s.statName, stats.CountType, statsTags).Increment()
return allTags
}
sanpj2292 marked this conversation as resolved.
Show resolved Hide resolved

func (m *OAuthStatsHandler) getStatName(suffix string) string {
return strings.Join([]string{"oauth_action", suffix}, "_")
}
sanpj2292 marked this conversation as resolved.
Show resolved Hide resolved

func (m *OAuthStatsHandler) Increment(statSuffix string, tags stats.Tags) {
statName := m.getStatName(statSuffix)
allTags := m.mergeTags(tags)
m.stats.NewTaggedStat(statName, stats.CountType, allTags).Increment()
}

func (m *OAuthStatsHandler) SendTiming(startTime time.Time, statSuffix string, tags stats.Tags) {
statName := m.getStatName(statSuffix)
allTags := m.mergeTags(tags)
m.stats.NewTaggedStat(statName, stats.TimerType, allTags).SendTiming(time.Since(startTime))
}
13 changes: 7 additions & 6 deletions services/oauth/v2/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"fmt"
"time"

"github.com/rudderlabs/rudder-go-kit/stats"
routerutils "github.com/rudderlabs/rudder-server/router/utils"
"github.com/rudderlabs/rudder-server/services/oauth/v2/common"
"github.com/rudderlabs/rudder-server/utils/misc"
Expand All @@ -21,8 +22,8 @@
return fmt.Sprintf("oauth_action_%v", stat)
}

func checkIfTokenExpired(secret AccountSecret, oldSecret json.RawMessage, expiryTimeDiff time.Duration, stats *OAuthStats) bool {
if secret.ExpirationDate != "" && isTokenExpired(secret.ExpirationDate, expiryTimeDiff, stats) {
func checkIfTokenExpired(secret AccountSecret, oldSecret json.RawMessage, expiryTimeDiff time.Duration, statsHandler OAuthStatsHandler) bool {
if secret.ExpirationDate != "" && isTokenExpired(secret.ExpirationDate, expiryTimeDiff, &statsHandler) {
return true
}
if !routerutils.IsNotEmptyString(string(oldSecret)) {
Expand All @@ -31,12 +32,12 @@
return bytes.Equal(secret.Secret, oldSecret)
}

func isTokenExpired(expirationDate string, expirationTimeDiff time.Duration, stats *OAuthStats) bool {
func isTokenExpired(expirationDate string, expirationTimeDiff time.Duration, statsHandler *OAuthStatsHandler) bool {
date, err := time.Parse(misc.RFC3339Milli, expirationDate)
if err != nil {
stats.errorMessage = "parsing failed"
stats.statName = GetOAuthActionStatName("proactive_token_refresh")
stats.SendCountStat()
statsHandler.Increment("proactive_token_refresh", stats.Tags{
"errorMessage": "parsing failed",
})

Check warning on line 40 in services/oauth/v2/utils.go

View check run for this annotation

Codecov / codecov/patch

services/oauth/v2/utils.go#L38-L40

Added lines #L38 - L40 were not covered by tests
return false
}
return date.Before(time.Now().Add(expirationTimeDiff))
Expand Down
Loading