Skip to content

Commit

Permalink
store-gateway: retain lazy-loaded index headers between restarts (#5606)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Jon Kartago Lamida <[email protected]>
Co-authored-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: Oleg Zaytsev <[email protected]>
Co-authored-by: Charles Korn <[email protected]>
Co-authored-by: Marco Pracucci <[email protected]>
  • Loading branch information
5 people authored Aug 14, 2023
1 parent d859ad5 commit 939f522
Show file tree
Hide file tree
Showing 17 changed files with 667 additions and 214 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@
* [CHANGE] Compactor: change default of `-compactor.first-level-compaction-wait-period` to 25m. #5128
* [CHANGE] Ruler: changed default of `-ruler.poll-interval` from `1m` to `10m`. Starting from this release, the configured rule groups will also be re-synced each time they're modified calling the ruler configuration API. #5170
* [FEATURE] Query-frontend: add `-query-frontend.log-query-request-headers` to enable logging of request headers in query logs. #5030
* [FEATURE] Store-gateway: add experimental feature to retain lazy-loaded index headers between restarts by eagerly loading them during startup. This is disabled by default and can only be enabled if lazy loading is enabled. To enable this set the following: #5606
* `-blocks-storage.bucket-store.index-header-lazy-loading-enabled` must be set to true
* `-blocks-storage.bucket-store.index-header.eager-loading-startup-enabled` must be set to true
* [ENHANCEMENT] Add per-tenant limit `-validation.max-native-histogram-buckets` to be able to ignore native histogram samples that have too many buckets. #4765
* [ENHANCEMENT] Store-gateway: reduce memory usage in some LabelValues calls. #4789
* [ENHANCEMENT] Store-gateway: add a `stage` label to the metric `cortex_bucket_store_series_data_touched`. This label now applies to `data_type="chunks"` and `data_type="series"`. The `stage` label has 2 values: `processed` - the number of series that parsed - and `returned` - the number of series selected from the processed bytes to satisfy the query. #4797 #4830
Expand Down
13 changes: 12 additions & 1 deletion cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -7516,13 +7516,24 @@
"kind": "field",
"name": "max_idle_file_handles",
"required": false,
"desc": "Maximum number of idle file handles the store-gateway keeps open for each index header file.",
"desc": "Maximum number of idle file handles the store-gateway keeps open for each index-header file.",
"fieldValue": null,
"fieldDefaultValue": 1,
"fieldFlag": "blocks-storage.bucket-store.index-header.max-idle-file-handles",
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "eager_loading_startup_enabled",
"required": false,
"desc": "If enabled, store-gateway will periodically persist block IDs of lazy loaded index-headers and load them eagerly during startup. It is not valid to enable this if index-header lazy loading is disabled.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "blocks-storage.bucket-store.index-header.eager-loading-startup-enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "verify_on_load",
Expand Down
4 changes: 3 additions & 1 deletion cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,10 @@ Usage of ./cmd/mimir/mimir:
If index-header lazy loading is enabled and this setting is > 0, the store-gateway will offload unused index-headers after 'idle timeout' inactivity. (default 1h0m0s)
-blocks-storage.bucket-store.index-header-sparse-persistence-enabled
[experimental] If enabled, store-gateway will persist a sparse version of the index-header to disk on construction and load sparse index-headers from disk instead of the whole index-header.
-blocks-storage.bucket-store.index-header.eager-loading-startup-enabled
[experimental] If enabled, store-gateway will periodically persist block IDs of lazy loaded index-headers and load them eagerly during startup. It is not valid to enable this if index-header lazy loading is disabled.
-blocks-storage.bucket-store.index-header.max-idle-file-handles uint
Maximum number of idle file handles the store-gateway keeps open for each index header file. (default 1)
Maximum number of idle file handles the store-gateway keeps open for each index-header file. (default 1)
-blocks-storage.bucket-store.index-header.verify-on-load
If true, verify the checksum of index headers upon loading them (either on startup or lazily when lazy loading is enabled). Setting to true helps detect disk corruption at the cost of slowing down index header loading.
-blocks-storage.bucket-store.max-chunk-pool-bytes uint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3412,10 +3412,16 @@ bucket_store:
index_header:
# (advanced) Maximum number of idle file handles the store-gateway keeps
# open for each index header file.
# open for each index-header file.
# CLI flag: -blocks-storage.bucket-store.index-header.max-idle-file-handles
[max_idle_file_handles: <int> | default = 1]
# (experimental) If enabled, store-gateway will periodically persist block
# IDs of lazy loaded index-headers and load them eagerly during startup. It
# is not valid to enable this if index-header lazy loading is disabled.
# CLI flag: -blocks-storage.bucket-store.index-header.eager-loading-startup-enabled
[eager_loading_startup_enabled: <boolean> | default = false]
# (advanced) If true, verify the checksum of index headers upon loading them
# (either on startup or lazily when lazy loading is enabled). Setting to
# true helps detect disk corruption at the cost of slowing down index header
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,9 @@ func (cfg *BucketStoreConfig) Validate(logger log.Logger) error {
if cfg.SeriesSelectionStrategyName == WorstCasePostingsStrategy && cfg.SelectionStrategies.WorstCaseSeriesPreference <= 0 {
return errors.New("invalid worst-case series preference; must be positive")
}
if err := cfg.IndexHeader.Validate(cfg.IndexHeaderLazyLoadingEnabled); err != nil {
return errors.Wrap(err, "index-header configuration")
}
if cfg.IndexHeaderLazyLoadingConcurrency < 0 {
return errInvalidIndexHeaderLazyLoadingConcurrency
}
Expand Down
38 changes: 21 additions & 17 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/storage/sharding"
"github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
"github.com/grafana/mimir/pkg/storage/tsdb/bucketcache"
"github.com/grafana/mimir/pkg/storegateway/chunkscache"
Expand Down Expand Up @@ -227,18 +228,11 @@ func NewBucketStore(
bkt objstore.InstrumentedBucketReader,
fetcher block.MetadataFetcher,
dir string,
maxSeriesPerBatch int,
numChunksRangesPerSeries int,
bucketStoreConfig tsdb.BucketStoreConfig,
postingsStrategy postingsSelectionStrategy,
chunksLimiterFactory ChunksLimiterFactory,
seriesLimiterFactory SeriesLimiterFactory,
partitioners blockPartitioners,
blockSyncConcurrency int,
postingOffsetsInMemSampling int,
indexHeaderCfg indexheader.Config,
lazyIndexReaderEnabled bool,
lazyIndexReaderIdleTimeout time.Duration,
sparsePersistenceEnabled bool,
seriesHashCache *hashcache.SeriesHashCache,
metrics *BucketStoreMetrics,
options ...BucketStoreOption,
Expand All @@ -252,28 +246,33 @@ func NewBucketStore(
chunksCache: chunkscache.NoopCache{},
blocks: map[ulid.ULID]*bucketBlock{},
blockSet: newBucketBlockSet(),
blockSyncConcurrency: blockSyncConcurrency,
blockSyncConcurrency: bucketStoreConfig.BlockSyncConcurrency,
queryGate: gate.NewNoop(),
lazyLoadingGate: gate.NewNoop(),
chunksLimiterFactory: chunksLimiterFactory,
seriesLimiterFactory: seriesLimiterFactory,
partitioners: partitioners,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
indexHeaderCfg: indexHeaderCfg,
postingOffsetsInMemSampling: bucketStoreConfig.PostingOffsetsInMemSampling,
indexHeaderCfg: bucketStoreConfig.IndexHeader,
seriesHashCache: seriesHashCache,
metrics: metrics,
userID: userID,
maxSeriesPerBatch: maxSeriesPerBatch,
numChunksRangesPerSeries: numChunksRangesPerSeries,
maxSeriesPerBatch: bucketStoreConfig.StreamingBatchSize,
numChunksRangesPerSeries: bucketStoreConfig.ChunkRangesPerSeries,
postingsStrategy: postingsStrategy,
}

for _, option := range options {
option(s)
}

lazyLoadedSnapshotConfig := indexheader.LazyLoadedHeadersSnapshotConfig{
Path: dir,
UserID: userID,
EagerLoadingEnabled: bucketStoreConfig.IndexHeader.IndexHeaderEagerLoadingStartupEnabled,
}
// Depend on the options
s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, sparsePersistenceEnabled, s.lazyLoadingGate, metrics.indexHeaderReaderMetrics)
s.indexReaderPool = indexheader.NewReaderPool(s.logger, bucketStoreConfig.IndexHeaderLazyLoadingEnabled, bucketStoreConfig.IndexHeaderLazyLoadingIdleTimeout, bucketStoreConfig.IndexHeaderSparsePersistenceEnabled, s.lazyLoadingGate, metrics.indexHeaderReaderMetrics, lazyLoadedSnapshotConfig)

if err := os.MkdirAll(dir, 0750); err != nil {
return nil, errors.Wrap(err, "create dir")
Expand Down Expand Up @@ -306,6 +305,10 @@ func (s *BucketStore) Stats() BucketStoreStats {
// SyncBlocks synchronizes the stores state with the Bucket bucket.
// It will reuse disk space as persistent cache based on s.dir param.
func (s *BucketStore) SyncBlocks(ctx context.Context) error {
return s.syncBlocks(ctx, false)
}

func (s *BucketStore) syncBlocks(ctx context.Context, initialSync bool) error {
metas, _, metaFetchErr := s.fetcher.Fetch(ctx)
// For partial view allow adding new blocks at least.
if metaFetchErr != nil && metas == nil {
Expand All @@ -319,7 +322,7 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error {
wg.Add(1)
go func() {
for meta := range blockc {
if err := s.addBlock(ctx, meta); err != nil {
if err := s.addBlock(ctx, meta, initialSync); err != nil {
continue
}
}
Expand Down Expand Up @@ -361,7 +364,7 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error {
// InitialSync perform blocking sync with extra step at the end to delete locally saved blocks that are no longer
// present in the bucket. The mismatch of these can only happen between restarts, so we can do that only once per startup.
func (s *BucketStore) InitialSync(ctx context.Context) error {
if err := s.SyncBlocks(ctx); err != nil {
if err := s.syncBlocks(ctx, true); err != nil {
return errors.Wrap(err, "sync block")
}

Expand Down Expand Up @@ -397,7 +400,7 @@ func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock {
return s.blocks[id]
}

func (s *BucketStore) addBlock(ctx context.Context, meta *block.Meta) (err error) {
func (s *BucketStore) addBlock(ctx context.Context, meta *block.Meta, initialSync bool) (err error) {
dir := filepath.Join(s.dir, meta.ULID.String())
start := time.Now()

Expand All @@ -423,6 +426,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *block.Meta) (err error
meta.ULID,
s.postingOffsetsInMemSampling,
s.indexHeaderCfg,
initialSync,
)
if err != nil {
return errors.Wrap(err, "create index header reader")
Expand Down
20 changes: 12 additions & 8 deletions pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,22 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS
objstore.WithNoopInstr(bkt),
metaFetcher,
cfg.tempDir,
cfg.maxSeriesPerBatch,
1,
mimir_tsdb.BucketStoreConfig{
StreamingBatchSize: cfg.maxSeriesPerBatch,
ChunkRangesPerSeries: 1,
BlockSyncConcurrency: 20,
PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling,
IndexHeader: indexheader.Config{
IndexHeaderEagerLoadingStartupEnabled: true,
},
IndexHeaderLazyLoadingEnabled: true,
IndexHeaderLazyLoadingIdleTimeout: time.Minute,
IndexHeaderSparsePersistenceEnabled: true,
},
cfg.postingsStrategy,
cfg.chunksLimiterFactory,
cfg.seriesLimiterFactory,
newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
20,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
indexheader.Config{},
true,
time.Minute,
true,
hashcache.NewSeriesHashCache(1024*1024),
NewBucketStoreMetrics(s.metricsRegistry),
storeOpts...,
Expand Down
9 changes: 1 addition & 8 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) {
userBkt,
fetcher,
u.syncDirForUser(userID),
u.cfg.BucketStore.StreamingBatchSize,
u.cfg.BucketStore.ChunkRangesPerSeries,
u.cfg.BucketStore,
selectPostingsStrategy(u.logger, u.cfg.BucketStore.SeriesSelectionStrategyName, u.cfg.BucketStore.SelectionStrategies.WorstCaseSeriesPreference),
NewChunksLimiterFactory(func() uint64 {
return uint64(u.limits.MaxChunksPerQuery(userID))
Expand All @@ -492,12 +491,6 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) {
return uint64(u.limits.MaxFetchedSeriesPerQuery(userID))
}),
u.partitioners,
u.cfg.BucketStore.BlockSyncConcurrency,
u.cfg.BucketStore.PostingOffsetsInMemSampling,
u.cfg.BucketStore.IndexHeader,
u.cfg.BucketStore.IndexHeaderLazyLoadingEnabled,
u.cfg.BucketStore.IndexHeaderLazyLoadingIdleTimeout,
u.cfg.BucketStore.IndexHeaderSparsePersistenceEnabled,
u.seriesHashCache,
u.bucketStoreMetrics,
bucketStoreOpts...,
Expand Down
Loading

0 comments on commit 939f522

Please sign in to comment.