diff --git a/pkg/costattribution/manager.go b/pkg/costattribution/manager.go index 0aeae09fcd..ba904e3a98 100644 --- a/pkg/costattribution/manager.go +++ b/pkg/costattribution/manager.go @@ -46,10 +46,10 @@ func (m *Manager) EnabledForUser(userID string) bool { return m.attributionTracker.limits.CostAttributionLabel(userID) != "" } -// GetUserAttributionLabel returns the cost attribution label for the user, first it will try to get the label from the cache, +// UserAttributionLabel returns the cost attribution label for the user, first it will try to get the label from the cache, // If not found, it will get the label from the config // If the user is not enabled for cost attribution, it would clean the cache and return empty string -func (m *Manager) GetUserAttributionLabel(userID string) string { +func (m *Manager) UserAttributionLabel(userID string) string { if m.EnabledForUser(userID) { return m.attributionTracker.getUserAttributionLabelFromCache(userID) } @@ -57,10 +57,10 @@ func (m *Manager) GetUserAttributionLabel(userID string) string { return "" } -// GetUserAttributionLimit returns the cost attribution limit for the user, first it will try to get the limit from the cache, +// UserAttributionLimit returns the cost attribution limit for the user, first it will try to get the limit from the cache, // If not found, it will get the limit from the config // If the user is not enabled for cost attribution, it would clean the cache and return 0 -func (m *Manager) GetUserAttributionLimit(userID string) int { +func (m *Manager) UserAttributionLimit(userID string) int { if m.EnabledForUser(userID) { return m.attributionTracker.getUserAttributionLimitFromCache(userID) } @@ -96,7 +96,7 @@ func (m *Manager) UpdateAttributionTimestamp(user string, calb string, lbs label // SetActiveSeries adjust the input attribution and sets the active series gauge for the given user and attribution func (m *Manager) SetActiveSeries(userID, calb, attribution string, value float64) { // if the input label is outdated, we skip the update - if calb != m.GetUserAttributionLabel(userID) { + if calb != m.UserAttributionLabel(userID) { return } attribution = m.adjustUserAttribution(userID, attribution) @@ -109,13 +109,13 @@ func (m *Manager) SetActiveSeries(userID, calb, attribution string, value float6 } // IncrementDiscardedSamples increments the discarded samples counter for a given user and attribution -func (m *Manager) IncrementDiscardedSamples(userID, calb, attribution string, value float64) { - // if the input label is outdated, we skip the update - if calb != m.GetUserAttributionLabel(userID) { +func (m *Manager) IncrementDiscardedSamples(userID string, lbs labels.Labels, value float64, now time.Time) { + if !m.EnabledForUser(userID) { return } + calb := m.UserAttributionLabel(userID) + _, attribution := m.UpdateAttributionTimestamp(userID, calb, lbs, now) - attribution = m.adjustUserAttribution(userID, attribution) m.attributionTracker.mu.RLock() defer m.attributionTracker.mu.RUnlock() if tracker, exists := m.attributionTracker.trackersByUserID[userID]; exists { @@ -124,12 +124,12 @@ func (m *Manager) IncrementDiscardedSamples(userID, calb, attribution string, va } // IncrementReceivedSamples increments the received samples counter for a given user and attribution -func (m *Manager) IncrementReceivedSamples(userID, calb, attribution string, value float64) { - // if the input label is outdated, we skip the update - if calb != m.GetUserAttributionLabel(userID) { +func (m *Manager) IncrementReceivedSamples(userID string, lbs labels.Labels, value float64, now time.Time) { + if !m.EnabledForUser(userID) { return } - attribution = m.adjustUserAttribution(userID, attribution) + calb := m.UserAttributionLabel(userID) + _, attribution := m.UpdateAttributionTimestamp(userID, calb, lbs, now) m.attributionTracker.mu.RLock() defer m.attributionTracker.mu.RUnlock() if tracker, exists := m.attributionTracker.trackersByUserID[userID]; exists { diff --git a/pkg/costattribution/manager_test.go b/pkg/costattribution/manager_test.go index 9108c46792..b24411615e 100644 --- a/pkg/costattribution/manager_test.go +++ b/pkg/costattribution/manager_test.go @@ -51,21 +51,21 @@ func Test_EnabledForUser(t *testing.T) { assert.False(t, manager.EnabledForUser("user4"), "Expected cost attribution to be disabled for user4") } -func Test_GetUserAttributionLabel(t *testing.T) { +func Test_UserAttributionLabel(t *testing.T) { manager := newTestManager() - assert.Equal(t, "team", manager.GetUserAttributionLabel("user1")) - assert.Equal(t, "", manager.GetUserAttributionLabel("user2")) - assert.Equal(t, "department", manager.GetUserAttributionLabel("user3")) + assert.Equal(t, "team", manager.UserAttributionLabel("user1")) + assert.Equal(t, "", manager.UserAttributionLabel("user2")) + assert.Equal(t, "department", manager.UserAttributionLabel("user3")) assert.Equal(t, 2, len(manager.attributionTracker.trackersByUserID)) assert.Equal(t, "team", manager.attributionTracker.trackersByUserID["user1"].trackedLabel) assert.Equal(t, "department", manager.attributionTracker.trackersByUserID["user3"].trackedLabel) } -func Test_GetUserAttributionLimit(t *testing.T) { +func Test_UserAttributionLimit(t *testing.T) { manager := newTestManager() - assert.Equal(t, 5, manager.GetUserAttributionLimit("user1")) - assert.Equal(t, 0, manager.GetUserAttributionLimit("user2")) - assert.Equal(t, 0, manager.GetUserAttributionLimit("user4")) + assert.Equal(t, 5, manager.UserAttributionLimit("user1")) + assert.Equal(t, 0, manager.UserAttributionLimit("user2")) + assert.Equal(t, 0, manager.UserAttributionLimit("user4")) } func Test_UpdateAttributionTimestamp(t *testing.T) { diff --git a/pkg/costattribution/tracker_group_test.go b/pkg/costattribution/tracker_group_test.go index 9c7f833009..787f17e1b0 100644 --- a/pkg/costattribution/tracker_group_test.go +++ b/pkg/costattribution/tracker_group_test.go @@ -64,7 +64,7 @@ func TestUpdateAttributionTimestampForUser(t *testing.T) { }) } -func TestGetUserAttributionLabel(t *testing.T) { +func TestUserAttributionLabel(t *testing.T) { cooldownTimeout := 10 * time.Second t.Run("Should return the cost attribution label for the user", func(t *testing.T) { // Create mock limits diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 7259e1d21a..300dbb086d 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -106,7 +106,7 @@ type Distributor struct { distributorsLifecycler *ring.BasicLifecycler distributorsRing *ring.Ring healthyInstancesCount *atomic.Uint32 - costAttributionMng *costattribution.Manager + costAttributionMgr *costattribution.Manager // For handling HA replicas. HATracker *haTracker @@ -307,7 +307,7 @@ func (m *PushMetrics) deleteUserMetrics(user string) { } // New constructs a new Distributor -func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionMng *costattribution.Manager, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) { +func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionMgr *costattribution.Manager, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) { clientMetrics := ingester_client.NewMetrics(reg) if cfg.IngesterClientFactory == nil { cfg.IngesterClientFactory = ring_client.PoolInstFunc(func(inst ring.InstanceDesc) (ring_client.PoolClient, error) { @@ -342,7 +342,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove healthyInstancesCount: atomic.NewUint32(0), limits: limits, HATracker: haTracker, - costAttributionMng: costAttributionMng, + costAttributionMgr: costAttributionMgr, ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ @@ -856,6 +856,10 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc { if errors.As(err, &tooManyClustersError{}) { d.discardedSamplesTooManyHaClusters.WithLabelValues(userID, group).Add(float64(numSamples)) + // here if it is a technical error, we don't want to increment the discarded samples counter + if d.costAttributionMgr != nil { + d.costAttributionMgr.IncrementDiscardedSamples(userID, mimirpb.FromLabelAdaptersToLabels(req.Timeseries[0].Labels), float64(numSamples), time.Now()) + } } return err @@ -1059,6 +1063,11 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc { // Errors in validation are considered non-fatal, as one series in a request may contain // invalid data but all the remaining series could be perfectly valid. if validationErr != nil { + // if the validation failed, we need to increment the discarded samples metric + if d.costAttributionMgr != nil { + d.costAttributionMgr.IncrementDiscardedSamples(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels), float64(len(ts.Samples)+len(ts.Histograms)), now) + } + if firstPartialErr == nil { // The series are never retained by validationErr. This is guaranteed by the way the latter is built. firstPartialErr = newValidationError(validationErr) @@ -1109,6 +1118,15 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc { totalN := validatedSamples + validatedExemplars + validatedMetadata if !d.ingestionRateLimiter.AllowN(now, userID, totalN) { + if d.costAttributionMgr != nil { + skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation() + for tsIdx, ts := range req.Timeseries { + if validationErr := d.validateSeries(now, &req.Timeseries[tsIdx], userID, group, skipLabelNameValidation, minExemplarTS, maxExemplarTS); validationErr != nil { + continue + } + d.costAttributionMgr.IncrementDiscardedSamples(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels), float64(len(ts.Samples)+len(ts.Histograms)), now) + } + } d.discardedSamplesRateLimited.WithLabelValues(userID, group).Add(float64(validatedSamples)) d.discardedExemplarsRateLimited.WithLabelValues(userID).Add(float64(validatedExemplars)) d.discardedMetadataRateLimited.WithLabelValues(userID).Add(float64(validatedMetadata)) @@ -1668,34 +1686,16 @@ func tokenForMetadata(userID string, metricName string) uint32 { func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string) { now := mtime.Now() var receivedSamples, receivedExemplars, receivedMetadata int - costattributionLimit := 0 - caEnabled := d.costAttributionMng != nil && d.costAttributionMng.EnabledForUser(userID) - caLabel := "" - if caEnabled { - costattributionLimit = d.costAttributionMng.GetUserAttributionLimit(userID) - caLabel = d.costAttributionMng.GetUserAttributionLabel(userID) - } - costAttribution := make(map[string]int, costattributionLimit) for _, ts := range req.Timeseries { receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms) receivedExemplars += len(ts.TimeSeries.Exemplars) - if caEnabled { - isKeyOutdated, attribution := d.costAttributionMng.UpdateAttributionTimestamp(userID, caLabel, mimirpb.FromLabelAdaptersToLabels(ts.Labels), now) - if isKeyOutdated { - // If the key is outdated, we need to reset cost attribution cache and update cost attribution label - costAttribution = make(map[string]int, costattributionLimit) - caLabel = d.costAttributionMng.GetUserAttributionLabel(userID) - } - costAttribution[attribution]++ + if d.costAttributionMgr != nil { + d.costAttributionMgr.IncrementReceivedSamples(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels), float64(receivedSamples), now) } } receivedMetadata = len(req.Metadata) - if caEnabled { - for value, count := range costAttribution { - d.costAttributionMng.IncrementReceivedSamples(userID, caLabel, value, float64(count)) - } - } + d.receivedSamples.WithLabelValues(userID).Add(float64(receivedSamples)) d.receivedExemplars.WithLabelValues(userID).Add(float64(receivedExemplars)) d.receivedMetadata.WithLabelValues(userID).Add(float64(receivedMetadata)) diff --git a/pkg/ingester/activeseries/active_series.go b/pkg/ingester/activeseries/active_series.go index b998b45611..938793bac9 100644 --- a/pkg/ingester/activeseries/active_series.go +++ b/pkg/ingester/activeseries/active_series.go @@ -50,7 +50,7 @@ type ActiveSeries struct { matchers *asmodel.Matchers lastMatchersUpdate time.Time - costAttributionMng *costattribution.Manager + costAttributionMgr *costattribution.Manager // The duration after which series become inactive. // Also used to determine if enough time has passed since configuration reload for valid results. @@ -68,7 +68,7 @@ type seriesStripe struct { // Updated in purge and when old timestamp is used when updating series (in this case, oldestEntryTs is updated // without holding the lock -- hence the atomic). oldestEntryTs atomic.Int64 - costAttributionMng *costattribution.Manager + costAttributionMgr *costattribution.Manager mu sync.RWMutex refs map[storage.SeriesRef]seriesEntry active uint32 // Number of active entries in this stripe. Only decreased during purge or clear. @@ -98,16 +98,16 @@ func NewActiveSeries( asm *asmodel.Matchers, timeout time.Duration, userID string, - costAttributionMng *costattribution.Manager, + costAttributionMgr *costattribution.Manager, ) *ActiveSeries { c := &ActiveSeries{ matchers: asm, timeout: timeout, userID: userID, - costAttributionMng: costAttributionMng, + costAttributionMgr: costAttributionMgr, } // Stripes are pre-allocated so that we only read on them and no lock is required. for i := 0; i < numStripes; i++ { - c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionMng) + c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionMgr) } return c @@ -124,7 +124,7 @@ func (c *ActiveSeries) ReloadMatchers(asm *asmodel.Matchers, now time.Time) { defer c.matchersMutex.Unlock() for i := 0; i < numStripes; i++ { - c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.costAttributionMng) + c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.costAttributionMgr) } c.matchers = asm c.lastMatchersUpdate = now @@ -232,7 +232,7 @@ func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, tot } func (c *ActiveSeries) ActiveByAttributionValue(calb string) map[string]uint32 { - total := make(map[string]uint32, c.costAttributionMng.GetUserAttributionLimit(c.userID)) + total := make(map[string]uint32, c.costAttributionMgr.UserAttributionLimit(c.userID)) for s := 0; s < numStripes; s++ { c.stripes[s].mu.RLock() if c.stripes[s].caLabel == calb { @@ -430,12 +430,12 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef // here if we have a cost attribution label, we can split the serie count based on the value of the label // we also set the reference to the value of the label in the entry, so when remove, we can decrease the counter accordingly - if s.costAttributionMng != nil && s.costAttributionMng.EnabledForUser(s.userID) { - isOutDated, attributionValue := s.costAttributionMng.UpdateAttributionTimestamp(s.userID, s.caLabel, series, time.Unix(0, nowNanos)) + if s.costAttributionMgr != nil && s.costAttributionMgr.EnabledForUser(s.userID) { + isOutDated, attributionValue := s.costAttributionMgr.UpdateAttributionTimestamp(s.userID, s.caLabel, series, time.Unix(0, nowNanos)) if isOutDated { // if the label is outdated, we need to remove the reference to the old value s.costAttributionValues = map[string]uint32{} - s.caLabel = s.costAttributionMng.GetUserAttributionLabel(s.userID) + s.caLabel = s.costAttributionMgr.UserAttributionLabel(s.userID) } s.costAttributionValues[attributionValue]++ e.calabel = s.caLabel @@ -469,7 +469,7 @@ func (s *seriesStripe) reinitialize( asm *asmodel.Matchers, deleted *deletedSeries, userID string, - costAttributionMng *costattribution.Manager, + costAttributionMgr *costattribution.Manager, ) { s.mu.Lock() defer s.mu.Unlock() @@ -485,7 +485,7 @@ func (s *seriesStripe) reinitialize( s.activeMatching = resizeAndClear(len(asm.MatcherNames()), s.activeMatching) s.activeMatchingNativeHistograms = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistograms) s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistogramBuckets) - s.costAttributionMng = costAttributionMng + s.costAttributionMgr = costAttributionMgr } func (s *seriesStripe) purge(keepUntil time.Time) { @@ -514,7 +514,7 @@ func (s *seriesStripe) purge(keepUntil time.Time) { s.deleted.purge(ref) } delete(s.refs, ref) - // here need to find what is deleted and decrement counters + // TODO: here need to find what is deleted and decrement counters continue } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 7a645440a8..8b51e68d86 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -311,7 +311,7 @@ type Ingester struct { activeGroups *util.ActiveGroupsCleanupService - costAttributionMng *costattribution.Manager + costAttributionMgr *costattribution.Manager tsdbMetrics *tsdbMetrics @@ -381,7 +381,7 @@ func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus } // New returns an Ingester that uses Mimir block storage. -func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionMng *costattribution.Manager, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { +func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionMgr *costattribution.Manager, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { i, err := newIngester(cfg, limits, registerer, logger) if err != nil { return nil, err @@ -389,7 +389,7 @@ func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, i.ingestionRate = util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval) i.metrics = newIngesterMetrics(registerer, cfg.ActiveSeriesMetrics.Enabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests, &i.inflightPushRequestsBytes) i.activeGroups = activeGroupsCleanupService - i.costAttributionMng = costAttributionMng + i.costAttributionMgr = costAttributionMgr // We create a circuit breaker, which will be activated on a successful completion of starting. i.circuitBreaker = newIngesterCircuitBreaker(i.cfg.PushCircuitBreaker, i.cfg.ReadCircuitBreaker, logger, registerer) @@ -790,10 +790,10 @@ func (i *Ingester) updateActiveSeries(now time.Time) { i.metrics.activeSeriesLoading.DeleteLabelValues(userID) if allActive > 0 { if i.isCostAttributionEnabledForUser(userID) { - calb := i.costAttributionMng.GetUserAttributionLabel(userID) + calb := i.costAttributionMgr.UserAttributionLabel(userID) labelAttributions := userDB.activeSeries.ActiveByAttributionValue(calb) for value, count := range labelAttributions { - i.costAttributionMng.SetActiveSeries(userID, calb, value, float64(count)) + i.costAttributionMgr.SetActiveSeries(userID, calb, value, float64(count)) } } i.metrics.activeSeriesPerUser.WithLabelValues(userID).Set(float64(allActive)) @@ -1286,13 +1286,13 @@ func (i *Ingester) updateMetricsFromPushStats(userID string, group string, stats } if i.isCostAttributionEnabledForUser(userID) { for value, count := range stats.failedSamplesAttribution { - i.costAttributionMng.IncrementDiscardedSamples(userID, stats.attributionLabel, value, float64(count)) + i.costAttributionMgr.IncrementDiscardedSamples(userID, stats.attributionLabel, value, float64(count)) } } } func (i *Ingester) isCostAttributionEnabledForUser(userID string) bool { - return i.costAttributionMng != nil && i.costAttributionMng.EnabledForUser(userID) + return i.costAttributionMgr != nil && i.costAttributionMgr.EnabledForUser(userID) } // pushSamplesToAppender appends samples and exemplars to the appender. Most errors are handled via updateFirstPartial function, @@ -1417,10 +1417,10 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre var caValue string // when cost attribution label is set if caEnabled { - isOutDated, caValue = i.costAttributionMng.UpdateAttributionTimestamp(userID, stats.attributionLabel, mimirpb.FromLabelAdaptersToLabels(ts.Labels), startAppend) + isOutDated, caValue = i.costAttributionMgr.UpdateAttributionTimestamp(userID, stats.attributionLabel, mimirpb.FromLabelAdaptersToLabels(ts.Labels), startAppend) // if the cost attribution label is outdated, we need to reset the attribution counter if isOutDated { - stats.attributionLabel = i.costAttributionMng.GetUserAttributionLabel(userID) + stats.attributionLabel = i.costAttributionMgr.UserAttributionLabel(userID) stats.failedSamplesAttribution = make(map[string]int, i.limits.MaxCostAttributionPerUser(userID)) } } @@ -2679,7 +2679,7 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD asmodel.NewMatchers(matchersConfig), i.cfg.ActiveSeriesMetrics.IdleTimeout, userID, - i.costAttributionMng, + i.costAttributionMgr, ), seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()), ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod),