Skip to content

Commit

Permalink
prometheusUsageClient
Browse files Browse the repository at this point in the history
  • Loading branch information
ingvagabund committed Nov 6, 2024
1 parent 59e9554 commit 800c92c
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 19 deletions.
74 changes: 55 additions & 19 deletions pkg/framework/plugins/nodeutilization/lownodeutilization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,28 @@ import (
"testing"
"time"

"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor"
frameworktesting "sigs.k8s.io/descheduler/pkg/framework/testing"
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"

v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
fakeclientset "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"

"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
"sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor"
frameworktesting "sigs.k8s.io/descheduler/pkg/framework/testing"
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
"sigs.k8s.io/descheduler/pkg/utils"
"sigs.k8s.io/descheduler/test"

promapi "github.com/prometheus/client_golang/api"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
)
Expand Down Expand Up @@ -1370,7 +1371,7 @@ func TestLowNodeUtilizationWithTaints(t *testing.T) {
}
}

func TestLowNodeUtilizationWithMetrics(t *testing.T) {
func TestLowNodeUtilizationWithMetricsReal(t *testing.T) {
return
roundTripper := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Expand All @@ -1382,29 +1383,64 @@ func TestLowNodeUtilizationWithMetrics(t *testing.T) {
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}

AuthToken := "eyJhbGciOiJSUzI1NiIsImtpZCI6IkNoTW9tT2w2cWtzR2V0dURZdjBqdnBSdmdWM29lWmc3dWpfNW0yaDc2NHMifQ.eyJhdWQiOlsiaHR0cHM6Ly9rdWJlcm5ldGVzLmRlZmF1bHQuc3ZjIl0sImV4cCI6MTcyODk5MjY3NywiaWF0IjoxNzI4OTg5MDc3LCJpc3MiOiJodHRwczovL2t1YmVybmV0ZXMuZGVmYXVsdC5zdmMiLCJqdGkiOiJkNDY3ZjVmMy0xNGVmLTRkMjItOWJkNC1jMGM1Mzk3NzYyZDgiLCJrdWJlcm5ldGVzLmlvIjp7Im5hbWVzcGFjZSI6Im9wZW5zaGlmdC1tb25pdG9yaW5nIiwic2VydmljZWFjY291bnQiOnsibmFtZSI6InByb21ldGhldXMtazhzIiwidWlkIjoiNjY4NDllMGItYTAwZC00NjUzLWE5NTItNThiYTE1MTk4NTlkIn19LCJuYmYiOjE3Mjg5ODkwNzcsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpvcGVuc2hpZnQtbW9uaXRvcmluZzpwcm9tZXRoZXVzLWs4cyJ9.J1i6-oRAC9J8mqrlZPKGA-CU5PbUzhm2QxAWFnu65-NXR3e252mesybwtjkwxUtTLKrsYHQXwEsG5rGcQsvMcGK9RC9y5z33DFj8tPPwOGLJYJ-s5cTImTqKtWRXzTlcrsrUYTYApfrOsEyXwyfDow4PCslZjR3cd5FMRbvXNqHLg26nG_smApR4wc6kXy7xxlRuGhxu-dUiscQP56njboOK61JdTG8F3FgOayZnKk1jGeVdIhXClqGWJyokk-ZM3mMK1MxzGXY0tLbe37V4B7g3NDiH651BUcicfDSky46yfcAYxMDbZgpK2TByWApAllN0wixz2WsFfyBVu_Q5xtZ9Gi9BUHSa5ioRiBK346W4Bdmr9ala5ldIXDa59YE7UB34DsCHyqvzRx_Sj76hLzy2jSOk7RsL0fM8sDoJL4ROdi-3Jtr5uPY593I8H8qeQvFS6PQfm0bUZqVKrrLoCK_uk9guH4a6K27SlD-Utk3dpsjbmrwcjBxm-zd_LE9YyQ734My00Pcy9D5eNio3gESjGsHqGFc_haq4ZCiVOCkbdmABjpPEL6K7bs1GMZbHt1CONL0-LzymM8vgGNj0grjpG8-5AF8ZuSqR7pbZSV_NO2nKkmrwpILCw0Joqp6V3C9pP9nXWHIDyVMxMK870zxzt_qCoPRLCAujQQn6e0U"
client, err := promapi.NewClient(promapi.Config{
Address: "https://prometheus-k8s-openshift-monitoring.apps.jchaloup-20241015-3.group-b.devcluster.openshift.com",
AuthToken := "XXXX"
promClient, err := promapi.NewClient(promapi.Config{
Address: "https://prometheus-k8s-openshift-monitoring.apps.jchaloup-20241106.group-b.devcluster.openshift.com",
RoundTripper: config.NewAuthorizationCredentialsRoundTripper("Bearer", config.NewInlineSecret(AuthToken), roundTripper),
})
if err != nil {
t.Fatalf("prom client error: %v", err)
}

n1 := test.BuildTestNode("ip-10-0-17-165.ec2.internal", 2000, 3000, 10, nil)
n2 := test.BuildTestNode("ip-10-0-51-101.ec2.internal", 2000, 3000, 10, nil)
n3 := test.BuildTestNode("ip-10-0-94-25.ec2.internal", 2000, 3000, 10, nil)

nodes := []*v1.Node{n1, n2, n3}

p1 := test.BuildTestPod("p1", 400, 0, n1.Name, nil)
p21 := test.BuildTestPod("p21", 400, 0, n2.Name, nil)
p22 := test.BuildTestPod("p22", 400, 0, n2.Name, nil)
p3 := test.BuildTestPod("p3", 400, 0, n3.Name, nil)

clientset := fakeclientset.NewSimpleClientset(n1, n2, n3, p1, p21, p22, p3)

ctx := context.TODO()
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
if err != nil {
t.Fatalf("Build get pods assigned to node function error: %v", err)
}

sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())

prometheusUsageClient := newPrometheusUsageSnapshot(podsAssignedToNode, promClient)
err = prometheusUsageClient.capture(nodes)
if err != nil {
t.Fatalf("unable to capture prometheus metrics: %v", err)
}

for _, node := range nodes {
nodeUtil := prometheusUsageClient.nodeUtilization(node.Name)
fmt.Printf("nodeUtil[%v]: %v\n", node.Name, nodeUtil)
}

// pod:container_cpu_usage:sum
// container_memory_usage_bytes

v1api := promv1.NewAPI(client)
ctx := context.TODO()
// promQuery := "avg_over_time(kube_pod_container_resource_requests[1m])"
promQuery := "kube_pod_container_resource_requests"
results, warnings, err := v1api.Query(ctx, promQuery, time.Now())
fmt.Printf("results: %#v\n", results)
for _, sample := range results.(model.Vector) {
fmt.Printf("sample: %#v\n", sample)
nodeThresholds := NodeThresholds{
lowResourceThreshold: map[v1.ResourceName]*resource.Quantity{
v1.ResourceName("MetricResource"): resource.NewQuantity(int64(300), resource.DecimalSI),
},
highResourceThreshold: map[v1.ResourceName]*resource.Quantity{
v1.ResourceName("MetricResource"): resource.NewQuantity(int64(500), resource.DecimalSI),
},
}
fmt.Printf("warnings: %v\n", warnings)
fmt.Printf("err: %v\n", err)

fmt.Printf("nodeThresholds: %#v\n", nodeThresholds)

result := model.Value(
&model.Vector{
Expand Down
118 changes: 118 additions & 0 deletions pkg/framework/plugins/nodeutilization/usageclients.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@ package nodeutilization

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"

promapi "github.com/prometheus/client_golang/api"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -200,3 +207,114 @@ func (client *actualUsageClient) capture(nodes []*v1.Node) error {

return nil
}

type prometheusUsageClient struct {
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
promClient promapi.Client
promQuery string

_nodes []*v1.Node
_pods map[string][]*v1.Pod
_nodeUtilization map[string]map[v1.ResourceName]*resource.Quantity
}

var _ usageClient = &actualUsageClient{}

func newPrometheusUsageSnapshot(
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc,
promClient promapi.Client,
) *prometheusUsageClient {
return &prometheusUsageClient{
getPodsAssignedToNode: getPodsAssignedToNode,
promClient: promClient,
promQuery: "instance:node_cpu:rate:sum",
}
}

func (client *prometheusUsageClient) nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity {
return client._nodeUtilization[node]
}

func (client *prometheusUsageClient) nodes() []*v1.Node {
return client._nodes
}

func (client *prometheusUsageClient) pods(node string) []*v1.Pod {
return client._pods[node]
}

func (client *prometheusUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) {
return nil, nil
}

type fakePromClient struct {
result interface{}
}

type fakePayload struct {
Status string `json:"status"`
Data queryResult `json:"data"`
}

type queryResult struct {
Type model.ValueType `json:"resultType"`
Result interface{} `json:"result"`
}

func (client *fakePromClient) URL(ep string, args map[string]string) *url.URL {
return &url.URL{}
}
func (client *fakePromClient) Do(ctx context.Context, request *http.Request) (*http.Response, []byte, error) {
jsonData, err := json.Marshal(fakePayload{
Status: "success",
Data: queryResult{
Type: model.ValVector,
Result: client.result,
},
})

return &http.Response{StatusCode: 200}, jsonData, err
}

func (client *prometheusUsageClient) capture(nodes []*v1.Node) error {
client._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity)
client._pods = make(map[string][]*v1.Pod)
capturedNodes := []*v1.Node{}

results, warnings, err := promv1.NewAPI(client.promClient).Query(context.TODO(), client.promQuery, time.Now())
if err != nil {
return fmt.Errorf("unable to capture prometheus metrics: %v", err)
}
if len(warnings) > 0 {
klog.Infof("prometheus metrics warnings: %v", warnings)
}

nodeUsages := make(map[string]map[v1.ResourceName]*resource.Quantity)
for _, sample := range results.(model.Vector) {
// fmt.Printf("sample: %#v\n", sample)
nodeName := string(sample.Metric["instance"])
nodeUsages[nodeName] = map[v1.ResourceName]*resource.Quantity{
v1.ResourceName("MetricResource"): resource.NewQuantity(int64(sample.Value*1000), resource.DecimalSI),
}
}

for _, node := range nodes {
if _, exists := nodeUsages[node.Name]; !exists {
return fmt.Errorf("unable to find metric entry for %v", node.Name)
}
pods, err := podutil.ListPodsOnANode(node.Name, client.getPodsAssignedToNode, nil)
if err != nil {
klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err)
continue
}

// store the snapshot of pods from the same (or the closest) node utilization computation
client._pods[node.Name] = pods
client._nodeUtilization[node.Name] = nodeUsages[node.Name]
capturedNodes = append(capturedNodes, node)
}

client._nodes = capturedNodes

return nil
}
68 changes: 68 additions & 0 deletions pkg/framework/plugins/nodeutilization/usageclients_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
"sigs.k8s.io/descheduler/test"
"github.com/prometheus/common/model"
)

var gvr = schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "nodemetricses"}
Expand Down Expand Up @@ -133,3 +134,70 @@ func TestActualUsageClient(t *testing.T) {
metricsClientset, collector, usageSnapshot, nodes, n2.Name, n2metrics,
)
}

func sample(metricName, nodeName string, value float64) model.Sample {
return model.Sample{
Metric: model.Metric{
"__name__": model.LabelValue(metricName),
"instance": model.LabelValue(nodeName),
},
Value: model.SampleValue(value),
Timestamp: 1728991761711,
}
}

func TestPrometheusUsageClient(t *testing.T) {
n1 := test.BuildTestNode("ip-10-0-17-165.ec2.internal", 2000, 3000, 10, nil)
n2 := test.BuildTestNode("ip-10-0-51-101.ec2.internal", 2000, 3000, 10, nil)
n3 := test.BuildTestNode("ip-10-0-94-25.ec2.internal", 2000, 3000, 10, nil)

nodes := []*v1.Node{n1, n2, n3}

p1 := test.BuildTestPod("p1", 400, 0, n1.Name, nil)
p21 := test.BuildTestPod("p21", 400, 0, n2.Name, nil)
p22 := test.BuildTestPod("p22", 400, 0, n2.Name, nil)
p3 := test.BuildTestPod("p3", 400, 0, n3.Name, nil)

pClient := &fakePromClient{
result: []model.Sample{
sample("instance:node_cpu:rate:sum", "ip-10-0-51-101.ec2.internal", 0.20381818181818104),
sample("instance:node_cpu:rate:sum", "ip-10-0-17-165.ec2.internal", 0.4245454545454522),
sample("instance:node_cpu:rate:sum", "ip-10-0-94-25.ec2.internal", 0.5695757575757561),
},
}

clientset := fakeclientset.NewSimpleClientset(n1, n2, n3, p1, p21, p22, p3)

ctx := context.TODO()
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
if err != nil {
t.Fatalf("Build get pods assigned to node function error: %v", err)
}

sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())

prometheusUsageClient := newPrometheusUsageSnapshot(podsAssignedToNode, pClient)
err = prometheusUsageClient.capture(nodes)
if err != nil {
t.Fatalf("unable to capture prometheus metrics: %v", err)
}

for _, node := range nodes {
nodeUtil := prometheusUsageClient.nodeUtilization(node.Name)
fmt.Printf("nodeUtil[%v]: %v\n", node.Name, nodeUtil)
}

nodeThresholds := NodeThresholds{
lowResourceThreshold: map[v1.ResourceName]*resource.Quantity{
v1.ResourceName("MetricResource"): resource.NewQuantity(int64(300), resource.DecimalSI),
},
highResourceThreshold: map[v1.ResourceName]*resource.Quantity{
v1.ResourceName("MetricResource"): resource.NewQuantity(int64(500), resource.DecimalSI),
},
}

fmt.Printf("nodeThresholds: %#v\n", nodeThresholds)
}

0 comments on commit 800c92c

Please sign in to comment.