diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ee417f07f..1c8e4884fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,9 @@ Main (unreleased) - Do not log error on clean shutdown of `loki.source.journal`. (@thampiotr) +- `prometheus.operator.*` components: Fixed a bug which would sometimes cause a + "failed to create service discovery refresh metrics" error after a config reload. (@ptodev) + ### Other changes - Small fix in UI stylesheet to fit more content into visible table area. (@defanator) diff --git a/internal/component/prometheus/operator/common/component.go b/internal/component/prometheus/operator/common/component.go index 666ec42aeb..ca2ac4a98d 100644 --- a/internal/component/prometheus/operator/common/component.go +++ b/internal/component/prometheus/operator/common/component.go @@ -19,7 +19,7 @@ import ( type Component struct { mut sync.RWMutex config *operator.Arguments - manager *crdManager + manager crdManagerInterface ls labelstore.LabelStore onUpdate chan struct{} @@ -27,6 +27,8 @@ type Component struct { healthMut sync.RWMutex health component.Health + crdManagerFactory crdManagerFactory + kind string cluster cluster.Cluster } @@ -44,11 +46,12 @@ func New(o component.Options, args component.Arguments, kind string) (*Component } ls := service.(labelstore.LabelStore) c := &Component{ - opts: o, - onUpdate: make(chan struct{}, 1), - kind: kind, - cluster: clusterData, - ls: ls, + opts: o, + onUpdate: make(chan struct{}, 1), + kind: kind, + cluster: clusterData, + ls: ls, + crdManagerFactory: realCrdManagerFactory{}, } return c, c.Update(args) } @@ -74,6 +77,8 @@ func (c *Component) Run(ctx context.Context) error { c.reportHealth(nil) errChan := make(chan error, 1) + wg := sync.WaitGroup{} + defer wg.Wait() for { select { case <-ctx.Done(): @@ -85,17 +90,25 @@ func (c *Component) Run(ctx context.Context) error { c.reportHealth(err) case <-c.onUpdate: c.mut.Lock() - manager := newCrdManager(c.opts, c.cluster, c.opts.Logger, c.config, c.kind, c.ls) + manager := c.crdManagerFactory.New(c.opts, c.cluster, c.opts.Logger, c.config, c.kind, c.ls) c.manager = manager + + // Wait for the old manager to stop. + // If we start the new manager before stopping the old one, + // the new manager might not be able to register its debug metrics due to a duplicate registration error. if cancel != nil { cancel() } + wg.Wait() + innerCtx, cancel = context.WithCancel(ctx) + wg.Add(1) go func() { if err := manager.Run(innerCtx); err != nil { level.Error(c.opts.Logger).Log("msg", "error running crd manager", "err", err) errChan <- err } + wg.Done() }() c.mut.Unlock() } @@ -170,7 +183,7 @@ func (c *Component) Handler() http.Handler { } ns := parts[1] name := parts[2] - scs := man.getScrapeConfig(ns, name) + scs := man.GetScrapeConfig(ns, name) if len(scs) == 0 { w.WriteHeader(404) return diff --git a/internal/component/prometheus/operator/common/component_test.go b/internal/component/prometheus/operator/common/component_test.go new file mode 100644 index 0000000000..b761d92bf1 --- /dev/null +++ b/internal/component/prometheus/operator/common/component_test.go @@ -0,0 +1,127 @@ +package common + +import ( + "context" + "fmt" + "net" + "sync" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/prometheus/operator" + "github.com/grafana/alloy/internal/service/cluster" + http_service "github.com/grafana/alloy/internal/service/http" + "github.com/grafana/alloy/internal/service/labelstore" + "github.com/grafana/alloy/internal/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +type crdManagerFactoryHungRun struct { + stopRun chan struct{} +} + +func (m crdManagerFactoryHungRun) New(_ component.Options, _ cluster.Cluster, _ log.Logger, + _ *operator.Arguments, _ string, _ labelstore.LabelStore) crdManagerInterface { + return &crdManagerHungRun{ + stopRun: m.stopRun, + } +} + +type crdManagerHungRun struct { + stopRun chan struct{} +} + +func (c *crdManagerHungRun) Run(ctx context.Context) error { + <-ctx.Done() + <-c.stopRun + return nil +} + +func (c *crdManagerHungRun) ClusteringUpdated() {} + +func (c *crdManagerHungRun) DebugInfo() interface{} { + return nil +} + +func (c *crdManagerHungRun) GetScrapeConfig(ns, name string) []*config.ScrapeConfig { + return nil +} + +func TestRunExit(t *testing.T) { + opts := component.Options{ + Logger: util.TestAlloyLogger(t), + Registerer: prometheus.NewRegistry(), + GetServiceData: func(name string) (interface{}, error) { + switch name { + case http_service.ServiceName: + return http_service.Data{ + HTTPListenAddr: "localhost:12345", + MemoryListenAddr: "alloy.internal:1245", + BaseHTTPPath: "/", + DialFunc: (&net.Dialer{}).DialContext, + }, nil + + case cluster.ServiceName: + return cluster.Mock(), nil + case labelstore.ServiceName: + return labelstore.New(nil, prometheus.DefaultRegisterer), nil + default: + return nil, fmt.Errorf("service %q does not exist", name) + } + }, + } + + nilReceivers := []storage.Appendable{nil, nil} + + var args operator.Arguments + args.SetToDefault() + args.ForwardTo = nilReceivers + + // Create a Component + c, err := New(opts, args, "") + require.NoError(t, err) + + stopRun := make(chan struct{}) + c.crdManagerFactory = crdManagerFactoryHungRun{ + stopRun: stopRun, + } + + // Run the component + ctx, cancelFunc := context.WithCancel(context.Background()) + cmpRunExited := atomic.Bool{} + cmpRunExited.Store(false) + + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + wg.Done() + err := c.Run(ctx) + require.NoError(t, err) + cmpRunExited.Store(true) + }() + // Wait until the component.Run goroutine starts + // The test can be flaky without this. + wg.Wait() + + // Stop the component. + // It shouldn't stop immediately, because the CRD Manager is hung. + cancelFunc() + time.Sleep(5 * time.Second) + if cmpRunExited.Load() { + require.Fail(t, "component.Run exited") + } + + // Make crdManager.Run exit + close(stopRun) + + // Make sure component.Run exits + require.Eventually(t, func() bool { + return cmpRunExited.Load() + }, 5*time.Second, 100*time.Millisecond, "component.Run didn't exit") +} diff --git a/internal/component/prometheus/operator/common/crdmanager.go b/internal/component/prometheus/operator/common/crdmanager.go index baa72f498b..f5c13da577 100644 --- a/internal/component/prometheus/operator/common/crdmanager.go +++ b/internal/component/prometheus/operator/common/crdmanager.go @@ -40,6 +40,23 @@ import ( // Generous timeout period for configuring all informers const informerSyncTimeout = 10 * time.Second +type crdManagerInterface interface { + Run(ctx context.Context) error + ClusteringUpdated() + DebugInfo() interface{} + GetScrapeConfig(ns, name string) []*config.ScrapeConfig +} + +type crdManagerFactory interface { + New(opts component.Options, cluster cluster.Cluster, logger log.Logger, args *operator.Arguments, kind string, ls labelstore.LabelStore) crdManagerInterface +} + +type realCrdManagerFactory struct{} + +func (realCrdManagerFactory) New(opts component.Options, cluster cluster.Cluster, logger log.Logger, args *operator.Arguments, kind string, ls labelstore.LabelStore) crdManagerInterface { + return newCrdManager(opts, cluster, logger, args, kind, ls) +} + // crdManager is all of the fields required to run a crd based component. // on update, this entire thing should be recreated and restarted type crdManager struct { @@ -237,7 +254,7 @@ func (c *crdManager) DebugInfo() interface{} { return info } -func (c *crdManager) getScrapeConfig(ns, name string) []*config.ScrapeConfig { +func (c *crdManager) GetScrapeConfig(ns, name string) []*config.ScrapeConfig { prefix := fmt.Sprintf("%s/%s/%s", c.kind, ns, name) matches := []*config.ScrapeConfig{} for k, v := range c.scrapeConfigs {