Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use TSDB index prefix on blooms directory #11977

Merged
merged 4 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,12 @@ func runWithRetries(
type tenantTable struct {
tenant string
table config.DayTime
period config.PeriodConfig
ownershipRange v1.FingerprintBounds
}

func (c *Compactor) tenants(ctx context.Context, table config.DayTime) (v1.Iterator[string], error) {
tenants, err := c.tsdbStore.UsersForPeriod(ctx, table)
func (c *Compactor) tenants(ctx context.Context, table config.DayTime, period config.PeriodConfig) (v1.Iterator[string], error) {
tenants, err := c.tsdbStore.UsersForPeriod(ctx, table, period)
if err != nil {
return nil, errors.Wrap(err, "getting tenants")
}
Expand Down Expand Up @@ -248,9 +249,14 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
tables := c.tables(time.Now())

for tables.Next() && tables.Err() == nil && ctx.Err() == nil {

table := tables.At()
tenants, err := c.tenants(ctx, table)

period, err := c.schemaCfg.SchemaForTime(table.ModelTime())
if err != nil {
return errors.Wrap(err, "getting schema for time")
}

tenants, err := c.tenants(ctx, table, period)
if err != nil {
return errors.Wrap(err, "getting tenants")
}
Expand All @@ -269,7 +275,12 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
c.metrics.tenantsOwned.Inc()

select {
case ch <- tenantTable{tenant: tenant, table: table, ownershipRange: ownershipRange}:
case ch <- tenantTable{
tenant: tenant,
table: table,
period: period,
ownershipRange: ownershipRange,
}:
case <-ctx.Done():
return ctx.Err()
}
Expand Down Expand Up @@ -327,7 +338,7 @@ func (c *Compactor) runWorkers(ctx context.Context, ch <-chan tenantTable) error

func (c *Compactor) compactTenantTable(ctx context.Context, tt tenantTable) error {
level.Info(c.logger).Log("msg", "compacting", "org_id", tt.tenant, "table", tt.table, "ownership", tt.ownershipRange)
return c.controller.compactTenant(ctx, tt.table, tt.tenant, tt.ownershipRange)
return c.controller.compactTenant(ctx, tt.table, tt.period, tt.tenant, tt.ownershipRange)
}

type dayRangeIterator struct {
Expand Down
24 changes: 14 additions & 10 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ Compaction works as follows, split across many functions for clarity:
func (s *SimpleBloomController) compactTenant(
ctx context.Context,
table config.DayTime,
period config.PeriodConfig,
tenant string,
ownershipRange v1.FingerprintBounds,
) error {
logger := log.With(s.logger, "ownership", ownershipRange, "org_id", tenant, "table", table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can change the signature to accept the struct, not a pointer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to log the full table name, the day is probably more convenient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say seeing the full table name is clearer wrt which TSDB table are we working with (and where will we write to)

logger := log.With(s.logger, "ownership", ownershipRange, "org_id", tenant, "table", table.AddrWithPreffix(&period))

client, err := s.bloomStore.Client(table.ModelTime())
if err != nil {
level.Error(logger).Log("msg", "failed to get client", "err", err, "table", table.Addr())
level.Error(logger).Log("msg", "failed to get client", "err", err)
return errors.Wrap(err, "failed to get client")
}

Expand All @@ -93,13 +94,13 @@ func (s *SimpleBloomController) compactTenant(
}

// build compaction plans
work, err := s.findOutdatedGaps(ctx, tenant, table, ownershipRange, metas, logger)
work, err := s.findOutdatedGaps(ctx, tenant, table, period, ownershipRange, metas, logger)
if err != nil {
return errors.Wrap(err, "failed to find outdated gaps")
}

// build new blocks
built, err := s.buildGaps(ctx, tenant, table, client, work, logger)
built, err := s.buildGaps(ctx, tenant, table, period, client, work, logger)
if err != nil {
return errors.Wrap(err, "failed to build gaps")
}
Expand Down Expand Up @@ -176,12 +177,13 @@ func (s *SimpleBloomController) findOutdatedGaps(
ctx context.Context,
tenant string,
table config.DayTime,
period config.PeriodConfig,
ownershipRange v1.FingerprintBounds,
metas []bloomshipper.Meta,
logger log.Logger,
) ([]blockPlan, error) {
// Resolve TSDBs
tsdbs, err := s.tsdbStore.ResolveTSDBs(ctx, table, tenant)
tsdbs, err := s.tsdbStore.ResolveTSDBs(ctx, table, period, tenant)
if err != nil {
level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
return nil, errors.Wrap(err, "failed to resolve tsdbs")
Expand Down Expand Up @@ -216,12 +218,13 @@ func (s *SimpleBloomController) findOutdatedGaps(
func (s *SimpleBloomController) loadWorkForGap(
ctx context.Context,
table config.DayTime,
period config.PeriodConfig,
tenant string,
id tsdb.Identifier,
gap gapWithBlocks,
) (v1.CloseableIterator[*v1.Series], v1.CloseableResettableIterator[*v1.SeriesWithBloom], error) {
// load a series iterator for the gap
seriesItr, err := s.tsdbStore.LoadTSDB(ctx, table, tenant, id, gap.bounds)
seriesItr, err := s.tsdbStore.LoadTSDB(ctx, table, period, tenant, id, gap.bounds)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load tsdb")
}
Expand All @@ -242,6 +245,7 @@ func (s *SimpleBloomController) buildGaps(
ctx context.Context,
tenant string,
table config.DayTime,
period config.PeriodConfig,
client bloomshipper.Client,
work []blockPlan,
logger log.Logger,
Expand Down Expand Up @@ -276,7 +280,7 @@ func (s *SimpleBloomController) buildGaps(
MetaRef: bloomshipper.MetaRef{
Ref: bloomshipper.Ref{
TenantID: tenant,
TableName: table.Addr(),
TableName: table.AddrWithPreffix(&period),
Bounds: gap.bounds,
},
},
Expand All @@ -285,7 +289,7 @@ func (s *SimpleBloomController) buildGaps(

// Fetch blocks that aren't up to date but are in the desired fingerprint range
// to try and accelerate bloom creation
seriesItr, blocksIter, err := s.loadWorkForGap(ctx, table, tenant, plan.tsdb, gap)
seriesItr, blocksIter, err := s.loadWorkForGap(ctx, table, period, tenant, plan.tsdb, gap)
if err != nil {
level.Error(logger).Log("msg", "failed to get series and blocks", "err", err)
return nil, errors.Wrap(err, "failed to get series and blocks")
Expand Down Expand Up @@ -318,7 +322,7 @@ func (s *SimpleBloomController) buildGaps(
blockCt++
blk := newBlocks.At()

built, err := bloomshipper.BlockFrom(tenant, table.Addr(), blk)
built, err := bloomshipper.BlockFrom(tenant, table.AddrWithPreffix(&period), blk)
if err != nil {
level.Error(logger).Log("msg", "failed to build block", "err", err)
blocksIter.Close()
Expand Down Expand Up @@ -346,7 +350,7 @@ func (s *SimpleBloomController) buildGaps(
blocksIter.Close()

// Write the new meta
ref, err := bloomshipper.MetaRefFrom(tenant, table.Addr(), gap.bounds, meta.Sources, meta.Blocks)
ref, err := bloomshipper.MetaRefFrom(tenant, table.AddrWithPreffix(&period), gap.bounds, meta.Sources, meta.Blocks)
if err != nil {
level.Error(logger).Log("msg", "failed to checksum meta", "err", err)
return nil, errors.Wrap(err, "failed to checksum meta")
Expand Down
27 changes: 15 additions & 12 deletions pkg/bloomcompactor/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ const (
)

type TSDBStore interface {
UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error)
ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error)
UsersForPeriod(ctx context.Context, table config.DayTime, period config.PeriodConfig) ([]string, error)
ResolveTSDBs(ctx context.Context, table config.DayTime, period config.PeriodConfig, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error)
LoadTSDB(
ctx context.Context,
table config.DayTime,
period config.PeriodConfig,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
Expand All @@ -49,13 +50,13 @@ func NewBloomTSDBStore(storage storage.Client) *BloomTSDBStore {
}
}

func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) {
_, users, err := b.storage.ListFiles(ctx, table.Addr(), true) // bypass cache for ease of testing
func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTime, period config.PeriodConfig) ([]string, error) {
_, users, err := b.storage.ListFiles(ctx, table.AddrWithPreffix(&period), true) // bypass cache for ease of testing
return users, err
}

func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
indices, err := b.storage.ListUserFiles(ctx, table.Addr(), tenant, true) // bypass cache for ease of testing
func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime, period config.PeriodConfig, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
indices, err := b.storage.ListUserFiles(ctx, table.AddrWithPreffix(&period), tenant, true) // bypass cache for ease of testing
if err != nil {
return nil, errors.Wrap(err, "failed to list user files")
}
Expand All @@ -81,13 +82,14 @@ func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime,
func (b *BloomTSDBStore) LoadTSDB(
ctx context.Context,
table config.DayTime,
period config.PeriodConfig,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (v1.CloseableIterator[*v1.Series], error) {
withCompression := id.Name() + gzipExtension

data, err := b.storage.GetUserFile(ctx, table.Addr(), tenant, withCompression)
data, err := b.storage.GetUserFile(ctx, table.AddrWithPreffix(&period), tenant, withCompression)
if err != nil {
return nil, errors.Wrap(err, "failed to get file")
}
Expand Down Expand Up @@ -272,27 +274,28 @@ func (s *TSDBStores) storeForPeriod(table config.DayTime) (TSDBStore, error) {
)
}

func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) {
func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTime, period config.PeriodConfig) ([]string, error) {
store, err := s.storeForPeriod(table)
if err != nil {
return nil, err
}

return store.UsersForPeriod(ctx, table)
return store.UsersForPeriod(ctx, table, period)
}

func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTime, period config.PeriodConfig, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
store, err := s.storeForPeriod(table)
if err != nil {
return nil, err
}

return store.ResolveTSDBs(ctx, table, tenant)
return store.ResolveTSDBs(ctx, table, period, tenant)
}

func (s *TSDBStores) LoadTSDB(
ctx context.Context,
table config.DayTime,
period config.PeriodConfig,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
Expand All @@ -302,5 +305,5 @@ func (s *TSDBStores) LoadTSDB(
return nil, err
}

return store.LoadTSDB(ctx, table, tenant, id, bounds)
return store.LoadTSDB(ctx, table, period, tenant, id, bounds)
}
4 changes: 2 additions & 2 deletions pkg/storage/config/schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (cfg *PeriodConfig) GetIndexTableNumberRange(schemaEndDate DayTime) TableRa
}

func (cfg *PeriodConfig) GetFullTableName(t model.Time) string {
return NewDayTime(t).TableWithPrefix(cfg)
return NewDayTime(t).AddrWithPreffix(cfg)
}

func NewDayTime(d model.Time) DayTime {
Expand Down Expand Up @@ -244,7 +244,7 @@ func (d DayTime) Addr() string {
d.ModelTime().Time().UnixNano()/int64(ObjectStorageIndexRequiredPeriod))
}

func (d DayTime) TableWithPrefix(cfg *PeriodConfig) string {
func (d DayTime) AddrWithPreffix(cfg *PeriodConfig) string {
return fmt.Sprintf("%s%d",
cfg.IndexTables.Prefix,
d.ModelTime().Time().UnixNano()/int64(ObjectStorageIndexRequiredPeriod))
Expand Down
Loading