diff --git a/.gitignore b/.gitignore index 2ab1e00..7030672 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,7 @@ pkg/*/community-controller pkg/*/system-controller pkg/*/edge-scheduler pkg/*/function-deployment-webhook +pkg/*/cpu-monitoring .idea/ *.pem diff --git a/config/deploy/cpu-monitoring.yaml b/config/deploy/cpu-monitoring.yaml index dd30898..c82fd16 100644 --- a/config/deploy/cpu-monitoring.yaml +++ b/config/deploy/cpu-monitoring.yaml @@ -23,10 +23,10 @@ spec: image: systemautoscaler/cpu-monitoring:dev imagePullPolicy: Always env: - - name: NODE_NAME - valueFrom: - fieldRef: - fieldPath: spec.nodeName + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole diff --git a/config/deploy/monitoring.yaml b/config/deploy/monitoring.yaml index c5bf06f..13dc9a7 100644 --- a/config/deploy/monitoring.yaml +++ b/config/deploy/monitoring.yaml @@ -59,3 +59,13 @@ spec: - name: monitoring image: systemautoscaler/system-autoscaler-monitoring:dev.1 imagePullPolicy: Always + - name: cpu-monitoring + image: systemautoscaler/cpu-monitoring:dev + imagePullPolicy: Always + env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_NAMESPACE + value: "default" \ No newline at end of file diff --git a/go.mod b/go.mod index 374abbd..6e82a2f 100644 --- a/go.mod +++ b/go.mod @@ -5,14 +5,15 @@ go 1.16 require ( github.com/JohnCGriffin/yogofn v0.0.0-20170613212352-43d7b79df9f1 github.com/asecurityteam/rolling v2.0.4+incompatible - github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8 github.com/emicklei/go-restful v2.15.0+incompatible // indirect github.com/emirpasic/gods v1.12.0 github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.19.5 // indirect github.com/go-openapi/swag v0.19.14 // indirect + github.com/google/uuid v1.2.0 // indirect github.com/jackc/pgx/v4 v4.13.0 github.com/jmcvetta/randutil v0.0.0-20150817122601-2bb1b664bcff + github.com/mailru/easyjson v0.7.7 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd github.com/onsi/ginkgo v1.16.4 github.com/onsi/gomega v1.13.0 @@ -20,13 +21,15 @@ require ( github.com/stretchr/testify v1.7.0 golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 // indirect golang.org/x/tools v0.1.5 // indirect - k8s.io/api v0.21.3 + k8s.io/api v0.21.5 k8s.io/apiextensions-apiserver v0.21.3 // indirect - k8s.io/apimachinery v0.21.3 - k8s.io/client-go v0.21.3 - k8s.io/code-generator v0.21.3 + k8s.io/apimachinery v0.21.5 + k8s.io/client-go v0.21.5 + k8s.io/code-generator v0.21.5 + k8s.io/component-base v0.21.5 // indirect k8s.io/klog/v2 v2.9.0 k8s.io/kube-openapi v0.0.0-20210527164424-3c818078ee3d + k8s.io/metrics v0.21.5 k8s.io/utils v0.0.0-20210707171843-4b05e18ac7d9 sigs.k8s.io/controller-runtime v0.9.3 diff --git a/go.sum b/go.sum index 116941a..82eae9a 100644 --- a/go.sum +++ b/go.sum @@ -79,8 +79,6 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= -github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8 h1:SjZ2GvvOononHOpK84APFuMvxqsk3tEIaKH/z4Rpu3g= -github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8/go.mod h1:uEyr4WpAH4hio6LFriaPkL938XnrvLpNPmQHBdrmbIE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= @@ -242,8 +240,9 @@ github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hf github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= @@ -385,8 +384,9 @@ github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs= -github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA= github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= @@ -952,28 +952,33 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= k8s.io/api v0.18.2/go.mod h1:SJCWI7OLzhZSvbY7U8zwNl9UA4o1fizoug34OV/2r78= k8s.io/api v0.21.2/go.mod h1:Lv6UGJZ1rlMI1qusN8ruAp9PUBFyBwpEHAdG24vIsiU= -k8s.io/api v0.21.3 h1:cblWILbLO8ar+Fj6xdDGr603HRsf8Wu9E9rngJeprZQ= k8s.io/api v0.21.3/go.mod h1:hUgeYHUbBp23Ue4qdX9tR8/ANi/g3ehylAqDn9NWVOg= +k8s.io/api v0.21.5 h1:9zp3SslPRB+rqxhGKqqTo6VsN3HX0Ype1nWV6UQQ+Sk= +k8s.io/api v0.21.5/go.mod h1:Un8C5Hemo2r3MfPOjZvwQQ9KkBbiTBUCGrjlivo9uJ0= k8s.io/apiextensions-apiserver v0.21.2/go.mod h1:+Axoz5/l3AYpGLlhJDfcVQzCerVYq3K3CvDMvw6X1RA= k8s.io/apiextensions-apiserver v0.21.3 h1:+B6biyUWpqt41kz5x6peIsljlsuwvNAp/oFax/j2/aY= k8s.io/apiextensions-apiserver v0.21.3/go.mod h1:kl6dap3Gd45+21Jnh6utCx8Z2xxLm8LGDkprcd+KbsE= k8s.io/apimachinery v0.18.2/go.mod h1:9SnR/e11v5IbyPCGbvJViimtJ0SwHG4nfZFjU77ftcA= k8s.io/apimachinery v0.21.2/go.mod h1:CdTY8fU/BlvAbJ2z/8kBwimGki5Zp8/fbVuLY8gJumM= -k8s.io/apimachinery v0.21.3 h1:3Ju4nvjCngxxMYby0BimUk+pQHPOQp3eCGChk5kfVII= k8s.io/apimachinery v0.21.3/go.mod h1:H/IM+5vH9kZRNJ4l3x/fXP/5bOPJaVP/guptnZPeCFI= +k8s.io/apimachinery v0.21.5 h1:56bnsHcUNboSCbD779GGi4Lh5kHTDFUoDrnHbhLTiaw= +k8s.io/apimachinery v0.21.5/go.mod h1:3PfBV+4PPXNs0aueD+7fHcGyhdkFFYqXeshQtsKCi+4= k8s.io/apiserver v0.21.2/go.mod h1:lN4yBoGyiNT7SC1dmNk0ue6a5Wi6O3SWOIw91TsucQw= k8s.io/apiserver v0.21.3/go.mod h1:eDPWlZG6/cCCMj/JBcEpDoK+I+6i3r9GsChYBHSbAzU= k8s.io/client-go v0.18.2/go.mod h1:Xcm5wVGXX9HAA2JJ2sSBUn3tCJ+4SVlCbl2MNNv+CIU= k8s.io/client-go v0.21.2/go.mod h1:HdJ9iknWpbl3vMGtib6T2PyI/VYxiZfq936WNVHBRrA= -k8s.io/client-go v0.21.3 h1:J9nxZTOmvkInRDCzcSNQmPJbDYN/PjlxXT9Mos3HcLg= k8s.io/client-go v0.21.3/go.mod h1:+VPhCgTsaFmGILxR/7E1N0S+ryO010QBeNCv5JwRGYU= +k8s.io/client-go v0.21.5 h1:zkVidiWVgciPKYqWpMFMjCUF+4rRXcfkKoyQS1Ue21k= +k8s.io/client-go v0.21.5/go.mod h1:EUornVlr3rBrPKXUoMPNggJdEQmvFNMpYO3Kb6432kw= k8s.io/code-generator v0.18.2/go.mod h1:+UHX5rSbxmR8kzS+FAv7um6dtYrZokQvjHpDSYRVkTc= k8s.io/code-generator v0.21.2/go.mod h1:8mXJDCB7HcRo1xiEQstcguZkbxZaqeUOrO9SsicWs3U= -k8s.io/code-generator v0.21.3 h1:K2Onrjuve/31D4Y5DpR9ngWM2BiiKUxrGaCxSEJS/Y8= k8s.io/code-generator v0.21.3/go.mod h1:K3y0Bv9Cz2cOW2vXUrNZlFbflhuPvuadW6JdnN6gGKo= +k8s.io/code-generator v0.21.5 h1:7X6dJG4hzKFHChYpP02iF0XrXhenqQHc76QoKYzDZfI= +k8s.io/code-generator v0.21.5/go.mod h1:0K1k6o2ef8JD/j8LF3ZuqWLGFMHvO5psNzLLmxf7ZVE= k8s.io/component-base v0.21.2/go.mod h1:9lvmIThzdlrJj5Hp8Z/TOgIkdfsNARQ1pT+3PByuiuc= -k8s.io/component-base v0.21.3 h1:4WuuXY3Npa+iFfi2aDRiOz+anhNvRfye0859ZgfC5Og= k8s.io/component-base v0.21.3/go.mod h1:kkuhtfEHeZM6LkX0saqSK8PbdO7A0HigUngmhhrwfGQ= +k8s.io/component-base v0.21.5 h1:icFqcFDrO9S+FQpGohzVm6qce9vlo131K0r3NhElxiQ= +k8s.io/component-base v0.21.5/go.mod h1:UyRaqQfPkBL/haEFaMWgVQvtom5TqAT+jqlFGlh6LuU= k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/gengo v0.0.0-20200114144118-36b2048a9120/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= @@ -992,6 +997,8 @@ k8s.io/kube-openapi v0.0.0-20200121204235-bf4fb3bd569c/go.mod h1:GRQhZsXIAJ1xR0C k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7/go.mod h1:wXW5VT87nVfh/iLV8FpR2uDvrFyomxbtb1KivDbvPTE= k8s.io/kube-openapi v0.0.0-20210527164424-3c818078ee3d h1:lUK8GPtuJy8ClWZhuvKoaLdKGPLq9H1PxWp7VPBZBkU= k8s.io/kube-openapi v0.0.0-20210527164424-3c818078ee3d/go.mod h1:vHXdDvt9+2spS2Rx9ql3I8tycm3H9FDfdUoIuKCefvw= +k8s.io/metrics v0.21.5 h1:lcczsNOCbwG4OKUbxuklK9fv+WQA0rCxLH86rNdAgq4= +k8s.io/metrics v0.21.5/go.mod h1:Ew+6obDfJiQVsi6J2NkoI5jNMio/CCPC5v3pLXH8vos= k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew= k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20210527160623-6fdb442a123b/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= diff --git a/pkg/apiutils/getter.go b/pkg/apiutils/getter.go index c86c7f7..ca6241a 100644 --- a/pkg/apiutils/getter.go +++ b/pkg/apiutils/getter.go @@ -15,9 +15,9 @@ import ( ) type ResourceGetter struct { - pods func(namespace string) corelisters.PodNamespaceLister - functions func(namespace string) openfaaslisters.FunctionNamespaceLister - nodes corelisters.NodeLister + Pods func(namespace string) corelisters.PodNamespaceLister + Functions func(namespace string) openfaaslisters.FunctionNamespaceLister + Nodes corelisters.NodeLister } func NewResourceGetter( @@ -26,9 +26,9 @@ func NewResourceGetter( nodes corelisters.NodeLister, ) *ResourceGetter { return &ResourceGetter{ - pods: pods, - functions: functions, - nodes: nodes, + Pods: pods, + Functions: functions, + Nodes: nodes, } } @@ -40,10 +40,10 @@ func (r *ResourceGetter) GetPodsOfFunctionInNode(function *openfaasv1.Function, ealabels.FunctionNameLabel: function.Name, ealabels.NodeLabel: nodeName, }) - return r.pods(function.Namespace).List(selector) + return r.Pods(function.Namespace).List(selector) } -func (r *ResourceGetter) GetNodeDelays(client *delayclient.SQLDelayClient, nodes []string) ([][]int64, error) { +func (r *ResourceGetter) GetNodeDelays(client delayclient.DelayClient, nodes []string) ([][]int64, error) { nodeMapping := make(map[string]int, len(nodes)) for i, node := range nodes { nodeMapping[node] = i @@ -74,13 +74,13 @@ func (r *ResourceGetter) GetWorkload(community, communityNamespace string) ([][] map[string]string{ ealabels.CommunityLabel.WithNamespace(communityNamespace).String(): community, }) - nodes, err := r.nodes.List(nodeSelector) + nodes, err := r.Nodes.List(nodeSelector) if err != nil { return nil, fmt.Errorf("failed to retrieve nodes using selector %s with error: %s", nodeSelector, err) } // Retrieve the functions - functions, err := r.functions(communityNamespace).List(labels.Everything()) + functions, err := r.Functions(communityNamespace).List(labels.Everything()) if err != nil { return nil, fmt.Errorf("failed to retrieve functions with error: %s", err) } @@ -96,7 +96,7 @@ func (r *ResourceGetter) GetWorkload(community, communityNamespace string) ([][] func (r *ResourceGetter) GetMaxDelays(communityNamespace string) ([]int64, error) { // Retrieve the functions - functions, err := r.functions(communityNamespace).List(labels.Everything()) + functions, err := r.Functions(communityNamespace).List(labels.Everything()) if err != nil { return nil, fmt.Errorf("failed to retrieve functions with error: %s", err) } diff --git a/pkg/apiutils/listers.go b/pkg/apiutils/listers.go new file mode 100644 index 0000000..8b46328 --- /dev/null +++ b/pkg/apiutils/listers.go @@ -0,0 +1,89 @@ +package apiutils + +import ( + "fmt" + + ealabels "github.com/lterrac/edge-autoscaler/pkg/labels" + openfaasv1 "github.com/openfaas/faas-netes/pkg/apis/openfaas/v1" + openfaaslisters "github.com/openfaas/faas-netes/pkg/client/listers/openfaas/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + corelisters "k8s.io/client-go/listers/core/v1" +) + +type PodGetter interface { + Pods(namespace string) corelisters.PodNamespaceLister + GetPodsOfFunctionInNode(function *openfaasv1.Function, nodeName string) ([]*corev1.Pod, error) + GetPodsOfAllFunctionInNode(namespace, nodeName string) ([]*corev1.Pod, error) +} + +type NodeGetter interface { + Nodes() corelisters.NodeLister +} + +type FunctionGetter interface { + Functions(namespace string) openfaaslisters.FunctionNamespaceLister +} + +func NewPodGetter( + pods func(namespace string) corelisters.PodNamespaceLister, +) (PodGetter, error) { + + if pods == nil { + return nil, fmt.Errorf("pod lister not set in listers struct") + } + + return &listers{ + pods: pods, + }, nil +} + +type listers struct { + pods func(namespace string) corelisters.PodNamespaceLister + functions func(namespace string) openfaaslisters.FunctionNamespaceLister + nodes corelisters.NodeLister +} + +func NewListers( + pods func(namespace string) corelisters.PodNamespaceLister, + functions func(namespace string) openfaaslisters.FunctionNamespaceLister, + nodes corelisters.NodeLister, +) *listers { + return &listers{ + pods: pods, + functions: functions, + nodes: nodes, + } +} + +func (l listers) Pods(namespace string) corelisters.PodNamespaceLister { + return l.pods(namespace) +} + +func (l listers) Functions(namespace string) openfaaslisters.FunctionNamespaceLister { + return l.functions(namespace) +} + +func (l listers) Nodes() corelisters.NodeLister { + return l.nodes +} + +// GetPodsOfAllFunctionInNode returns a list of pods managed by a community controller +func (l listers) GetPodsOfAllFunctionInNode(namespace, nodeName string) ([]*corev1.Pod, error) { + selector := labels.SelectorFromSet( + map[string]string{ + ealabels.NodeLabel: nodeName, + }) + return l.pods(namespace).List(selector) +} + +// GetPodsOfFunctionInNode returns a list of pods which is related to a given function and are running in a given node +func (r listers) GetPodsOfFunctionInNode(function *openfaasv1.Function, nodeName string) ([]*corev1.Pod, error) { + selector := labels.SelectorFromSet( + map[string]string{ + ealabels.FunctionNamespaceLabel: function.Namespace, + ealabels.FunctionNameLabel: function.Name, + ealabels.NodeLabel: nodeName, + }) + return r.Pods(function.Namespace).List(selector) +} diff --git a/pkg/community-controller/pkg/controller/sync.go b/pkg/community-controller/pkg/controller/sync.go index 92bdb11..70c131d 100644 --- a/pkg/community-controller/pkg/controller/sync.go +++ b/pkg/community-controller/pkg/controller/sync.go @@ -3,6 +3,8 @@ package controller import ( "context" "fmt" + rand "math/rand" + "github.com/lterrac/edge-autoscaler/pkg/apis/edgeautoscaler/v1alpha1" ealabels "github.com/lterrac/edge-autoscaler/pkg/labels" openfaasv1 "github.com/openfaas/faas-netes/pkg/apis/openfaas/v1" @@ -14,13 +16,15 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" - rand "math/rand" ) const ( - HttpMetricsPort = 8080 - HttpMetricsCpu = 100 - HttpMetricsMemory = 150000000 + HttpMetricsImage = "systemautoscaler/http-metrics" + HttpMetrics = "http-metrics" + HttpMetricsVersion = "0.1.0" + HttpMetricsPort = 8080 + HttpMetricsCpu = 100 + HttpMetricsMemory = 200000000 ) func (c *CommunityController) runScheduler(_ string) error { @@ -257,7 +261,7 @@ func newPod(function *openfaasv1.Function, cs *v1alpha1.CommunitySchedule, node }, { Name: "http-metrics", - Image: "systemautoscaler/http-metrics:0.1.0", + Image: fmt.Sprintf("%s:%s", HttpMetricsImage, HttpMetricsVersion), Ports: []corev1.ContainerPort{ {ContainerPort: int32(8000), Protocol: corev1.ProtocolTCP}, }, diff --git a/pkg/cpu-monitoring/Dockerfile b/pkg/cpu-monitoring/Dockerfile new file mode 100644 index 0000000..aeb55af --- /dev/null +++ b/pkg/cpu-monitoring/Dockerfile @@ -0,0 +1,7 @@ +FROM alpine:3.14.0 + +LABEL name="cpu-monitoring" + +COPY cpu-monitoring /usr/local/bin/ + +CMD ["cpu-monitoring"] diff --git a/pkg/cpu-monitoring/Makefile b/pkg/cpu-monitoring/Makefile new file mode 100644 index 0000000..550e52c --- /dev/null +++ b/pkg/cpu-monitoring/Makefile @@ -0,0 +1,48 @@ +BUILD_SETTINGS = CGO_ENABLED=0 GOOS=linux GOARCH=amd64 +IMAGE = cpu-monitoring +IMAGE_VERSION = $(shell git tag --points-at HEAD | sed '/$(IMAGE)\/.*/!s/.*//' | sed 's/\//:/') +REPO = systemautoscaler + + +.PHONY: all build coverage clean e2e fmt release test vet + +all: build test coverage clean + +build: fmt vet test + $(BUILD_SETTINGS) go build -trimpath -o "$(IMAGE)" ./main.go + +fmt: + @go fmt ./... + +test: + @go test -race $(shell go list ./... | grep -v e2e) --coverprofile=coverage.out + +e2e: + @go test -race $(shell go list ./... | grep e2e) + +coverage: test + @go tool cover -func=coverage.out + +release: + @if [ -n "$(IMAGE_VERSION)" ]; then \ + echo "Building $(IMAGE_VERSION)" ;\ + docker build -t $(REPO)/$(IMAGE_VERSION) . ;\ + docker push $(REPO)/$(IMAGE_VERSION) ;\ + else \ + echo "$(IMAGE) unchanged: no version tag on HEAD commit" ;\ + fi + +vet: + @go vet ./... + +clean: + @rm -rf ./$(IMAGE) + @go clean -cache + @rm -rf *.out + +dev: + echo "Building dev release"; \ + $(BUILD_SETTINGS) go build -trimpath -o "$(IMAGE)" ./main.go; \ + docker build -t $(REPO)/$(IMAGE):dev . ;\ + docker push $(REPO)/$(IMAGE):dev ; \ + echo "Dev release built and pushed"; \ diff --git a/pkg/cpu-monitoring/README.md b/pkg/cpu-monitoring/README.md new file mode 100644 index 0000000..d449a2c --- /dev/null +++ b/pkg/cpu-monitoring/README.md @@ -0,0 +1,3 @@ +# CPU-monitoring + +This component is deployed as a DaemonSet and scrapes container metrics from cAdvisor for Pods managed by edge-autoscaler \ No newline at end of file diff --git a/pkg/cpu-monitoring/main.go b/pkg/cpu-monitoring/main.go index 0538664..fae41e7 100644 --- a/pkg/cpu-monitoring/main.go +++ b/pkg/cpu-monitoring/main.go @@ -1,26 +1,87 @@ package main import ( - linuxproc "github.com/c9s/goprocinfo/linux" + "context" + "flag" + "os" + "time" + + "github.com/lterrac/edge-autoscaler/pkg/cpu-monitoring/pkg/scraper" + + "github.com/lterrac/edge-autoscaler/pkg/signals" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" + metrics "k8s.io/metrics/pkg/client/clientset/versioned" +) + +var ( + masterURL string + kubeconfig string + node string ) func main() { - stat, err := linuxproc.ReadStat("/proc/stat") + klog.InitFlags(nil) + flag.Parse() + + // set up signals so we handle the first shutdown signal gracefully + stopCh := signals.SetupSignalHandler() + + cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) + if err != nil { + klog.Fatalf("Error building kubeconfig: %s", err.Error()) + } + + kubernetesClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + klog.Fatalf("Error building example clientset: %s", err.Error()) + } + + coreInformerFactory := informers.NewSharedInformerFactory(kubernetesClient, time.Second*30) + + node = getenv("NODE_NAME", "") + + podsCache := coreInformerFactory.Core().V1().Pods() + + node, err := kubernetesClient.CoreV1().Nodes().Get(context.TODO(), node, metav1.GetOptions{}) + if err != nil { - klog.Fatal("stat read fail") + klog.Fatalf("Error getting node: %s", err.Error()) } - klog.Infof("%+v", stat.CPUStatAll) - //klog.Infof("%+v", stat.CPUStats) - klog.Infof("%+v", stat.Processes) - klog.Infof("%+v", stat.BootTime) + mc := metrics.NewForConfigOrDie(cfg) + + cpuScraper, err := scraper.New(podsCache.Lister().Pods, mc.MetricsV1beta1(), node) + + if err != nil { + klog.Fatalf("Error creating CPU scraper: %s", err.Error()) + } - s := stat.CPUStatAll - nonIdle := float64(s.User + s.Nice + s.System + s.IRQ + s.SoftIRQ + s.Steal) - total := float64(s.Idle) + nonIdle + klog.Info("starting informer factory") - klog.Info(nonIdle/total) + coreInformerFactory.Start(stopCh) + if err = cpuScraper.Start(stopCh, podsCache.Informer().HasSynced); err != nil { + klog.Fatalf("Error running cpu scraper: %s", err.Error()) + } + defer cpuScraper.Stop() + + <-stopCh + klog.Info("Shutting down workers") +} + +func init() { + flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") + flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") } +func getenv(key, fallback string) string { + value := os.Getenv(key) + if len(value) == 0 { + return fallback + } + return value +} diff --git a/pkg/cpu-monitoring/pkg/persistor/persistor.go b/pkg/cpu-monitoring/pkg/persistor/persistor.go new file mode 100644 index 0000000..939a22f --- /dev/null +++ b/pkg/cpu-monitoring/pkg/persistor/persistor.go @@ -0,0 +1,154 @@ +package persistor + +import ( + "context" + + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + "github.com/lterrac/edge-autoscaler/pkg/db" + "github.com/lterrac/edge-autoscaler/pkg/metrics" + "k8s.io/klog/v2" +) + +var ( + columns = []string{ + "timestamp", + "node", + "function", + "namespace", + "community", + "cores", + } +) + +const ( + // InsertMetricQuery is the prepare statement for inserting metrics. + InsertMetricQuery = "INSERT INTO resource (timestamp, node, function, namespace, community, cores) VALUES ($1, $2, $3, $4, $5, $6);" + batchSize = 1000 + table = "resource" +) + +type Persistor interface { + // Save incoming data in a database. + Persist() + // SetupDBConnection creates a new connection to the database using the provided options. + SetupDBConnection() error + // Stop closes the connection to the database. + Stop() +} + +// ResourcePersistor receives metrics from the load balancer and persists them to a backend. +// The initial implementation is a simple client that connects to a TimescaleDB backend. +type ResourcePersistor struct { + pool *pgxpool.Pool + resourceChan <-chan metrics.RawResourceData + opts db.Options + ctx context.Context +} + +// NewResourcePersistor creates a new ResourcePersistor. +func NewResourcePersistor(opts db.Options, rawResourceChan <-chan metrics.RawResourceData) Persistor { + return &ResourcePersistor{ + opts: opts, + resourceChan: rawResourceChan, + } +} + +// SetupDBConnection creates a new connection to the database using the provided options. +func (p *ResourcePersistor) SetupDBConnection() error { + var config *pgxpool.Config + var err error + + config, err = pgxpool.ParseConfig(p.opts.ConnString()) + + if err != nil { + return err + } + + p.pool, err = pgxpool.ConnectConfig(context.Background(), config) + + if err != nil { + return err + } + + return nil +} + +// Stop closes the connection to the database. +func (p *ResourcePersistor) Stop() { + p.ctx.Done() + p.pool.Close() +} + +// Persist receives metrics from the cpu scraper and persists them into a database until the chan is closed. +// It spawns the first polling goroutine which always listen for new data. +func (p *ResourcePersistor) Persist() { + var cancel context.CancelFunc + + // use context to terminate all routines + p.ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + + klog.Infof("start metrics collection") + + p.batchData(false) + + klog.Infof("stop metrics collection") + +} + +// Many goroutines are spawned to handle the incoming data if the existing ones are polling slowly. +// A routine persists the data when a batch is full or when all data from the chan are polled. +func (p *ResourcePersistor) batchData(terminate bool) { + var batch []metrics.RawResourceData + var err error + batch = make([]metrics.RawResourceData, 0, batchSize) + + for { + select { + case <-p.ctx.Done(): + return + case m, ok := <-p.resourceChan: + if !ok { + p.Stop() + return + } + + batch = append(batch, m) + + if !terminate && len(p.resourceChan) > batchSize/4 { + klog.Infof("create new polling routine") + go p.batchData(true) + } + + if len(batch) == batchSize || len(p.resourceChan) == 0 { + err = p.save(batch) + if err != nil { + klog.Errorf("failed to persist resource data %v error: %s\n", m, err) + } + } + + if terminate { + break + } + + batch = make([]metrics.RawResourceData, 0, batchSize) + } + + } +} + +func (p *ResourcePersistor) save(batch []metrics.RawResourceData) error { + + klog.Info("persisting:") + + for _, e := range batch { + klog.Infof("%v\n", e) + } + + _, err := p.pool.CopyFrom(context.TODO(), pgx.Identifier{table}, columns, pgx.CopyFromSlice(len(batch), func(i int) ([]interface{}, error) { + return batch[i].AsCopy(), nil + })) + + return err +} diff --git a/pkg/cpu-monitoring/pkg/scraper/scraper.go b/pkg/cpu-monitoring/pkg/scraper/scraper.go new file mode 100644 index 0000000..4a8d0a6 --- /dev/null +++ b/pkg/cpu-monitoring/pkg/scraper/scraper.go @@ -0,0 +1,135 @@ +package scraper + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/lterrac/edge-autoscaler/pkg/apiutils" + cc "github.com/lterrac/edge-autoscaler/pkg/community-controller/pkg/controller" + "github.com/lterrac/edge-autoscaler/pkg/cpu-monitoring/pkg/persistor" + "github.com/lterrac/edge-autoscaler/pkg/db" + ealabels "github.com/lterrac/edge-autoscaler/pkg/labels" + "github.com/lterrac/edge-autoscaler/pkg/metrics" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1" + + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" +) + +type Scraper interface { + // start scraping CPU usage of pods running on the node + Start(stopCh <-chan struct{}, hasSynced cache.InformerSynced) error + Stop() +} + +type scraper struct { + pods apiutils.PodGetter + metrics v1beta1.MetricsV1beta1Interface + persistor persistor.Persistor + resourceChan chan metrics.RawResourceData + node string +} + +func New(pods func(namespace string) corelisters.PodNamespaceLister, metricsClient v1beta1.MetricsV1beta1Interface, node *corev1.Node) (Scraper, error) { + podGetter, err := apiutils.NewPodGetter(pods) + + if err != nil { + return nil, fmt.Errorf("failed to create pod getter: %v", err) + } + + resourceChan := make(chan metrics.RawResourceData, 100) + persistor := persistor.NewResourcePersistor(db.NewDBOptions(), resourceChan) + + err = persistor.SetupDBConnection() + + if err != nil { + return nil, fmt.Errorf("failed to connect to the database: %v", err) + } + + return &scraper{ + pods: podGetter, + metrics: metricsClient, + persistor: persistor, + resourceChan: resourceChan, + node: node.Name, + }, nil +} + +func (s *scraper) Start(stopCh <-chan struct{}, hasSynced cache.InformerSynced) error { + klog.Info("wait for cache to sync") + + if ok := cache.WaitForCacheSync( + stopCh, + hasSynced, + ); !ok { + return fmt.Errorf("failed to wait for caches to sync") + } + + go s.persistor.Persist() + + klog.Info("start cpu scraping") + + // TODO tune the frequency + go wait.Until(s.scrape, 5*time.Second, stopCh) + + return nil +} + +func (s *scraper) Stop() { + s.persistor.Stop() +} + +func (s *scraper) scrape() { + pods, err := s.pods.GetPodsOfAllFunctionInNode(corev1.NamespaceAll, s.node) + + klog.Infof("scraping %d pods", len(pods)) + + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to retrieve pods in node %v: %v", s.node, err)) + return + } + + var total int64 + + for _, p := range pods { + + total = 0 + + m, err := s.metrics.PodMetricses(p.Namespace).Get(context.TODO(), p.Name, metav1.GetOptions{}) + + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to retrieve metrics for pod %v: %v", p.Name, err)) + continue + } + + klog.Infof("scraping pod %v", p.Name) + + for _, c := range m.Containers { + //Http metrics proxy should not be considere in CPU usage + if strings.Contains(c.Name, cc.HttpMetrics) { + continue + } + + klog.Infof("container %v usage cpu %v", c.Name, c.Usage.Cpu().MilliValue()) + total += c.Usage.Cpu().MilliValue() + } + + namespace := p.Labels[ealabels.FunctionNamespaceLabel] + // save to metrics database + s.resourceChan <- metrics.RawResourceData{ + Timestamp: time.Now(), + Node: s.node, + Function: p.Labels[ealabels.FunctionNameLabel], + Namespace: namespace, + Community: p.Labels[ealabels.CommunityLabel.WithNamespace(namespace).String()], + Cores: total, + } + } +} diff --git a/pkg/metrics/datastore-metrics.go b/pkg/metrics/datastore-metrics.go index 95158c2..7644936 100644 --- a/pkg/metrics/datastore-metrics.go +++ b/pkg/metrics/datastore-metrics.go @@ -9,6 +9,8 @@ import ( "github.com/asecurityteam/rolling" ) +// Deprecated: The whole package is deprecated. + // ExposedMetrics is a struct that wraps the exposed metrics type ExposedMetrics struct { ResponseTime float64 `json:"response-time"` diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 1165144..e931a07 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -43,7 +43,7 @@ type RawResourceData struct { Function string Namespace string Community string - Cores int + Cores int64 } func (r RawResourceData) AsCopy() []interface{} { diff --git a/pkg/system-controller/pkg/controller/sync.go b/pkg/system-controller/pkg/controller/sync.go index e0cbf4e..a13ecbe 100644 --- a/pkg/system-controller/pkg/controller/sync.go +++ b/pkg/system-controller/pkg/controller/sync.go @@ -3,6 +3,7 @@ package controller import ( "context" "fmt" + "github.com/lterrac/edge-autoscaler/pkg/apiutils" "k8s.io/apimachinery/pkg/api/resource" @@ -27,8 +28,8 @@ import ( const ( // EmptyNodeListError is the default error message when grouping cluster nodes EmptyNodeListError string = "there are no or too few ready nodes for building communities" - ComControllerCpu = 200 - ComControllerMemory = 200000000 + ComControllerCpu int64 = 200 + ComControllerMemory int64 = 200000000 ) // TODO: better error handling @@ -368,7 +369,7 @@ func NewCommunityController(namespace, name string, conf *eav1alpha1.CommunityCo func diff(expected, actual []string) (createSet, deleteSet []string) { - actualMap := make(map[string]bool, 0) + actualMap := make(map[string]bool) deleteSet = make([]string, 0) createSet = make([]string, 0) @@ -385,7 +386,7 @@ func diff(expected, actual []string) (createSet, deleteSet []string) { } } - for e, _ := range actualMap { + for e := range actualMap { deleteSet = append(deleteSet, e) } diff --git a/pkg/system-controller/pkg/controller/system-controller.go b/pkg/system-controller/pkg/controller/system-controller.go index 3ad99d5..e963d95 100644 --- a/pkg/system-controller/pkg/controller/system-controller.go +++ b/pkg/system-controller/pkg/controller/system-controller.go @@ -49,7 +49,7 @@ type SystemController struct { communityUpdater *CommunityUpdater // delayClient retrieves the delay matrix - delayClient *delayclient.SQLDelayClient + delayClient delayclient.DelayClient listers informers.Listers @@ -75,7 +75,7 @@ func NewController( informers informers.Informers, communityUpdater *CommunityUpdater, communityGetter slpaClient.ClientCommunityGetter, - delayClient *delayclient.SQLDelayClient, + delayClient delayclient.DelayClient, ) *SystemController { // Create event broadcaster @@ -154,20 +154,15 @@ func (c *SystemController) runStandardWorker() { } } +func (c *SystemController) runPerformanceDegradationObserver() { +} + // handles standard partitioning (e.g. first partioning and cache sync) func (c *SystemController) runSyncSchedulesWorker() { for c.syncSchedulesWorkqueue.ProcessNextItem(c.syncCommunitySchedules) { } } -// control loop to handle performance degradation inside communities -func (c *SystemController) runPerformanceDegradationObserver() { -} - -// control loop to handle cluster topology changes -func (c *SystemController) runTopologyObserver() { -} - // Shutdown is called when the controller has finished its work func (c *SystemController) Shutdown() { utilruntime.HandleCrash() diff --git a/pkg/system-controller/pkg/controller/update-communities.go b/pkg/system-controller/pkg/controller/update-communities.go index c1e0fd9..130c319 100644 --- a/pkg/system-controller/pkg/controller/update-communities.go +++ b/pkg/system-controller/pkg/controller/update-communities.go @@ -83,7 +83,7 @@ func (c *CommunityUpdater) UpdateCommunityNodes(namespace string, communities [] var ok bool if node, ok = nodeMap[member.Name]; !ok { - utilruntime.HandleError(fmt.Errorf("Node %s not in node map", member.Name)) + utilruntime.HandleError(fmt.Errorf("node %s not in node map", member.Name)) continue } @@ -98,7 +98,7 @@ func (c *CommunityUpdater) UpdateCommunityNodes(namespace string, communities [] _, err := c.updateNode(context.TODO(), node, v1.UpdateOptions{}) if err != nil { - utilruntime.HandleError(fmt.Errorf("Error while updating Node %s: %s", member.Name, err)) + utilruntime.HandleError(fmt.Errorf("error while updating Node %s: %s", member.Name, err)) return err } @@ -122,7 +122,7 @@ func (c *CommunityUpdater) UpdateCommunityNodes(namespace string, communities [] _, err := c.updateNode(context.TODO(), node, v1.UpdateOptions{}) if err != nil { - utilruntime.HandleError(fmt.Errorf("Error while deleting label from node Node %s: %s", node.Name, err)) + utilruntime.HandleError(fmt.Errorf("error while deleting label from node Node %s: %s", node.Name, err)) return err } @@ -141,14 +141,13 @@ func (c *CommunityUpdater) ClearNodesLabels(namespace string) error { } for _, node := range clusterNodes { - if _, ok := node.Labels[ealabels.CommunityLabel.WithNamespace(namespace).String()]; ok { - delete(node.Labels, ealabels.CommunityLabel.WithNamespace(namespace).String()) - } + + delete(node.Labels, ealabels.CommunityLabel.WithNamespace(namespace).String()) _, err = c.updateNode(context.TODO(), node, v1.UpdateOptions{}) if err != nil { - utilruntime.HandleError(fmt.Errorf("Error while deleting label from node Node %s: %s", node.Name, err)) + utilruntime.HandleError(fmt.Errorf("error while deleting label from node Node %s: %s", node.Name, err)) return err } } diff --git a/pkg/system-controller/pkg/controller/update-communities_test.go b/pkg/system-controller/pkg/controller/update-communities_test.go index 54bc57a..b5c5883 100644 --- a/pkg/system-controller/pkg/controller/update-communities_test.go +++ b/pkg/system-controller/pkg/controller/update-communities_test.go @@ -4,7 +4,6 @@ import ( "context" "testing" - eav1alpha1 "github.com/lterrac/edge-autoscaler/pkg/apis/edgeautoscaler/v1alpha1" eafake "github.com/lterrac/edge-autoscaler/pkg/generated/clientset/versioned/fake" ealabels "github.com/lterrac/edge-autoscaler/pkg/labels" "github.com/lterrac/edge-autoscaler/pkg/system-controller/pkg/slpaclient" @@ -30,11 +29,6 @@ var notInCommunityMeta = v1.ObjectMeta{ }, } -type NodeWithNoLabel struct { - update UpdateNodeFunc - list ListNodeFunc -} - func updateNode(ctx context.Context, node *corev1.Node, opts v1.UpdateOptions) (*corev1.Node, error) { return &corev1.Node{ ObjectMeta: v1.ObjectMeta{ @@ -44,10 +38,6 @@ func updateNode(ctx context.Context, node *corev1.Node, opts v1.UpdateOptions) ( }, nil } -func updateStatus(ctx context.Context, cc *eav1alpha1.CommunityConfiguration, opts v1.UpdateOptions) (*eav1alpha1.CommunityConfiguration, error) { - return cc, nil -} - func listNodeWithNoLabel(selector labels.Selector) (ret []*corev1.Node, err error) { return []*corev1.Node{ { diff --git a/pkg/system-controller/pkg/delayclient/client.go b/pkg/system-controller/pkg/delayclient/client.go index c3c371d..fd116d5 100644 --- a/pkg/system-controller/pkg/delayclient/client.go +++ b/pkg/system-controller/pkg/delayclient/client.go @@ -2,6 +2,7 @@ package delayclient import ( "context" + "github.com/jackc/pgx/v4/pgxpool" "github.com/lterrac/edge-autoscaler/pkg/db" "k8s.io/klog/v2" diff --git a/pkg/system-controller/pkg/delayclient/fake.go b/pkg/system-controller/pkg/delayclient/fake.go new file mode 100644 index 0000000..197fa25 --- /dev/null +++ b/pkg/system-controller/pkg/delayclient/fake.go @@ -0,0 +1,38 @@ +package delayclient + +import ( + "github.com/lterrac/edge-autoscaler/pkg/informers" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/klog/v2" +) + +type FakeDelayClient struct { + listers informers.Listers +} + +func NewFakeClient(l informers.Listers) *FakeDelayClient { + return &FakeDelayClient{ + listers: l, + } +} + +func (f FakeDelayClient) GetDelays() ([]*NodeDelay, error) { + nodes, err := f.listers.NodeLister.List(labels.Everything()) + if err != nil { + klog.Error(err) + return nil, err + } + + delays := make([]*NodeDelay, 0) + for _, from := range nodes { + for _, to := range nodes { + delays = append(delays, &NodeDelay{ + FromNode: from.Name, + ToNode: to.Name, + Latency: 0, + }) + } + } + + return delays, nil +} diff --git a/pkg/system-controller/pkg/e2e/e2e_suite_test.go b/pkg/system-controller/pkg/e2e/e2e_suite_test.go index c44f9fb..61ea9ad 100644 --- a/pkg/system-controller/pkg/e2e/e2e_suite_test.go +++ b/pkg/system-controller/pkg/e2e/e2e_suite_test.go @@ -2,15 +2,14 @@ package e2e_test import ( "context" - "github.com/lterrac/edge-autoscaler/pkg/system-controller/pkg/delayclient" + "testing" + "time" + openfaasclientsent "github.com/openfaas/faas-netes/pkg/client/clientset/versioned" openfaasinformers "github.com/openfaas/faas-netes/pkg/client/informers/externalversions" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/klog/v2" - "testing" - "time" eaclientset "github.com/lterrac/edge-autoscaler/pkg/generated/clientset/versioned" eascheme "github.com/lterrac/edge-autoscaler/pkg/generated/clientset/versioned/scheme" @@ -18,6 +17,7 @@ import ( "github.com/lterrac/edge-autoscaler/pkg/informers" "github.com/lterrac/edge-autoscaler/pkg/signals" syscontroller "github.com/lterrac/edge-autoscaler/pkg/system-controller/pkg/controller" + "github.com/lterrac/edge-autoscaler/pkg/system-controller/pkg/delayclient" "github.com/lterrac/edge-autoscaler/pkg/system-controller/pkg/slpaclient" coreinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -91,9 +91,7 @@ var _ = BeforeSuite(func() { communityGetter := slpaclient.NewFakeClient() By("bootstrapping controller") - delayClient := FakeDelayClient{ - listers: informers.GetListers(), - } + delayClient := delayclient.NewFakeClient(informers.GetListers()) systemController = syscontroller.NewController( kubeClient, @@ -158,28 +156,3 @@ func setup() { _, err = eaClient.EdgeautoscalerV1alpha1().CommunityConfigurations(namespace).Create(context.TODO(), cc, metav1.CreateOptions{}) Expect(err).ShouldNot(HaveOccurred()) } - -type FakeDelayClient struct { - listers informers.Listers -} - -func (f FakeDelayClient) GetDelays() ([]*delayclient.NodeDelay, error) { - nodes, err := f.listers.NodeLister.List(labels.Everything()) - if err != nil { - klog.Error(err) - return nil, err - } - - delays := make([]*delayclient.NodeDelay, 0) - for _, from := range nodes { - for _, to := range nodes { - delays = append(delays, &delayclient.NodeDelay{ - FromNode: from.Name, - ToNode: to.Name, - Latency: 0, - }) - } - } - - return delays, nil -}