diff --git a/CHANGELOG.md b/CHANGELOG.md index 111b18db46..b894b378a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,19 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan We use *breaking :warning:* to mark changes that are not backward compatible (relates only to v0.y.z releases.) +## Unreleased + +### Fixed + +### Added + +- [#7907](https://github.com/thanos-io/thanos/pull/7907) Receive: Add `--receive.grpc-service-config` flag to configure gRPC service config for the receivers. +- [#7961](https://github.com/thanos-io/thanos/pull/7961) Store Gateway: Add `--store.posting-group-max-keys` flag to mark posting group as lazy if it exceeds number of keys limit. Added `thanos_bucket_store_lazy_expanded_posting_groups_total` for total number of lazy posting groups and corresponding reasons. + +### Changed + +### Removed + ## [v0.37.2](https://github.com/thanos-io/thanos/tree/release-0.37) - 11.12.2024 ### Fixed @@ -48,6 +61,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7658](https://github.com/thanos-io/thanos/pull/7658) Store: Fix panic because too small buffer in pool. - [#7643](https://github.com/thanos-io/thanos/pull/7643) Receive: fix thanos_receive_write_{timeseries,samples} stats - [#7644](https://github.com/thanos-io/thanos/pull/7644) fix(ui): add null check to find overlapping blocks logic +- [#7674](https://github.com/thanos-io/thanos/pull/7674) Query-frontend: Fix connection to Redis cluster with TLS. - [#7814](https://github.com/thanos-io/thanos/pull/7814) Store: label_values: if matchers contain **name**=="something", do not add != "" to fetch less postings. - [#7679](https://github.com/thanos-io/thanos/pull/7679) Query: respect store.limit.* flags when evaluating queries - [#7821](https://github.com/thanos-io/thanos/pull/7821) Query/Receive: Fix coroutine leak introduced in https://github.com/thanos-io/thanos/pull/7796. @@ -58,6 +72,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7893](https://github.com/thanos-io/thanos/pull/7893) Sidecar: Fix retrieval of external labels for Prometheus v3.0.0. - [#7903](https://github.com/thanos-io/thanos/pull/7903) Query: Fix panic on regex store matchers. - [#7915](https://github.com/thanos-io/thanos/pull/7915) Store: Close block series client at the end to not reuse chunk buffer +- [#7941](https://github.com/thanos-io/thanos/pull/7941) Receive: Fix race condition when adding multiple new tenants, see [issue-7892](https://github.com/thanos-io/thanos/issues/7892). ### Added diff --git a/VERSION b/VERSION index 8570a3aeb9..c05b0f0786 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.37.2 +0.38.0-dev diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index fffcf6bbde..0372e83690 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -141,7 +141,10 @@ func runReceive( level.Info(logger).Log("mode", receiveMode, "msg", "running receive") - multiTSDBOptions := []receive.MultiTSDBOption{} + multiTSDBOptions := []receive.MultiTSDBOption{ + receive.WithHeadExpandedPostingsCacheSize(conf.headExpandedPostingsCacheSize), + receive.WithBlockExpandedPostingsCacheSize(conf.compactedBlocksExpandedPostingsCacheSize), + } for _, feature := range *conf.featureList { if feature == metricNamesFilter { multiTSDBOptions = append(multiTSDBOptions, receive.WithMetricNameFilterEnabled()) @@ -172,6 +175,10 @@ func runReceive( dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(conf.compression))) } + if conf.grpcServiceConfig != "" { + dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(conf.grpcServiceConfig)) + } + var bkt objstore.Bucket confContentYaml, err := conf.objStoreConfig.Content() if err != nil { @@ -853,6 +860,7 @@ type receiveConfig struct { maxBackoff *model.Duration compression string replicationProtocol string + grpcServiceConfig string tsdbMinBlockDuration *model.Duration tsdbMaxBlockDuration *model.Duration @@ -886,6 +894,9 @@ type receiveConfig struct { asyncForwardWorkerCount uint featureList *[]string + + headExpandedPostingsCacheSize uint64 + compactedBlocksExpandedPostingsCacheSize uint64 } func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -964,6 +975,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("receive.capnproto-address", "Address for the Cap'n Proto server.").Default(fmt.Sprintf("0.0.0.0:%s", receive.DefaultCapNProtoPort)).StringVar(&rc.replicationAddr) + cmd.Flag("receive.grpc-service-config", "gRPC service configuration file or content in JSON format. See https://github.com/grpc/grpc/blob/master/doc/service_config.md").PlaceHolder("").Default("").StringVar(&rc.grpcServiceConfig) + rc.forwardTimeout = extkingpin.ModelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden()) rc.maxBackoff = extkingpin.ModelDuration(cmd.Flag("receive-forward-max-backoff", "Maximum backoff for each forward fan-out request").Default("5s").Hidden()) @@ -996,6 +1009,9 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("tsdb.no-lockfile", "Do not create lockfile in TSDB data directory. In any case, the lockfiles will be deleted on next startup.").Default("false").BoolVar(&rc.noLockFile) + cmd.Flag("tsdb.head.expanded-postings-cache-size", "[EXPERIMENTAL] If non-zero, enables expanded postings cache for the head block.").Default("0").Uint64Var(&rc.headExpandedPostingsCacheSize) + cmd.Flag("tsdb.block.expanded-postings-cache-size", "[EXPERIMENTAL] If non-zero, enables expanded postings cache for compacted blocks.").Default("0").Uint64Var(&rc.compactedBlocksExpandedPostingsCacheSize) + cmd.Flag("tsdb.max-exemplars", "Enables support for ingesting exemplars and sets the maximum number of exemplars that will be stored per tenant."+ " In case the exemplar storage becomes full (number of stored exemplars becomes equal to max-exemplars),"+ diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 795c108cd3..1cdc12679c 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -67,40 +67,41 @@ const ( ) type storeConfig struct { - indexCacheConfigs extflag.PathOrContent - objStoreConfig extflag.PathOrContent - dataDir string - cacheIndexHeader bool - grpcConfig grpcConfig - httpConfig httpConfig - indexCacheSizeBytes units.Base2Bytes - chunkPoolSize units.Base2Bytes - estimatedMaxSeriesSize uint64 - estimatedMaxChunkSize uint64 - seriesBatchSize int - storeRateLimits store.SeriesSelectLimits - maxDownloadedBytes units.Base2Bytes - maxConcurrency int - component component.StoreAPI - debugLogging bool - syncInterval time.Duration - blockListStrategy string - blockSyncConcurrency int - blockMetaFetchConcurrency int - filterConf *store.FilterConfig - selectorRelabelConf extflag.PathOrContent - advertiseCompatibilityLabel bool - consistencyDelay commonmodel.Duration - ignoreDeletionMarksDelay commonmodel.Duration - disableWeb bool - webConfig webConfig - label string - postingOffsetsInMemSampling int - cachingBucketConfig extflag.PathOrContent - reqLogConfig *extflag.PathOrContent - lazyIndexReaderEnabled bool - lazyIndexReaderIdleTimeout time.Duration - lazyExpandedPostingsEnabled bool + indexCacheConfigs extflag.PathOrContent + objStoreConfig extflag.PathOrContent + dataDir string + cacheIndexHeader bool + grpcConfig grpcConfig + httpConfig httpConfig + indexCacheSizeBytes units.Base2Bytes + chunkPoolSize units.Base2Bytes + estimatedMaxSeriesSize uint64 + estimatedMaxChunkSize uint64 + seriesBatchSize int + storeRateLimits store.SeriesSelectLimits + maxDownloadedBytes units.Base2Bytes + maxConcurrency int + component component.StoreAPI + debugLogging bool + syncInterval time.Duration + blockListStrategy string + blockSyncConcurrency int + blockMetaFetchConcurrency int + filterConf *store.FilterConfig + selectorRelabelConf extflag.PathOrContent + advertiseCompatibilityLabel bool + consistencyDelay commonmodel.Duration + ignoreDeletionMarksDelay commonmodel.Duration + disableWeb bool + webConfig webConfig + label string + postingOffsetsInMemSampling int + cachingBucketConfig extflag.PathOrContent + reqLogConfig *extflag.PathOrContent + lazyIndexReaderEnabled bool + lazyIndexReaderIdleTimeout time.Duration + lazyExpandedPostingsEnabled bool + postingGroupMaxKeySeriesRatio float64 indexHeaderLazyDownloadStrategy string } @@ -204,6 +205,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings."). Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled) + cmd.Flag("store.posting-group-max-key-series-ratio", "Mark posting group as lazy if it fetches more keys than R * max series the query should fetch. With R set to 100, a posting group which fetches 100K keys will be marked as lazy if the current query only fetches 1000 series. thanos_bucket_store_lazy_expanded_posting_groups_total shows lazy expanded postings groups with reasons and you can tune this config accordingly. This config is only valid if lazy expanded posting is enabled. 0 disables the limit."). + Default("100").Float64Var(&sc.postingGroupMaxKeySeriesRatio) + cmd.Flag("store.index-header-lazy-download-strategy", "Strategy of how to download index headers lazily. Supported values: eager, lazy. If eager, always download index header during initial load. If lazy, download index header during query time."). Default(string(indexheader.EagerDownloadStrategy)). EnumVar(&sc.indexHeaderLazyDownloadStrategy, string(indexheader.EagerDownloadStrategy), string(indexheader.LazyDownloadStrategy)) @@ -429,6 +433,7 @@ func runStore( return conf.estimatedMaxChunkSize }), store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled), + store.WithPostingGroupMaxKeySeriesRatio(conf.postingGroupMaxKeySeriesRatio), store.WithIndexHeaderLazyDownloadStrategy( indexheader.IndexHeaderLazyDownloadStrategy(conf.indexHeaderLazyDownloadStrategy).StrategyToDownloadFunc(), ), diff --git a/docs/components/receive.md b/docs/components/receive.md index cd0dc322c7..38906489ba 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -429,6 +429,10 @@ Flags: Compression algorithm to use for gRPC requests to other receivers. Must be one of: snappy, none + --receive.grpc-service-config= + gRPC service configuration file + or content in JSON format. See + https://github.com/grpc/grpc/blob/master/doc/service_config.md --receive.hashrings= Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains @@ -552,6 +556,12 @@ Flags: Allow overlapping blocks, which in turn enables vertical compaction and vertical query merge. Does not do anything, enabled all the time. + --tsdb.block.expanded-postings-cache-size=0 + [EXPERIMENTAL] If non-zero, enables expanded + postings cache for compacted blocks. + --tsdb.head.expanded-postings-cache-size=0 + [EXPERIMENTAL] If non-zero, enables expanded + postings cache for the head block. --tsdb.max-exemplars=0 Enables support for ingesting exemplars and sets the maximum number of exemplars that will be stored per tenant. In case the exemplar diff --git a/docs/components/store.md b/docs/components/store.md index 3425000146..ce2adb6d6d 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -250,6 +250,18 @@ Flags: The maximum series allowed for a single Series request. The Series call fails if this limit is exceeded. 0 means no limit. + --store.posting-group-max-key-series-ratio=100 + Mark posting group as lazy if it fetches more + keys than R * max series the query should + fetch. With R set to 100, a posting group which + fetches 100K keys will be marked as lazy if + the current query only fetches 1000 series. + thanos_bucket_store_lazy_expanded_posting_groups_total + shows lazy expanded postings groups with + reasons and you can tune this config + accordingly. This config is only valid if lazy + expanded posting is enabled. 0 disables the + limit. --sync-block-duration=15m Repeat interval for syncing the blocks between local and remote view. --tracing.config= @@ -467,10 +479,6 @@ Here is an example of what effect client-side caching could have: Example of client-side in action - reduced network usage by a lot -- `pool_size`: maximum number of socket connections. -- `min_idle_conns`: specifies the minimum number of idle connections which is useful when establishing new connection is slow. -- `idle_timeout`: amount of time after which client closes idle connections. Should be less than server's timeout. -- `max_conn_age`: connection age at which client retires (closes) the connection. - `max_get_multi_concurrency`: specifies the maximum number of concurrent GetMulti() operations. - `get_multi_batch_size`: specifies the maximum size per batch for mget. - `max_set_multi_concurrency`: specifies the maximum number of concurrent SetMulti() operations. diff --git a/pkg/receive/expandedpostingscache/cache.go b/pkg/receive/expandedpostingscache/cache.go new file mode 100644 index 0000000000..2e38d1971c --- /dev/null +++ b/pkg/receive/expandedpostingscache/cache.go @@ -0,0 +1,418 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +// Original code by Alan Protasio (https://github.com/alanprot) in the Cortex project. + +package expandedpostingscache + +import ( + "container/list" + "context" + "slices" + "strconv" + "strings" + "sync" + "time" + + "github.com/cespare/xxhash" + "github.com/oklog/ulid" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" +) + +type ExpandedPostingsCache interface { + PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) + ExpireSeries(metric labels.Labels) + tsdb.SeriesLifecycleCallback +} + +type BlocksPostingsForMatchersCache struct { + strippedLock []sync.RWMutex + + headCache *fifoCache[[]storage.SeriesRef] + blocksCache *fifoCache[[]storage.SeriesRef] + + headSeedByMetricName []int + postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) + timeNow func() time.Time + + metrics ExpandedPostingsCacheMetrics +} + +var ( + rangeHeadULID = ulid.MustParse("0000000000XXXXXXXRANGEHEAD") + headULID = ulid.MustParse("0000000000XXXXXXXXXXXXHEAD") +) + +const ( + // size of the seed array. Each seed is a 64bits int (8 bytes) + // totaling 8mb. + seedArraySize = 1024 * 1024 + + numOfSeedsStripes = 512 +) + +type ExpandedPostingsCacheMetrics struct { + CacheRequests *prometheus.CounterVec + CacheHits *prometheus.CounterVec + CacheEvicts *prometheus.CounterVec + NonCacheableQueries *prometheus.CounterVec +} + +func NewPostingCacheMetrics(r prometheus.Registerer) ExpandedPostingsCacheMetrics { + return ExpandedPostingsCacheMetrics{ + CacheRequests: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "expanded_postings_cache_requests_total", + Help: "Total number of requests to the cache.", + }, []string{"cache"}), + CacheHits: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "expanded_postings_cache_hits_total", + Help: "Total number of hit requests to the cache.", + }, []string{"cache"}), + CacheEvicts: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "expanded_postings_cache_evicts_total", + Help: "Total number of evictions in the cache, excluding items that got evicted.", + }, []string{"cache", "reason"}), + NonCacheableQueries: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "expanded_postings_non_cacheable_queries_total", + Help: "Total number of non cacheable queries.", + }, []string{"cache"}), + } +} + +func NewBlocksPostingsForMatchersCache(metrics ExpandedPostingsCacheMetrics, headExpandedPostingsCacheSize uint64, blockExpandedPostingsCacheSize uint64, seedSize int64) *BlocksPostingsForMatchersCache { + if seedSize <= 0 { + seedSize = seedArraySize + } + + return &BlocksPostingsForMatchersCache{ + headCache: newFifoCache[[]storage.SeriesRef]("head", metrics, time.Now, headExpandedPostingsCacheSize), + blocksCache: newFifoCache[[]storage.SeriesRef]("block", metrics, time.Now, blockExpandedPostingsCacheSize), + headSeedByMetricName: make([]int, seedSize), + strippedLock: make([]sync.RWMutex, numOfSeedsStripes), + postingsForMatchersFunc: tsdb.PostingsForMatchers, + timeNow: time.Now, + metrics: metrics, + } +} + +func (c *BlocksPostingsForMatchersCache) PostCreation(metric labels.Labels) { + c.ExpireSeries(metric) +} + +func (c *BlocksPostingsForMatchersCache) PostDeletion(metrics map[chunks.HeadSeriesRef]labels.Labels) { + for _, metric := range metrics { + c.ExpireSeries(metric) + } +} + +func (c *BlocksPostingsForMatchersCache) PreCreation(labels.Labels) error { + return nil +} + +func (c *BlocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) { + var metricName string + + metric.Range(func(l labels.Label) { + if l.Name != model.MetricNameLabel { + return + } + metricName = l.Value + }) + + if metricName == "" { + return + } + + h := MemHashString(metricName) + i := h % uint64(len(c.headSeedByMetricName)) + l := i % uint64(len(c.strippedLock)) + c.strippedLock[l].Lock() + defer c.strippedLock[l].Unlock() + c.headSeedByMetricName[i]++ +} + +func (c *BlocksPostingsForMatchersCache) PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) { + return c.fetchPostings(blockID, ix, ms...)(ctx) +} + +func (c *BlocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) func(context.Context) (index.Postings, error) { + var seed string + cache := c.blocksCache + + // If is a head block, lets add the seed on the cache key so we can + // invalidate the cache when new series are created for this metric name + if isHeadBlock(blockID) { + cache = c.headCache + + metricName, ok := metricNameFromMatcher(ms) + // Lets not cache head if we don;t find an equal matcher for the label __name__ + if !ok { + c.metrics.NonCacheableQueries.WithLabelValues(cache.name).Inc() + return func(ctx context.Context) (index.Postings, error) { + return tsdb.PostingsForMatchers(ctx, ix, ms...) + } + } + + seed = c.getSeedForMetricName(metricName) + } + + c.metrics.CacheRequests.WithLabelValues(cache.name).Inc() + + fetch := func() ([]storage.SeriesRef, int64, error) { + // Use context.Background() as this promise is maybe shared across calls + postings, err := c.postingsForMatchersFunc(context.Background(), ix, ms...) + + if err == nil { + ids, err := index.ExpandPostings(postings) + return ids, int64(len(ids) * 8), err + } + + return nil, 0, err + } + + key := c.cacheKey(seed, blockID, ms...) + promise, loaded := cache.getPromiseForKey(key, fetch) + if loaded { + c.metrics.CacheHits.WithLabelValues(cache.name).Inc() + } + + return c.result(promise) +} + +func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) { + return func(ctx context.Context) (index.Postings, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ce.done: + if ctx.Err() != nil { + return nil, ctx.Err() + } + return index.NewListPostings(ce.v), ce.err + } + } +} + +func (c *BlocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string { + h := MemHashString(metricName) + i := h % uint64(len(c.headSeedByMetricName)) + l := i % uint64(len(c.strippedLock)) + c.strippedLock[l].RLock() + defer c.strippedLock[l].RUnlock() + return strconv.Itoa(c.headSeedByMetricName[i]) +} + +func (c *BlocksPostingsForMatchersCache) cacheKey(seed string, blockID ulid.ULID, ms ...*labels.Matcher) string { + slices.SortFunc(ms, func(i, j *labels.Matcher) int { + if i.Type != j.Type { + return int(i.Type - j.Type) + } + if i.Name != j.Name { + return strings.Compare(i.Name, j.Name) + } + if i.Value != j.Value { + return strings.Compare(i.Value, j.Value) + } + return 0 + }) + + const ( + typeLen = 2 + sepLen = 1 + ) + + var size int + for _, m := range ms { + size += len(seed) + len(blockID.String()) + len(m.Name) + len(m.Value) + typeLen + 2*sepLen + } + sb := strings.Builder{} + sb.Grow(size) + sb.WriteString(seed) + sb.WriteByte('|') + sb.WriteString(blockID.String()) + for _, m := range ms { + sb.WriteString(m.Name) + sb.WriteString(m.Type.String()) + sb.WriteString(m.Value) + sb.WriteByte('|') + } + key := sb.String() + return key +} + +func isHeadBlock(blockID ulid.ULID) bool { + return blockID == rangeHeadULID || blockID == headULID +} + +func metricNameFromMatcher(ms []*labels.Matcher) (string, bool) { + for _, m := range ms { + if m.Name == labels.MetricName && m.Type == labels.MatchEqual { + return m.Value, true + } + } + + return "", false +} + +// TODO(GiedriusS): convert Thanos caching system to be promised-based +// i.e. avoid multiple loads for same item. This is a copy from Cortex. +// Use as an inspiration. +type fifoCache[V any] struct { + cachedValues *sync.Map + timeNow func() time.Time + name string + metrics ExpandedPostingsCacheMetrics + + ttl time.Duration + maxBytes int64 + + // Fields from here should be locked + cachedMtx sync.RWMutex + cached *list.List + cachedBytes int64 +} + +func newFifoCache[V any](name string, metrics ExpandedPostingsCacheMetrics, timeNow func() time.Time, maxBytes uint64) *fifoCache[V] { + return &fifoCache[V]{ + cachedValues: new(sync.Map), + cached: list.New(), + timeNow: timeNow, + name: name, + metrics: metrics, + ttl: 10 * time.Minute, + maxBytes: int64(maxBytes), + } +} + +func (c *fifoCache[V]) expire() { + if c.ttl.Seconds() <= 0 { + return + } + c.cachedMtx.RLock() + if _, r := c.shouldEvictHead(); !r { + c.cachedMtx.RUnlock() + return + } + c.cachedMtx.RUnlock() + c.cachedMtx.Lock() + defer c.cachedMtx.Unlock() + for reason, r := c.shouldEvictHead(); r; reason, r = c.shouldEvictHead() { + c.metrics.CacheEvicts.WithLabelValues(c.name, reason).Inc() + c.evictHead() + } +} + +func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) { + r := &cacheEntryPromise[V]{ + done: make(chan struct{}), + } + defer close(r.done) + + loaded, ok := c.cachedValues.LoadOrStore(k, r) + + if !ok { + r.v, r.sizeBytes, r.err = fetch() + r.sizeBytes += int64(len(k)) + r.ts = c.timeNow() + c.created(k, r.sizeBytes) + c.expire() + } + + if ok { + // If the promise is already in the cache, lets wait it to fetch the data. + <-loaded.(*cacheEntryPromise[V]).done + + // If is cached but is expired, lets try to replace the cache value. + if loaded.(*cacheEntryPromise[V]).isExpired(c.ttl, c.timeNow()) && c.cachedValues.CompareAndSwap(k, loaded, r) { + c.metrics.CacheEvicts.WithLabelValues(c.name, "expired").Inc() + r.v, r.sizeBytes, r.err = fetch() + r.sizeBytes += int64(len(k)) + c.updateSize(loaded.(*cacheEntryPromise[V]).sizeBytes, r.sizeBytes) + loaded = r + r.ts = c.timeNow() + ok = false + } + } + + return loaded.(*cacheEntryPromise[V]), ok +} + +func (c *fifoCache[V]) shouldEvictHead() (string, bool) { + h := c.cached.Front() + if h == nil { + return "", false + } + + if c.cachedBytes > c.maxBytes { + return "full", true + } + key := h.Value.(string) + + if l, ok := c.cachedValues.Load(key); ok { + return "expired", l.(*cacheEntryPromise[V]).isExpired(c.ttl, c.timeNow()) + } + + return "", false +} + +func (c *fifoCache[V]) evictHead() { + front := c.cached.Front() + c.cached.Remove(front) + oldestKey := front.Value.(string) + if oldest, loaded := c.cachedValues.LoadAndDelete(oldestKey); loaded { + c.cachedBytes -= oldest.(*cacheEntryPromise[V]).sizeBytes + } +} + +func (c *fifoCache[V]) created(key string, sizeBytes int64) { + if c.ttl <= 0 { + c.cachedValues.Delete(key) + return + } + c.cachedMtx.Lock() + defer c.cachedMtx.Unlock() + c.cached.PushBack(key) + c.cachedBytes += sizeBytes +} + +func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) { + if oldSize == newSizeBytes { + return + } + + c.cachedMtx.Lock() + defer c.cachedMtx.Unlock() + c.cachedBytes += newSizeBytes - oldSize +} + +func (c *fifoCache[V]) contains(k string) bool { + _, ok := c.cachedValues.Load(k) + return ok +} + +type cacheEntryPromise[V any] struct { + ts time.Time + sizeBytes int64 + + done chan struct{} + v V + err error +} + +func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool { + ts := ce.ts + r := now.Sub(ts) + return r >= ttl +} + +func MemHashString(str string) uint64 { + return xxhash.Sum64String(str) +} diff --git a/pkg/receive/expandedpostingscache/cache_test.go b/pkg/receive/expandedpostingscache/cache_test.go new file mode 100644 index 0000000000..86c7573e99 --- /dev/null +++ b/pkg/receive/expandedpostingscache/cache_test.go @@ -0,0 +1,210 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +// Original code by Alan Protasio (https://github.com/alanprot) in the Cortex project. +// +//nolint:unparam +package expandedpostingscache + +import ( + "bytes" + "fmt" + "strings" + "sync" + "testing" + "time" + + "go.uber.org/atomic" + "golang.org/x/exp/rand" + + "github.com/prometheus/client_golang/prometheus" + promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) { + m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry()) + cache := newFifoCache[int]("test", m, time.Now, 10<<20) + calls := atomic.Int64{} + concurrency := 100 + wg := sync.WaitGroup{} + wg.Add(concurrency) + + fetchFunc := func() (int, int64, error) { + calls.Inc() + time.Sleep(100 * time.Millisecond) + return 0, 0, nil //nolint:unparam + } + + for i := 0; i < 100; i++ { + go func() { + defer wg.Done() + cache.getPromiseForKey("key1", fetchFunc) + }() + } + + wg.Wait() + require.Equal(t, int64(1), calls.Load()) + +} + +func TestFifoCacheExpire(t *testing.T) { + + keySize := 20 + numberOfKeys := 100 + + tc := map[string]struct { + ttl time.Duration + maxBytes uint64 + expectedFinalItems int + ttlExpire bool + }{ + "MaxBytes": { + expectedFinalItems: 10, + ttl: time.Hour, + maxBytes: uint64(10 * (8 + keySize)), + }, + "TTL": { + expectedFinalItems: numberOfKeys, + ttlExpire: true, + ttl: time.Hour, + maxBytes: 10 << 20, + }, + } + + for name, c := range tc { + t.Run(name, func(t *testing.T) { + r := prometheus.NewPedanticRegistry() + m := NewPostingCacheMetrics(r) + timeNow := time.Now + cache := newFifoCache[int]("test", m, timeNow, c.maxBytes) + + for i := 0; i < numberOfKeys; i++ { + key := repeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) + p, loaded := cache.getPromiseForKey(key, func() (int, int64, error) { + return 1, 8, nil + }) + require.False(t, loaded) + require.Equal(t, 1, p.v) + require.True(t, cache.contains(key)) + p, loaded = cache.getPromiseForKey(key, func() (int, int64, error) { + return 1, 0, nil + }) + require.True(t, loaded) + require.Equal(t, 1, p.v) + } + + totalCacheSize := 0 + + for i := 0; i < numberOfKeys; i++ { + key := repeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) + if cache.contains(key) { + totalCacheSize++ + } + } + + require.Equal(t, c.expectedFinalItems, totalCacheSize) + + if c.expectedFinalItems != numberOfKeys { + err := promtest.GatherAndCompare(r, bytes.NewBufferString(fmt.Sprintf(` + # HELP expanded_postings_cache_evicts_total Total number of evictions in the cache, excluding items that got evicted. + # TYPE expanded_postings_cache_evicts_total counter + expanded_postings_cache_evicts_total{cache="test",reason="full"} %v +`, numberOfKeys-c.expectedFinalItems)), "expanded_postings_cache_evicts_total") + require.NoError(t, err) + + } + + if c.ttlExpire { + cache.timeNow = func() time.Time { + return timeNow().Add(2 * c.ttl) + } + + for i := 0; i < numberOfKeys; i++ { + key := repeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) + originalSize := cache.cachedBytes + p, loaded := cache.getPromiseForKey(key, func() (int, int64, error) { + return 2, 18, nil + }) + require.False(t, loaded) + // New value + require.Equal(t, 2, p.v) + // Total Size Updated + require.Equal(t, originalSize+10, cache.cachedBytes) + } + + err := promtest.GatherAndCompare(r, bytes.NewBufferString(fmt.Sprintf(` + # HELP expanded_postings_cache_evicts_total Total number of evictions in the cache, excluding items that got evicted. + # TYPE expanded_postings_cache_evicts_total counter + expanded_postings_cache_evicts_total{cache="test",reason="expired"} %v +`, numberOfKeys)), "expanded_postings_cache_evicts_total") + require.NoError(t, err) + + cache.timeNow = func() time.Time { + return timeNow().Add(5 * c.ttl) + } + + cache.getPromiseForKey("newKwy", func() (int, int64, error) { + return 2, 18, nil + }) + + // Should expire all keys again as ttl is expired + err = promtest.GatherAndCompare(r, bytes.NewBufferString(fmt.Sprintf(` + # HELP expanded_postings_cache_evicts_total Total number of evictions in the cache, excluding items that got evicted. + # TYPE expanded_postings_cache_evicts_total counter + expanded_postings_cache_evicts_total{cache="test",reason="expired"} %v +`, numberOfKeys*2)), "expanded_postings_cache_evicts_total") + require.NoError(t, err) + } + }) + } +} + +func repeatStringIfNeeded(seed string, length int) string { + if len(seed) > length { + return seed + } + + return strings.Repeat(seed, 1+length/len(seed))[:max(length, len(seed))] +} + +func TestLockRaceExpireSeries(t *testing.T) { + for j := 0; j < 10; j++ { + wg := &sync.WaitGroup{} + + c := NewBlocksPostingsForMatchersCache(ExpandedPostingsCacheMetrics{}, 1<<7, 1<<7, 3) + for i := 0; i < 1000; i++ { + wg.Add(2) + + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + c.ExpireSeries( + labels.FromMap(map[string]string{"__name__": randSeq(10)}), + ) + } + }() + + go func() { + defer wg.Done() + + for i := 0; i < 10; i++ { + c.getSeedForMetricName(randSeq(10)) + } + }() + } + wg.Wait() + } +} + +var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + +func randSeq(n int) string { + b := make([]rune, n) + rand.Seed(uint64(time.Now().UnixNano())) + for i := range b { + b[i] = letters[rand.Intn(len(letters))] + } + return string(b) +} diff --git a/pkg/receive/expandedpostingscache/tsdb.go b/pkg/receive/expandedpostingscache/tsdb.go new file mode 100644 index 0000000000..09d9276575 --- /dev/null +++ b/pkg/receive/expandedpostingscache/tsdb.go @@ -0,0 +1,146 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +// Original code by Alan Protasio (https://github.com/alanprot) in the Cortex project. + +package expandedpostingscache + +import ( + "context" + "errors" + "fmt" + + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + prom_tsdb "github.com/prometheus/prometheus/tsdb" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/prometheus/prometheus/util/annotations" +) + +/* + This file is basically a copy from https://github.com/prometheus/prometheus/blob/e2e01c1cffbfc4f26f5e9fe6138af87d7ff16122/tsdb/querier.go + with the difference that the PostingsForMatchers function is called from the Postings Cache +*/ + +type blockBaseQuerier struct { + blockID ulid.ULID + index prom_tsdb.IndexReader + chunks prom_tsdb.ChunkReader + tombstones tombstones.Reader + + closed bool + + mint, maxt int64 +} + +func newBlockBaseQuerier(b prom_tsdb.BlockReader, mint, maxt int64) (*blockBaseQuerier, error) { + indexr, err := b.Index() + if err != nil { + return nil, fmt.Errorf("open index reader: %w", err) + } + chunkr, err := b.Chunks() + if err != nil { + indexr.Close() + return nil, fmt.Errorf("open chunk reader: %w", err) + } + tombsr, err := b.Tombstones() + if err != nil { + indexr.Close() + chunkr.Close() + return nil, fmt.Errorf("open tombstone reader: %w", err) + } + + if tombsr == nil { + tombsr = tombstones.NewMemTombstones() + } + return &blockBaseQuerier{ + blockID: b.Meta().ULID, + mint: mint, + maxt: maxt, + index: indexr, + chunks: chunkr, + tombstones: tombsr, + }, nil +} + +func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + res, err := q.index.SortedLabelValues(ctx, name, matchers...) + return res, nil, err +} + +func (q *blockBaseQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + res, err := q.index.LabelNames(ctx, matchers...) + return res, nil, err +} + +func (q *blockBaseQuerier) Close() error { + if q.closed { + return errors.New("block querier already closed") + } + + errs := tsdb_errors.NewMulti( + q.index.Close(), + q.chunks.Close(), + q.tombstones.Close(), + ) + q.closed = true + return errs.Err() +} + +type cachedBlockChunkQuerier struct { + *blockBaseQuerier + + cache ExpandedPostingsCache +} + +func NewCachedBlockChunkQuerier(cache ExpandedPostingsCache, b prom_tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) { + q, err := newBlockBaseQuerier(b, mint, maxt) + if err != nil { + return nil, err + } + return &cachedBlockChunkQuerier{blockBaseQuerier: q, cache: cache}, nil +} + +func (q *cachedBlockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet { + return selectChunkSeriesSet(ctx, sortSeries, hints, ms, q.blockID, q.index, q.chunks, q.tombstones, q.mint, q.maxt, q.cache) +} + +func selectChunkSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher, + blockID ulid.ULID, ir prom_tsdb.IndexReader, chunks prom_tsdb.ChunkReader, tombstones tombstones.Reader, mint, maxt int64, + cache ExpandedPostingsCache, +) storage.ChunkSeriesSet { + disableTrimming := false + sharded := hints != nil && hints.ShardCount > 0 + + if hints != nil { + mint = hints.Start + maxt = hints.End + disableTrimming = hints.DisableTrimming + } + + var postings index.Postings + if cache != nil { + p, err := cache.PostingsForMatchers(ctx, blockID, ir, ms...) + if err != nil { + return storage.ErrChunkSeriesSet(err) + } + postings = p + } else { + p, err := prom_tsdb.PostingsForMatchers(ctx, ir, ms...) + if err != nil { + return storage.ErrChunkSeriesSet(err) + } + postings = p + } + + if sharded { + postings = ir.ShardedPostings(postings, hints.ShardIndex, hints.ShardCount) + } + if sortSeries { + postings = ir.SortedPostings(postings) + } + return prom_tsdb.NewBlockChunkSeriesSet(blockID, ir, chunks, tombstones, postings, mint, maxt, disableTrimming) +} diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 9c9954d1bd..526e3c6ec9 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -34,7 +34,9 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/exemplars" + "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/info/infopb" + "github.com/thanos-io/thanos/pkg/receive/expandedpostingscache" "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -66,6 +68,9 @@ type MultiTSDB struct { exemplarClients map[string]*exemplars.TSDB metricNameFilterEnabled bool + + headExpandedPostingsCacheSize uint64 + blockExpandedPostingsCacheSize uint64 } // MultiTSDBOption is a functional option for MultiTSDB. @@ -78,6 +83,18 @@ func WithMetricNameFilterEnabled() MultiTSDBOption { } } +func WithHeadExpandedPostingsCacheSize(size uint64) MultiTSDBOption { + return func(s *MultiTSDB) { + s.headExpandedPostingsCacheSize = size + } +} + +func WithBlockExpandedPostingsCacheSize(size uint64) MultiTSDBOption { + return func(s *MultiTSDB) { + s.blockExpandedPostingsCacheSize = size + } +} + // NewMultiTSDB creates new MultiTSDB. // NOTE: Passed labels must be sorted lexicographically (alphabetically). func NewMultiTSDB( @@ -684,9 +701,27 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant dataDir := t.defaultTenantDataDir(tenantID) level.Info(logger).Log("msg", "opening TSDB") + + var expandedPostingsCache expandedpostingscache.ExpandedPostingsCache + if t.headExpandedPostingsCacheSize > 0 || t.blockExpandedPostingsCacheSize > 0 { + var expandedPostingsCacheMetrics = expandedpostingscache.NewPostingCacheMetrics(extprom.WrapRegistererWithPrefix("thanos_", reg)) + + expandedPostingsCache = expandedpostingscache.NewBlocksPostingsForMatchersCache(expandedPostingsCacheMetrics, t.headExpandedPostingsCacheSize, t.blockExpandedPostingsCacheSize, 0) + } + opts := *t.tsdbOpts opts.BlocksToDelete = tenant.blocksToDelete opts.EnableDelayedCompaction = true + + opts.BlockChunkQuerierFunc = func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) { + if expandedPostingsCache != nil { + return expandedpostingscache.NewCachedBlockChunkQuerier(expandedPostingsCache, b, mint, maxt) + } + return tsdb.NewBlockChunkQuerier(b, mint, maxt) + } + if expandedPostingsCache != nil { + opts.SeriesLifecycleCallback = expandedPostingsCache + } tenant.blocksToDeleteFn = tsdb.DefaultBlocksToDelete // NOTE(GiedriusS): always set to false to properly handle OOO samples - OOO samples are written into the WBL diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d9940221ff..1c434f503f 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -151,6 +151,7 @@ type bucketStoreMetrics struct { emptyPostingCount *prometheus.CounterVec lazyExpandedPostingsCount prometheus.Counter + lazyExpandedPostingGroupsByReason *prometheus.CounterVec lazyExpandedPostingSizeBytes prometheus.Counter lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter @@ -345,6 +346,11 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Help: "Total number of times when lazy expanded posting optimization applies.", }) + m.lazyExpandedPostingGroupsByReason = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_bucket_store_lazy_expanded_posting_groups_total", + Help: "Total number of posting groups that are marked as lazy and corresponding reason", + }, []string{"reason"}) + m.lazyExpandedPostingSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_lazy_expanded_posting_size_bytes_total", Help: "Total number of lazy posting group size in bytes.", @@ -419,7 +425,8 @@ type BucketStore struct { enableChunkHashCalculation bool - enabledLazyExpandedPostings bool + enabledLazyExpandedPostings bool + postingGroupMaxKeySeriesRatio float64 sortingStrategy sortingStrategy @@ -552,6 +559,13 @@ func WithLazyExpandedPostings(enabled bool) BucketStoreOption { } } +// WithPostingGroupMaxKeySeriesRatio configures a threshold to mark a posting group as lazy if it has more add keys. +func WithPostingGroupMaxKeySeriesRatio(postingGroupMaxKeySeriesRatio float64) BucketStoreOption { + return func(s *BucketStore) { + s.postingGroupMaxKeySeriesRatio = postingGroupMaxKeySeriesRatio + } +} + // WithDontResort disables series resorting in Store Gateway. func WithDontResort(true bool) BucketStoreOption { return func(s *BucketStore) { @@ -1002,8 +1016,11 @@ type blockSeriesClient struct { chunksLimiter ChunksLimiter bytesLimiter BytesLimiter - lazyExpandedPostingEnabled bool + lazyExpandedPostingEnabled bool + // Mark posting group as lazy if it adds too many keys. 0 to disable. + postingGroupMaxKeySeriesRatio float64 lazyExpandedPostingsCount prometheus.Counter + lazyExpandedPostingGroupByReason *prometheus.CounterVec lazyExpandedPostingSizeBytes prometheus.Counter lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter @@ -1046,7 +1063,9 @@ func newBlockSeriesClient( chunkFetchDurationSum *prometheus.HistogramVec, extLsetToRemove map[string]struct{}, lazyExpandedPostingEnabled bool, + postingGroupMaxKeySeriesRatio float64, lazyExpandedPostingsCount prometheus.Counter, + lazyExpandedPostingByReason *prometheus.CounterVec, lazyExpandedPostingSizeBytes prometheus.Counter, lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter, tenant string, @@ -1081,7 +1100,9 @@ func newBlockSeriesClient( chunkFetchDurationSum: chunkFetchDurationSum, lazyExpandedPostingEnabled: lazyExpandedPostingEnabled, + postingGroupMaxKeySeriesRatio: postingGroupMaxKeySeriesRatio, lazyExpandedPostingsCount: lazyExpandedPostingsCount, + lazyExpandedPostingGroupByReason: lazyExpandedPostingByReason, lazyExpandedPostingSizeBytes: lazyExpandedPostingSizeBytes, lazyExpandedPostingSeriesOverfetchedSizeBytes: lazyExpandedPostingSeriesOverfetchedSizeBytes, @@ -1133,7 +1154,7 @@ func (b *blockSeriesClient) ExpandPostings( matchers sortedMatchers, seriesLimiter SeriesLimiter, ) error { - ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.lazyExpandedPostingSizeBytes, b.tenant) + ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.postingGroupMaxKeySeriesRatio, b.lazyExpandedPostingSizeBytes, b.lazyExpandedPostingGroupByReason, b.tenant) if err != nil { return errors.Wrap(err, "expanded matching posting") } @@ -1566,7 +1587,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store s.metrics.chunkFetchDurationSum, extLsetToRemove, s.enabledLazyExpandedPostings, + s.postingGroupMaxKeySeriesRatio, s.metrics.lazyExpandedPostingsCount, + s.metrics.lazyExpandedPostingGroupsByReason, s.metrics.lazyExpandedPostingSizeBytes, s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, tenant, @@ -1880,7 +1903,9 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq nil, extLsetToRemove, s.enabledLazyExpandedPostings, + s.postingGroupMaxKeySeriesRatio, s.metrics.lazyExpandedPostingsCount, + s.metrics.lazyExpandedPostingGroupsByReason, s.metrics.lazyExpandedPostingSizeBytes, s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, tenant, @@ -2106,7 +2131,9 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR nil, nil, s.enabledLazyExpandedPostings, + s.postingGroupMaxKeySeriesRatio, s.metrics.lazyExpandedPostingsCount, + s.metrics.lazyExpandedPostingGroupsByReason, s.metrics.lazyExpandedPostingSizeBytes, s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, tenant, @@ -2563,7 +2590,16 @@ func (r *bucketIndexReader) reset(size int) { // Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. -func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter, tenant string) (*lazyExpandedPostings, error) { +func (r *bucketIndexReader) ExpandedPostings( + ctx context.Context, + ms sortedMatchers, + bytesLimiter BytesLimiter, + lazyExpandedPostingEnabled bool, + postingGroupMaxKeySeriesRatio float64, + lazyExpandedPostingSizeBytes prometheus.Counter, + lazyExpandedPostingGroupsByReason *prometheus.CounterVec, + tenant string, +) (*lazyExpandedPostings, error) { // Shortcut the case of `len(postingGroups) == 0`. It will only happen when no // matchers specified, and we don't need to fetch expanded postings from cache. if len(ms) == 0 { @@ -2615,7 +2651,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil)) } - ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, lazyExpandedPostingSizeBytes, tenant) + ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, tenant) if err != nil { return nil, errors.Wrap(err, "fetch and expand postings") } @@ -2661,13 +2697,14 @@ func ExpandPostingsWithContext(ctx context.Context, p index.Postings) ([]storage // If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels // This computation happens in ExpandedPostings. type postingGroup struct { - addAll bool - name string - matchers []*labels.Matcher - addKeys []string - removeKeys []string - cardinality int64 - lazy bool + addAll bool + name string + matchers []*labels.Matcher + addKeys []string + removeKeys []string + cardinality int64 + existentKeys int + lazy bool } func newPostingGroup(addAll bool, name string, addKeys, removeKeys []string) *postingGroup { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e8dffd093b..df4d1e189c 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1288,7 +1288,9 @@ func benchmarkExpandedPostings( {`uniq=~"9|random-shuffled-values|1"`, []*labels.Matcher{iRegexBigValueSet}, bigValueSetSize}, } - dummyCounter := promauto.NewCounter(prometheus.CounterOpts{Name: "test"}) + reg := prometheus.NewRegistry() + dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"}) + dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"}) for _, c := range cases { t.Run(c.name, func(t testutil.TB) { b := &bucketBlock{ @@ -1304,7 +1306,7 @@ func benchmarkExpandedPostings( t.ResetTimer() for i := 0; i < t.N(); i++ { - p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant) + p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant) testutil.Ok(t, err) testutil.Equals(t, c.expectedLen, len(p.postings)) } @@ -1340,8 +1342,10 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) { // Match nothing. matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "i", "500.*") ctx := context.Background() - dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"}) - ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant) + reg := prometheus.NewRegistry() + dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"}) + dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"}) + ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant) testutil.Ok(t, err) testutil.Equals(t, ps, (*lazyExpandedPostings)(nil)) // Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings. @@ -1378,8 +1382,10 @@ func TestLazyExpandedPostingsEmptyPostings(t *testing.T) { matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "n", "1_.*") matcher3 := labels.MustNewMatcher(labels.MatchRegexp, "i", ".+") ctx := context.Background() - dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"}) - ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, dummyCounter, tenancy.DefaultTenant) + reg := prometheus.NewRegistry() + dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"}) + dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"}) + ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant) testutil.Ok(t, err) // We expect emptyLazyPostings rather than lazy postings with 0 length but with matchers. testutil.Equals(t, ps, emptyLazyPostings) @@ -2872,7 +2878,9 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet wg := sync.WaitGroup{} wg.Add(concurrency) - dummyCounter := promauto.NewCounter(prometheus.CounterOpts{Name: "test"}) + reg := prometheus.NewRegistry() + dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"}) + dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"}) for w := 0; w < concurrency; w++ { go func() { defer wg.Done() @@ -2917,7 +2925,9 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet dummyHistogram, nil, false, + 0, dummyCounter, + dummyCounterVec, dummyCounter, dummyCounter, tenancy.DefaultTenant, @@ -3551,7 +3561,9 @@ func TestExpandedPostingsRace(t *testing.T) { l := sync.Mutex{} previousRefs := make(map[int][]storage.SeriesRef) - dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"}) + reg := prometheus.NewRegistry() + dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"}) + dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"}) for { if tm.Err() != nil { @@ -3573,7 +3585,7 @@ func TestExpandedPostingsRace(t *testing.T) { wg.Add(1) go func(i int, bb *bucketBlock) { - refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant) + refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant) testutil.Ok(t, err) defer wg.Done() diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index f8363ab477..ef7ae5d00a 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -39,7 +39,15 @@ func (p *lazyExpandedPostings) lazyExpanded() bool { return p != nil && len(p.matchers) > 0 } -func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups []*postingGroup, seriesMaxSize int64, seriesMatchRatio float64, lazyExpandedPostingSizeBytes prometheus.Counter) ([]*postingGroup, bool, error) { +func optimizePostingsFetchByDownloadedBytes( + r *bucketIndexReader, + postingGroups []*postingGroup, + seriesMaxSize int64, + seriesMatchRatio float64, + postingGroupMaxKeySeriesRatio float64, + lazyExpandedPostingSizeBytes prometheus.Counter, + lazyExpandedPostingGroupsByReason *prometheus.CounterVec, +) ([]*postingGroup, bool, error) { if len(postingGroups) <= 1 { return postingGroups, false, nil } @@ -55,6 +63,7 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups return nil, false, errors.Wrapf(err, "postings offsets for %s", pg.name) } + existentKeys := 0 for _, rng := range rngs { if rng == indexheader.NotFoundRange { continue @@ -63,14 +72,16 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups level.Error(r.logger).Log("msg", "invalid index range, fallback to non lazy posting optimization") return postingGroups, false, nil } + existentKeys++ // Each range starts from the #entries field which is 4 bytes. // Need to subtract it when calculating number of postings. // https://github.com/prometheus/prometheus/blob/v2.46.0/tsdb/docs/format/index.md. pg.cardinality += (rng.End - rng.Start - 4) / 4 } + pg.existentKeys = existentKeys // If the posting group adds keys, 0 cardinality means the posting doesn't exist. // If the posting group removes keys, no posting ranges found is fine as it is a noop. - if len(pg.addKeys) > 0 && pg.cardinality == 0 { + if len(pg.addKeys) > 0 && pg.existentKeys == 0 { return nil, true, nil } } @@ -142,6 +153,7 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups // Assume only seriesMatchRatio postings will be matched every posting group. seriesMatched := postingGroups[i].cardinality - int64(math.Ceil(float64(negativeCardinalities)*seriesMatchRatio)) + maxSeriesMatched := seriesMatched i++ // Start from next posting group as we always need to fetch at least one posting group with add keys. for i < len(postingGroups) { pg := postingGroups[i] @@ -165,6 +177,13 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups seriesMatched -= underfetchedSeries underfetchedSeriesSize = underfetchedSeries * seriesMaxSize } else { + // Only mark posting group as lazy due to too many keys when those keys are known to be existent. + if postingGroupMaxKeySeriesRatio > 0 && maxSeriesMatched > 0 && + float64(pg.existentKeys)/float64(maxSeriesMatched) > postingGroupMaxKeySeriesRatio { + markPostingGroupLazy(pg, "keys_limit", lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason) + i++ + continue + } underfetchedSeriesSize = seriesMaxSize * int64(math.Ceil(float64(seriesMatched)*(1-seriesMatchRatio))) seriesMatched = int64(math.Ceil(float64(seriesMatched) * seriesMatchRatio)) } @@ -176,13 +195,18 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups i++ } for i < len(postingGroups) { - postingGroups[i].lazy = true - lazyExpandedPostingSizeBytes.Add(float64(4 * postingGroups[i].cardinality)) + markPostingGroupLazy(postingGroups[i], "postings_size", lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason) i++ } return postingGroups, false, nil } +func markPostingGroupLazy(pg *postingGroup, reason string, lazyExpandedPostingSizeBytes prometheus.Counter, lazyExpandedPostingGroupsByReason *prometheus.CounterVec) { + pg.lazy = true + lazyExpandedPostingSizeBytes.Add(float64(4 * pg.cardinality)) + lazyExpandedPostingGroupsByReason.WithLabelValues(reason).Inc() +} + func fetchLazyExpandedPostings( ctx context.Context, postingGroups []*postingGroup, @@ -190,7 +214,9 @@ func fetchLazyExpandedPostings( bytesLimiter BytesLimiter, addAllPostings bool, lazyExpandedPostingEnabled bool, + postingGroupMaxKeySeriesRatio float64, lazyExpandedPostingSizeBytes prometheus.Counter, + lazyExpandedPostingGroupsByReason *prometheus.CounterVec, tenant string, ) (*lazyExpandedPostings, error) { var ( @@ -212,7 +238,9 @@ func fetchLazyExpandedPostings( postingGroups, int64(r.block.estimatedMaxSeriesSize), 0.5, // TODO(yeya24): Expose this as a flag. + postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, + lazyExpandedPostingGroupsByReason, ) if err != nil { return nil, err @@ -243,27 +271,25 @@ func keysToFetchFromPostingGroups(postingGroups []*postingGroup) ([]labels.Label for i < len(postingGroups) { pg := postingGroups[i] if pg.lazy { - break + if len(lazyMatchers) == 0 { + lazyMatchers = make([]*labels.Matcher, 0) + } + lazyMatchers = append(lazyMatchers, postingGroups[i].matchers...) + } else { + // Postings returned by fetchPostings will be in the same order as keys + // so it's important that we iterate them in the same order later. + // We don't have any other way of pairing keys and fetched postings. + for _, key := range pg.addKeys { + keys = append(keys, labels.Label{Name: pg.name, Value: key}) + } + for _, key := range pg.removeKeys { + keys = append(keys, labels.Label{Name: pg.name, Value: key}) + } } - // Postings returned by fetchPostings will be in the same order as keys - // so it's important that we iterate them in the same order later. - // We don't have any other way of pairing keys and fetched postings. - for _, key := range pg.addKeys { - keys = append(keys, labels.Label{Name: pg.name, Value: key}) - } - for _, key := range pg.removeKeys { - keys = append(keys, labels.Label{Name: pg.name, Value: key}) - } i++ } - if i < len(postingGroups) { - lazyMatchers = make([]*labels.Matcher, 0) - for i < len(postingGroups) { - lazyMatchers = append(lazyMatchers, postingGroups[i].matchers...) - i++ - } - } + return keys, lazyMatchers } @@ -279,6 +305,18 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post return nil, nil, errors.Wrap(err, "get postings") } + result := mergeFetchedPostings(ctx, fetchedPostings, postingGroups) + if err := ctx.Err(); err != nil { + return nil, nil, err + } + ps, err := ExpandPostingsWithContext(ctx, result) + if err != nil { + return nil, nil, errors.Wrap(err, "expand") + } + return ps, lazyMatchers, nil +} + +func mergeFetchedPostings(ctx context.Context, fetchedPostings []index.Postings, postingGroups []*postingGroup) index.Postings { // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys // again, and this is exactly the same order as before (when building the groups), so we can simply // use one incrementing index to fetch postings from returned slice. @@ -287,7 +325,7 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post var groupAdds, groupRemovals []index.Postings for _, g := range postingGroups { if g.lazy { - break + continue } // We cannot add empty set to groupAdds, since they are intersected. if len(g.addKeys) > 0 { @@ -307,13 +345,5 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post } result := index.Without(index.Intersect(groupAdds...), index.Merge(ctx, groupRemovals...)) - - if err := ctx.Err(); err != nil { - return nil, nil, err - } - ps, err := ExpandPostingsWithContext(ctx, result) - if err != nil { - return nil, nil, errors.Wrap(err, "expand") - } - return ps, lazyMatchers, nil + return result } diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go index 06157affe0..cb52dac412 100644 --- a/pkg/store/lazy_postings_test.go +++ b/pkg/store/lazy_postings_test.go @@ -16,8 +16,10 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/index" + "github.com/stretchr/testify/require" "github.com/thanos-io/objstore/providers/filesystem" "github.com/thanos-io/thanos/pkg/block/indexheader" @@ -206,6 +208,38 @@ func TestKeysToFetchFromPostingGroups(t *testing.T) { labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*"), }, }, + { + name: "multiple non lazy and lazy posting groups with lazy posting groups in the middle", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + }, + { + name: "cluster", + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "cluster", "bar")}, + lazy: true, + }, + { + name: "env", + addKeys: []string{"beta", "gamma", "prod"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "env", "beta|gamma|prod")}, + lazy: true, + }, + { + name: "job", + addKeys: []string{"prometheus"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*")}, + }, + }, + expectedLabels: []labels.Label{{Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}, {Name: "job", Value: "prometheus"}}, + expectedMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "cluster", "bar"), + labels.MustNewMatcher(labels.MatchRegexp, "env", "beta|gamma|prod"), + }, + }, } { t.Run(tc.name, func(t *testing.T) { keys, matchers := keysToFetchFromPostingGroups(tc.pgs) @@ -276,15 +310,16 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { }, } for _, tc := range []struct { - name string - inputPostings map[string]map[string]index.Range - inputError error - postingGroups []*postingGroup - seriesMaxSize int64 - seriesMatchRatio float64 - expectedPostingGroups []*postingGroup - expectedEmptyPosting bool - expectedError string + name string + inputPostings map[string]map[string]index.Range + inputError error + postingGroups []*postingGroup + seriesMaxSize int64 + seriesMatchRatio float64 + postingGroupMaxKeySeriesRatio float64 + expectedPostingGroups []*postingGroup + expectedEmptyPosting bool + expectedError string }{ { name: "empty posting group", @@ -353,7 +388,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { }, expectedPostingGroups: []*postingGroup{ {name: "bar", removeKeys: []string{"foo"}, cardinality: 0, addAll: true}, - {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, }, }, { @@ -385,7 +420,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { }, expectedPostingGroups: []*postingGroup{ {name: "bar", removeKeys: []string{"foo"}, cardinality: 0, addAll: true}, - {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, }, }, { @@ -401,8 +436,8 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { {name: "bar", addKeys: []string{"foo", "buz"}}, }, expectedPostingGroups: []*postingGroup{ - {name: "bar", addKeys: []string{"foo", "buz"}, cardinality: 1}, - {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + {name: "bar", addKeys: []string{"foo", "buz"}, cardinality: 1, existentKeys: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, }, }, { @@ -418,8 +453,97 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { {name: "bar", addKeys: []string{"foo"}}, }, expectedPostingGroups: []*postingGroup{ - {name: "bar", addKeys: []string{"foo"}, cardinality: 1}, - {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 1, existentKeys: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + }, + }, + { + name: "two posting groups with add keys, posting group not marked as lazy due to some add keys don't exist", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 16}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}}, + }, + postingGroupMaxKeySeriesRatio: 2, + expectedPostingGroups: []*postingGroup{ + {name: "bar", addKeys: []string{"bar", "baz", "foo"}, cardinality: 1, existentKeys: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + }, + }, + { + name: "two posting groups with add keys, first posting group not marked as lazy even though exceeding 2 keys due to we always mark first posting group as non lazy", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 108}}, + "bar": {"foo": index.Range{Start: 108, End: 116}, "bar": index.Range{Start: 116, End: 124}, "baz": index.Range{Start: 124, End: 132}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}}, + }, + postingGroupMaxKeySeriesRatio: 2, + expectedPostingGroups: []*postingGroup{ + {name: "bar", addKeys: []string{"bar", "baz", "foo"}, cardinality: 3, existentKeys: 3}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 26, existentKeys: 1}, + }, + }, + { + name: "two posting groups with add keys, one posting group with too many keys not marked as lazy due to postingGroupMaxKeySeriesRatio not set", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 16}, "bar": index.Range{Start: 16, End: 24}, "baz": index.Range{Start: 24, End: 32}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}}, + }, + postingGroupMaxKeySeriesRatio: 0, + expectedPostingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}, cardinality: 3, existentKeys: 3}, + }, + }, + { + name: "two posting groups with add keys, one posting group marked as lazy due to exceeding postingGroupMaxKeySeriesRatio", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 16}, "bar": index.Range{Start: 16, End: 24}, "baz": index.Range{Start: 24, End: 32}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}}, + }, + postingGroupMaxKeySeriesRatio: 2, + expectedPostingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}, cardinality: 3, existentKeys: 3, lazy: true}, + }, + }, + { + name: "two posting groups with remove keys, minAddKeysToMarkLazy won't be applied", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 16}, "baz": index.Range{Start: 16, End: 24}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {addAll: true, name: "foo", removeKeys: []string{"bar"}}, + {addAll: true, name: "bar", removeKeys: []string{"baz", "foo"}}, + }, + expectedPostingGroups: []*postingGroup{ + {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {addAll: true, name: "bar", removeKeys: []string{"baz", "foo"}, cardinality: 2, existentKeys: 2}, }, }, { @@ -437,8 +561,8 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { {addAll: true, name: "bar", removeKeys: []string{"foo"}}, }, expectedPostingGroups: []*postingGroup{ - {addAll: true, name: "bar", removeKeys: []string{"foo"}, cardinality: 1}, - {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1}, + {addAll: true, name: "bar", removeKeys: []string{"foo"}, cardinality: 1, existentKeys: 1}, + {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, }, }, { @@ -454,8 +578,8 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { {name: "bar", addKeys: []string{"foo"}}, }, expectedPostingGroups: []*postingGroup{ - {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1}, - {name: "bar", addKeys: []string{"foo"}, cardinality: 250000}, + {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 250000, existentKeys: 1}, }, }, { @@ -471,8 +595,8 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { {name: "bar", addKeys: []string{"foo"}}, }, expectedPostingGroups: []*postingGroup{ - {name: "bar", addKeys: []string{"foo"}, cardinality: 1}, - {name: "foo", addKeys: []string{"bar"}, cardinality: 1, lazy: true}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 1, existentKeys: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1, lazy: true}, }, }, { @@ -488,8 +612,8 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { {name: "bar", addKeys: []string{"foo"}}, }, expectedPostingGroups: []*postingGroup{ - {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, - {name: "bar", addKeys: []string{"foo"}, cardinality: 250000, lazy: true}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 250000, existentKeys: 1, lazy: true}, }, }, { @@ -507,9 +631,51 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { {name: "cluster", addKeys: []string{"us"}}, }, expectedPostingGroups: []*postingGroup{ - {name: "cluster", addKeys: []string{"us"}, cardinality: 1}, - {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, - {name: "bar", addKeys: []string{"foo"}, cardinality: 250000, lazy: true}, + {name: "cluster", addKeys: []string{"us"}, cardinality: 1, existentKeys: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 250000, existentKeys: 1, lazy: true}, + }, + }, + { + name: "three posting groups with add keys, middle posting group marked as lazy due to too many add keys", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"bar": index.Range{Start: 8, End: 16}, "baz": index.Range{Start: 16, End: 24}, "foo": index.Range{Start: 24, End: 32}}, + "cluster": {"us": index.Range{Start: 32, End: 108}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroupMaxKeySeriesRatio: 2, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}}, + {name: "cluster", addKeys: []string{"us"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}, cardinality: 3, existentKeys: 3, lazy: true}, + {name: "cluster", addKeys: []string{"us"}, cardinality: 18, existentKeys: 1}, + }, + }, + { + name: "three posting groups with add keys, bar not marked as lazy even though too many add keys due to first positive posting group sorted by cardinality", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"bar": index.Range{Start: 8, End: 16}, "baz": index.Range{Start: 16, End: 24}, "foo": index.Range{Start: 24, End: 32}}, + "cluster": {"us": index.Range{Start: 32, End: 108}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroupMaxKeySeriesRatio: 2, + postingGroups: []*postingGroup{ + {addAll: true, name: "foo", removeKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}}, + {name: "cluster", addKeys: []string{"us"}}, + }, + expectedPostingGroups: []*postingGroup{ + {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}, cardinality: 3, existentKeys: 3}, + {name: "cluster", addKeys: []string{"us"}, cardinality: 18, existentKeys: 1}, }, }, { @@ -527,9 +693,9 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { {name: "cluster", addKeys: []string{"us"}}, }, expectedPostingGroups: []*postingGroup{ - {name: "cluster", addKeys: []string{"us"}, cardinality: 1}, - {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1}, - {addAll: true, name: "bar", removeKeys: []string{"foo"}, cardinality: 250000, lazy: true}, + {name: "cluster", addKeys: []string{"us"}, cardinality: 1, existentKeys: 1}, + {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {addAll: true, name: "bar", removeKeys: []string{"foo"}, cardinality: 250000, existentKeys: 1, lazy: true}, }, }, { @@ -549,10 +715,10 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { {name: "cluster", addKeys: []string{"us"}}, }, expectedPostingGroups: []*postingGroup{ - {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1}, - {name: "bar", addKeys: []string{"foo"}, cardinality: 500}, - {name: "baz", addKeys: []string{"foo"}, cardinality: 501}, - {name: "cluster", addKeys: []string{"us"}, cardinality: 250000, lazy: true}, + {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 500, existentKeys: 1}, + {name: "baz", addKeys: []string{"foo"}, cardinality: 501, existentKeys: 1}, + {name: "cluster", addKeys: []string{"us"}, cardinality: 250000, existentKeys: 1, lazy: true}, }, }, } { @@ -563,7 +729,8 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { testutil.Ok(t, err) ir := newBucketIndexReader(block, logger) dummyCounter := promauto.With(registry).NewCounter(prometheus.CounterOpts{Name: "test"}) - pgs, emptyPosting, err := optimizePostingsFetchByDownloadedBytes(ir, tc.postingGroups, tc.seriesMaxSize, tc.seriesMatchRatio, dummyCounter) + dummyCounterVec := promauto.With(registry).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"}) + pgs, emptyPosting, err := optimizePostingsFetchByDownloadedBytes(ir, tc.postingGroups, tc.seriesMaxSize, tc.seriesMatchRatio, tc.postingGroupMaxKeySeriesRatio, dummyCounter, dummyCounterVec) if err != nil { testutil.Equals(t, tc.expectedError, err.Error()) return @@ -580,3 +747,94 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { }) } } + +func TestMergeFetchedPostings(t *testing.T) { + ctx := context.Background() + for _, tc := range []struct { + name string + fetchedPostings []index.Postings + postingGroups []*postingGroup + expectedSeriesRefs []storage.SeriesRef + }{ + { + name: "empty fetched postings and posting groups", + }, + { + name: "single posting group with 1 add key", + fetchedPostings: []index.Postings{index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + }, + expectedSeriesRefs: []storage.SeriesRef{1, 2, 3, 4, 5}, + }, + { + name: "single posting group with multiple add keys, merge", + fetchedPostings: []index.Postings{ + index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}), + index.NewListPostings([]storage.SeriesRef{6, 7, 8, 9}), + }, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar", "baz"}}, + }, + expectedSeriesRefs: []storage.SeriesRef{1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + { + name: "multiple posting groups with add key, intersect", + fetchedPostings: []index.Postings{ + index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}), + index.NewListPostings([]storage.SeriesRef{1, 2, 4}), + }, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedSeriesRefs: []storage.SeriesRef{1, 2, 4}, + }, + { + name: "posting group with remove keys", + fetchedPostings: []index.Postings{ + index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}), + index.NewListPostings([]storage.SeriesRef{1, 2, 4}), + }, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", removeKeys: []string{"foo"}, addAll: true}, + }, + expectedSeriesRefs: []storage.SeriesRef{3, 5}, + }, + { + name: "multiple posting groups with add key and ignore lazy posting groups", + fetchedPostings: []index.Postings{ + index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}), + }, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}, lazy: true}, + {name: "baz", addKeys: []string{"foo"}, lazy: true}, + {name: "job", addKeys: []string{"foo"}, lazy: true}, + }, + expectedSeriesRefs: []storage.SeriesRef{1, 2, 3, 4, 5}, + }, + { + name: "multiple posting groups with add key and non consecutive lazy posting groups", + fetchedPostings: []index.Postings{ + index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}), + index.NewListPostings([]storage.SeriesRef{1, 2, 4}), + }, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}, lazy: true}, + {name: "baz", addKeys: []string{"foo"}}, + {name: "job", addKeys: []string{"foo"}, lazy: true}, + }, + expectedSeriesRefs: []storage.SeriesRef{1, 2, 4}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + p := mergeFetchedPostings(ctx, tc.fetchedPostings, tc.postingGroups) + res, err := index.ExpandPostings(p) + require.NoError(t, err) + require.Equal(t, tc.expectedSeriesRefs, res) + }) + } +} diff --git a/pkg/store/storepb/inprocess.go b/pkg/store/storepb/inprocess.go index 0c3e7641ba..7e960845f3 100644 --- a/pkg/store/storepb/inprocess.go +++ b/pkg/store/storepb/inprocess.go @@ -5,8 +5,10 @@ package storepb import ( "context" + "fmt" "io" "iter" + "runtime/debug" "google.golang.org/grpc" ) @@ -92,6 +94,12 @@ func (s serverAsClient) LabelValues(ctx context.Context, in *LabelValuesRequest, func (s serverAsClient) Series(ctx context.Context, in *SeriesRequest, _ ...grpc.CallOption) (Store_SeriesClient, error) { var srvIter iter.Seq2[*SeriesResponse, error] = func(yield func(*SeriesResponse, error) bool) { + defer func() { + if r := recover(); r != nil { + st := debug.Stack() + panic(fmt.Sprintf("panic %v in server iterator: %s", r, st)) + } + }() srv := newInProcessServer(ctx, yield) err := s.srv.Series(in, srv) if err != nil { diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index c8a9e7fc62..b4448aa633 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -538,20 +538,21 @@ type ReceiveBuilder struct { f e2e.FutureRunnable - maxExemplars int - capnp bool - ingestion bool - limit int - tenantsLimits receive.TenantsWriteLimitsConfig - metaMonitoring string - metaMonitoringQuery string - hashringConfigs []receive.HashringConfig - relabelConfigs []*relabel.Config - replication int - image string - nativeHistograms bool - labels []string - tenantSplitLabel string + maxExemplars int + capnp bool + ingestion bool + expandedPostingsCache bool + limit int + tenantsLimits receive.TenantsWriteLimitsConfig + metaMonitoring string + metaMonitoringQuery string + hashringConfigs []receive.HashringConfig + relabelConfigs []*relabel.Config + replication int + image string + nativeHistograms bool + labels []string + tenantSplitLabel string } func NewReceiveBuilder(e e2e.Environment, name string) *ReceiveBuilder { @@ -582,6 +583,11 @@ func (r *ReceiveBuilder) WithIngestionEnabled() *ReceiveBuilder { return r } +func (r *ReceiveBuilder) WithExpandedPostingsCache() *ReceiveBuilder { + r.expandedPostingsCache = true + return r +} + func (r *ReceiveBuilder) WithLabel(name, value string) *ReceiveBuilder { r.labels = append(r.labels, fmt.Sprintf(`%s="%s"`, name, value)) return r @@ -661,6 +667,11 @@ func (r *ReceiveBuilder) Init() *e2eobs.Observable { args["--receive.local-endpoint"] = r.InternalEndpoint("grpc") } + if r.expandedPostingsCache { + args["--tsdb.head.expanded-postings-cache-size"] = "1000" + args["--tsdb.block.expanded-postings-cache-size"] = "1000" + } + if r.limit != 0 && r.metaMonitoring != "" { cfg := receive.RootLimitsConfig{ WriteLimits: receive.WriteLimitsConfig{ diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index c938a4f040..7d841e9a7e 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -80,7 +80,7 @@ func TestReceive(t *testing.T) { t.Cleanup(e2ethanos.CleanScenario(t, e)) // Setup Router Ingestor. - i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().Init() + i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().WithExpandedPostingsCache().Init() testutil.Ok(t, e2e.StartAndWaitReady(i)) // Setup Prometheus @@ -135,9 +135,9 @@ func TestReceive(t *testing.T) { t.Cleanup(e2ethanos.CleanScenario(t, e)) // Setup Receives - r1 := e2ethanos.NewReceiveBuilder(e, "1").WithIngestionEnabled().Init() - r2 := e2ethanos.NewReceiveBuilder(e, "2").WithIngestionEnabled().Init() - r3 := e2ethanos.NewReceiveBuilder(e, "3").WithIngestionEnabled().Init() + r1 := e2ethanos.NewReceiveBuilder(e, "1").WithIngestionEnabled().WithExpandedPostingsCache().Init() + r2 := e2ethanos.NewReceiveBuilder(e, "2").WithIngestionEnabled().WithExpandedPostingsCache().Init() + r3 := e2ethanos.NewReceiveBuilder(e, "3").WithIngestionEnabled().WithExpandedPostingsCache().Init() testutil.Ok(t, e2e.StartAndWaitReady(r1, r2, r3)) @@ -291,9 +291,9 @@ test_metric{a="2", b="2"} 1`) t.Cleanup(e2ethanos.CleanScenario(t, e)) // Setup 3 ingestors. - i1 := e2ethanos.NewReceiveBuilder(e, "i1").WithIngestionEnabled().Init() - i2 := e2ethanos.NewReceiveBuilder(e, "i2").WithIngestionEnabled().Init() - i3 := e2ethanos.NewReceiveBuilder(e, "i3").WithIngestionEnabled().Init() + i1 := e2ethanos.NewReceiveBuilder(e, "i1").WithIngestionEnabled().WithExpandedPostingsCache().Init() + i2 := e2ethanos.NewReceiveBuilder(e, "i2").WithIngestionEnabled().WithExpandedPostingsCache().Init() + i3 := e2ethanos.NewReceiveBuilder(e, "i3").WithIngestionEnabled().WithExpandedPostingsCache().Init() h := receive.HashringConfig{ Endpoints: []receive.Endpoint{