Skip to content

Commit

Permalink
improve collect lister metrics task
Browse files Browse the repository at this point in the history
  • Loading branch information
BarkovBG committed Oct 15, 2024
1 parent 116fa80 commit f764508
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 7 deletions.
22 changes: 19 additions & 3 deletions cloud/tasks/collect_lister_metrics_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@ import (

////////////////////////////////////////////////////////////////////////////////

const totalHangingTaskCountGaugeName = "totalHangingTaskCount"

////////////////////////////////////////////////////////////////////////////////

type collectListerMetricsTask struct {
registry metrics.Registry
storage storage.Storage
metricsCollectionInterval time.Duration

taskTypes []string
hangingTaskGaugesByID map[string]metrics.Gauge
maxHangingTaskIDsToReport int64
}
Expand Down Expand Up @@ -134,6 +139,9 @@ func (c *collectListerMetricsTask) collectTasksMetrics(
) error {

tasksByType := make(map[string]uint64)
for _, taskType := range c.taskTypes {
tasksByType[taskType] = 0
}

taskInfos, err := getTaskInfos(ctx)
if err != nil {
Expand All @@ -158,14 +166,22 @@ func (c *collectListerMetricsTask) collectHangingTasksMetrics(
ctx context.Context,
) error {

totalHangingTasksGauge := c.registry.Gauge("totalHangingTaskCount")

taskInfos, err := c.storage.ListHangingTasks(ctx, ^uint64(0))
if err != nil {
return err
}

totalHangingTasksGauge.Set(float64(len(taskInfos)))
err = c.collectTasksMetrics(
ctx,
func(context.Context) ([]storage.TaskInfo, error) {
return taskInfos, nil
},
totalHangingTaskCountGaugeName,
)
if err != nil {
return err
}

taskInfoByID := make(map[string]storage.TaskInfo)
for _, taskInfo := range taskInfos {
taskInfoByID[taskInfo.ID] = taskInfo
Expand Down
1 change: 1 addition & 0 deletions cloud/tasks/scheduler_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ func NewScheduler(
storage: storage,
metricsCollectionInterval: listerMetricsCollectionInterval,

taskTypes: registry.TaskTypesForExecution(),
hangingTaskGaugesByID: make(map[string]metrics.Gauge),
maxHangingTaskIDsToReport: config.GetMaxHangingTaskIDsToReport(),
}
Expand Down
12 changes: 8 additions & 4 deletions cloud/tasks/tasks_tests/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1252,11 +1252,15 @@ func TestHangingTasksMetrics(t *testing.T) {
gaugeSetWg := sync.WaitGroup{}
gaugeUnsetWg := sync.WaitGroup{}

registry.GetGauge("totalHangingTaskCount", map[string]string{}).On(
registry.GetGauge(
"totalHangingTaskCount",
map[string]string{"type": "tasks.hanging"},
).On(
"Set",
mock.Anything,
float64(1),
).Return(mock.Anything)
gaugeSetCall := registry.GetGauge(

hangingTasksGaugeSetCall := registry.GetGauge(
"hangingTasks",
map[string]string{"type": "tasks.hanging", "id": taskID},
).On("Set", float64(1)).Return(mock.Anything).Run(
Expand All @@ -1273,7 +1277,7 @@ func TestHangingTasksMetrics(t *testing.T) {
"Set",
float64(0),
).NotBefore(
gaugeSetCall,
hangingTasksGaugeSetCall,
).Return(mock.Anything).Run(
func(args mock.Arguments) {
gaugeUnsetWg.Done()
Expand Down

0 comments on commit f764508

Please sign in to comment.