Skip to content

Commit

Permalink
renaming and simplify distributor logic
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Oct 16, 2024
1 parent e55f83c commit b9508e0
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 69 deletions.
26 changes: 13 additions & 13 deletions pkg/costattribution/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,21 @@ func (m *Manager) EnabledForUser(userID string) bool {
return m.attributionTracker.limits.CostAttributionLabel(userID) != ""
}

// GetUserAttributionLabel returns the cost attribution label for the user, first it will try to get the label from the cache,
// UserAttributionLabel returns the cost attribution label for the user, first it will try to get the label from the cache,
// If not found, it will get the label from the config
// If the user is not enabled for cost attribution, it would clean the cache and return empty string
func (m *Manager) GetUserAttributionLabel(userID string) string {
func (m *Manager) UserAttributionLabel(userID string) string {
if m.EnabledForUser(userID) {
return m.attributionTracker.getUserAttributionLabelFromCache(userID)
}
m.attributionTracker.deleteUserTracerFromCache(userID)
return ""
}

// GetUserAttributionLimit returns the cost attribution limit for the user, first it will try to get the limit from the cache,
// UserAttributionLimit returns the cost attribution limit for the user, first it will try to get the limit from the cache,
// If not found, it will get the limit from the config
// If the user is not enabled for cost attribution, it would clean the cache and return 0
func (m *Manager) GetUserAttributionLimit(userID string) int {
func (m *Manager) UserAttributionLimit(userID string) int {
if m.EnabledForUser(userID) {
return m.attributionTracker.getUserAttributionLimitFromCache(userID)
}
Expand Down Expand Up @@ -96,7 +96,7 @@ func (m *Manager) UpdateAttributionTimestamp(user string, calb string, lbs label
// SetActiveSeries adjust the input attribution and sets the active series gauge for the given user and attribution
func (m *Manager) SetActiveSeries(userID, calb, attribution string, value float64) {
// if the input label is outdated, we skip the update
if calb != m.GetUserAttributionLabel(userID) {
if calb != m.UserAttributionLabel(userID) {
return
}
attribution = m.adjustUserAttribution(userID, attribution)
Expand All @@ -109,13 +109,13 @@ func (m *Manager) SetActiveSeries(userID, calb, attribution string, value float6
}

// IncrementDiscardedSamples increments the discarded samples counter for a given user and attribution
func (m *Manager) IncrementDiscardedSamples(userID, calb, attribution string, value float64) {
// if the input label is outdated, we skip the update
if calb != m.GetUserAttributionLabel(userID) {
func (m *Manager) IncrementDiscardedSamples(userID string, lbs labels.Labels, value float64, now time.Time) {
if !m.EnabledForUser(userID) {
return
}
calb := m.UserAttributionLabel(userID)
_, attribution := m.UpdateAttributionTimestamp(userID, calb, lbs, now)

attribution = m.adjustUserAttribution(userID, attribution)
m.attributionTracker.mu.RLock()
defer m.attributionTracker.mu.RUnlock()
if tracker, exists := m.attributionTracker.trackersByUserID[userID]; exists {
Expand All @@ -124,12 +124,12 @@ func (m *Manager) IncrementDiscardedSamples(userID, calb, attribution string, va
}

// IncrementReceivedSamples increments the received samples counter for a given user and attribution
func (m *Manager) IncrementReceivedSamples(userID, calb, attribution string, value float64) {
// if the input label is outdated, we skip the update
if calb != m.GetUserAttributionLabel(userID) {
func (m *Manager) IncrementReceivedSamples(userID string, lbs labels.Labels, value float64, now time.Time) {
if !m.EnabledForUser(userID) {
return
}
attribution = m.adjustUserAttribution(userID, attribution)
calb := m.UserAttributionLabel(userID)
_, attribution := m.UpdateAttributionTimestamp(userID, calb, lbs, now)
m.attributionTracker.mu.RLock()
defer m.attributionTracker.mu.RUnlock()
if tracker, exists := m.attributionTracker.trackersByUserID[userID]; exists {
Expand Down
16 changes: 8 additions & 8 deletions pkg/costattribution/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,21 @@ func Test_EnabledForUser(t *testing.T) {
assert.False(t, manager.EnabledForUser("user4"), "Expected cost attribution to be disabled for user4")
}

func Test_GetUserAttributionLabel(t *testing.T) {
func Test_UserAttributionLabel(t *testing.T) {
manager := newTestManager()
assert.Equal(t, "team", manager.GetUserAttributionLabel("user1"))
assert.Equal(t, "", manager.GetUserAttributionLabel("user2"))
assert.Equal(t, "department", manager.GetUserAttributionLabel("user3"))
assert.Equal(t, "team", manager.UserAttributionLabel("user1"))
assert.Equal(t, "", manager.UserAttributionLabel("user2"))
assert.Equal(t, "department", manager.UserAttributionLabel("user3"))
assert.Equal(t, 2, len(manager.attributionTracker.trackersByUserID))
assert.Equal(t, "team", manager.attributionTracker.trackersByUserID["user1"].trackedLabel)
assert.Equal(t, "department", manager.attributionTracker.trackersByUserID["user3"].trackedLabel)
}

func Test_GetUserAttributionLimit(t *testing.T) {
func Test_UserAttributionLimit(t *testing.T) {
manager := newTestManager()
assert.Equal(t, 5, manager.GetUserAttributionLimit("user1"))
assert.Equal(t, 0, manager.GetUserAttributionLimit("user2"))
assert.Equal(t, 0, manager.GetUserAttributionLimit("user4"))
assert.Equal(t, 5, manager.UserAttributionLimit("user1"))
assert.Equal(t, 0, manager.UserAttributionLimit("user2"))
assert.Equal(t, 0, manager.UserAttributionLimit("user4"))
}

func Test_UpdateAttributionTimestamp(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/costattribution/tracker_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestUpdateAttributionTimestampForUser(t *testing.T) {
})
}

func TestGetUserAttributionLabel(t *testing.T) {
func TestUserAttributionLabel(t *testing.T) {
cooldownTimeout := 10 * time.Second
t.Run("Should return the cost attribution label for the user", func(t *testing.T) {
// Create mock limits
Expand Down
48 changes: 24 additions & 24 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ type Distributor struct {
distributorsLifecycler *ring.BasicLifecycler
distributorsRing *ring.Ring
healthyInstancesCount *atomic.Uint32
costAttributionMng *costattribution.Manager
costAttributionMgr *costattribution.Manager
// For handling HA replicas.
HATracker *haTracker

Expand Down Expand Up @@ -307,7 +307,7 @@ func (m *PushMetrics) deleteUserMetrics(user string) {
}

// New constructs a new Distributor
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionMng *costattribution.Manager, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionMgr *costattribution.Manager, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
clientMetrics := ingester_client.NewMetrics(reg)
if cfg.IngesterClientFactory == nil {
cfg.IngesterClientFactory = ring_client.PoolInstFunc(func(inst ring.InstanceDesc) (ring_client.PoolClient, error) {
Expand Down Expand Up @@ -342,7 +342,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
healthyInstancesCount: atomic.NewUint32(0),
limits: limits,
HATracker: haTracker,
costAttributionMng: costAttributionMng,
costAttributionMgr: costAttributionMgr,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),

queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Expand Down Expand Up @@ -856,6 +856,10 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc {

if errors.As(err, &tooManyClustersError{}) {
d.discardedSamplesTooManyHaClusters.WithLabelValues(userID, group).Add(float64(numSamples))
// here if it is a technical error, we don't want to increment the discarded samples counter
if d.costAttributionMgr != nil {
d.costAttributionMgr.IncrementDiscardedSamples(userID, mimirpb.FromLabelAdaptersToLabels(req.Timeseries[0].Labels), float64(numSamples), time.Now())
}
}

return err
Expand Down Expand Up @@ -1059,6 +1063,11 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {
// Errors in validation are considered non-fatal, as one series in a request may contain
// invalid data but all the remaining series could be perfectly valid.
if validationErr != nil {
// if the validation failed, we need to increment the discarded samples metric
if d.costAttributionMgr != nil {
d.costAttributionMgr.IncrementDiscardedSamples(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels), float64(len(ts.Samples)+len(ts.Histograms)), now)
}

if firstPartialErr == nil {
// The series are never retained by validationErr. This is guaranteed by the way the latter is built.
firstPartialErr = newValidationError(validationErr)
Expand Down Expand Up @@ -1109,6 +1118,15 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {

totalN := validatedSamples + validatedExemplars + validatedMetadata
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
if d.costAttributionMgr != nil {
skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation()
for tsIdx, ts := range req.Timeseries {
if validationErr := d.validateSeries(now, &req.Timeseries[tsIdx], userID, group, skipLabelNameValidation, minExemplarTS, maxExemplarTS); validationErr != nil {
continue
}
d.costAttributionMgr.IncrementDiscardedSamples(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels), float64(len(ts.Samples)+len(ts.Histograms)), now)
}
}
d.discardedSamplesRateLimited.WithLabelValues(userID, group).Add(float64(validatedSamples))
d.discardedExemplarsRateLimited.WithLabelValues(userID).Add(float64(validatedExemplars))
d.discardedMetadataRateLimited.WithLabelValues(userID).Add(float64(validatedMetadata))
Expand Down Expand Up @@ -1668,34 +1686,16 @@ func tokenForMetadata(userID string, metricName string) uint32 {
func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string) {
now := mtime.Now()
var receivedSamples, receivedExemplars, receivedMetadata int
costattributionLimit := 0
caEnabled := d.costAttributionMng != nil && d.costAttributionMng.EnabledForUser(userID)
caLabel := ""
if caEnabled {
costattributionLimit = d.costAttributionMng.GetUserAttributionLimit(userID)
caLabel = d.costAttributionMng.GetUserAttributionLabel(userID)
}
costAttribution := make(map[string]int, costattributionLimit)

for _, ts := range req.Timeseries {
receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms)
receivedExemplars += len(ts.TimeSeries.Exemplars)
if caEnabled {
isKeyOutdated, attribution := d.costAttributionMng.UpdateAttributionTimestamp(userID, caLabel, mimirpb.FromLabelAdaptersToLabels(ts.Labels), now)
if isKeyOutdated {
// If the key is outdated, we need to reset cost attribution cache and update cost attribution label
costAttribution = make(map[string]int, costattributionLimit)
caLabel = d.costAttributionMng.GetUserAttributionLabel(userID)
}
costAttribution[attribution]++
if d.costAttributionMgr != nil {
d.costAttributionMgr.IncrementReceivedSamples(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels), float64(receivedSamples), now)
}
}
receivedMetadata = len(req.Metadata)
if caEnabled {
for value, count := range costAttribution {
d.costAttributionMng.IncrementReceivedSamples(userID, caLabel, value, float64(count))
}
}

d.receivedSamples.WithLabelValues(userID).Add(float64(receivedSamples))
d.receivedExemplars.WithLabelValues(userID).Add(float64(receivedExemplars))
d.receivedMetadata.WithLabelValues(userID).Add(float64(receivedMetadata))
Expand Down
26 changes: 13 additions & 13 deletions pkg/ingester/activeseries/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type ActiveSeries struct {
matchers *asmodel.Matchers
lastMatchersUpdate time.Time

costAttributionMng *costattribution.Manager
costAttributionMgr *costattribution.Manager

// The duration after which series become inactive.
// Also used to determine if enough time has passed since configuration reload for valid results.
Expand All @@ -68,7 +68,7 @@ type seriesStripe struct {
// Updated in purge and when old timestamp is used when updating series (in this case, oldestEntryTs is updated
// without holding the lock -- hence the atomic).
oldestEntryTs atomic.Int64
costAttributionMng *costattribution.Manager
costAttributionMgr *costattribution.Manager
mu sync.RWMutex
refs map[storage.SeriesRef]seriesEntry
active uint32 // Number of active entries in this stripe. Only decreased during purge or clear.
Expand Down Expand Up @@ -98,16 +98,16 @@ func NewActiveSeries(
asm *asmodel.Matchers,
timeout time.Duration,
userID string,
costAttributionMng *costattribution.Manager,
costAttributionMgr *costattribution.Manager,
) *ActiveSeries {
c := &ActiveSeries{
matchers: asm, timeout: timeout, userID: userID,
costAttributionMng: costAttributionMng,
costAttributionMgr: costAttributionMgr,
}

// Stripes are pre-allocated so that we only read on them and no lock is required.
for i := 0; i < numStripes; i++ {
c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionMng)
c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionMgr)
}

return c
Expand All @@ -124,7 +124,7 @@ func (c *ActiveSeries) ReloadMatchers(asm *asmodel.Matchers, now time.Time) {
defer c.matchersMutex.Unlock()

for i := 0; i < numStripes; i++ {
c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.costAttributionMng)
c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.costAttributionMgr)
}
c.matchers = asm
c.lastMatchersUpdate = now
Expand Down Expand Up @@ -232,7 +232,7 @@ func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, tot
}

func (c *ActiveSeries) ActiveByAttributionValue(calb string) map[string]uint32 {
total := make(map[string]uint32, c.costAttributionMng.GetUserAttributionLimit(c.userID))
total := make(map[string]uint32, c.costAttributionMgr.UserAttributionLimit(c.userID))
for s := 0; s < numStripes; s++ {
c.stripes[s].mu.RLock()
if c.stripes[s].caLabel == calb {
Expand Down Expand Up @@ -430,12 +430,12 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef

// here if we have a cost attribution label, we can split the serie count based on the value of the label
// we also set the reference to the value of the label in the entry, so when remove, we can decrease the counter accordingly
if s.costAttributionMng != nil && s.costAttributionMng.EnabledForUser(s.userID) {
isOutDated, attributionValue := s.costAttributionMng.UpdateAttributionTimestamp(s.userID, s.caLabel, series, time.Unix(0, nowNanos))
if s.costAttributionMgr != nil && s.costAttributionMgr.EnabledForUser(s.userID) {
isOutDated, attributionValue := s.costAttributionMgr.UpdateAttributionTimestamp(s.userID, s.caLabel, series, time.Unix(0, nowNanos))
if isOutDated {
// if the label is outdated, we need to remove the reference to the old value
s.costAttributionValues = map[string]uint32{}
s.caLabel = s.costAttributionMng.GetUserAttributionLabel(s.userID)
s.caLabel = s.costAttributionMgr.UserAttributionLabel(s.userID)
}
s.costAttributionValues[attributionValue]++
e.calabel = s.caLabel
Expand Down Expand Up @@ -469,7 +469,7 @@ func (s *seriesStripe) reinitialize(
asm *asmodel.Matchers,
deleted *deletedSeries,
userID string,
costAttributionMng *costattribution.Manager,
costAttributionMgr *costattribution.Manager,
) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -485,7 +485,7 @@ func (s *seriesStripe) reinitialize(
s.activeMatching = resizeAndClear(len(asm.MatcherNames()), s.activeMatching)
s.activeMatchingNativeHistograms = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistograms)
s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistogramBuckets)
s.costAttributionMng = costAttributionMng
s.costAttributionMgr = costAttributionMgr
}

func (s *seriesStripe) purge(keepUntil time.Time) {
Expand Down Expand Up @@ -514,7 +514,7 @@ func (s *seriesStripe) purge(keepUntil time.Time) {
s.deleted.purge(ref)
}
delete(s.refs, ref)
// here need to find what is deleted and decrement counters
// TODO: here need to find what is deleted and decrement counters

continue
}
Expand Down
Loading

0 comments on commit b9508e0

Please sign in to comment.