Skip to content

Commit

Permalink
Validate prometheus query result
Browse files Browse the repository at this point in the history
  • Loading branch information
ingvagabund committed Nov 19, 2024
1 parent a4186f1 commit d30044e
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1402,7 +1402,7 @@ func TestLowNodeUtilizationWithPrometheusMetrics(t *testing.T) {
useDeviationThresholds bool
thresholds, targetThresholds api.ResourceThresholds
query string
samples []model.Sample
samples model.Vector
nodes []*v1.Node
pods []*v1.Pod
expectedPodsEvicted uint
Expand All @@ -1418,7 +1418,7 @@ func TestLowNodeUtilizationWithPrometheusMetrics(t *testing.T) {
v1.ResourceName("MetricResource"): 50,
},
query: "instance:node_cpu:rate:sum",
samples: []model.Sample{
samples: model.Vector{
sample("instance:node_cpu:rate:sum", n1NodeName, 0.5695757575757561),
sample("instance:node_cpu:rate:sum", n2NodeName, 0.4245454545454522),
sample("instance:node_cpu:rate:sum", n3NodeName, 0.20381818181818104),
Expand Down Expand Up @@ -1487,7 +1487,8 @@ func TestLowNodeUtilizationWithPrometheusMetrics(t *testing.T) {
}

handle.PrometheusClientImpl = &fakePromClient{
result: tc.samples,
result: tc.samples,
dataType: model.ValVector,
}
plugin, err := NewLowNodeUtilization(&LowNodeUtilizationArgs{
Thresholds: tc.thresholds,
Expand Down
49 changes: 13 additions & 36 deletions pkg/framework/plugins/nodeutilization/usageclients.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ package nodeutilization

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"

promapi "github.com/prometheus/client_golang/api"
Expand Down Expand Up @@ -261,36 +258,6 @@ func (client *prometheusUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]
return nil, newNotSupportedError(prometheusUsageClientType)
}

type fakePromClient struct {
result interface{}
}

type fakePayload struct {
Status string `json:"status"`
Data queryResult `json:"data"`
}

type queryResult struct {
Type model.ValueType `json:"resultType"`
Result interface{} `json:"result"`
}

func (client *fakePromClient) URL(ep string, args map[string]string) *url.URL {
return &url.URL{}
}

func (client *fakePromClient) Do(ctx context.Context, request *http.Request) (*http.Response, []byte, error) {
jsonData, err := json.Marshal(fakePayload{
Status: "success",
Data: queryResult{
Type: model.ValVector,
Result: client.result,
},
})

return &http.Response{StatusCode: 200}, jsonData, err
}

func (client *prometheusUsageClient) resourceNames() []v1.ResourceName {
return []v1.ResourceName{ResourceMetrics}
}
Expand All @@ -307,11 +274,21 @@ func (client *prometheusUsageClient) sync(nodes []*v1.Node) error {
klog.Infof("prometheus metrics warnings: %v", warnings)
}

if results.Type() != model.ValVector {
return fmt.Errorf("expected query results to be of type %q, got %q instead", model.ValVector, results.Type())
}

nodeUsages := make(map[string]map[v1.ResourceName]*resource.Quantity)
for _, sample := range results.(model.Vector) {
nodeName := string(sample.Metric["instance"])
nodeUsages[nodeName] = map[v1.ResourceName]*resource.Quantity{
v1.ResourceName("MetricResource"): resource.NewQuantity(int64(sample.Value*100), resource.DecimalSI),
nodeName, exists := sample.Metric["instance"]
if !exists {
return fmt.Errorf("The collected metrics sample is missing 'instance' key")
}
if sample.Value < 0 || sample.Value > 1 {
return fmt.Errorf("The collected metrics sample for %q has value %v outside of <0; 1> interval", string(nodeName), sample.Value)
}
nodeUsages[string(nodeName)] = map[v1.ResourceName]*resource.Quantity{
ResourceMetrics: resource.NewQuantity(int64(sample.Value*100), resource.DecimalSI),
}
}

Expand Down
169 changes: 130 additions & 39 deletions pkg/framework/plugins/nodeutilization/usageclients_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package nodeutilization

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"testing"

"github.com/prometheus/common/model"
Expand Down Expand Up @@ -138,8 +141,39 @@ func TestActualUsageClient(t *testing.T) {
)
}

func sample(metricName, nodeName string, value float64) model.Sample {
return model.Sample{
type fakePromClient struct {
result interface{}
dataType model.ValueType
}

type fakePayload struct {
Status string `json:"status"`
Data queryResult `json:"data"`
}

type queryResult struct {
Type model.ValueType `json:"resultType"`
Result interface{} `json:"result"`
}

func (client *fakePromClient) URL(ep string, args map[string]string) *url.URL {
return &url.URL{}
}

func (client *fakePromClient) Do(ctx context.Context, request *http.Request) (*http.Response, []byte, error) {
jsonData, err := json.Marshal(fakePayload{
Status: "success",
Data: queryResult{
Type: client.dataType,
Result: client.result,
},
})

return &http.Response{StatusCode: 200}, jsonData, err
}

func sample(metricName, nodeName string, value float64) *model.Sample {
return &model.Sample{
Metric: model.Metric{
"__name__": model.LabelValue(metricName),
"instance": model.LabelValue(nodeName),
Expand All @@ -161,46 +195,103 @@ func TestPrometheusUsageClient(t *testing.T) {
p22 := test.BuildTestPod("p22", 400, 0, n2.Name, nil)
p3 := test.BuildTestPod("p3", 400, 0, n3.Name, nil)

pClient := &fakePromClient{
result: []model.Sample{
sample("instance:node_cpu:rate:sum", "ip-10-0-51-101.ec2.internal", 0.20381818181818104),
sample("instance:node_cpu:rate:sum", "ip-10-0-17-165.ec2.internal", 0.4245454545454522),
sample("instance:node_cpu:rate:sum", "ip-10-0-94-25.ec2.internal", 0.5695757575757561),
tests := []struct {
name string
result interface{}
dataType model.ValueType
nodeUsage map[string]int64
err error
}{
{
name: "valid data",
dataType: model.ValVector,
result: model.Vector{
sample("instance:node_cpu:rate:sum", "ip-10-0-51-101.ec2.internal", 0.20381818181818104),
sample("instance:node_cpu:rate:sum", "ip-10-0-17-165.ec2.internal", 0.4245454545454522),
sample("instance:node_cpu:rate:sum", "ip-10-0-94-25.ec2.internal", 0.5695757575757561),
},
nodeUsage: map[string]int64{
"ip-10-0-51-101.ec2.internal": 20,
"ip-10-0-17-165.ec2.internal": 42,
"ip-10-0-94-25.ec2.internal": 56,
},
},
}

clientset := fakeclientset.NewSimpleClientset(n1, n2, n3, p1, p21, p22, p3)

ctx := context.TODO()
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
if err != nil {
t.Fatalf("Build get pods assigned to node function error: %v", err)
}

sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())

prometheusUsageClient := newPrometheusUsageClient(podsAssignedToNode, pClient, "instance:node_cpu:rate:sum")
err = prometheusUsageClient.sync(nodes)
if err != nil {
t.Fatalf("unable to sync prometheus metrics: %v", err)
}

for _, node := range nodes {
nodeUtil := prometheusUsageClient.nodeUtilization(node.Name)
fmt.Printf("nodeUtil[%v]: %v\n", node.Name, nodeUtil)
}

nodeThresholds := NodeThresholds{
lowResourceThreshold: map[v1.ResourceName]*resource.Quantity{
ResourceMetrics: resource.NewQuantity(int64(30), resource.DecimalSI),
{
name: "invalid data missing instance label",
dataType: model.ValVector,
result: model.Vector{
&model.Sample{
Metric: model.Metric{
"__name__": model.LabelValue("instance:node_cpu:rate:sum"),
},
Value: model.SampleValue(0.20381818181818104),
Timestamp: 1728991761711,
},
},
err: fmt.Errorf("The collected metrics sample is missing 'instance' key"),
},
highResourceThreshold: map[v1.ResourceName]*resource.Quantity{
ResourceMetrics: resource.NewQuantity(int64(50), resource.DecimalSI),
{
name: "invalid data value out of range",
dataType: model.ValVector,
result: model.Vector{
sample("instance:node_cpu:rate:sum", "ip-10-0-51-101.ec2.internal", 1.20381818181818104),
},
err: fmt.Errorf("The collected metrics sample for \"ip-10-0-51-101.ec2.internal\" has value 1.203818181818181 outside of <0; 1> interval"),
},
{
name: "invalid data not a vector",
dataType: model.ValScalar,
result: model.Scalar{
Value: model.SampleValue(0.20381818181818104),
Timestamp: 1728991761711,
},
err: fmt.Errorf("expected query results to be of type \"vector\", got \"scalar\" instead"),
},
}

fmt.Printf("nodeThresholds: %#v\n", nodeThresholds)
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
pClient := &fakePromClient{
result: tc.result,
dataType: tc.dataType,
}

clientset := fakeclientset.NewSimpleClientset(n1, n2, n3, p1, p21, p22, p3)

ctx := context.TODO()
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
if err != nil {
t.Fatalf("Build get pods assigned to node function error: %v", err)
}

sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())

prometheusUsageClient := newPrometheusUsageClient(podsAssignedToNode, pClient, "instance:node_cpu:rate:sum")
err = prometheusUsageClient.sync(nodes)
if tc.err == nil {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
} else {
if err == nil {
t.Fatalf("unexpected %q error, got nil instead", tc.err)
} else if err.Error() != tc.err.Error() {
t.Fatalf("expected %q error, got %q instead", tc.err, err)
}
return
}

for _, node := range nodes {
nodeUtil := prometheusUsageClient.nodeUtilization(node.Name)
if nodeUtil[ResourceMetrics].Value() != tc.nodeUsage[node.Name] {
t.Fatalf("expected %q node utilization to be %v, got %v instead", node.Name, tc.nodeUsage[node.Name], nodeUtil[ResourceMetrics])
} else {
t.Logf("%v node utilization: %v", node.Name, nodeUtil[ResourceMetrics])
}
}
})
}
}

0 comments on commit d30044e

Please sign in to comment.