Skip to content

Commit

Permalink
add label checks
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Oct 8, 2024
1 parent b733e44 commit 5207d8d
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 40 deletions.
19 changes: 16 additions & 3 deletions pkg/costattribution/caimpl/managerImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@ func (m *ManagerImpl) UpdateAttributionTimestamp(user string, calb string, lbs l
}

// SetActiveSeries adjust the input attribution and sets the active series gauge for the given user and attribution
func (m *ManagerImpl) SetActiveSeries(userID, attribution string, value float64) {
func (m *ManagerImpl) SetActiveSeries(userID, calb, attribution string, value float64) {
// if the input label is outdated, we skip the update
if calb != m.GetUserAttributionLabel(userID) {
return
}
attribution = m.adjustUserAttribution(userID, attribution)

m.attributionTracker.mu.Lock()
Expand All @@ -105,7 +109,12 @@ func (m *ManagerImpl) SetActiveSeries(userID, attribution string, value float64)
}

// IncrementDiscardedSamples increments the discarded samples counter for a given user and attribution
func (m *ManagerImpl) IncrementDiscardedSamples(userID, attribution string, value float64) {
func (m *ManagerImpl) IncrementDiscardedSamples(userID, calb, attribution string, value float64) {
// if the input label is outdated, we skip the update
if calb != m.GetUserAttributionLabel(userID) {
return
}

attribution = m.adjustUserAttribution(userID, attribution)
m.attributionTracker.mu.RLock()
defer m.attributionTracker.mu.RUnlock()
Expand All @@ -115,7 +124,11 @@ func (m *ManagerImpl) IncrementDiscardedSamples(userID, attribution string, valu
}

// IncrementReceivedSamples increments the received samples counter for a given user and attribution
func (m *ManagerImpl) IncrementReceivedSamples(userID, attribution string, value float64) {
func (m *ManagerImpl) IncrementReceivedSamples(userID, calb, attribution string, value float64) {
// if the input label is outdated, we skip the update
if calb != m.GetUserAttributionLabel(userID) {
return
}
attribution = m.adjustUserAttribution(userID, attribution)
m.attributionTracker.mu.RLock()
defer m.attributionTracker.mu.RUnlock()
Expand Down
14 changes: 7 additions & 7 deletions pkg/costattribution/caimpl/managerImpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func Test_SetActiveSeries(t *testing.T) {
lbls.Set("team", "foo")
isOutdated, val := manager.UpdateAttributionTimestamp(userID, "team", lbls.Labels(), time.Unix(0, 0))
assert.False(t, isOutdated)
manager.SetActiveSeries(userID, val, 1.0)
manager.SetActiveSeries(userID, "team", val, 1.0)
expectedMetrics := `
# HELP cortex_ingester_active_series_attribution The total number of active series per user and attribution.
# TYPE cortex_ingester_active_series_attribution gauge
Expand All @@ -135,12 +135,12 @@ func Test_SetActiveSeries(t *testing.T) {
lbls.Set("department", "bar")
isOutdated, val := manager.UpdateAttributionTimestamp(userID, "department", lbls.Labels(), time.Unix(0, 0))
assert.False(t, isOutdated)
manager.SetActiveSeries(userID, val, 2.0)
manager.SetActiveSeries(userID, "department", val, 2.0)

lbls.Set("department", "baz")
isOutdated, val = manager.UpdateAttributionTimestamp(userID, "team", lbls.Labels(), time.Unix(0, 0))
assert.True(t, isOutdated)
manager.SetActiveSeries(userID, val, 3.0)
isOutdated, val = manager.UpdateAttributionTimestamp(userID, "department", lbls.Labels(), time.Unix(1, 0))
assert.False(t, isOutdated)
manager.SetActiveSeries(userID, "department", val, 3.0)

expectedMetrics := `
# HELP cortex_ingester_active_series_attribution The total number of active series per user and attribution.
Expand Down Expand Up @@ -170,7 +170,7 @@ func Test_SetActiveSeries(t *testing.T) {
manager.attributionTracker.limits = overrides
isOutdated, val := manager.UpdateAttributionTimestamp(userID, "department", lbls.Labels(), time.Unix(5, 0))
assert.False(t, isOutdated)
manager.SetActiveSeries(userID, val, 3.0)
manager.SetActiveSeries(userID, val, "department", 3.0)

expectedMetrics := `
# HELP cortex_ingester_active_series_attribution The total number of active series per user and attribution.
Expand All @@ -188,7 +188,7 @@ func Test_SetActiveSeries(t *testing.T) {
lbls.Set("department", "bar")
isOutdated, val := manager.UpdateAttributionTimestamp(userID, "department", lbls.Labels(), time.Unix(0, 0))
assert.False(t, isOutdated)
manager.SetActiveSeries(userID, val, 4.0)
manager.SetActiveSeries(userID, val, "department", 4.0)

expectedMetrics := `
# HELP cortex_ingester_active_series_attribution The total number of active series per user and attribution.
Expand Down
3 changes: 3 additions & 0 deletions pkg/costattribution/caimpl/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@ func newTracker(trackedLabel string, limit int) (*Tracker, error) {
attributionLimit: limit,
attributionTimestamps: map[string]*atomic.Int64{},
coolDownDeadline: atomic.NewInt64(0),
//nolint:faillint // the metrics are registered in the mimir package
discardedSampleAttribution: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "cortex_discarded_samples_attribution_total",
Help: "The total number of samples that were discarded per attribution.",
}, []string{"user", trackedLabel}),
//nolint:faillint
receivedSamplesAttribution: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "cortex_received_samples_attribution_total",
Help: "The total number of samples that were received per attribution.",
}, []string{"user", trackedLabel}),
//nolint:faillint
activeSeriesPerUserAttribution: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_ingester_active_series_attribution",
Help: "The total number of active series per user and attribution.",
Expand Down
4 changes: 0 additions & 4 deletions pkg/costattribution/caimpl/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,4 @@ func Test_NewTracker(t *testing.T) {

// Clean the tracker for the user attribution
tracker.cleanupTrackerAttribution(userID, attribution)

// Verify that metrics have been cleaned
expectedMetrics = ``
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...))
}
6 changes: 3 additions & 3 deletions pkg/costattribution/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ type Manager interface {
GetUserAttributionLabel(userID string) string
GetUserAttributionLimit(userID string) int
UpdateAttributionTimestamp(user string, calb string, lbs labels.Labels, now time.Time) (bool, string)
SetActiveSeries(userID, attribution string, value float64)
IncrementDiscardedSamples(userID, attribution string, value float64)
IncrementReceivedSamples(userID, attribution string, value float64)
SetActiveSeries(userID, calb string, attribution string, value float64)
IncrementDiscardedSamples(userID, calb string, attribution string, value float64)
IncrementReceivedSamples(userID, calb string, attribution string, value float64)

Collect(out chan<- prometheus.Metric)
Describe(chan<- *prometheus.Desc)
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1692,8 +1692,8 @@ func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID st
}
receivedMetadata = len(req.Metadata)
if caEnabled {
for lv, count := range costAttribution {
d.costAttributionMng.IncrementReceivedSamples(userID, lv, float64(count))
for value, count := range costAttribution {
d.costAttributionMng.IncrementReceivedSamples(userID, caLabel, value, float64(count))
}
}
d.receivedSamples.WithLabelValues(userID).Add(float64(receivedSamples))
Expand Down
16 changes: 5 additions & 11 deletions pkg/ingester/activeseries/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,26 +231,20 @@ func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, tot
return
}

func (c *ActiveSeries) ActiveByAttributionValue() map[string]uint32 {
func (c *ActiveSeries) ActiveByAttributionValue(calb string) map[string]uint32 {
total := make(map[string]uint32, c.costAttributionMng.GetUserAttributionLimit(c.userID))
for s := 0; s < numStripes; s++ {
c.stripes[s].mu.RLock()
for k, v := range c.stripes[s].costAttributionValues {
total[k] += v
if c.stripes[s].caLabel == calb {
for k, v := range c.stripes[s].costAttributionValues {
total[k] += v
}
}
c.stripes[s].mu.RUnlock()
}
return total
}

func (c *ActiveSeries) ResetAttribution() {
for s := 0; s < numStripes; s++ {
c.stripes[s].mu.Lock()
c.stripes[s].costAttributionValues = map[string]uint32{}
c.stripes[s].mu.Unlock()
}
}

func (c *ActiveSeries) Delete(ref chunks.HeadSeriesRef) {
stripeID := storage.SeriesRef(ref) % numStripes
c.stripes[stripeID].remove(storage.SeriesRef(ref))
Expand Down
17 changes: 7 additions & 10 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,6 @@ type Ingester struct {
usersMetadataMtx sync.RWMutex
usersMetadata map[string]*userMetricsMetadata

// For storing tenant current cost attribution labels.
costAttributionMtx sync.RWMutex
costAttributionlbs map[string]string

// Rate of pushed samples. Used to limit global samples push rate.
ingestionRate *util_math.EwmaRate
inflightPushRequests atomic.Int64
Expand Down Expand Up @@ -794,14 +790,15 @@ func (i *Ingester) updateActiveSeries(now time.Time) {
i.metrics.activeSeriesLoading.DeleteLabelValues(userID)
if allActive > 0 {
if i.isCostAttributionEnabledForUser(userID) {
labelAttributions := userDB.activeSeries.ActiveByAttributionValue()
for label, count := range labelAttributions {
i.costAttributionMng.SetActiveSeries(userID, label, float64(count))
calb := i.costAttributionMng.GetUserAttributionLabel(userID)
labelAttributions := userDB.activeSeries.ActiveByAttributionValue(calb)
for value, count := range labelAttributions {
i.costAttributionMng.SetActiveSeries(userID, calb, value, float64(count))
}
}
i.metrics.activeSeriesPerUser.WithLabelValues(userID).Set(float64(allActive))
} else {
i.metrics.activeSeriesPerUser.DeletePartialMatch(prometheus.Labels{"user": userID})
i.metrics.activeSeriesPerUser.DeleteLabelValues(userID)
}
if allActiveHistograms > 0 {
i.metrics.activeSeriesPerUserNativeHistograms.WithLabelValues(userID).Set(float64(allActiveHistograms))
Expand Down Expand Up @@ -1288,8 +1285,8 @@ func (i *Ingester) updateMetricsFromPushStats(userID string, group string, stats
}
}
if i.isCostAttributionEnabledForUser(userID) {
for label, count := range stats.failedSamplesAttribution {
i.costAttributionMng.IncrementDiscardedSamples(userID, label, float64(count))
for value, count := range stats.failedSamplesAttribution {
i.costAttributionMng.IncrementDiscardedSamples(userID, stats.attributionLabel, value, float64(count))
}
}
}
Expand Down

0 comments on commit 5207d8d

Please sign in to comment.