Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block-builder-scheduler initial structure #9650

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions pkg/blockbuilder/scheduler/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// SPDX-License-Identifier: AGPL-3.0-only

package scheduler

import (
"flag"
"fmt"
"time"

"github.com/grafana/mimir/pkg/storage/ingest"
)

type Config struct {
BuilderConsumerGroup string `yaml:"builder_consumer_group"`
SchedulerConsumerGroup string `yaml:"scheduler_consumer_group"`
KafkaMonitorInterval time.Duration `yaml:"kafka_monitor_interval"`

// Config parameters defined outside the block-builder-scheduler config and are injected dynamically.
Kafka ingest.KafkaConfig `yaml:"-"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.BuilderConsumerGroup, "block-builder-scheduler.builder-consumer-group", "block-builder", "The Kafka consumer group used by block-builders.")
f.StringVar(&cfg.SchedulerConsumerGroup, "block-builder-scheduler.scheduler-consumer-group", "block-builder-scheduler", "The Kafka consumer group used by block-builder-scheduler.")
f.DurationVar(&cfg.KafkaMonitorInterval, "block-builder-scheduler.kafka-monitor-interval", 20*time.Second, "How frequently to monitor the Kafka partitions.")
}

func (cfg *Config) Validate() error {
if err := cfg.Kafka.Validate(); err != nil {
return err
}
if cfg.KafkaMonitorInterval <= 0 {
return fmt.Errorf("kafka_monitor_interval (%d) must be positive", cfg.KafkaMonitorInterval)
}
return nil
}
33 changes: 33 additions & 0 deletions pkg/blockbuilder/scheduler/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// SPDX-License-Identifier: AGPL-3.0-only

package scheduler

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type schedulerMetrics struct {
monitorPartitionsDuration prometheus.Histogram
partitionStartOffsets *prometheus.GaugeVec
partitionEndOffsets *prometheus.GaugeVec
}

func newSchedulerMetrics(reg prometheus.Registerer) schedulerMetrics {
return schedulerMetrics{
monitorPartitionsDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_blockbuilder_scheduler_monitor_partitions_duration_seconds",
Help: "Time spent monitoring partitions.",

NativeHistogramBucketFactor: 1.1,
}),
partitionStartOffsets: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_blockbuilder_scheduler_partition_start_offsets",
Help: "The observed start offset of each partition.",
}, []string{"partition"}),
partitionEndOffsets: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_blockbuilder_scheduler_partition_end_offsets",
Help: "The observed end offset of each partition.",
}, []string{"partition"}),
}
}
100 changes: 100 additions & 0 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// SPDX-License-Identifier: AGPL-3.0-only

package scheduler

import (
"context"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"

"github.com/grafana/mimir/pkg/storage/ingest"
)

type BlockBuilderScheduler struct {
services.Service

kafkaClient *kgo.Client
cfg Config
logger log.Logger
register prometheus.Registerer
metrics schedulerMetrics
}

func New(
cfg Config,
logger log.Logger,
reg prometheus.Registerer,
) (*BlockBuilderScheduler, error) {
s := &BlockBuilderScheduler{
cfg: cfg,
logger: logger,
register: reg,
metrics: newSchedulerMetrics(reg),
}
s.Service = services.NewBasicService(s.starting, s.running, s.stopping)
return s, nil
}

func (s *BlockBuilderScheduler) starting(context.Context) (err error) {
s.kafkaClient, err = ingest.NewKafkaReaderClient(
s.cfg.Kafka,
ingest.NewKafkaReaderClientMetrics("block-builder-scheduler", s.register),
s.logger,
kgo.ConsumerGroup(s.cfg.SchedulerConsumerGroup),
kgo.DisableAutoCommit(),
)
if err != nil {
return fmt.Errorf("creating kafka reader: %w", err)
}
return nil
}

func (s *BlockBuilderScheduler) stopping(_ error) error {
s.kafkaClient.Close()
return nil
}

func (s *BlockBuilderScheduler) running(ctx context.Context) error {
monitorTick := time.NewTicker(s.cfg.KafkaMonitorInterval)
defer monitorTick.Stop()
for {
select {
case <-monitorTick.C:
s.monitorPartitions(ctx)
case <-ctx.Done():
return nil
}
}
}

// monitorPartitions updates knowledge of all active partitions.
func (s *BlockBuilderScheduler) monitorPartitions(ctx context.Context) {
startTime := time.Now()
// Eventually this will also include job computation. But for now, collect partition data.
admin := kadm.NewClient(s.kafkaClient)

startOffsets, err := admin.ListStartOffsets(ctx, s.cfg.Kafka.Topic)
if err != nil {
level.Warn(s.logger).Log("msg", "failed to list start offsets", "err", err)
}
endOffsets, err := admin.ListEndOffsets(ctx, s.cfg.Kafka.Topic)
if err != nil {
level.Warn(s.logger).Log("msg", "failed to list end offsets", "err", err)
}

s.metrics.monitorPartitionsDuration.Observe(time.Since(startTime).Seconds())

startOffsets.Each(func(o kadm.ListedOffset) {
s.metrics.partitionStartOffsets.WithLabelValues(fmt.Sprint(o.Partition)).Set(float64(o.Offset))
})
endOffsets.Each(func(o kadm.ListedOffset) {
s.metrics.partitionEndOffsets.WithLabelValues(fmt.Sprint(o.Partition)).Set(float64(o.Offset))
})
}
40 changes: 22 additions & 18 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
alertstorelocal "github.com/grafana/mimir/pkg/alertmanager/alertstore/local"
"github.com/grafana/mimir/pkg/api"
"github.com/grafana/mimir/pkg/blockbuilder"
blockbuilderscheduler "github.com/grafana/mimir/pkg/blockbuilder/scheduler"
"github.com/grafana/mimir/pkg/compactor"
"github.com/grafana/mimir/pkg/continuoustest"
"github.com/grafana/mimir/pkg/distributor"
Expand Down Expand Up @@ -113,24 +114,25 @@ type Config struct {
PrintConfig bool `yaml:"-"`
ApplicationName string `yaml:"-"`

API api.Config `yaml:"api"`
Server server.Config `yaml:"server"`
Distributor distributor.Config `yaml:"distributor"`
Querier querier.Config `yaml:"querier"`
IngesterClient client.Config `yaml:"ingester_client"`
Ingester ingester.Config `yaml:"ingester"`
Flusher flusher.Config `yaml:"flusher"`
LimitsConfig validation.Limits `yaml:"limits"`
Worker querier_worker.Config `yaml:"frontend_worker"`
Frontend frontend.CombinedFrontendConfig `yaml:"frontend"`
IngestStorage ingest.Config `yaml:"ingest_storage"`
BlockBuilder blockbuilder.Config `yaml:"block_builder" doc:"hidden"`
BlocksStorage tsdb.BlocksStorageConfig `yaml:"blocks_storage"`
Compactor compactor.Config `yaml:"compactor"`
StoreGateway storegateway.Config `yaml:"store_gateway"`
TenantFederation tenantfederation.Config `yaml:"tenant_federation"`
ActivityTracker activitytracker.Config `yaml:"activity_tracker"`
Vault vault.Config `yaml:"vault"`
API api.Config `yaml:"api"`
Server server.Config `yaml:"server"`
Distributor distributor.Config `yaml:"distributor"`
Querier querier.Config `yaml:"querier"`
IngesterClient client.Config `yaml:"ingester_client"`
Ingester ingester.Config `yaml:"ingester"`
Flusher flusher.Config `yaml:"flusher"`
LimitsConfig validation.Limits `yaml:"limits"`
Worker querier_worker.Config `yaml:"frontend_worker"`
Frontend frontend.CombinedFrontendConfig `yaml:"frontend"`
IngestStorage ingest.Config `yaml:"ingest_storage"`
BlockBuilder blockbuilder.Config `yaml:"block_builder" doc:"hidden"`
BlockBuilderScheduler blockbuilderscheduler.Config `yaml:"block_builder_scheduler" doc:"hidden"`
BlocksStorage tsdb.BlocksStorageConfig `yaml:"blocks_storage"`
Compactor compactor.Config `yaml:"compactor"`
StoreGateway storegateway.Config `yaml:"store_gateway"`
TenantFederation tenantfederation.Config `yaml:"tenant_federation"`
ActivityTracker activitytracker.Config `yaml:"activity_tracker"`
Vault vault.Config `yaml:"vault"`

Ruler ruler.Config `yaml:"ruler"`
RulerStorage rulestore.Config `yaml:"ruler_storage"`
Expand Down Expand Up @@ -184,6 +186,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
c.Frontend.RegisterFlags(f, logger)
c.IngestStorage.RegisterFlags(f)
c.BlockBuilder.RegisterFlags(f, logger)
c.BlockBuilderScheduler.RegisterFlags(f)
c.BlocksStorage.RegisterFlags(f)
c.Compactor.RegisterFlags(f, logger)
c.StoreGateway.RegisterFlags(f, logger)
Expand Down Expand Up @@ -733,6 +736,7 @@ type Mimir struct {
Vault *vault.Vault
UsageStatsReporter *usagestats.Reporter
BlockBuilder *blockbuilder.BlockBuilder
BlockBuilderScheduler *blockbuilderscheduler.BlockBuilderScheduler
ContinuousTestManager *continuoustest.Manager
BuildInfoHandler http.Handler
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/grafana/mimir/pkg/alertmanager/alertstore/bucketclient"
"github.com/grafana/mimir/pkg/api"
"github.com/grafana/mimir/pkg/blockbuilder"
blockbuilderscheduler "github.com/grafana/mimir/pkg/blockbuilder/scheduler"
"github.com/grafana/mimir/pkg/compactor"
"github.com/grafana/mimir/pkg/continuoustest"
"github.com/grafana/mimir/pkg/distributor"
Expand Down Expand Up @@ -102,6 +103,7 @@ const (
TenantFederation string = "tenant-federation"
UsageStats string = "usage-stats"
BlockBuilder string = "block-builder"
BlockBuilderScheduler string = "block-builder-scheduler"
ContinuousTest string = "continuous-test"
All string = "all"

Expand Down Expand Up @@ -1093,6 +1095,17 @@ func (t *Mimir) initBlockBuilder() (_ services.Service, err error) {
return t.BlockBuilder, nil
}

func (t *Mimir) initBlockBuilderScheduler() (services.Service, error) {
t.Cfg.BlockBuilderScheduler.Kafka = t.Cfg.IngestStorage.KafkaConfig

s, err := blockbuilderscheduler.New(t.Cfg.BlockBuilderScheduler, util_log.Logger, t.Registerer)
if err != nil {
return nil, errors.Wrap(err, "block-builder-scheduler init")
}
t.BlockBuilderScheduler = s
return s, nil
}

func (t *Mimir) initContinuousTest() (services.Service, error) {
client, err := continuoustest.NewClient(t.Cfg.ContinuousTest.Client, util_log.Logger)
if err != nil {
Expand Down Expand Up @@ -1144,6 +1157,7 @@ func (t *Mimir) setupModuleManager() error {
mm.RegisterModule(TenantFederation, t.initTenantFederation, modules.UserInvisibleModule)
mm.RegisterModule(UsageStats, t.initUsageStats, modules.UserInvisibleModule)
mm.RegisterModule(BlockBuilder, t.initBlockBuilder)
mm.RegisterModule(BlockBuilderScheduler, t.initBlockBuilderScheduler)
mm.RegisterModule(ContinuousTest, t.initContinuousTest)
mm.RegisterModule(Vault, t.initVault, modules.UserInvisibleModule)
mm.RegisterModule(Write, nil)
Expand Down Expand Up @@ -1180,6 +1194,7 @@ func (t *Mimir) setupModuleManager() error {
StoreGateway: {API, Overrides, MemberlistKV, Vault},
TenantFederation: {Queryable},
BlockBuilder: {API, Overrides},
BlockBuilderScheduler: {API},
ContinuousTest: {API},
Write: {Distributor, Ingester},
Read: {QueryFrontend, Querier},
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/ingest/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (r *concurrentFetchers) recordOrderedFetchTelemetry(f fetchResult, firstRet
r.metrics.fetchedDiscardedRecordBytes.Add(float64(doubleFetchedBytes))
}

// fetchSingle attempts to find out the leader leader Kafka broker for a partition and then sends a fetch request to the leader of the fetchWant request and parses the responses
// fetchSingle attempts to find out the leader Kafka broker for a partition and then sends a fetch request to the leader of the fetchWant request and parses the responses
// fetchSingle returns a fetchResult which may or may not fulfil the entire fetchWant.
// If ctx is cancelled, fetchSingle will return an empty fetchResult without an error.
func (r *concurrentFetchers) fetchSingle(ctx context.Context, fw fetchWant) (fr fetchResult) {
Expand Down
Loading