Skip to content

Commit

Permalink
QoS Manager Framework (#796)
Browse files Browse the repository at this point in the history
Signed-off-by: stormgbs <[email protected]>
  • Loading branch information
stormgbs authored Nov 11, 2022
1 parent 80ec3cb commit 39cb1d1
Show file tree
Hide file tree
Showing 18 changed files with 1,119 additions and 0 deletions.
6 changes: 6 additions & 0 deletions cmd/koordlet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ func main() {
os.Exit(1)
}

// Init config from ConfigMap.
if err = cfg.InitFromConfigMap(); err != nil {
klog.Error("Unable to init config from ConfigMap: ", err)
os.Exit(1)
}

d, err := agent.NewDaemon(cfg)
if err != nil {
klog.Error("Unable to setup koordlet daemon: ", err)
Expand Down
22 changes: 22 additions & 0 deletions pkg/features/koordlet_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ limitations under the License.
package features

import (
"fmt"

"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/component-base/featuregate"

slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1"
)

const (
Expand Down Expand Up @@ -83,3 +87,21 @@ var (
PerformanceCollector: {Default: false, PreRelease: featuregate.Alpha},
}
)

// IsFeatureDisabled returns whether the featuregate is disabled by nodeSLO config
func IsFeatureDisabled(nodeSLO *slov1alpha1.NodeSLO, feature featuregate.Feature) (bool, error) {
if nodeSLO == nil || nodeSLO.Spec == (slov1alpha1.NodeSLOSpec{}) {
return true, fmt.Errorf("cannot parse feature config for invalid nodeSLO %v", nodeSLO)
}

spec := nodeSLO.Spec
switch feature {
case BECPUSuppress, BEMemoryEvict, BECPUEvict:
if spec.ResourceUsedThresholdWithBE == nil || spec.ResourceUsedThresholdWithBE.Enable == nil {
return true, fmt.Errorf("cannot parse feature config for invalid nodeSLO %v", nodeSLO)
}
return !(*spec.ResourceUsedThresholdWithBE.Enable), nil
default:
return true, fmt.Errorf("cannot parse feature config for unsupported feature %s", feature)
}
}
34 changes: 34 additions & 0 deletions pkg/koordlet/common/reason/reason.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package reason

const (
UpdateCPU = "UpdateCPU"
UpdateMemory = "UpdateMemory"
UpdateCgroups = "UpdateCgroups" // update cgroups excluding the options already stated above
UpdateSystemConfig = "UpdateSystemConfig"
UpdateResctrlSchemata = "UpdateResctrlSchemata" // update resctrl l3 cat schemata
UpdateResctrlTasks = "UpdateResctrlTasks" // update resctrl tasks

EvictPodByNodeMemoryUsage = "EvictPodByNodeMemoryUsage"
EvictPodByBECPUSatisfaction = "EvictPodByBECPUSatisfaction"

AdjustBEByNodeCPUUsage = "AdjustBEByNodeCPUUsage"

EvictPodSuccess = "evictPodSuccess"
EvictPodFail = "evictPodFail"
)
79 changes: 79 additions & 0 deletions pkg/koordlet/common/testutil/k8s.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package testutil

import (
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
)

type FakeRecorder struct {
eventReason string
}

func (f *FakeRecorder) Event(object runtime.Object, eventType, reason, message string) {
f.eventReason = reason
fmt.Printf("send event:eventType:%s,reason:%s,message:%s", eventType, reason, message)
}

func (f *FakeRecorder) Eventf(object runtime.Object, eventType, reason, messageFmt string, args ...interface{}) {
f.eventReason = reason
fmt.Printf("send event:eventType:%s,reason:%s,message:%s", eventType, reason, messageFmt)
}

func (f *FakeRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventType, reason, messageFmt string, args ...interface{}) {
f.Eventf(object, eventType, reason, messageFmt, args...)
}

func MockTestNode(cpu, memory string) *corev1.Node {
return &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
Namespace: "default",
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(cpu),
corev1.ResourceMemory: resource.MustParse(memory),
},
Capacity: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(cpu),
corev1.ResourceMemory: resource.MustParse(memory),
},
},
}
}

func MockTestPod(qosClass apiext.QoSClass, name string) *corev1.Pod {
return &corev1.Pod{
TypeMeta: metav1.TypeMeta{Kind: "Pod"},
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: types.UID(name),
Labels: map[string]string{
apiext.LabelPodQoS: string(qosClass),
},
},
}
}
22 changes: 22 additions & 0 deletions pkg/koordlet/common/types/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package types

const (
DefaultCFSPeriod = 100000
DefaultMemUnlimit = 9223372036854771712
)
54 changes: 54 additions & 0 deletions pkg/koordlet/common/utils/containers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/pkg/util"
"github.com/koordinator-sh/koordinator/pkg/util/runtime"
)

// KillContainers kills containers inside the pod
func KillContainers(pod *corev1.Pod, message string) {
for _, container := range pod.Spec.Containers {
containerID, containerStatus, err := util.FindContainerIdAndStatusByName(&pod.Status, container.Name)
if err != nil {
klog.Errorf("failed to find container id and status, error: %v", err)
return
}

if containerStatus == nil || containerStatus.State.Running == nil {
return
}

if containerID != "" {
runtimeType, _, _ := util.ParseContainerId(containerStatus.ContainerID)
runtimeHandler, err := runtime.GetRuntimeHandler(runtimeType)
if err != nil || runtimeHandler == nil {
klog.Errorf("%s, kill container(%s) error! GetRuntimeHandler fail! error: %v", message, containerStatus.ContainerID, err)
continue
}
if err := runtimeHandler.StopContainer(containerID, 0); err != nil {
klog.Errorf("%s, stop container error! error: %v", message, err)
}
} else {
klog.Warningf("%s, get container ID failed, pod %s/%s containerName %s status: %v", message, pod.Namespace, pod.Name, container.Name, pod.Status.ContainerStatuses)
}
}
}
36 changes: 36 additions & 0 deletions pkg/koordlet/common/utils/pod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
corev1 "k8s.io/api/core/v1"

"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
"github.com/koordinator-sh/koordinator/pkg/util"
)

func GetPodMetas(pods []*corev1.Pod) []*statesinformer.PodMeta {
podMetas := make([]*statesinformer.PodMeta, len(pods))

for index, pod := range pods {
cgroupDir := util.GetPodKubeRelativePath(pod)
podMeta := &statesinformer.PodMeta{CgroupDir: cgroupDir, Pod: pod.DeepCopy()}
podMetas[index] = podMeta
}

return podMetas
}
46 changes: 46 additions & 0 deletions pkg/koordlet/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ limitations under the License.
package config

import (
"context"
"encoding/json"
"errors"
"flag"
"strings"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
cliflag "k8s.io/component-base/cli/flag"
"sigs.k8s.io/controller-runtime/pkg/client/config"
Expand All @@ -28,38 +34,54 @@ import (
"github.com/koordinator-sh/koordinator/pkg/koordlet/audit"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metricsadvisor"
qosmanagerconfig "github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/config"
"github.com/koordinator-sh/koordinator/pkg/koordlet/reporter"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resmanager"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
"github.com/koordinator-sh/koordinator/pkg/util/system"
)

const (
DefaultKoordletConfigMapNamespace = "koordinator-system"
DefaultKoordletConfigMapName = "koordlet-config"

CMKeyQoSPluginExtraConfigs = "qos-plugin-extra-configs"
)

type Configuration struct {
ConfigMapName string
ConfigMapNamesapce string
KubeRestConf *rest.Config
StatesInformerConf *statesinformer.Config
ReporterConf *reporter.Config
CollectorConf *metricsadvisor.Config
MetricCacheConf *metriccache.Config
ResManagerConf *resmanager.Config
QosManagerConf *qosmanagerconfig.Config
RuntimeHookConf *runtimehooks.Config
AuditConf *audit.Config
FeatureGates map[string]bool
}

func NewConfiguration() *Configuration {
return &Configuration{
ConfigMapName: DefaultKoordletConfigMapName,
ConfigMapNamesapce: DefaultKoordletConfigMapNamespace,
StatesInformerConf: statesinformer.NewDefaultConfig(),
ReporterConf: reporter.NewDefaultConfig(),
CollectorConf: metricsadvisor.NewDefaultConfig(),
MetricCacheConf: metriccache.NewDefaultConfig(),
ResManagerConf: resmanager.NewDefaultConfig(),
QosManagerConf: qosmanagerconfig.NewDefaultConfig(),
RuntimeHookConf: runtimehooks.NewDefaultConfig(),
AuditConf: audit.NewDefaultConfig(),
}
}

func (c *Configuration) InitFlags(fs *flag.FlagSet) {
fs.StringVar(&c.ConfigMapName, "configmap-name", DefaultKoordletConfigMapName, "determines the name the koordlet configmap uses.")
fs.StringVar(&c.ConfigMapNamesapce, "configmap-namespace", DefaultKoordletConfigMapNamespace, "determines the namespace of configmap uses.")
system.Conf.InitFlags(fs)
c.StatesInformerConf.InitFlags(fs)
c.ReporterConf.InitFlags(fs)
Expand All @@ -81,3 +103,27 @@ func (c *Configuration) InitClient() error {
c.KubeRestConf = cfg
return nil
}

func (c *Configuration) InitFromConfigMap() error {
if c.KubeRestConf == nil {
return errors.New("KubeRestConf is nil")
}
cli, err := kubernetes.NewForConfig(c.KubeRestConf)
if err != nil {
return err
}
cm, err := cli.CoreV1().ConfigMaps(c.ConfigMapNamesapce).Get(context.TODO(), c.ConfigMapName, metav1.GetOptions{})
if err == nil {
// Setup extra configs for QoS Manager.
if qosPluginExtraConfigRaw, found := cm.Data[CMKeyQoSPluginExtraConfigs]; found {
var extraConfigs map[string]string
if err = json.Unmarshal([]byte(qosPluginExtraConfigRaw), &extraConfigs); err != nil {
return err
}
c.QosManagerConf.PluginExtraConfigs = extraConfigs
}
} else if !k8serrors.IsNotFound(err) {
return err
}
return nil
}
Loading

0 comments on commit 39cb1d1

Please sign in to comment.