Skip to content

Commit

Permalink
Blooms retention
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Mar 19, 2024
1 parent b926688 commit e33f8eb
Show file tree
Hide file tree
Showing 11 changed files with 841 additions and 14 deletions.
28 changes: 23 additions & 5 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ type Compactor struct {

tsdbStore TSDBStore
// TODO(owen-d): ShardingStrategy
controller *SimpleBloomController
controller *SimpleBloomController
retentionManager *RetentionManager

// temporary workaround until bloomStore has implemented read/write shipper interface
bloomStore bloomshipper.Store
Expand All @@ -65,7 +66,8 @@ func New(
storeCfg storage.Config,
clientMetrics storage.ClientMetrics,
fetcherProvider stores.ChunkFetcherProvider,
sharding util_ring.TenantSharding,
ring ring.ReadRing,
ringLifeCycler *ring.BasicLifecycler,
limits Limits,
store bloomshipper.Store,
logger log.Logger,
Expand All @@ -75,7 +77,7 @@ func New(
cfg: cfg,
schemaCfg: schemaCfg,
logger: logger,
sharding: sharding,
sharding: util_ring.NewTenantShuffleSharding(ring, ringLifeCycler, limits.BloomCompactorShardSize),
limits: limits,
bloomStore: store,
}
Expand Down Expand Up @@ -104,6 +106,15 @@ func New(
c.logger,
)

c.retentionManager = NewRetentionManager(
c.cfg.RetentionConfig,
c.limits,
c.bloomStore,
newFirstTokenRetentionSharding(ring, ringLifeCycler),
c.metrics,
c.logger,
)

c.Service = services.NewBasicService(c.starting, c.running, c.stopping)
return c, nil
}
Expand Down Expand Up @@ -218,10 +229,17 @@ func (c *Compactor) runOne(ctx context.Context) error {
c.metrics.compactionsStarted.Inc()
start := time.Now()
level.Info(c.logger).Log("msg", "running bloom compaction", "workers", c.cfg.WorkerParallelism)
var workersErr error
var workersErr, retentionErr error
var wg sync.WaitGroup
input := make(chan *tenantTableRange)

// Launch retention (will return instantly if retention is disabled or not owned by this compactor)
wg.Add(1)
go func() {
retentionErr = c.retentionManager.Apply(ctx)
wg.Done()
}()

tables := c.tables(time.Now())
level.Debug(c.logger).Log("msg", "loaded tables", "tables", tables.TotalDays())

Expand All @@ -240,7 +258,7 @@ func (c *Compactor) runOne(ctx context.Context) error {

wg.Wait()
duration := time.Since(start)
err = multierror.New(workersErr, err, ctx.Err()).Err()
err = multierror.New(retentionErr, workersErr, err, ctx.Err()).Err()

if err != nil {
level.Error(c.logger).Log("msg", "compaction iteration failed", "err", err, "duration", duration)
Expand Down
8 changes: 8 additions & 0 deletions pkg/bloomcompactor/bloomcompactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ type mockLimits struct {
shardSize int
}

func (m mockLimits) RetentionPeriod(_ string) time.Duration {
panic("implement me")
}

func (m mockLimits) StreamRetention(_ string) []validation.StreamRetention {
panic("implement me")
}

func (m mockLimits) AllByUserID() map[string]*validation.Limits {
panic("implement me")
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/bloomcompactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/pkg/errors"
"time"

"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads"
"github.com/grafana/loki/pkg/util/ring"
)

Expand All @@ -31,6 +30,8 @@ type Config struct {
CompactionRetries int `yaml:"compaction_retries"`

MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`

RetentionConfig RetentionConfig `yaml:"retention"`
}

// RegisterFlags registers flags for the Bloom-Compactor configuration.
Expand All @@ -52,6 +53,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.RetryMaxBackoff, "bloom-compactor.compaction-retries-max-backoff", time.Minute, "Maximum backoff time between retries.")
f.IntVar(&cfg.CompactionRetries, "bloom-compactor.compaction-retries", 3, "Number of retries to perform when compaction fails.")
f.IntVar(&cfg.MaxCompactionParallelism, "bloom-compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
cfg.RetentionConfig.RegisterFlags(f)

// Ring
skipFlags := []string{
Expand All @@ -76,7 +78,7 @@ func (cfg *Config) Validate() error {
}

type Limits interface {
downloads.Limits
RetentionLimits
BloomCompactorShardSize(tenantID string) int
BloomCompactorEnabled(tenantID string) bool
BloomNGramLength(tenantID string) int
Expand Down
31 changes: 31 additions & 0 deletions pkg/bloomcompactor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ type Metrics struct {

progress prometheus.Gauge
timePerTenant *prometheus.CounterVec

// Retention metrics
retentionRunning prometheus.Gauge
retentionTime *prometheus.HistogramVec
retentionDaysPerIteration *prometheus.HistogramVec
}

func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics {
Expand Down Expand Up @@ -175,6 +180,32 @@ func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics {
Name: "tenant_compaction_seconds_total",
Help: "Time spent processing a tenant.",
}, []string{tenantLabel}),

// Retention
retentionRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_running",
Help: "1 if retention is running in this compactor.",
}),

retentionDaysPerIteration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_days_processed",
Help: "Number of days iterated over during the retention process.",
// 1day -> 5 years, 10 buckets
Buckets: prometheus.ExponentialBucketsRange(1, 365*5, 10),
}, []string{"status"}),

retentionTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_time_seconds",
Help: "Time this retention process took to complete.",
// 1second -> 5 years, 10 buckets
Buckets: prometheus.DefBuckets,
}, []string{"status"}),
}

return &m
Expand Down
214 changes: 214 additions & 0 deletions pkg/bloomcompactor/retention.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package bloomcompactor

import (
"context"
"flag"
storageconfig "github.com/grafana/loki/pkg/storage/config"
"slices"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/pkg/errors"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/validation"
)

type retentionSharding interface {
OwnsRetention() (bool, error)
}

type firstTokenRetentionSharding struct {
ring ring.ReadRing
ringLifeCycler *ring.BasicLifecycler
}

func newFirstTokenRetentionSharding(ring ring.ReadRing, ringLifeCycler *ring.BasicLifecycler) *firstTokenRetentionSharding {
return &firstTokenRetentionSharding{
ring: ring,
ringLifeCycler: ringLifeCycler,
}
}

// OwnsRetention returns true if the compactor should apply retention.
// This is determined by checking if the compactor owns the smaller token in the ring.
// Note that during a ring topology change, more than one compactor may attempt to apply retention.
// This is fine since retention consists on deleting old data which should be idempotent.
func (s *firstTokenRetentionSharding) OwnsRetention() (bool, error) {
rs, err := s.ring.GetAllHealthy(RingOp)
if err != nil {
return false, errors.Wrap(err, "getting ring healthy instances")
}
if len(rs.Instances) == 0 {
return false, errors.New("no healthy instances in ring")
}

// Lookup the instance with smaller token
instance := slices.MinFunc(rs.Instances, func(a, b ring.InstanceDesc) int {
smallerA := slices.Min(a.GetTokens())
smallerB := slices.Min(b.GetTokens())
return int(smallerA - smallerB)
})

return instance.GetAddr() == s.ringLifeCycler.GetInstanceID(), nil
}

type RetentionConfig struct {
Enabled bool `yaml:"enabled"`
MaxLookbackDays int `yaml:"max_lookback_days"`
}

func (cfg *RetentionConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, "bloom-compactor.retention.enabled", false, "Enable bloom retention.")
f.IntVar(&cfg.MaxLookbackDays, "bloom-compactor.retention.max-lookback-days", 365, "Max lookback days for retention.")
}

type RetentionLimits interface {
RetentionPeriod(userID string) time.Duration
StreamRetention(userID string) []validation.StreamRetention
}

type RetentionManager struct {
cfg RetentionConfig
limits RetentionLimits
bloomStore bloomshipper.Store
sharding retentionSharding
metrics *Metrics
logger log.Logger
lastDayRun storageconfig.DayTime

// For testing
now func() model.Time
}

func NewRetentionManager(
cfg RetentionConfig,
limits RetentionLimits,
bloomStore bloomshipper.Store,
sharding retentionSharding,
metrics *Metrics,
logger log.Logger,
) *RetentionManager {
return &RetentionManager{
cfg: cfg,
limits: limits,
bloomStore: bloomStore,
sharding: sharding,
metrics: metrics,
logger: log.With(logger, "component", "retention-manager"),
now: model.Now,
lastDayRun: storageconfig.NewDayTime(model.Earliest),
}
}

func (r *RetentionManager) Apply(ctx context.Context) error {
if !r.cfg.Enabled {
return nil
}

ownsRetention, err := r.sharding.OwnsRetention()
if err != nil {
return errors.Wrap(err, "checking if compactor owns retention")
}
if !ownsRetention {
return nil
}

start := r.now()
today := storageconfig.NewDayTime(start)
if !today.After(r.lastDayRun) {
// We've already run retention for today
return nil
}

level.Info(r.logger).Log("msg", "Applying retention")
r.metrics.retentionRunning.Set(1)
defer r.metrics.retentionRunning.Set(0)

// We iterate through up to r.cfg.MaxLookbackDays days unless it's set to 0
// In that case, we iterate through all days
var daysProcessed int
day := today
for i := 1; i <= r.cfg.MaxLookbackDays || r.cfg.MaxLookbackDays == 0; i++ {
day = day.Dec()
dayLogger := log.With(r.logger, "day", day.String())
bloomClient, err := r.bloomStore.Client(day.ModelTime())
if err != nil {
level.Error(dayLogger).Log("msg", "failed to get bloom store client", "err", err)
break
}
objectClient := bloomClient.ObjectClient()

tenants, table, err := r.bloomStore.UsersForPeriod(ctx, day.ModelTime())
if err != nil {
r.metrics.retentionTime.WithLabelValues(statusFailure).Observe(time.Since(start.Time()).Seconds())
r.metrics.retentionDaysPerIteration.WithLabelValues(statusFailure).Observe(float64(daysProcessed))
return errors.Wrap(err, "getting users for period")
}

if len(tenants) == 0 {
// No tenants for this day means we can break here since previous
// retention iterations have already deleted all tenants
break
}

for _, tenant := range tenants {
tenantRetention := findLongestRetention(tenant, r.limits)
expirationDay := storageconfig.NewDayTime(today.Add(-tenantRetention))
if !day.Before(expirationDay) {
continue
}

tenantLogger := log.With(dayLogger, "table", table, "tenant", tenant, "retention", tenantRetention)
level.Info(tenantLogger).Log("msg", "applying retention to tenant")

// List all keys under the tenant directory
// Note: we cannot delete the tenant directory directly because it is not an actual key in the object store
// Instead, we need to list all keys under the tenant directory and delete them one by one
tenantDir := bloomClient.Tenant(tenant, table)
tenantObjects, _, err := objectClient.List(ctx, tenantDir.Addr(), "")
if err != nil {
r.metrics.retentionTime.WithLabelValues(statusFailure).Observe(time.Since(start.Time()).Seconds())
r.metrics.retentionDaysPerIteration.WithLabelValues(statusFailure).Observe(float64(daysProcessed))
return errors.Wrapf(err, "listing tenant directory %s", tenantDir.Addr())
}

for _, object := range tenantObjects {
if err := objectClient.DeleteObject(ctx, object.Key); err != nil {
r.metrics.retentionTime.WithLabelValues(statusFailure).Observe(time.Since(start.Time()).Seconds())
r.metrics.retentionDaysPerIteration.WithLabelValues(statusFailure).Observe(float64(daysProcessed))
return errors.Wrapf(err, "deleting tenant directory %s", tenantDir.Addr())
}
}
}

daysProcessed++
}

level.Info(r.logger).Log("msg", "finished applying retention")
r.lastDayRun = today
r.metrics.retentionTime.WithLabelValues(statusSuccess).Observe(time.Since(start.Time()).Seconds())
r.metrics.retentionDaysPerIteration.WithLabelValues(statusSuccess).Observe(float64(daysProcessed))

return nil
}

func findLongestRetention(tenant string, limits RetentionLimits) time.Duration {
globalRetention := limits.RetentionPeriod(tenant)
streamRetention := limits.StreamRetention(tenant)
if len(streamRetention) == 0 {
return globalRetention
}

maxStreamRetention := slices.MaxFunc(streamRetention, func(a, b validation.StreamRetention) int {
return int(a.Period - b.Period)
})

if time.Duration(maxStreamRetention.Period) > globalRetention {
return time.Duration(maxStreamRetention.Period)
}
return globalRetention
}
Loading

0 comments on commit e33f8eb

Please sign in to comment.