Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2855] Rethink the call for update metrics with nil resource #960

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions pkg/metrics/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
dto "github.com/prometheus/client_model/go"
"go.uber.org/zap"

"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/log"
)

Expand Down Expand Up @@ -57,6 +58,8 @@
resourceMetricsLabel *prometheus.GaugeVec
// Deprecated - To be removed in 1.7.0. Replaced with queue label Metrics
resourceMetricsSubsystem *prometheus.GaugeVec
// Track known resource types.
knownResourceTypes map[string]struct{}
craigcondit marked this conversation as resolved.
Show resolved Hide resolved
}

// InitQueueMetrics to initialize queue metrics
Expand Down Expand Up @@ -123,6 +126,7 @@
}
}

q.knownResourceTypes = make(map[string]struct{})
return q
}

Expand All @@ -146,6 +150,7 @@
m.appMetricsSubsystem.Reset()
m.resourceMetricsLabel.Reset()
m.resourceMetricsSubsystem.Reset()
m.knownResourceTypes = make(map[string]struct{})

Check warning on line 153 in pkg/metrics/queue.go

View check run for this annotation

Codecov / codecov/patch

pkg/metrics/queue.go#L153

Added line #L153 was not covered by tests
craigcondit marked this conversation as resolved.
Show resolved Hide resolved
}

func (m *QueueMetrics) IncQueueApplicationsRunning() {
Expand Down Expand Up @@ -301,12 +306,20 @@
m.containerMetrics.WithLabelValues(ContainerReleased).Add(float64(value))
}

func (m *QueueMetrics) SetQueueGuaranteedResourceMetrics(resourceName string, value float64) {
m.setQueueResource(QueueGuaranteed, resourceName, value)
}
func (m *QueueMetrics) UpdateQueueResourceMetrics(state string, newResources map[string]resources.Quantity) {
// Iterate over new resource types and set their values.
for resourceName, value := range newResources {
m.setQueueResource(state, resourceName, float64(value))
// Add new resources to the known list
m.knownResourceTypes[resourceName] = struct{}{}
craigcondit marked this conversation as resolved.
Show resolved Hide resolved
}

func (m *QueueMetrics) SetQueueMaxResourceMetrics(resourceName string, value float64) {
m.setQueueResource(QueueMax, resourceName, value)
// Emit old resource types that are missing in the new collection with zero.
for resourceName := range m.knownResourceTypes {
if _, exists := newResources[resourceName]; !exists {
m.setQueueResource(state, resourceName, float64(0))
}
}
}

func (m *QueueMetrics) SetQueueAllocatedResourceMetrics(resourceName string, value float64) {
Expand Down
19 changes: 17 additions & 2 deletions pkg/metrics/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"

"github.com/apache/yunikorn-core/pkg/common/resources"
)

var qm *QueueMetrics
Expand Down Expand Up @@ -196,16 +198,28 @@ func TestQueueGuaranteedResourceMetrics(t *testing.T) {
qm = getQueueMetrics()
defer unregisterQueueMetrics()

qm.SetQueueGuaranteedResourceMetrics("cpu", 1)
qm.UpdateQueueResourceMetrics("guaranteed", map[string]resources.Quantity{
"cpu": 1,
})
verifyResourceMetrics(t, "guaranteed", "cpu")
assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu": {}})

qm.UpdateQueueResourceMetrics("guaranteed", map[string]resources.Quantity{"memory": 1})
assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu": {}, "memory": {}})
}

func TestQueueMaxResourceMetrics(t *testing.T) {
qm = getQueueMetrics()
defer unregisterQueueMetrics()

qm.SetQueueMaxResourceMetrics("cpu", 1)
qm.UpdateQueueResourceMetrics("max", map[string]resources.Quantity{
"cpu": 1,
})
verifyResourceMetrics(t, "max", "cpu")
assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu": {}})

qm.UpdateQueueResourceMetrics("max", map[string]resources.Quantity{"memory": 1})
assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu": {}, "memory": {}})
}

func TestQueueAllocatedResourceMetrics(t *testing.T) {
Expand Down Expand Up @@ -364,4 +378,5 @@ func unregisterQueueMetrics() {
prometheus.Unregister(qm.containerMetrics)
prometheus.Unregister(qm.resourceMetricsLabel)
prometheus.Unregister(qm.resourceMetricsSubsystem)
qm.knownResourceTypes = make(map[string]struct{})
}
14 changes: 8 additions & 6 deletions pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -1657,20 +1657,22 @@ func (sq *Queue) SupportTaskGroup() bool {

// updateGuaranteedResourceMetrics updates guaranteed resource metrics.
func (sq *Queue) updateGuaranteedResourceMetrics() {
queueMetrics := metrics.GetQueueMetrics(sq.QueuePath)
resourcesToUpdate := map[string]resources.Quantity{}
if sq.guaranteedResource != nil {
for k, v := range sq.guaranteedResource.Resources {
metrics.GetQueueMetrics(sq.QueuePath).SetQueueGuaranteedResourceMetrics(k, float64(v))
}
resourcesToUpdate = sq.guaranteedResource.Resources
}
queueMetrics.UpdateQueueResourceMetrics(metrics.QueueGuaranteed, resourcesToUpdate)
}

// updateMaxResourceMetrics updates max resource metrics.
func (sq *Queue) updateMaxResourceMetrics() {
queueMetrics := metrics.GetQueueMetrics(sq.QueuePath)
resourcesToUpdate := map[string]resources.Quantity{}
if sq.maxResource != nil {
for k, v := range sq.maxResource.Resources {
metrics.GetQueueMetrics(sq.QueuePath).SetQueueMaxResourceMetrics(k, float64(v))
craigcondit marked this conversation as resolved.
Show resolved Hide resolved
}
resourcesToUpdate = sq.maxResource.Resources
}
queueMetrics.UpdateQueueResourceMetrics(metrics.QueueMax, resourcesToUpdate)
}

// updateAllocatedResourceMetrics updates allocated resource metrics for all queue types.
Expand Down
Loading