Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

monitor for dcgm #2300

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/koordlet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import (
"flag"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu"

Check failure on line 21 in cmd/koordlet/main.go

View workflow job for this annotation

GitHub Actions / golangci-lint

could not import github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu (-: # github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu
"net/http"
_ "net/http/pprof"
"time"
Expand Down Expand Up @@ -80,6 +81,8 @@
// Expose the Prometheus http endpoint
go installHTTPHandler()

go gpu.StartGrpc()

// Start the Cmd
klog.Info("Starting the koordlet daemon")
d.Run(stopCtx.Done())
Expand Down
10 changes: 10 additions & 0 deletions pkg/koordlet/runtimehooks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import (
"flag"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/podgpu"
"math"
"time"

Expand All @@ -30,7 +31,7 @@
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/coresched"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/cpunormalization"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/cpuset"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu"

Check failure on line 34 in pkg/koordlet/runtimehooks/config.go

View workflow job for this annotation

GitHub Actions / golangci-lint

could not import github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu (-: # github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/groupidentity"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/rdma"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/resctrl"
Expand Down Expand Up @@ -61,6 +62,13 @@
// beta: v1.1
GPUEnvInject featuregate.Feature = "GPUEnvInject"

// PODGpuDelete delete nodePodResourcese's gpu data when pod which use gpu is deleted.
//
// owner: @Stone
// alpha: v0.3
// beta: v1.1
PODGpuDelete featuregate.Feature = "PODGpuDelete"

// RDMADeviceInject injects rdma device info according to allocate result from koord-scheduler.
//
// owner: @ZiMengSheng
Expand Down Expand Up @@ -108,6 +116,7 @@
GroupIdentity: {Default: true, PreRelease: featuregate.Beta},
CPUSetAllocator: {Default: true, PreRelease: featuregate.Beta},
GPUEnvInject: {Default: false, PreRelease: featuregate.Alpha},
PODGpuDelete: {Default: false, PreRelease: featuregate.Alpha},
RDMADeviceInject: {Default: false, PreRelease: featuregate.Alpha},
BatchResource: {Default: true, PreRelease: featuregate.Beta},
CPUNormalization: {Default: false, PreRelease: featuregate.Alpha},
Expand All @@ -121,6 +130,7 @@
GroupIdentity: groupidentity.Object(),
CPUSetAllocator: cpuset.Object(),
GPUEnvInject: gpu.Object(),
PODGpuDelete: podgpu.Object(),
RDMADeviceInject: rdma.Object(),
BatchResource: batchresource.Object(),
CPUNormalization: cpunormalization.Object(),
Expand Down
205 changes: 204 additions & 1 deletion pkg/koordlet/runtimehooks/hooks/gpu/gpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,17 @@

import (
"fmt"
"google.golang.org/grpc"
"io/ioutil"
"net"
"os"
"path/filepath"
"strings"
"sync"
"time"

"k8s.io/klog/v2"
pb "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"

ext "github.com/koordinator-sh/koordinator/apis/extension"
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
Expand All @@ -30,13 +38,37 @@
rmconfig "github.com/koordinator-sh/koordinator/pkg/runtimeproxy/config"
)

const GpuAllocEnv = "NVIDIA_VISIBLE_DEVICES"
const (
GpuAllocEnv = "NVIDIA_VISIBLE_DEVICES"
ServerPodResourcesKubeletSocket = "/pod-resources/koordlet.sock"
ServerPodResourcesKubeletCheckPoint = "/pod-resources/podgpu.json"
interval = 30 * time.Second
)

var (
NodePodResources []*pb.PodResources
PodResourcesLock sync.RWMutex
)

type PodResourcesServer struct{}

func (s *PodResourcesServer) List(ctx context.Context, req *pb.ListPodResourcesRequest) (*pb.ListPodResourcesResponse, error) {

Check failure on line 55 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go build)

undefined: context

Check failure on line 55 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Verify govet)

undefined: context

Check failure on line 55 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: context

Check failure on line 55 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: context

Check failure on line 55 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: context

Check failure on line 55 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go test)

undefined: context
klog.V(1).Infof("List(): list resp nodePodResources %v", NodePodResources)
return &pb.ListPodResourcesResponse{PodResources: NodePodResources}, nil
}

type gpuPlugin struct{}

func (p *gpuPlugin) Register(op hooks.Options) {
klog.V(5).Infof("register hook %v", "gpu env inject")
hooks.Register(rmconfig.PreCreateContainer, "gpu env inject", "inject NVIDIA_VISIBLE_DEVICES env into container", p.InjectContainerGPUEnv)

//Construct the memory object by loading pod-gpu.json from disk
nodePodResources, err := LoadNodePodResourcesFromFile(ServerPodResourcesKubeletCheckPoint)
if err != nil {
klog.Errorf("Register(): Error loading nodePodResources: %v\n", err)
}
NodePodResources = nodePodResources
}

var singleton *gpuPlugin
Expand Down Expand Up @@ -106,5 +138,176 @@
}
}

klog.V(1).Infof("InjectContainerGPUEnv(): start to convertToPodResources")
convertToPodResources(containerReq, devices)

return nil
}

func convertToPodResources(request protocol.ContainerRequest, deviceAllocation []*ext.DeviceAllocation) *pb.ContainerDevices {
klog.V(1).Infof("convertToPodResources(): enter into convertToPodResources")
podName := request.PodMeta.Name
nameSpace := request.PodMeta.Namespace
containerName := request.ContainerMeta.Name

devices := make([]string, 0, len(deviceAllocation))
for _, device := range deviceAllocation {
devices = append(devices, device.ID)
}

containerDevices := &pb.ContainerDevices{
ResourceName: ext.ResourceNvidiaGPU.String(),
DeviceIds: devices,
}
klog.V(1).Infof("convertToPodResources(): containerDevices: %v", containerDevices)

containerDevicesSlice := make([]*pb.ContainerDevices, 0, 1)
containerDevicesSlice = append(containerDevicesSlice, containerDevices)

var podFound bool

PodResourcesLock.RLock()
for i, podResources := range NodePodResources {
klog.V(1).Infof("convertToPodResources(): traversal nodePodResources %d,%s/%s", i, podResources.GetName(), podResources.GetNamespace())
if podResources.Name == podName && podResources.Namespace == nameSpace {
klog.V(1).Infof("convertToPodResources(): pod is found in nodePodResources %d,%s/%s", i, podResources.GetName(), podResources.GetNamespace())
podFound = true

PodResourcesLock.RUnlock()

PodResourcesLock.Lock()
containerExists := false
for j, container := range podResources.Containers {
if container.Name == containerName {
klog.V(1).Infof("convertToPodResources(): container %s is found", containerName)
containerExists = true
podResources.Containers[j] = &pb.ContainerResources{
Name: containerName,
Devices: containerDevicesSlice,
}
break
}
}
if !containerExists {
klog.V(1).Infof("convertToPodResources(): container %s is not found", containerName)
podResources.Containers = append(podResources.Containers, &pb.ContainerResources{
Name: containerName,
Devices: containerDevicesSlice,
})
}
NodePodResources[i] = podResources
PodResourcesLock.Unlock()
break
}
}

if !podFound {
PodResourcesLock.RUnlock()
PodResourcesLock.Lock()
newPodResources := &pb.PodResources{
Name: podName,
Namespace: nameSpace,
Containers: []*pb.ContainerResources{
{
Name: containerName,
Devices: containerDevicesSlice,
},
},
}
NodePodResources = append(NodePodResources, newPodResources)
PodResourcesLock.Unlock()
}

klog.V(1).Infof("convertToPodResources(): nodePodResources: %v", NodePodResources)
return containerDevices
}

func StartGrpc() error {
lis, err := net.Listen("unix", ServerPodResourcesKubeletSocket)
if err != nil {
klog.Errorf("failed to listen: %v", err)
return err
}
if err := setSocketPermissions(ServerPodResourcesKubeletSocket); err != nil {
klog.Errorf("failed to set socket permissions: %v", err)
return err
}
server := grpc.NewServer()
pb.RegisterPodResourcesListerServer(server, &PodResourcesServer{})

startCheckpoint()

klog.V(4).Infof("startGrpc():Starting gRPC server on %s", ServerPodResourcesKubeletSocket)
if err := server.Serve(lis); err != nil {
klog.Errorf("failed to serve: %v", err)
return err
}
klog.V(1).Infof("startGrpc():end...")
return nil
}

func setSocketPermissions(socketPath string) error {
// In a real application, you would set the correct permissions here.
// For example:
return os.Chmod(socketPath, 0660)
//return nil
}

func startCheckpoint() {
stopCh := make(chan struct{})
go PeriodicSave(ServerPodResourcesKubeletCheckPoint, interval, stopCh)
}

func EnsureDirectory(path string) error {
return os.MkdirAll(filepath.Dir(path), os.ModePerm)
}

func SaveNodePodResourcesToFile(filename string, data []*pb.PodResources) error {
if err := EnsureDirectory(filename); err != nil {
return fmt.Errorf("failed to ensure directory for %s: %v", filename, err)
}

jsonData, err := json.MarshalIndent(data, "", " ")

Check failure on line 270 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go build)

undefined: json

Check failure on line 270 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Verify govet)

undefined: json

Check failure on line 270 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: json

Check failure on line 270 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: json

Check failure on line 270 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go test)

undefined: json
if err != nil {
return fmt.Errorf("failed to marshal nodePodResources to JSON: %v", err)
}

if err := ioutil.WriteFile(filename, jsonData, 0644); err != nil {
return fmt.Errorf("failed to write JSON data to file %s: %v", filename, err)
}
klog.V(5).Infof("SaveNodePodResourcesToFile(): Saved nodePodResources to %s", filename)
return nil
}

func PeriodicSave(filename string, interval time.Duration, stopCh <-chan struct{}) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if err := SaveNodePodResourcesToFile(filename, NodePodResources); err != nil {
klog.V(1).Infof("PeriodicSave(): SaveNodePodResourcesToFile(): Error saving nodePodResources: %v", err)
}
case <-stopCh:
fmt.Println("Stopping periodic save.")
return
}
}
}

func LoadNodePodResourcesFromFile(filePath string) ([]*pb.PodResources, error) {
klog.V(1).Infof("LoadNodePodResourcesFromFile():start to load PodResources from %s", filePath)
data, err := os.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("failed to read file %s: %v", filePath, err)
}

var nodePodResources []*pb.PodResources
if err := json.Unmarshal(data, &nodePodResources); err != nil {

Check failure on line 307 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go build)

undefined: json

Check failure on line 307 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Verify govet)

undefined: json

Check failure on line 307 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: json) (typecheck)

Check failure on line 307 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: json) (typecheck)

Check failure on line 307 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go test)

undefined: json
return nil, fmt.Errorf("failed to unmarshal JSON data from file %s: %v", filePath, err)
}

klog.V(1).Infof("LoadNodePodResourcesFromFile():Loaded %d PodResources from %s\n", len(nodePodResources), filePath)
return nodePodResources, nil
}
86 changes: 86 additions & 0 deletions pkg/koordlet/runtimehooks/hooks/podgpu/gpu_delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright 2022 The Koordinator Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podgpu

import (
"fmt"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu"

Check failure on line 22 in pkg/koordlet/runtimehooks/hooks/podgpu/gpu_delete.go

View workflow job for this annotation

GitHub Actions / golangci-lint

could not import github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu (-: # github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/protocol"
rmconfig "github.com/koordinator-sh/koordinator/pkg/runtimeproxy/config"
"k8s.io/klog/v2"
pb "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
)

type podGpuDeletePlugin struct{}

func (p *podGpuDeletePlugin) Register(op hooks.Options) {
klog.V(5).Infof("register hook %v", "nodePodResourcese gpu delete")
hooks.Register(rmconfig.PreRemoveRunPodSandbox, "nodePodResourcese gpu delete", "delete nodePodResourcese's gpu data when pod which use gpu is deleted", p.DeletePodGPU)
}

var singleton *podGpuDeletePlugin

func Object() *podGpuDeletePlugin {
if singleton == nil {
singleton = &podGpuDeletePlugin{}
}
return singleton
}

func (p *podGpuDeletePlugin) DeletePodGPU(proto protocol.HooksProtocol) error {
podCtx := proto.(*protocol.PodContext)
klog.V(1).Info("DeletePodGPU(): podCtx:%v", podCtx)
if podCtx == nil {
return fmt.Errorf("container protocol is nil for plugin gpu")
}

klog.V(1).Info("DeletePodGPU(): start to parse params")
containerReq := podCtx.Request

podName := containerReq.PodMeta.Name
nameSpace := containerReq.PodMeta.Namespace
klog.V(1).Infof("DeletePodGPU(): container %s/%s in nodePodResources", podName, nameSpace)

gpu.PodResourcesLock.Lock()
defer gpu.PodResourcesLock.Unlock()

for i, podResources := range gpu.NodePodResources {
if podResources.Name == podName && podResources.Namespace == nameSpace {
klog.V(1).Infof("DeletePodGPU(): found pod %s/%s in nodePodResources", podName, nameSpace)

if err := removePodResourceByIndex(&gpu.NodePodResources, i); err != nil {
klog.Errorf("removePodResourceByIndex error:%v", err)
}
klog.V(1).Infof("DeletePodGPU(): deleted pod %s/%s from nodePodResources", podName, nameSpace)
return nil
}
}

klog.V(1).Infof("DeletePodGPU(): not found pod %s/%s in nodePodResources, then no delete podInfo", podName, nameSpace)
return nil
}

func removePodResourceByIndex(resources *[]*pb.PodResources, index int) error {
if index < 0 || index >= len(*resources) {
return fmt.Errorf("invalid index %d", index)
}

*resources = append((*resources)[:index], (*resources)[index+1:]...)
klog.V(1).Infof("removePodResourceByIndex(): Removed pod resource at index %d and newresources is %v", index, resources)
return nil
}
4 changes: 2 additions & 2 deletions pkg/scheduler/plugins/deviceshare/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,10 +564,10 @@ func (p *Plugin) fillID(allocationResult apiext.DeviceAllocations, nodeName stri
return err
}
for deviceType, allocations := range allocationResult {
if deviceType == schedulingv1alpha1.GPU {
/*if deviceType == schedulingv1alpha1.GPU {
// because gpu minor is well known, ID isn't needed
continue
}
}*/
for i, allocation := range allocations {
for _, info := range device.Spec.Devices {
if info.Minor != nil && *info.Minor == allocation.Minor && info.Type == deviceType {
Expand Down
Loading