Skip to content

Commit

Permalink
[YUNIKORN-2855] Handle nil properly in queue metrics (#960)
Browse files Browse the repository at this point in the history
Closes: #960

Signed-off-by: Craig Condit <[email protected]>
  • Loading branch information
zhuqi-lucas authored and craigcondit committed Sep 17, 2024
1 parent ecf7a65 commit 227a240
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 13 deletions.
29 changes: 24 additions & 5 deletions pkg/metrics/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
dto "github.com/prometheus/client_model/go"
"go.uber.org/zap"

"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
)

Expand Down Expand Up @@ -57,6 +59,9 @@ type QueueMetrics struct {
resourceMetricsLabel *prometheus.GaugeVec
// Deprecated - To be removed in 1.7.0. Replaced with queue label Metrics
resourceMetricsSubsystem *prometheus.GaugeVec
// Track known resource types
knownResourceTypes map[string]struct{}
lock locking.Mutex
}

// InitQueueMetrics to initialize queue metrics
Expand Down Expand Up @@ -123,6 +128,7 @@ func InitQueueMetrics(name string) *QueueMetrics {
}
}

q.knownResourceTypes = make(map[string]struct{})
return q
}

Expand All @@ -142,10 +148,13 @@ func (m *QueueMetrics) setQueueResource(state string, resourceName string, value
}

func (m *QueueMetrics) Reset() {
m.lock.Lock()
defer m.lock.Unlock()
m.appMetricsLabel.Reset()
m.appMetricsSubsystem.Reset()
m.resourceMetricsLabel.Reset()
m.resourceMetricsSubsystem.Reset()
m.knownResourceTypes = make(map[string]struct{})
}

func (m *QueueMetrics) IncQueueApplicationsRunning() {
Expand Down Expand Up @@ -301,12 +310,22 @@ func (m *QueueMetrics) AddReleasedContainers(value int) {
m.containerMetrics.WithLabelValues(ContainerReleased).Add(float64(value))
}

func (m *QueueMetrics) SetQueueGuaranteedResourceMetrics(resourceName string, value float64) {
m.setQueueResource(QueueGuaranteed, resourceName, value)
}
func (m *QueueMetrics) UpdateQueueResourceMetrics(state string, newResources map[string]resources.Quantity) {
m.lock.Lock()
defer m.lock.Unlock()
// Iterate over new resource types and set their values
for resourceName, value := range newResources {
m.setQueueResource(state, resourceName, float64(value))
// Add new resources to the known list
m.knownResourceTypes[resourceName] = struct{}{}
}

func (m *QueueMetrics) SetQueueMaxResourceMetrics(resourceName string, value float64) {
m.setQueueResource(QueueMax, resourceName, value)
// Emit old resource types that are missing in the new collection with zero
for resourceName := range m.knownResourceTypes {
if _, exists := newResources[resourceName]; !exists {
m.setQueueResource(state, resourceName, float64(0))
}
}
}

func (m *QueueMetrics) SetQueueAllocatedResourceMetrics(resourceName string, value float64) {
Expand Down
19 changes: 17 additions & 2 deletions pkg/metrics/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"

"github.com/apache/yunikorn-core/pkg/common/resources"
)

var qm *QueueMetrics
Expand Down Expand Up @@ -196,16 +198,28 @@ func TestQueueGuaranteedResourceMetrics(t *testing.T) {
qm = getQueueMetrics()
defer unregisterQueueMetrics()

qm.SetQueueGuaranteedResourceMetrics("cpu", 1)
qm.UpdateQueueResourceMetrics("guaranteed", map[string]resources.Quantity{
"cpu": 1,
})
verifyResourceMetrics(t, "guaranteed", "cpu")
assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu": {}})

qm.UpdateQueueResourceMetrics("guaranteed", map[string]resources.Quantity{"memory": 1})
assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu": {}, "memory": {}})
}

func TestQueueMaxResourceMetrics(t *testing.T) {
qm = getQueueMetrics()
defer unregisterQueueMetrics()

qm.SetQueueMaxResourceMetrics("cpu", 1)
qm.UpdateQueueResourceMetrics("max", map[string]resources.Quantity{
"cpu": 1,
})
verifyResourceMetrics(t, "max", "cpu")
assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu": {}})

qm.UpdateQueueResourceMetrics("max", map[string]resources.Quantity{"memory": 1})
assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu": {}, "memory": {}})
}

func TestQueueAllocatedResourceMetrics(t *testing.T) {
Expand Down Expand Up @@ -364,4 +378,5 @@ func unregisterQueueMetrics() {
prometheus.Unregister(qm.containerMetrics)
prometheus.Unregister(qm.resourceMetricsLabel)
prometheus.Unregister(qm.resourceMetricsSubsystem)
qm.knownResourceTypes = make(map[string]struct{})
}
14 changes: 8 additions & 6 deletions pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -1657,20 +1657,22 @@ func (sq *Queue) SupportTaskGroup() bool {

// updateGuaranteedResourceMetrics updates guaranteed resource metrics.
func (sq *Queue) updateGuaranteedResourceMetrics() {
queueMetrics := metrics.GetQueueMetrics(sq.QueuePath)
resourcesToUpdate := map[string]resources.Quantity{}
if sq.guaranteedResource != nil {
for k, v := range sq.guaranteedResource.Resources {
metrics.GetQueueMetrics(sq.QueuePath).SetQueueGuaranteedResourceMetrics(k, float64(v))
}
resourcesToUpdate = sq.guaranteedResource.Resources
}
queueMetrics.UpdateQueueResourceMetrics(metrics.QueueGuaranteed, resourcesToUpdate)
}

// updateMaxResourceMetrics updates max resource metrics.
func (sq *Queue) updateMaxResourceMetrics() {
queueMetrics := metrics.GetQueueMetrics(sq.QueuePath)
resourcesToUpdate := map[string]resources.Quantity{}
if sq.maxResource != nil {
for k, v := range sq.maxResource.Resources {
metrics.GetQueueMetrics(sq.QueuePath).SetQueueMaxResourceMetrics(k, float64(v))
}
resourcesToUpdate = sq.maxResource.Resources
}
queueMetrics.UpdateQueueResourceMetrics(metrics.QueueMax, resourcesToUpdate)
}

// updateAllocatedResourceMetrics updates allocated resource metrics for all queue types.
Expand Down

0 comments on commit 227a240

Please sign in to comment.