Skip to content

Commit

Permalink
chore: reduce metrics cardinality (#5222)
Browse files Browse the repository at this point in the history
* chore: reduce metrics cardinality

* chore: remove sourceID from gw pickup lag
  • Loading branch information
mihir20 authored Oct 28, 2024
1 parent ad598b0 commit 18c102f
Show file tree
Hide file tree
Showing 13 changed files with 24 additions and 30 deletions.
6 changes: 3 additions & 3 deletions archiver/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (w *worker) Stop() {
}

func (w *worker) uploadJobs(ctx context.Context, jobs []*jobsdb.JobT) (string, error) {
defer w.uploadLimiter.Begin(w.sourceID)()
defer w.uploadLimiter.Begin("")()
firstJobCreatedAt := jobs[0].CreatedAt.UTC()
lastJobCreatedAt := jobs[len(jobs)-1].CreatedAt.UTC()
workspaceID := jobs[0].WorkspaceId
Expand Down Expand Up @@ -212,7 +212,7 @@ func (w *worker) uploadJobs(ctx context.Context, jobs []*jobsdb.JobT) (string, e
}

func (w *worker) getJobs() ([]*jobsdb.JobT, bool, error) {
defer w.fetchLimiter.Begin(w.sourceID)()
defer w.fetchLimiter.Begin("")()
params := w.queryParams
params.PayloadSizeLimit = w.payloadLimitFunc(w.config.payloadLimit())
params.EventsLimit = w.config.eventsLimit()
Expand Down Expand Up @@ -242,7 +242,7 @@ func marshalJob(job *jobsdb.JobT) ([]byte, error) {
func (w *worker) markStatus(
jobs []*jobsdb.JobT, state string, response []byte,
) error {
defer w.updateLimiter.Begin(w.sourceID)()
defer w.updateLimiter.Begin("")()
workspaceID := jobs[0].WorkspaceId
if err := misc.RetryWithNotify(
w.lifecycle.ctx,
Expand Down
6 changes: 3 additions & 3 deletions enterprise/reporting/error_index/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (w *worker) Work() bool {
}

func (w *worker) fetchJobs() (jobsdb.JobsResult, error) {
defer w.limiter.fetch.Begin(w.sourceID)()
defer w.limiter.fetch.Begin("")()

return w.jobsDB.GetUnprocessed(w.lifecycle.ctx, jobsdb.GetQueryParams{
ParameterFilters: []jobsdb.ParameterFilterT{
Expand All @@ -172,7 +172,7 @@ func (w *worker) fetchJobs() (jobsdb.JobsResult, error) {
// It aggregates payloads from a list of jobs, applies transformations if needed,
// uploads the payloads, and returns the concatenated locations of the uploaded files.
func (w *worker) uploadJobs(ctx context.Context, jobs []*jobsdb.JobT) ([]*jobsdb.JobStatusT, error) {
defer w.limiter.upload.Begin(w.sourceID)()
defer w.limiter.upload.Begin("")()

jobWithPayloadsMap := make(map[string][]jobWithPayload)
for _, job := range jobs {
Expand Down Expand Up @@ -286,7 +286,7 @@ func (w *worker) encodeToParquet(wr io.Writer, payloads []payload) error {

// markJobsStatus marks the status of the jobs in the erridx jobsDB.
func (w *worker) markJobsStatus(statusList []*jobsdb.JobStatusT) error {
defer w.limiter.update.Begin(w.sourceID)()
defer w.limiter.update.Begin("")()

err := misc.RetryWithNotify(
w.lifecycle.ctx,
Expand Down
2 changes: 1 addition & 1 deletion gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1832,7 +1832,7 @@ var _ = Describe("Gateway", func() {
Expect(statStore.GetByName("gateway.event_pickup_lag_seconds")).To(Equal([]memstats.Metric{
{
Name: "gateway.event_pickup_lag_seconds",
Tags: map[string]string{"sourceId": SourceIDEnabled, "workspaceId": WorkspaceID},
Tags: map[string]string{"workspaceId": WorkspaceID},
Durations: []time.Duration{
time.Second,
},
Expand Down
1 change: 0 additions & 1 deletion gateway/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,6 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
}

gw.stats.NewTaggedStat("gateway.event_pickup_lag_seconds", stats.TimerType, stats.Tags{
"sourceId": msg.Properties.SourceID,
"workspaceId": msg.Properties.WorkspaceID,
}).Since(msg.Properties.ReceivedAt)

Expand Down
4 changes: 0 additions & 4 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1762,7 +1762,6 @@ func (jd *Handle) GetToProcess(ctx context.Context, params GetQueryParams, more
tags := statTags{
StateFilters: params.stateFilters,
CustomValFilters: params.CustomValFilters,
ParameterFilters: params.ParameterFilters,
WorkspaceID: params.WorkspaceID,
}
command := func() moreQueryResult {
Expand Down Expand Up @@ -2044,7 +2043,6 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param
tags := statTags{
StateFilters: stateFilters,
CustomValFilters: params.CustomValFilters,
ParameterFilters: params.ParameterFilters,
WorkspaceID: workspaceID,
}

Expand Down Expand Up @@ -3055,7 +3053,6 @@ func (jd *Handle) getJobs(ctx context.Context, params GetQueryParams, more MoreT
tags := &statTags{
StateFilters: params.stateFilters,
CustomValFilters: params.CustomValFilters,
ParameterFilters: params.ParameterFilters,
WorkspaceID: params.WorkspaceID,
}
defer jd.getTimerStat(
Expand Down Expand Up @@ -3165,7 +3162,6 @@ func (jd *Handle) GetJobs(ctx context.Context, states []string, params GetQueryP
tags := statTags{
StateFilters: params.stateFilters,
CustomValFilters: params.CustomValFilters,
ParameterFilters: params.ParameterFilters,
WorkspaceID: params.WorkspaceID,
}
command := func() queryResult {
Expand Down
8 changes: 4 additions & 4 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2200,7 +2200,7 @@ func (proc *Handle) generateTransformationMessage(preTrans *preTransformationMes

func (proc *Handle) processJobsForDest(partition string, subJobs subJob) (*transformationMessage, error) {
if proc.limiter.preprocess != nil {
defer proc.limiter.preprocess.BeginWithPriority(partition, proc.getLimiterPriority(partition))()
defer proc.limiter.preprocess.BeginWithPriority("", proc.getLimiterPriority(partition))()
}

jobList := subJobs.subJobs
Expand Down Expand Up @@ -2533,7 +2533,7 @@ type transformationMessage struct {

func (proc *Handle) transformations(partition string, in *transformationMessage) *storeMessage {
if proc.limiter.transform != nil {
defer proc.limiter.transform.BeginWithPriority(partition, proc.getLimiterPriority(partition))()
defer proc.limiter.transform.BeginWithPriority("", proc.getLimiterPriority(partition))()
}
// Now do the actual transformation. We call it in batches, once
// for each destination ID
Expand Down Expand Up @@ -2721,7 +2721,7 @@ func (proc *Handle) Store(partition string, in *storeMessage) {
}

if proc.limiter.store != nil {
defer proc.limiter.store.BeginWithPriority(partition, proc.getLimiterPriority(partition))()
defer proc.limiter.store.BeginWithPriority("", proc.getLimiterPriority(partition))()
}

statusList, destJobs, batchDestJobs := in.statusList, in.destJobs, in.batchDestJobs
Expand Down Expand Up @@ -3460,7 +3460,7 @@ func ConvertToFilteredTransformerResponse(

func (proc *Handle) getJobs(partition string) jobsdb.JobsResult {
if proc.limiter.read != nil {
defer proc.limiter.read.BeginWithPriority(partition, proc.getLimiterPriority(partition))()
defer proc.limiter.read.BeginWithPriority("", proc.getLimiterPriority(partition))()
}

s := time.Now()
Expand Down
8 changes: 4 additions & 4 deletions processor/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (*mockWorkerHandle) stats() *processorStats {

func (m *mockWorkerHandle) getJobs(partition string) jobsdb.JobsResult {
if m.limiters.query != nil {
defer m.limiters.query.Begin(partition)()
defer m.limiters.query.Begin("")()
}
m.statsMu.Lock()
defer m.statsMu.Unlock()
Expand Down Expand Up @@ -365,7 +365,7 @@ func (m *mockWorkerHandle) jobSplitter(jobs []*jobsdb.JobT, rsourcesStats rsourc

func (m *mockWorkerHandle) processJobsForDest(partition string, subJobs subJob) (*transformationMessage, error) {
if m.limiters.process != nil {
defer m.limiters.process.Begin(partition)()
defer m.limiters.process.Begin("")()
}
m.statsMu.Lock()
defer m.statsMu.Unlock()
Expand All @@ -386,7 +386,7 @@ func (m *mockWorkerHandle) processJobsForDest(partition string, subJobs subJob)

func (m *mockWorkerHandle) transformations(partition string, in *transformationMessage) *storeMessage {
if m.limiters.transform != nil {
defer m.limiters.transform.Begin(partition)()
defer m.limiters.transform.Begin("")()
}
m.statsMu.Lock()
defer m.statsMu.Unlock()
Expand All @@ -404,7 +404,7 @@ func (m *mockWorkerHandle) transformations(partition string, in *transformationM

func (m *mockWorkerHandle) Store(partition string, in *storeMessage) {
if m.limiters.store != nil {
defer m.limiters.store.Begin(partition)()
defer m.limiters.store.Begin("")()
}
m.statsMu.Lock()
defer m.statsMu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion router/batchrouter/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob
return
}

defer brt.limiter.read.Begin(partition)()
defer brt.limiter.read.Begin("")()

brt.configSubscriberMu.RLock()
destinationsMap := brt.destinationsMap
Expand Down
4 changes: 2 additions & 2 deletions router/batchrouter/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (w *worker) Work() bool {
func (w *worker) processJobAsync(jobsWg *sync.WaitGroup, destinationJobs *DestinationJobs) {
brt := w.brt
rruntime.Go(func() {
defer brt.limiter.process.Begin(w.partition)()
defer brt.limiter.process.Begin("")()
defer jobsWg.Done()
destWithSources := destinationJobs.destWithSources
parameterFilters := []jobsdb.ParameterFilterT{{Name: "destination_id", Value: destWithSources.Destination.ID}}
Expand Down Expand Up @@ -207,7 +207,7 @@ func (w *worker) processJobAsync(jobsWg *sync.WaitGroup, destinationJobs *Destin
continue
}
rruntime.Go(func() {
defer brt.limiter.upload.Begin(w.partition)()
defer brt.limiter.upload.Begin("")()
switch {
case IsObjectStorageDestination(brt.destType):
destUploadStat := stats.Default.NewStat(fmt.Sprintf(`batch_router.%s_dest_upload_time`, brt.destType), stats.TimerType)
Expand Down
2 changes: 1 addition & 1 deletion router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke
var discardedCount int
limiter := rt.limiter.pickup
limiterStats := rt.limiter.stats.pickup
limiterEnd := limiter.BeginWithPriority(partition, LimiterPriorityValueFrom(limiterStats.Score(partition), 100))
limiterEnd := limiter.BeginWithPriority("", LimiterPriorityValueFrom(limiterStats.Score(partition), 100))
defer limiterEnd()

defer func() {
Expand Down
3 changes: 1 addition & 2 deletions router/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,7 @@ func (rt *Handle) Setup(
allPStats := pstats.All()
for _, pstat := range allPStats {
statTags := stats.Tags{
"destType": rt.destType,
"partition": pstat.Partition,
"destType": rt.destType,
}
stats.Default.NewTaggedStat("rt_"+key+"_limiter_stats_throughput", stats.GaugeType, statTags).Gauge(pstat.Throughput)
stats.Default.NewTaggedStat("rt_"+key+"_limiter_stats_errors", stats.GaugeType, statTags).Gauge(pstat.Errors)
Expand Down
2 changes: 1 addition & 1 deletion router/partition_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type partitionWorker struct {
func (pw *partitionWorker) Work() bool {
start := time.Now()
pw.pickupCount, pw.limitsReached = pw.rt.pickup(pw.ctx, pw.partition, pw.workers)
stats.Default.NewTaggedStat("router_generator_loop", stats.TimerType, stats.Tags{"destType": pw.rt.destType, "partition": pw.partition}).Since(start)
stats.Default.NewTaggedStat("router_generator_loop", stats.TimerType, stats.Tags{"destType": pw.rt.destType}).Since(start)
stats.Default.NewTaggedStat("router_generator_events", stats.CountType, stats.Tags{"destType": pw.rt.destType, "partition": pw.partition}).Count(pw.pickupCount)
worked := pw.pickupCount > 0
if worked && !pw.limitsReached { // sleep only if we worked and we didn't reach the limits
Expand Down
6 changes: 3 additions & 3 deletions router/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (w *worker) transform(routerJobs []types.RouterJobT) []types.DestinationJob
start := time.Now()
limiter := w.rt.limiter.transform
limiterStats := w.rt.limiter.stats.transform
defer limiter.BeginWithPriority(w.partition, LimiterPriorityValueFrom(limiterStats.Score(w.partition), 100))()
defer limiter.BeginWithPriority("", LimiterPriorityValueFrom(limiterStats.Score(w.partition), 100))()
defer func() {
limiterStats.Update(w.partition, time.Since(start), len(routerJobs), 0)
}()
Expand Down Expand Up @@ -322,7 +322,7 @@ func (w *worker) batchTransform(routerJobs []types.RouterJobT) []types.Destinati
start := time.Now()
limiter := w.rt.limiter.batch
limiterStats := w.rt.limiter.stats.batch
defer limiter.BeginWithPriority(w.partition, LimiterPriorityValueFrom(limiterStats.Score(w.partition), 100))()
defer limiter.BeginWithPriority("", LimiterPriorityValueFrom(limiterStats.Score(w.partition), 100))()
defer func() {
limiterStats.Update(w.partition, time.Since(start), len(routerJobs), 0)
}()
Expand Down Expand Up @@ -372,7 +372,7 @@ func (w *worker) processDestinationJobs() {
var successCount, errorCount int
limiter := w.rt.limiter.process
limiterStats := w.rt.limiter.stats.process
defer limiter.BeginWithPriority(w.partition, LimiterPriorityValueFrom(limiterStats.Score(w.partition), 100))()
defer limiter.BeginWithPriority("", LimiterPriorityValueFrom(limiterStats.Score(w.partition), 100))()
defer func() {
limiterStats.Update(w.partition, time.Since(start), successCount+errorCount, errorCount)
}()
Expand Down

0 comments on commit 18c102f

Please sign in to comment.