Skip to content

Commit

Permalink
fix(healthcheck): get discoverable nodes from config-cache
Browse files Browse the repository at this point in the history
  • Loading branch information
karol-kokoszka committed May 16, 2024
1 parent bd9ae32 commit bca79c5
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 24 deletions.
38 changes: 14 additions & 24 deletions pkg/service/healthcheck/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/scylladb/go-log"
"github.com/scylladb/go-set/strset"
"github.com/scylladb/scylla-manager/v3/pkg/service/configcache"
"github.com/scylladb/scylla-manager/v3/pkg/util/slice"

"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/service"
Expand Down Expand Up @@ -45,6 +46,7 @@ func (r Runner) Run(ctx context.Context, clusterID, taskID, runID uuid.UUID, pro

type runner struct {
logger log.Logger
configCacher configcache.ConfigCacher
scyllaClient scyllaclient.ProviderFunc
timeout time.Duration
metrics *runnerMetrics
Expand All @@ -67,31 +69,24 @@ func (r runner) Run(ctx context.Context, clusterID, _, _ uuid.UUID, _ json.RawMe
// Enable interactive mode for fast backoff
ctx = scyllaclient.Interactive(ctx)

client, err := r.scyllaClient(ctx, clusterID)
nodes, err := r.configCacher.AvailableHosts(ctx, clusterID)
if err != nil {
return errors.Wrap(err, "get client")
return err
}

status, err := client.Status(ctx)
if err != nil {
return errors.Wrap(err, "status")
}

live := status.Live()
r.removeMetricsForMissingHosts(clusterID, live)
r.checkHosts(ctx, clusterID, live)
r.removeMetricsForMissingHosts(clusterID, nodes)
r.checkHosts(ctx, clusterID, nodes)

return nil
}

func (r runner) checkHosts(ctx context.Context, clusterID uuid.UUID, status []scyllaclient.NodeStatusInfo) {
func (r runner) checkHosts(ctx context.Context, clusterID uuid.UUID, addresses []string) {
f := func(i int) error {
hl := prometheus.Labels{
clusterKey: clusterID.String(),
hostKey: status[i].Addr,
hostKey: addresses[i],
}

rtt, err := r.ping(ctx, clusterID, status[i].Addr, r.timeout)
rtt, err := r.ping(ctx, clusterID, addresses[i], r.timeout)
if err != nil {
r.metrics.status.With(hl).Set(-1)
} else {
Expand All @@ -102,8 +97,8 @@ func (r runner) checkHosts(ctx context.Context, clusterID uuid.UUID, status []sc
return nil
}

_ = parallel.Run(len(status), parallel.NoLimit, f, func(i int, err error) { // nolint: errcheck
r.logger.Error(ctx, "Parallel hosts check failed", "", status[i].Addr, "error", err)
_ = parallel.Run(len(addresses), parallel.NoLimit, f, func(i int, err error) { // nolint: errcheck
r.logger.Error(ctx, "Parallel hosts check failed", "", addresses[i], "error", err)
})
}

Expand All @@ -122,17 +117,12 @@ func (r runner) removeMetricsForCluster(clusterID uuid.UUID) {
})
}

func (r runner) removeMetricsForMissingHosts(clusterID uuid.UUID, status []scyllaclient.NodeStatusInfo) {
m := strset.New()
for _, node := range status {
m.Add(node.Addr)
}

func (r runner) removeMetricsForMissingHosts(clusterID uuid.UUID, addresses []string) {
apply(collect(r.metrics.status), func(cluster, dc, host, pt string, v float64) {
if clusterID.String() != cluster {
return
}
if m.Has(host) {
if slice.ContainsString(addresses, host) {
return
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/service/healthcheck/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (s *Service) Runner() Runner {
return Runner{
cql: runner{
logger: s.logger.Named("CQL healthcheck"),
configCacher: s.configCache,
scyllaClient: s.scyllaClient,
timeout: s.config.MaxTimeout,
metrics: &runnerMetrics{
Expand All @@ -68,6 +69,7 @@ func (s *Service) Runner() Runner {
},
rest: runner{
logger: s.logger.Named("REST healthcheck"),
configCacher: s.configCache,
scyllaClient: s.scyllaClient,
timeout: s.config.MaxTimeout,
metrics: &runnerMetrics{
Expand All @@ -79,6 +81,7 @@ func (s *Service) Runner() Runner {
},
alternator: runner{
logger: s.logger.Named("Alternator healthcheck"),
configCacher: s.configCache,
scyllaClient: s.scyllaClient,
timeout: s.config.MaxTimeout,
metrics: &runnerMetrics{
Expand Down

0 comments on commit bca79c5

Please sign in to comment.