Skip to content

Commit

Permalink
Address new comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuqi-lucas committed Sep 10, 2024
1 parent 466d7e4 commit 380c95c
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 38 deletions.
60 changes: 56 additions & 4 deletions pkg/metrics/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,64 @@ func (m *QueueMetrics) AddReleasedContainers(value int) {
m.containerMetrics.WithLabelValues(ContainerReleased).Add(float64(value))
}

func (m *QueueMetrics) SetQueueGuaranteedResourceMetrics(resourceName string, value float64) {
m.setQueueResource(QueueGuaranteed, resourceName, value)
func (m *QueueMetrics) SetQueueResourceMetrics(state string, resourceName string, value float64) {
m.setQueueResource(state, resourceName, value)
}

func (m *QueueMetrics) SetQueueMaxResourceMetrics(resourceName string, value float64) {
m.setQueueResource(QueueMax, resourceName, value)
func (m *QueueMetrics) SetQueueNilResourceMetrics(state string) {
currResource, err := m.GetAllQueueStateMetrics(state)
if err != nil {
log.Log(log.Metrics).Warn("failed to get current guaranteed resource metrics",
zap.Error(err))
return

Check warning on line 313 in pkg/metrics/queue.go

View check run for this annotation

Codecov / codecov/patch

pkg/metrics/queue.go#L311-L313

Added lines #L311 - L313 were not covered by tests
}
for k := range currResource {
m.SetQueueResourceMetrics(state, k, float64(0))
}
}

func (m *QueueMetrics) GetQueueMaxResourceMetrics(resourceName string) (float64, error) {
metricDto := &dto.Metric{}
err := m.resourceMetricsLabel.WithLabelValues(QueueMax, resourceName).Write(metricDto)
if err == nil {
return *metricDto.Gauge.Value, nil

Check warning on line 324 in pkg/metrics/queue.go

View check run for this annotation

Codecov / codecov/patch

pkg/metrics/queue.go#L320-L324

Added lines #L320 - L324 were not covered by tests
}
return -1, err

Check warning on line 326 in pkg/metrics/queue.go

View check run for this annotation

Codecov / codecov/patch

pkg/metrics/queue.go#L326

Added line #L326 was not covered by tests
}

func (m *QueueMetrics) GetAllQueueStateMetrics(state string) (map[string]float64, error) {
metrics := make(map[string]float64)
metricFamilies, err := prometheus.DefaultGatherer.Gather()
if err != nil {
return nil, err

Check warning on line 333 in pkg/metrics/queue.go

View check run for this annotation

Codecov / codecov/patch

pkg/metrics/queue.go#L333

Added line #L333 was not covered by tests
}

for _, metricFamily := range metricFamilies {
for _, metric := range metricFamily.Metric {
// Check if this metric belongs to the state type.
for _, label := range metric.Label {
if *label.Name == "state" && *label.Value == state {
// Extract the `resource` label and its corresponding value.
for _, subLabel := range metric.Label {
if *subLabel.Name == "resource" {
metrics[*subLabel.Value] = metric.GetGauge().GetValue()
}
}
}
}
}
}

return metrics, nil
}

func (m *QueueMetrics) GetQueueGuaranteedResourceMetrics(resourceName string) (float64, error) {
metricDto := &dto.Metric{}
err := m.resourceMetricsLabel.WithLabelValues(QueueGuaranteed, resourceName).Write(metricDto)
if err == nil {
return *metricDto.Gauge.Value, nil

Check warning on line 359 in pkg/metrics/queue.go

View check run for this annotation

Codecov / codecov/patch

pkg/metrics/queue.go#L355-L359

Added lines #L355 - L359 were not covered by tests
}
return -1, err

Check warning on line 361 in pkg/metrics/queue.go

View check run for this annotation

Codecov / codecov/patch

pkg/metrics/queue.go#L361

Added line #L361 was not covered by tests
}

func (m *QueueMetrics) SetQueueAllocatedResourceMetrics(resourceName string, value float64) {
Expand Down
26 changes: 24 additions & 2 deletions pkg/metrics/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,16 +196,38 @@ func TestQueueGuaranteedResourceMetrics(t *testing.T) {
qm = getQueueMetrics()
defer unregisterQueueMetrics()

qm.SetQueueGuaranteedResourceMetrics("cpu", 1)
qm.SetQueueResourceMetrics(QueueGuaranteed, "cpu", 1)
verifyResourceMetrics(t, "guaranteed", "cpu")

res, err := qm.GetAllQueueStateMetrics(QueueGuaranteed)
assert.NilError(t, err)
assert.Equal(t, len(res), 1)
assert.DeepEqual(t, res, map[string]float64{"cpu": 1})

qm.SetQueueNilResourceMetrics(QueueGuaranteed)
res, err = qm.GetAllQueueStateMetrics(QueueGuaranteed)
assert.NilError(t, err)
assert.Equal(t, len(res), 1)
assert.DeepEqual(t, res, map[string]float64{"cpu": 0})
}

func TestQueueMaxResourceMetrics(t *testing.T) {
qm = getQueueMetrics()
defer unregisterQueueMetrics()

qm.SetQueueMaxResourceMetrics("cpu", 1)
qm.SetQueueResourceMetrics(QueueMax, "cpu", 1)
verifyResourceMetrics(t, "max", "cpu")

res, err := qm.GetAllQueueStateMetrics(QueueMax)
assert.NilError(t, err)
assert.Equal(t, len(res), 1)
assert.DeepEqual(t, res, map[string]float64{"cpu": 1})

qm.SetQueueNilResourceMetrics(QueueMax)
res, err = qm.GetAllQueueStateMetrics(QueueMax)
assert.NilError(t, err)
assert.Equal(t, len(res), 1)
assert.DeepEqual(t, res, map[string]float64{"cpu": 0})
}

func TestQueueAllocatedResourceMetrics(t *testing.T) {
Expand Down
48 changes: 20 additions & 28 deletions pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,20 @@ type Queue struct {
// The queue properties should be treated as immutable the value is a merge of the
// parent properties with the config for this queue only manipulated during creation
// of the queue or via a queue configuration update.
properties map[string]string
adminACL security.ACL // admin ACL
submitACL security.ACL // submit ACL
previousMaxResource *resources.Resource // Previous max resource for the queue before update
previousGuaranteedResource *resources.Resource // Previous guaranteed resource for the queue before update
maxResource *resources.Resource // When not set, max = nil
guaranteedResource *resources.Resource // When not set, Guaranteed == 0
isLeaf bool // this is a leaf queue or not (i.e. parent)
isManaged bool // queue is part of the config, not auto created
stateMachine *fsm.FSM // the state of the queue for scheduling
stateTime time.Time // last time the state was updated (needed for cleanup)
maxRunningApps uint64
runningApps uint64
allocatingAcceptedApps map[string]bool
template *template.Template
queueEvents *schedEvt.QueueEvents
properties map[string]string
adminACL security.ACL // admin ACL
submitACL security.ACL // submit ACL
maxResource *resources.Resource // When not set, max = nil
guaranteedResource *resources.Resource // When not set, Guaranteed == 0
isLeaf bool // this is a leaf queue or not (i.e. parent)
isManaged bool // queue is part of the config, not auto created
stateMachine *fsm.FSM // the state of the queue for scheduling
stateTime time.Time // last time the state was updated (needed for cleanup)
maxRunningApps uint64
runningApps uint64
allocatingAcceptedApps map[string]bool
template *template.Template
queueEvents *schedEvt.QueueEvents

locking.RWMutex
}
Expand Down Expand Up @@ -1661,27 +1659,21 @@ func (sq *Queue) SupportTaskGroup() bool {
func (sq *Queue) updateGuaranteedResourceMetrics() {
if sq.guaranteedResource != nil {
for k, v := range sq.guaranteedResource.Resources {
metrics.GetQueueMetrics(sq.QueuePath).SetQueueGuaranteedResourceMetrics(k, float64(v))
}
sq.previousGuaranteedResource = sq.guaranteedResource
} else if sq.previousGuaranteedResource != nil {
for k := range sq.previousGuaranteedResource.Resources {
metrics.GetQueueMetrics(sq.QueuePath).SetQueueGuaranteedResourceMetrics(k, float64(0))
metrics.GetQueueMetrics(sq.QueuePath).SetQueueResourceMetrics(metrics.QueueGuaranteed, k, float64(v))
}
} else {
metrics.GetQueueMetrics(sq.QueuePath).SetQueueNilResourceMetrics(metrics.QueueGuaranteed)
}
}

// updateMaxResourceMetrics updates max resource metrics.
func (sq *Queue) updateMaxResourceMetrics() {
if sq.maxResource != nil {
for k, v := range sq.maxResource.Resources {
metrics.GetQueueMetrics(sq.QueuePath).SetQueueMaxResourceMetrics(k, float64(v))
}
sq.previousMaxResource = sq.maxResource
} else if sq.previousMaxResource != nil {
for k := range sq.previousMaxResource.Resources {
metrics.GetQueueMetrics(sq.QueuePath).SetQueueMaxResourceMetrics(k, float64(0))
metrics.GetQueueMetrics(sq.QueuePath).SetQueueResourceMetrics(metrics.QueueMax, k, float64(v))
}
} else {
metrics.GetQueueMetrics(sq.QueuePath).SetQueueNilResourceMetrics(metrics.QueueMax)
}
}

Expand Down
4 changes: 0 additions & 4 deletions pkg/scheduler/objects/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1753,8 +1753,6 @@ func TestSetResources(t *testing.T) {
assert.NilError(t, err, "failed to set resources: %v", err)
assert.DeepEqual(t, queue.guaranteedResource, nilResource)
assert.DeepEqual(t, queue.maxResource, nilResource)
assert.DeepEqual(t, queue.previousGuaranteedResource, expectedGuaranteedResource)
assert.DeepEqual(t, queue.previousMaxResource, expectedMaxResource)

// case 2: zero resource won't change the queue resources as it is 'nil' already
err = queue.setResourcesFromConf(configs.Resources{
Expand All @@ -1764,8 +1762,6 @@ func TestSetResources(t *testing.T) {
assert.NilError(t, err, "failed to set resources: %v", err)
assert.DeepEqual(t, queue.guaranteedResource, nilResource)
assert.DeepEqual(t, queue.maxResource, nilResource)
assert.DeepEqual(t, queue.previousGuaranteedResource, expectedGuaranteedResource)
assert.DeepEqual(t, queue.previousMaxResource, expectedMaxResource)
}

func TestPreemptingResource(t *testing.T) {
Expand Down

0 comments on commit 380c95c

Please sign in to comment.