From ef40136715d0bf6cbeeac7ca4e70632b1343278d Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Fri, 16 Feb 2024 17:08:03 +0100 Subject: [PATCH] Use TSDB index prefix on blooms directory (#11977) --- pkg/bloomcompactor/bloomcompactor.go | 38 +++++++++++++++++++------- pkg/bloomcompactor/controller.go | 12 ++++---- pkg/bloomcompactor/tsdb.go | 24 ++++++++-------- pkg/bloomgateway/util_test.go | 2 +- pkg/storage/config/schema_config.go | 41 ++++++++++++++++------------ 5 files changed, 71 insertions(+), 46 deletions(-) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index cc752c2224a63..3bb1c815e8295 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -179,11 +179,11 @@ func runWithRetries( type tenantTable struct { tenant string - table config.DayTime + table config.DayTable ownershipRange v1.FingerprintBounds } -func (c *Compactor) tenants(ctx context.Context, table config.DayTime) (v1.Iterator[string], error) { +func (c *Compactor) tenants(ctx context.Context, table config.DayTable) (v1.Iterator[string], error) { tenants, err := c.tsdbStore.UsersForPeriod(ctx, table) if err != nil { return nil, errors.Wrap(err, "getting tenants") @@ -241,15 +241,15 @@ func (c *Compactor) tables(ts time.Time) *dayRangeIterator { fromDay := config.NewDayTime(model.TimeFromUnixNano(from)) throughDay := config.NewDayTime(model.TimeFromUnixNano(through)) - return newDayRangeIterator(fromDay, throughDay) + return newDayRangeIterator(fromDay, throughDay, c.schemaCfg) } func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error { tables := c.tables(time.Now()) for tables.Next() && tables.Err() == nil && ctx.Err() == nil { - table := tables.At() + tenants, err := c.tenants(ctx, table) if err != nil { return errors.Wrap(err, "getting tenants") @@ -269,7 +269,11 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error { c.metrics.tenantsOwned.Inc() select { - case ch <- tenantTable{tenant: tenant, table: table, ownershipRange: ownershipRange}: + case ch <- tenantTable{ + tenant: tenant, + table: table, + ownershipRange: ownershipRange, + }: case <-ctx.Done(): return ctx.Err() } @@ -332,19 +336,33 @@ func (c *Compactor) compactTenantTable(ctx context.Context, tt tenantTable) erro type dayRangeIterator struct { min, max, cur config.DayTime + curPeriod config.PeriodConfig + schemaCfg config.SchemaConfig + err error } -func newDayRangeIterator(min, max config.DayTime) *dayRangeIterator { - return &dayRangeIterator{min: min, max: max, cur: min.Dec()} +func newDayRangeIterator(min, max config.DayTime, schemaCfg config.SchemaConfig) *dayRangeIterator { + return &dayRangeIterator{min: min, max: max, cur: min.Dec(), schemaCfg: schemaCfg} } func (r *dayRangeIterator) Next() bool { r.cur = r.cur.Inc() - return r.cur.Before(r.max) + if !r.cur.Before(r.max) { + return false + } + + period, err := r.schemaCfg.SchemaForTime(r.cur.ModelTime()) + if err != nil { + r.err = errors.Wrapf(err, "getting schema for time (%s)", r.cur) + return false + } + r.curPeriod = period + + return true } -func (r *dayRangeIterator) At() config.DayTime { - return r.cur +func (r *dayRangeIterator) At() config.DayTable { + return config.NewDayTable(r.cur, r.curPeriod.IndexTables.Prefix) } func (r *dayRangeIterator) Err() error { diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index cee0e6f058201..ef41ec2d8efbb 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -66,15 +66,15 @@ Compaction works as follows, split across many functions for clarity: */ func (s *SimpleBloomController) compactTenant( ctx context.Context, - table config.DayTime, + table config.DayTable, tenant string, ownershipRange v1.FingerprintBounds, ) error { - logger := log.With(s.logger, "ownership", ownershipRange, "org_id", tenant, "table", table) + logger := log.With(s.logger, "ownership", ownershipRange, "org_id", tenant, "table", table.Addr()) client, err := s.bloomStore.Client(table.ModelTime()) if err != nil { - level.Error(logger).Log("msg", "failed to get client", "err", err, "table", table.Addr()) + level.Error(logger).Log("msg", "failed to get client", "err", err) return errors.Wrap(err, "failed to get client") } @@ -175,7 +175,7 @@ func (s *SimpleBloomController) compactTenant( func (s *SimpleBloomController) findOutdatedGaps( ctx context.Context, tenant string, - table config.DayTime, + table config.DayTable, ownershipRange v1.FingerprintBounds, metas []bloomshipper.Meta, logger log.Logger, @@ -215,7 +215,7 @@ func (s *SimpleBloomController) findOutdatedGaps( func (s *SimpleBloomController) loadWorkForGap( ctx context.Context, - table config.DayTime, + table config.DayTable, tenant string, id tsdb.Identifier, gap gapWithBlocks, @@ -241,7 +241,7 @@ func (s *SimpleBloomController) loadWorkForGap( func (s *SimpleBloomController) buildGaps( ctx context.Context, tenant string, - table config.DayTime, + table config.DayTable, client bloomshipper.Client, work []blockPlan, logger log.Logger, diff --git a/pkg/bloomcompactor/tsdb.go b/pkg/bloomcompactor/tsdb.go index d19e185a9275b..6159ce02a804a 100644 --- a/pkg/bloomcompactor/tsdb.go +++ b/pkg/bloomcompactor/tsdb.go @@ -26,11 +26,11 @@ const ( ) type TSDBStore interface { - UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) - ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) + UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error) + ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) LoadTSDB( ctx context.Context, - table config.DayTime, + table config.DayTable, tenant string, id tsdb.Identifier, bounds v1.FingerprintBounds, @@ -49,12 +49,12 @@ func NewBloomTSDBStore(storage storage.Client) *BloomTSDBStore { } } -func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) { +func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error) { _, users, err := b.storage.ListFiles(ctx, table.Addr(), true) // bypass cache for ease of testing return users, err } -func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { +func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { indices, err := b.storage.ListUserFiles(ctx, table.Addr(), tenant, true) // bypass cache for ease of testing if err != nil { return nil, errors.Wrap(err, "failed to list user files") @@ -80,7 +80,7 @@ func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime, func (b *BloomTSDBStore) LoadTSDB( ctx context.Context, - table config.DayTime, + table config.DayTable, tenant string, id tsdb.Identifier, bounds v1.FingerprintBounds, @@ -272,8 +272,8 @@ func (s *TSDBStores) storeForPeriod(table config.DayTime) (TSDBStore, error) { ) } -func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) { - store, err := s.storeForPeriod(table) +func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error) { + store, err := s.storeForPeriod(table.DayTime) if err != nil { return nil, err } @@ -281,8 +281,8 @@ func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTime) ( return store.UsersForPeriod(ctx, table) } -func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { - store, err := s.storeForPeriod(table) +func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { + store, err := s.storeForPeriod(table.DayTime) if err != nil { return nil, err } @@ -292,12 +292,12 @@ func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTime, ten func (s *TSDBStores) LoadTSDB( ctx context.Context, - table config.DayTime, + table config.DayTable, tenant string, id tsdb.Identifier, bounds v1.FingerprintBounds, ) (v1.CloseableIterator[*v1.Series], error) { - store, err := s.storeForPeriod(table) + store, err := s.storeForPeriod(table.DayTime) if err != nil { return nil, err } diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index 9b5ce6e897bb9..281feba4b29a5 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -311,7 +311,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, } ref := bloomshipper.Ref{ TenantID: tenant, - TableName: config.NewDayTime(truncateDay(from)).Addr(), + TableName: config.NewDayTable(config.NewDayTime(truncateDay(from)), "").Addr(), Bounds: v1.NewBounds(fromFp, throughFp), StartTimestamp: from, EndTimestamp: through, diff --git a/pkg/storage/config/schema_config.go b/pkg/storage/config/schema_config.go index b7c92c62c3d94..968ca87e609b7 100644 --- a/pkg/storage/config/schema_config.go +++ b/pkg/storage/config/schema_config.go @@ -200,10 +200,6 @@ func (cfg *PeriodConfig) GetIndexTableNumberRange(schemaEndDate DayTime) TableRa } } -func (cfg *PeriodConfig) GetFullTableName(t model.Time) string { - return NewDayTime(t).TableWithPrefix(cfg) -} - func NewDayTime(d model.Time) DayTime { return DayTime{d} } @@ -237,19 +233,6 @@ func (d DayTime) String() string { return d.Time.Time().UTC().Format("2006-01-02") } -// Addr returns the unix day offset as a string, which is used -// as the address for the index table in storage. -func (d DayTime) Addr() string { - return fmt.Sprintf("%d", - d.ModelTime().Time().UnixNano()/int64(ObjectStorageIndexRequiredPeriod)) -} - -func (d DayTime) TableWithPrefix(cfg *PeriodConfig) string { - return fmt.Sprintf("%s%d", - cfg.IndexTables.Prefix, - d.ModelTime().Time().UnixNano()/int64(ObjectStorageIndexRequiredPeriod)) -} - func (d DayTime) Inc() DayTime { return DayTime{d.Add(ObjectStorageIndexRequiredPeriod)} } @@ -274,6 +257,30 @@ func (d DayTime) Bounds() (model.Time, model.Time) { return d.Time, d.Inc().Time } +type DayTable struct { + DayTime + Prefix string +} + +func (d DayTable) String() string { + return d.Addr() +} + +func NewDayTable(d DayTime, prefix string) DayTable { + return DayTable{ + DayTime: d, + Prefix: prefix, + } +} + +// Addr returns the prefix (if any) and the unix day offset as a string, which is used +// as the address for the index table in storage. +func (d DayTable) Addr() string { + return fmt.Sprintf("%s%d", + d.Prefix, + d.ModelTime().Time().UnixNano()/int64(ObjectStorageIndexRequiredPeriod)) +} + // SchemaConfig contains the config for our chunk index schemas type SchemaConfig struct { Configs []PeriodConfig `yaml:"configs"`