Skip to content

Commit

Permalink
allows the disabling of leader-only cycle metrics & consolidates cont…
Browse files Browse the repository at this point in the history
…rol of leader metrics (#226) (#3949)

Co-authored-by: Eleanor Pratt <[email protected]>
  • Loading branch information
eleanorpratt and Eleanor Pratt authored Sep 12, 2024
1 parent eec6b6b commit f45aefa
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 129 deletions.
262 changes: 155 additions & 107 deletions internal/scheduler/metrics/cycle_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ var (
)

type cycleMetrics struct {
leaderMetricsEnabled bool

scheduledJobs *prometheus.CounterVec
premptedJobs *prometheus.CounterVec
consideredJobs *prometheus.GaugeVec
Expand All @@ -25,97 +27,137 @@ type cycleMetrics struct {
cappedDemand *prometheus.GaugeVec
scheduleCycleTime prometheus.Histogram
reconciliationCycleTime prometheus.Histogram
allResettableMetrics []resettableMetric
}

func newCycleMetrics() *cycleMetrics {
scheduledJobs := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "scheduled_jobs",
Help: "Number of events scheduled",
},
queueAndPriorityClassLabels,
)

premptedJobs := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "preempted_jobs",
Help: "Number of jobs preempted",
},
queueAndPriorityClassLabels,
)

consideredJobs := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "considered_jobs",
Help: "Number of jobs considered",
},
poolAndQueueLabels,
)

fairShare := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "fair_share",
Help: "Fair share of each queue",
},
poolAndQueueLabels,
)

adjustedFairShare := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "adjusted_fair_share",
Help: "Adjusted Fair share of each queue",
},
poolAndQueueLabels,
)

actualShare := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "actual_share",
Help: "Actual Fair share of each queue",
},
poolAndQueueLabels,
)

demand := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "demand",
Help: "Demand of each queue",
},
poolAndQueueLabels,
)

cappedDemand := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "capped_demand",
Help: "Capped Demand of each queue and pool. This differs from demand in that it limits demand by scheduling constraints",
},
poolAndQueueLabels,
)

fairnessError := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "fairness_error",
Help: "Cumulative delta between adjusted fair share and actual share for all users who are below their fair share",
},
[]string{poolLabel},
)

scheduleCycleTime := prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: prefix + "schedule_cycle_times",
Help: "Cycle time when in a scheduling round.",
Buckets: prometheus.ExponentialBuckets(10.0, 1.1, 110),
},
)

reconciliationCycleTime := prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: prefix + "reconciliation_cycle_times",
Help: "Cycle time when in a scheduling round.",
Buckets: prometheus.ExponentialBuckets(10.0, 1.1, 110),
},
)

return &cycleMetrics{
scheduledJobs: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "scheduled_jobs",
Help: "Number of events scheduled",
},
queueAndPriorityClassLabels,
),

premptedJobs: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "preempted_jobs",
Help: "Number of jobs preempted",
},
queueAndPriorityClassLabels,
),

consideredJobs: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "considered_jobs",
Help: "Number of jobs considered",
},
poolAndQueueLabels,
),

fairShare: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "fair_share",
Help: "Fair share of each queue",
},
poolAndQueueLabels,
),

adjustedFairShare: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "adjusted_fair_share",
Help: "Adjusted Fair share of each queue",
},
poolAndQueueLabels,
),

actualShare: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "actual_share",
Help: "Actual Fair share of each queue",
},
poolAndQueueLabels,
),

demand: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "demand",
Help: "Demand of each queue",
},
poolAndQueueLabels,
),

cappedDemand: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "capped_demand",
Help: "Capped Demand of each queue and pool. This differs from demand in that it limits demand by scheduling constraints",
},
poolAndQueueLabels,
),

fairnessError: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "fairness_error",
Help: "Cumulative delta between adjusted fair share and actual share for all users who are below their fair share",
},
[]string{poolLabel},
),

scheduleCycleTime: prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: prefix + "schedule_cycle_times",
Help: "Cycle time when in a scheduling round.",
Buckets: prometheus.ExponentialBuckets(10.0, 1.1, 110),
},
),

reconciliationCycleTime: prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: prefix + "reconciliation_cycle_times",
Help: "Cycle time when in a scheduling round.",
Buckets: prometheus.ExponentialBuckets(10.0, 1.1, 110),
},
),
leaderMetricsEnabled: true,
scheduledJobs: scheduledJobs,
premptedJobs: premptedJobs,
consideredJobs: consideredJobs,
fairShare: fairShare,
adjustedFairShare: adjustedFairShare,
actualShare: actualShare,
demand: demand,
cappedDemand: cappedDemand,
fairnessError: fairnessError,
scheduleCycleTime: scheduleCycleTime,
allResettableMetrics: []resettableMetric{
scheduledJobs,
premptedJobs,
consideredJobs,
fairShare,
adjustedFairShare,
actualShare,
demand,
cappedDemand,
fairnessError,
},
reconciliationCycleTime: reconciliationCycleTime,
}
}

func (m *cycleMetrics) enableLeaderMetrics() {
m.leaderMetricsEnabled = true
}

func (m *cycleMetrics) disableLeaderMetrics() {
m.resetLeaderMetrics()
m.leaderMetricsEnabled = false
}

func (m *cycleMetrics) resetLeaderMetrics() {
for _, metric := range m.allResettableMetrics {
metric.Reset()
}
}

Expand Down Expand Up @@ -157,29 +199,35 @@ func (m *cycleMetrics) ReportSchedulerResult(result schedulerresult.SchedulerRes
}

func (m *cycleMetrics) describe(ch chan<- *prometheus.Desc) {
m.scheduledJobs.Describe(ch)
m.premptedJobs.Describe(ch)
m.consideredJobs.Describe(ch)
m.fairShare.Describe(ch)
m.adjustedFairShare.Describe(ch)
m.actualShare.Describe(ch)
m.fairnessError.Describe(ch)
m.demand.Describe(ch)
m.cappedDemand.Describe(ch)
m.scheduleCycleTime.Describe(ch)
if m.leaderMetricsEnabled {
m.scheduledJobs.Describe(ch)
m.premptedJobs.Describe(ch)
m.consideredJobs.Describe(ch)
m.fairShare.Describe(ch)
m.adjustedFairShare.Describe(ch)
m.actualShare.Describe(ch)
m.fairnessError.Describe(ch)
m.demand.Describe(ch)
m.cappedDemand.Describe(ch)
m.scheduleCycleTime.Describe(ch)
}

m.reconciliationCycleTime.Describe(ch)
}

func (m *cycleMetrics) collect(ch chan<- prometheus.Metric) {
m.scheduledJobs.Collect(ch)
m.premptedJobs.Collect(ch)
m.consideredJobs.Collect(ch)
m.fairShare.Collect(ch)
m.adjustedFairShare.Collect(ch)
m.actualShare.Collect(ch)
m.fairnessError.Collect(ch)
m.demand.Collect(ch)
m.cappedDemand.Collect(ch)
m.scheduleCycleTime.Collect(ch)
if m.leaderMetricsEnabled {
m.scheduledJobs.Collect(ch)
m.premptedJobs.Collect(ch)
m.consideredJobs.Collect(ch)
m.fairShare.Collect(ch)
m.adjustedFairShare.Collect(ch)
m.actualShare.Collect(ch)
m.fairnessError.Collect(ch)
m.demand.Collect(ch)
m.cappedDemand.Collect(ch)
m.scheduleCycleTime.Collect(ch)
}

m.reconciliationCycleTime.Collect(ch)
}
75 changes: 75 additions & 0 deletions internal/scheduler/metrics/cycle_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -78,6 +79,80 @@ func TestReportStateTransitions(t *testing.T) {
assert.InDelta(t, 0.05, fairnessError, epsilon, "fairnessError")
}

func TestResetLeaderMetrics(t *testing.T) {
m := newCycleMetrics()

poolQueueLabelValues := []string{"pool1", "queue1"}
queuePriorityClassLabelValues := []string{"pool1", "priorityClass1"}

testResetCounter := func(vec *prometheus.CounterVec, labelValues []string) {
vec.WithLabelValues(labelValues...).Inc()
counterVal := testutil.ToFloat64(vec.WithLabelValues(labelValues...))
assert.Equal(t, 1.0, counterVal)
m.resetLeaderMetrics()
counterVal = testutil.ToFloat64(vec.WithLabelValues(labelValues...))
assert.Equal(t, 0.0, counterVal)
}
testResetGauge := func(vec *prometheus.GaugeVec, labelValues []string) {
vec.WithLabelValues(labelValues...).Inc()
counterVal := testutil.ToFloat64(vec.WithLabelValues(labelValues...))
assert.Equal(t, 1.0, counterVal)
m.resetLeaderMetrics()
counterVal = testutil.ToFloat64(vec.WithLabelValues(labelValues...))
assert.Equal(t, 0.0, counterVal)
}

testResetCounter(m.scheduledJobs, queuePriorityClassLabelValues)
testResetCounter(m.premptedJobs, queuePriorityClassLabelValues)
testResetGauge(m.consideredJobs, poolQueueLabelValues)
testResetGauge(m.fairShare, poolQueueLabelValues)
testResetGauge(m.adjustedFairShare, poolQueueLabelValues)
testResetGauge(m.actualShare, poolQueueLabelValues)
testResetGauge(m.fairnessError, []string{"pool1"})
testResetGauge(m.demand, poolQueueLabelValues)
testResetGauge(m.cappedDemand, poolQueueLabelValues)
}

func TestDisableLeaderMetrics(t *testing.T) {
m := newCycleMetrics()

poolQueueLabelValues := []string{"pool1", "queue1"}
queuePriorityClassLabelValues := []string{"pool1", "priorityClass1"}

collect := func(m *cycleMetrics) []prometheus.Metric {
m.scheduledJobs.WithLabelValues(queuePriorityClassLabelValues...).Inc()
m.premptedJobs.WithLabelValues(queuePriorityClassLabelValues...).Inc()
m.consideredJobs.WithLabelValues(poolQueueLabelValues...).Inc()
m.fairShare.WithLabelValues(poolQueueLabelValues...).Inc()
m.adjustedFairShare.WithLabelValues(poolQueueLabelValues...).Inc()
m.actualShare.WithLabelValues(poolQueueLabelValues...).Inc()
m.fairnessError.WithLabelValues("pool1").Inc()
m.demand.WithLabelValues(poolQueueLabelValues...).Inc()
m.cappedDemand.WithLabelValues(poolQueueLabelValues...).Inc()
m.scheduleCycleTime.Observe(float64(1000))
m.reconciliationCycleTime.Observe(float64(1000))

ch := make(chan prometheus.Metric, 1000)
m.collect(ch)
collected := make([]prometheus.Metric, 0, len(ch))
for len(ch) > 0 {
collected = append(collected, <-ch)
}
return collected
}

// Enabled
assert.NotZero(t, len(collect(m)))

// Disabled
m.disableLeaderMetrics()
assert.Equal(t, 1, len(collect(m)))

// Enabled
m.enableLeaderMetrics()
assert.NotZero(t, len(collect(m)))
}

func cpu(n int) schedulerobjects.ResourceList {
return schedulerobjects.ResourceList{
Resources: map[string]resource.Quantity{"cpu": resource.MustParse(fmt.Sprintf("%d", n))},
Expand Down
Loading

0 comments on commit f45aefa

Please sign in to comment.