Skip to content

Commit 844fa55

Browse files
authored
Refactor block storage User Scanner and introduce user index file (#6780)
* add list and user index user scanner Signed-off-by: Ben Ye <[email protected]> * changelog Signed-off-by: Ben Ye <[email protected]> * lint Signed-off-by: Ben Ye <[email protected]> * fix lint and test Signed-off-by: yeya24 <[email protected]> * lint Signed-off-by: yeya24 <[email protected]> * fix test Signed-off-by: yeya24 <[email protected]> * fix unit tests Signed-off-by: yeya24 <[email protected]> * fix unit test Signed-off-by: yeya24 <[email protected]> * update unit test Signed-off-by: yeya24 <[email protected]> * ignore user index file name as a valid a userID Signed-off-by: yeya24 <[email protected]> * add metrics for scanner Signed-off-by: yeya24 <[email protected]> * fix lint Signed-off-by: yeya24 <[email protected]> * fix metrics Signed-off-by: yeya24 <[email protected]> --------- Signed-off-by: Ben Ye <[email protected]> Signed-off-by: yeya24 <[email protected]>
1 parent 2811849 commit 844fa55

36 files changed

+1693
-233
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
* [ENHANCEMENT] Distributor: Add min/max schema validation for NativeHistograms. #6766
3838
* [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769
3939
* [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778
40+
* [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780
4041
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
4142
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
4243
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576

docs/blocks-storage/querier.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1599,4 +1599,19 @@ blocks_storage:
15991599
# TTL for postings cache
16001600
# CLI flag: -blocks-storage.expanded_postings_cache.block.ttl
16011601
[ttl: <duration> | default = 10m]
1602+
1603+
users_scanner:
1604+
# Strategy to use to scan users. Supported values are: list, user_index.
1605+
# CLI flag: -blocks-storage.users-scanner.strategy
1606+
[strategy: <string> | default = "list"]
1607+
1608+
# Maximum period of time to consider the user index as stale. Fall back to
1609+
# the base scanner if stale. Only valid when strategy is user_index.
1610+
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
1611+
[max_stale_period: <duration> | default = 1h]
1612+
1613+
# TTL of the cached users. 0 disables caching and relies on caching at
1614+
# bucket client level.
1615+
# CLI flag: -blocks-storage.users-scanner.cache-ttl
1616+
[cache_ttl: <duration> | default = 0s]
16021617
```

docs/blocks-storage/store-gateway.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1720,4 +1720,19 @@ blocks_storage:
17201720
# TTL for postings cache
17211721
# CLI flag: -blocks-storage.expanded_postings_cache.block.ttl
17221722
[ttl: <duration> | default = 10m]
1723+
1724+
users_scanner:
1725+
# Strategy to use to scan users. Supported values are: list, user_index.
1726+
# CLI flag: -blocks-storage.users-scanner.strategy
1727+
[strategy: <string> | default = "list"]
1728+
1729+
# Maximum period of time to consider the user index as stale. Fall back to
1730+
# the base scanner if stale. Only valid when strategy is user_index.
1731+
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
1732+
[max_stale_period: <duration> | default = 1h]
1733+
1734+
# TTL of the cached users. 0 disables caching and relies on caching at
1735+
# bucket client level.
1736+
# CLI flag: -blocks-storage.users-scanner.cache-ttl
1737+
[cache_ttl: <duration> | default = 0s]
17231738
```

docs/configuration/config-file-reference.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2173,6 +2173,21 @@ tsdb:
21732173
# TTL for postings cache
21742174
# CLI flag: -blocks-storage.expanded_postings_cache.block.ttl
21752175
[ttl: <duration> | default = 10m]
2176+
2177+
users_scanner:
2178+
# Strategy to use to scan users. Supported values are: list, user_index.
2179+
# CLI flag: -blocks-storage.users-scanner.strategy
2180+
[strategy: <string> | default = "list"]
2181+
2182+
# Maximum period of time to consider the user index as stale. Fall back to the
2183+
# base scanner if stale. Only valid when strategy is user_index.
2184+
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
2185+
[max_stale_period: <duration> | default = 1h]
2186+
2187+
# TTL of the cached users. 0 disables caching and relies on caching at bucket
2188+
# client level.
2189+
# CLI flag: -blocks-storage.users-scanner.cache-ttl
2190+
[cache_ttl: <duration> | default = 0s]
21762191
```
21772192
21782193
### `compactor_config`

docs/configuration/v1-guarantees.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ Currently experimental features are:
6464
- Blocks storage bucket index
6565
- The bucket index support in the querier and store-gateway (enabled via `-blocks-storage.bucket-store.bucket-index.enabled=true`) is experimental
6666
- The block deletion marks migration support in the compactor (`-compactor.block-deletion-marks-migration-enabled`) is temporarily and will be removed in future versions
67+
- Blocks storage user index
6768
- Querier: tenant federation
6869
- The thanosconvert tool for converting Thanos block metadata to Cortex
6970
- HA Tracker: cleanup of old replicas from KV Store.

docs/guides/limitations.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ The following tenant IDs are considered invalid in Cortex.
3838
- Current directory (`.`)
3939
- Parent directory (`..`)
4040
- Markers directory (`__markers__`)
41+
- User Index File (`user-index.json.gz`)
4142

4243
### Length
4344

pkg/compactor/blocks_cleaner.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/cortexproject/cortex/pkg/storage/bucket"
2222
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2323
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
24+
"github.com/cortexproject/cortex/pkg/storage/tsdb/users"
2425
"github.com/cortexproject/cortex/pkg/util"
2526
"github.com/cortexproject/cortex/pkg/util/concurrency"
2627
util_log "github.com/cortexproject/cortex/pkg/util/log"
@@ -51,7 +52,7 @@ type BlocksCleaner struct {
5152
cfgProvider ConfigProvider
5253
logger log.Logger
5354
bucketClient objstore.InstrumentedBucket
54-
usersScanner *cortex_tsdb.UsersScanner
55+
usersScanner users.Scanner
5556

5657
ringLifecyclerID string
5758

@@ -85,7 +86,7 @@ type BlocksCleaner struct {
8586
func NewBlocksCleaner(
8687
cfg BlocksCleanerConfig,
8788
bucketClient objstore.InstrumentedBucket,
88-
usersScanner *cortex_tsdb.UsersScanner,
89+
usersScanner users.Scanner,
8990
compactionVisitMarkerTimeout time.Duration,
9091
cfgProvider ConfigProvider,
9192
logger log.Logger,
@@ -336,19 +337,22 @@ func (c *BlocksCleaner) cleanDeletedUsers(ctx context.Context, users []string) e
336337
}
337338

338339
func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, error) {
339-
users, deleted, err := c.usersScanner.ScanUsers(ctx)
340+
active, deleting, deleted, err := c.usersScanner.ScanUsers(ctx)
340341
if err != nil {
341342
return nil, nil, errors.Wrap(err, "failed to discover users from bucket")
342343
}
343344

344-
isActive := util.StringsMap(users)
345-
isDeleted := util.StringsMap(deleted)
346-
allUsers := append(users, deleted...)
345+
isActive := util.StringsMap(active)
346+
markedForDeletion := make([]string, 0, len(deleting)+len(deleted))
347+
markedForDeletion = append(markedForDeletion, deleting...)
348+
markedForDeletion = append(markedForDeletion, deleted...)
349+
isMarkedForDeletion := util.StringsMap(markedForDeletion)
350+
allUsers := append(active, markedForDeletion...)
347351
// Delete per-tenant metrics for all tenants not belonging anymore to this shard.
348352
// Such tenants have been moved to a different shard, so their updated metrics will
349353
// be exported by the new shard.
350354
for _, userID := range c.lastOwnedUsers {
351-
if !isActive[userID] && !isDeleted[userID] {
355+
if !isActive[userID] && !isMarkedForDeletion[userID] {
352356
c.tenantBlocks.DeleteLabelValues(userID)
353357
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
354358
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
@@ -365,7 +369,7 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro
365369
}
366370
c.lastOwnedUsers = allUsers
367371

368-
return users, deleted, nil
372+
return active, markedForDeletion, nil
369373
}
370374

371375
func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (visitMarkerManager *VisitMarkerManager, isVisited bool, err error) {

pkg/compactor/blocks_cleaner_test.go

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/cortexproject/cortex/pkg/storage/tsdb"
2626
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
2727
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
28+
"github.com/cortexproject/cortex/pkg/storage/tsdb/users"
2829
"github.com/cortexproject/cortex/pkg/util"
2930
util_log "github.com/cortexproject/cortex/pkg/util/log"
3031
"github.com/cortexproject/cortex/pkg/util/services"
@@ -83,21 +84,25 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
8384
}
8485

8586
logger := log.NewNopLogger()
86-
scanner := tsdb.NewUsersScanner(mbucket, tsdb.AllUsers, logger)
87+
reg := prometheus.NewRegistry()
88+
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
89+
Strategy: tsdb.UserScanStrategyList,
90+
}, mbucket, logger, reg)
91+
require.NoError(t, err)
8792
cfgProvider := newMockConfigProvider()
8893
blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{
8994
Name: blocksMarkedForDeletionName,
9095
Help: blocksMarkedForDeletionHelp,
9196
}, append(commonLabels, reasonLabelName))
9297
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})
9398

94-
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
99+
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
95100

96101
// Clean User with no error
97102
cleaner.bucketClient = bkt
98103
userLogger := util_log.WithUserID(userID, cleaner.logger)
99104
userBucket := bucket.NewUserBucketClient(userID, cleaner.bucketClient, cleaner.cfgProvider)
100-
err := cleaner.cleanUser(ctx, userLogger, userBucket, userID, false)
105+
err = cleaner.cleanUser(ctx, userLogger, userBucket, userID, false)
101106
require.NoError(t, err)
102107
s, err := bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
103108
require.NoError(t, err)
@@ -196,7 +201,10 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
196201

197202
reg := prometheus.NewPedanticRegistry()
198203
logger := log.NewNopLogger()
199-
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
204+
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
205+
Strategy: tsdb.UserScanStrategyList,
206+
}, bucketClient, logger, reg)
207+
require.NoError(t, err)
200208
cfgProvider := newMockConfigProvider()
201209
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
202210
Name: blocksMarkedForDeletionName,
@@ -364,7 +372,11 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
364372
}
365373

366374
logger := log.NewNopLogger()
367-
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
375+
reg := prometheus.NewRegistry()
376+
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
377+
Strategy: tsdb.UserScanStrategyList,
378+
}, bucketClient, logger, reg)
379+
require.NoError(t, err)
368380
cfgProvider := newMockConfigProvider()
369381
blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{
370382
Name: blocksMarkedForDeletionName,
@@ -429,7 +441,11 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
429441
}
430442

431443
logger := log.NewNopLogger()
432-
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
444+
reg := prometheus.NewRegistry()
445+
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
446+
Strategy: tsdb.UserScanStrategyList,
447+
}, bucketClient, logger, reg)
448+
require.NoError(t, err)
433449
cfgProvider := newMockConfigProvider()
434450
blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{
435451
Name: blocksMarkedForDeletionName,
@@ -487,8 +503,11 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
487503

488504
ctx := context.Background()
489505
logger := log.NewNopLogger()
490-
reg := prometheus.NewPedanticRegistry()
491-
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
506+
reg := prometheus.NewRegistry()
507+
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
508+
Strategy: tsdb.UserScanStrategyList,
509+
}, bucketClient, logger, reg)
510+
require.NoError(t, err)
492511
cfgProvider := newMockConfigProvider()
493512
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
494513
Name: blocksMarkedForDeletionName,
@@ -522,7 +541,11 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
522541
))
523542

524543
// Override the users scanner to reconfigure it to only return a subset of users.
525-
cleaner.usersScanner = tsdb.NewUsersScanner(bucketClient, func(userID string) (bool, error) { return userID == "user-1", nil }, logger)
544+
cleaner.usersScanner, err = users.NewScanner(tsdb.UsersScannerConfig{
545+
Strategy: tsdb.UserScanStrategyList,
546+
}, bucketClient, logger, reg)
547+
require.NoError(t, err)
548+
cleaner.usersScanner = users.NewShardedScanner(cleaner.usersScanner, func(userID string) (bool, error) { return userID == "user-1", nil }, logger)
526549

527550
// Create new blocks, to double check expected metrics have changed.
528551
createTSDBBlock(t, bucketClient, "user-1", 40, 50, nil)
@@ -630,7 +653,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
630653
ctx := context.Background()
631654
logger := log.NewNopLogger()
632655
reg := prometheus.NewPedanticRegistry()
633-
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
656+
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
657+
Strategy: tsdb.UserScanStrategyList,
658+
}, bucketClient, logger, reg)
659+
require.NoError(t, err)
634660
cfgProvider := newMockConfigProvider()
635661
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
636662
Name: blocksMarkedForDeletionName,
@@ -859,7 +885,10 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
859885
ctx := context.Background()
860886
logger := log.NewNopLogger()
861887
reg := prometheus.NewPedanticRegistry()
862-
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
888+
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
889+
Strategy: tsdb.UserScanStrategyList,
890+
}, bucketClient, logger, reg)
891+
require.NoError(t, err)
863892
cfgProvider := newMockConfigProvider()
864893
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
865894
Name: blocksMarkedForDeletionName,
@@ -885,7 +914,7 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
885914
CreationTime: time.Now().Add(-5 * time.Minute).Unix(),
886915
Version: PartitionedGroupInfoVersion1,
887916
}
888-
_, err := UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo)
917+
_, err = UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo)
889918
require.NoError(t, err)
890919

891920
visitMarker := &partitionVisitMarker{
@@ -931,7 +960,10 @@ func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
931960
ctx := context.Background()
932961
logger := log.NewNopLogger()
933962
reg := prometheus.NewPedanticRegistry()
934-
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
963+
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
964+
Strategy: tsdb.UserScanStrategyList,
965+
}, bucketClient, logger, reg)
966+
require.NoError(t, err)
935967
cfgProvider := newMockConfigProvider()
936968
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
937969
Name: blocksMarkedForDeletionName,
@@ -960,7 +992,7 @@ func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
960992
CreationTime: time.Now().Add(-5 * time.Minute).Unix(),
961993
Version: PartitionedGroupInfoVersion1,
962994
}
963-
_, err := UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo)
995+
_, err = UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo)
964996
require.NoError(t, err)
965997
partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupInfo.PartitionedGroupID)
966998

0 commit comments

Comments
 (0)