Skip to content

Commit

Permalink
[YUNIKORN-2818] Fix state tracking metrics for app and queue (#951)
Browse files Browse the repository at this point in the history
Closes: #951

Signed-off-by: Craig Condit <[email protected]>
  • Loading branch information
zhuqi-lucas authored and craigcondit committed Aug 21, 2024
1 parent 8a4acda commit 375895b
Show file tree
Hide file tree
Showing 9 changed files with 499 additions and 67 deletions.
91 changes: 84 additions & 7 deletions pkg/metrics/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@ import (
)

const (
AppAccepted = "accepted"
AppRunning = "running"
AppFailed = "failed"
AppRejected = "rejected"
AppCompleted = "completed"
AppNew = "new"
AppAccepted = "accepted"
AppRunning = "running"
AppFailing = "failing"
AppFailed = "failed"
AppRejected = "rejected"
AppResuming = "resuming"
AppCompleting = "completing"
AppCompleted = "completed"
AppExpired = "expired"

ContainerReleased = "released"
ContainerAllocated = "allocated"
Expand Down Expand Up @@ -65,15 +70,15 @@ func InitQueueMetrics(name string) *QueueMetrics {
Namespace: Namespace,
Name: "queue_app",
ConstLabels: prometheus.Labels{"queue": name},
Help: "Queue application metrics. State of the application includes `accepted`, `rejected`, `running`, `failed`, `completed`.",
Help: "Queue application metrics. State of the application includes `new`, `accepted`, `rejected`, `running`, `failing`, `failed`, `resuming`, `completing`, `completed`.",
}, []string{"state"})

q.appMetricsSubsystem = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: replaceStr,
Name: "queue_app",
Help: "Queue application metrics. State of the application includes `accepted`, `rejected`, `running`, `failed`, `completed`.",
Help: "Queue application metrics. State of the application includes `new`, `accepted`, `rejected`, `running`, `failing`, `failed`, `resuming`, `completing`, `completed`.",
}, []string{"state"})

q.containerMetrics = prometheus.NewCounterVec(
Expand Down Expand Up @@ -160,10 +165,31 @@ func (m *QueueMetrics) GetQueueApplicationsRunning() (int, error) {
return -1, err
}

func (m *QueueMetrics) IncQueueApplicationsNew() {
m.incQueueApplications(AppNew)
}

func (m *QueueMetrics) DecQueueApplicationsNew() {
m.decQueueApplications(AppNew)
}

func (m *QueueMetrics) GetQueueApplicationsNew() (int, error) {
metricDto := &dto.Metric{}
err := m.appMetricsLabel.WithLabelValues(AppNew).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
return -1, err
}

func (m *QueueMetrics) IncQueueApplicationsAccepted() {
m.incQueueApplications(AppAccepted)
}

func (m *QueueMetrics) DecQueueApplicationsAccepted() {
m.decQueueApplications(AppAccepted)
}

func (m *QueueMetrics) GetQueueApplicationsAccepted() (int, error) {
metricDto := &dto.Metric{}
err := m.appMetricsLabel.WithLabelValues(AppAccepted).Write(metricDto)
Expand All @@ -186,6 +212,40 @@ func (m *QueueMetrics) GetQueueApplicationsRejected() (int, error) {
return -1, err
}

func (m *QueueMetrics) IncQueueApplicationsResuming() {
m.incQueueApplications(AppResuming)
}

func (m *QueueMetrics) DecQueueApplicationsResuming() {
m.decQueueApplications(AppResuming)
}

func (m *QueueMetrics) GetQueueApplicationsResuming() (int, error) {
metricDto := &dto.Metric{}
err := m.appMetricsLabel.WithLabelValues(AppResuming).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
return -1, err
}

func (m *QueueMetrics) IncQueueApplicationsFailing() {
m.incQueueApplications(AppFailing)
}

func (m *QueueMetrics) DecQueueApplicationsFailing() {
m.decQueueApplications(AppFailing)
}

func (m *QueueMetrics) GetQueueApplicationsFailing() (int, error) {
metricDto := &dto.Metric{}
err := m.appMetricsLabel.WithLabelValues(AppFailing).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
return -1, err
}

func (m *QueueMetrics) IncQueueApplicationsFailed() {
m.incQueueApplications(AppFailed)
}
Expand All @@ -199,6 +259,23 @@ func (m *QueueMetrics) GetQueueApplicationsFailed() (int, error) {
return -1, err
}

func (m *QueueMetrics) IncQueueApplicationsCompleting() {
m.incQueueApplications(AppCompleting)
}

func (m *QueueMetrics) DecQueueApplicationsCompleting() {
m.decQueueApplications(AppCompleting)
}

func (m *QueueMetrics) GetQueueApplicationsCompleting() (int, error) {
metricDto := &dto.Metric{}
err := m.appMetricsLabel.WithLabelValues(AppCompleting).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
return -1, err
}

func (m *QueueMetrics) IncQueueApplicationsCompleted() {
m.incQueueApplications(AppCompleted)
}
Expand Down
98 changes: 98 additions & 0 deletions pkg/metrics/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,38 @@ import (

var qm *QueueMetrics

func TestApplicationsNew(t *testing.T) {
qm = getQueueMetrics()
defer unregisterQueueMetrics()

qm.IncQueueApplicationsNew()
verifyAppMetrics(t, "new")

curr, err := qm.GetQueueApplicationsNew()
assert.NilError(t, err)
assert.Equal(t, 1, curr)

qm.DecQueueApplicationsNew()
curr, err = qm.GetQueueApplicationsNew()
assert.NilError(t, err)
assert.Equal(t, 0, curr)
}

func TestApplicationsRunning(t *testing.T) {
qm = getQueueMetrics()
defer unregisterQueueMetrics()

qm.IncQueueApplicationsRunning()
verifyAppMetrics(t, "running")

curr, err := qm.GetQueueApplicationsRunning()
assert.NilError(t, err)
assert.Equal(t, 1, curr)

qm.DecQueueApplicationsRunning()
curr, err = qm.GetQueueApplicationsRunning()
assert.NilError(t, err)
assert.Equal(t, 0, curr)
}

func TestApplicationsAccepted(t *testing.T) {
Expand All @@ -44,6 +70,49 @@ func TestApplicationsAccepted(t *testing.T) {

qm.IncQueueApplicationsAccepted()
verifyAppMetrics(t, "accepted")

curr, err := qm.GetQueueApplicationsAccepted()
assert.NilError(t, err)
assert.Equal(t, 1, curr)

qm.DecQueueApplicationsAccepted()
curr, err = qm.GetQueueApplicationsAccepted()
assert.NilError(t, err)
assert.Equal(t, 0, curr)
}

func TestApplicationsResuming(t *testing.T) {
qm = getQueueMetrics()
defer unregisterQueueMetrics()

qm.IncQueueApplicationsResuming()
verifyAppMetrics(t, "resuming")

curr, err := qm.GetQueueApplicationsResuming()
assert.NilError(t, err)
assert.Equal(t, 1, curr)

qm.DecQueueApplicationsResuming()
curr, err = qm.GetQueueApplicationsResuming()
assert.NilError(t, err)
assert.Equal(t, 0, curr)
}

func TestApplicationsFailing(t *testing.T) {
qm = getQueueMetrics()
defer unregisterQueueMetrics()

qm.IncQueueApplicationsFailing()
verifyAppMetrics(t, "failing")

curr, err := qm.GetQueueApplicationsFailing()
assert.NilError(t, err)
assert.Equal(t, 1, curr)

qm.DecQueueApplicationsFailing()
curr, err = qm.GetQueueApplicationsFailing()
assert.NilError(t, err)
assert.Equal(t, 0, curr)
}

func TestApplicationsRejected(t *testing.T) {
Expand All @@ -52,6 +121,10 @@ func TestApplicationsRejected(t *testing.T) {

qm.IncQueueApplicationsRejected()
verifyAppMetrics(t, "rejected")

curr, err := qm.GetQueueApplicationsRejected()
assert.NilError(t, err)
assert.Equal(t, 1, curr)
}

func TestApplicationsFailed(t *testing.T) {
Expand All @@ -60,6 +133,27 @@ func TestApplicationsFailed(t *testing.T) {

qm.IncQueueApplicationsFailed()
verifyAppMetrics(t, "failed")

curr, err := qm.GetQueueApplicationsFailed()
assert.NilError(t, err)
assert.Equal(t, 1, curr)
}

func TestApplicationsCompleting(t *testing.T) {
qm = getQueueMetrics()
defer unregisterQueueMetrics()

qm.IncQueueApplicationsCompleting()
verifyAppMetrics(t, "completing")

curr, err := qm.GetQueueApplicationsCompleting()
assert.NilError(t, err)
assert.Equal(t, 1, curr)

qm.DecQueueApplicationsCompleting()
curr, err = qm.GetQueueApplicationsCompleting()
assert.NilError(t, err)
assert.Equal(t, 0, curr)
}

func TestApplicationsCompleted(t *testing.T) {
Expand All @@ -68,6 +162,10 @@ func TestApplicationsCompleted(t *testing.T) {

qm.IncQueueApplicationsCompleted()
verifyAppMetrics(t, "completed")

curr, err := qm.GetQueueApplicationsCompleted()
assert.NilError(t, err)
assert.Equal(t, 1, curr)
}

func TestAllocatedContainers(t *testing.T) {
Expand Down
Loading

0 comments on commit 375895b

Please sign in to comment.