diff --git a/cmd/koordlet/main.go b/cmd/koordlet/main.go index 5e4e1e551..ba0742f2c 100644 --- a/cmd/koordlet/main.go +++ b/cmd/koordlet/main.go @@ -18,6 +18,7 @@ package main import ( "flag" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu" "net/http" _ "net/http/pprof" "time" @@ -80,6 +81,8 @@ func main() { // Expose the Prometheus http endpoint go installHTTPHandler() + go gpu.StartGrpc() + // Start the Cmd klog.Info("Starting the koordlet daemon") d.Run(stopCtx.Done()) diff --git a/pkg/koordlet/runtimehooks/config.go b/pkg/koordlet/runtimehooks/config.go index c3a122e5f..fda64c3c1 100644 --- a/pkg/koordlet/runtimehooks/config.go +++ b/pkg/koordlet/runtimehooks/config.go @@ -18,6 +18,7 @@ package runtimehooks import ( "flag" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/podgpu" "math" "time" @@ -61,6 +62,13 @@ const ( // beta: v1.1 GPUEnvInject featuregate.Feature = "GPUEnvInject" + // PODGpuDelete delete nodePodResourcese's gpu data when pod which use gpu is deleted. + // + // owner: @Stone + // alpha: v0.3 + // beta: v1.1 + PODGpuDelete featuregate.Feature = "PODGpuDelete" + // RDMADeviceInject injects rdma device info according to allocate result from koord-scheduler. // // owner: @ZiMengSheng @@ -108,6 +116,7 @@ var ( GroupIdentity: {Default: true, PreRelease: featuregate.Beta}, CPUSetAllocator: {Default: true, PreRelease: featuregate.Beta}, GPUEnvInject: {Default: false, PreRelease: featuregate.Alpha}, + PODGpuDelete: {Default: false, PreRelease: featuregate.Alpha}, RDMADeviceInject: {Default: false, PreRelease: featuregate.Alpha}, BatchResource: {Default: true, PreRelease: featuregate.Beta}, CPUNormalization: {Default: false, PreRelease: featuregate.Alpha}, @@ -121,6 +130,7 @@ var ( GroupIdentity: groupidentity.Object(), CPUSetAllocator: cpuset.Object(), GPUEnvInject: gpu.Object(), + PODGpuDelete: podgpu.Object(), RDMADeviceInject: rdma.Object(), BatchResource: batchresource.Object(), CPUNormalization: cpunormalization.Object(), diff --git a/pkg/koordlet/runtimehooks/hooks/gpu/gpu.go b/pkg/koordlet/runtimehooks/hooks/gpu/gpu.go index 25c2839f4..2b370a680 100644 --- a/pkg/koordlet/runtimehooks/hooks/gpu/gpu.go +++ b/pkg/koordlet/runtimehooks/hooks/gpu/gpu.go @@ -18,9 +18,17 @@ package gpu import ( "fmt" + "google.golang.org/grpc" + "io/ioutil" + "net" + "os" + "path/filepath" "strings" + "sync" + "time" "k8s.io/klog/v2" + pb "k8s.io/kubelet/pkg/apis/podresources/v1alpha1" ext "github.com/koordinator-sh/koordinator/apis/extension" schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" @@ -30,13 +38,37 @@ import ( rmconfig "github.com/koordinator-sh/koordinator/pkg/runtimeproxy/config" ) -const GpuAllocEnv = "NVIDIA_VISIBLE_DEVICES" +const ( + GpuAllocEnv = "NVIDIA_VISIBLE_DEVICES" + ServerPodResourcesKubeletSocket = "/pod-resources/koordlet.sock" + ServerPodResourcesKubeletCheckPoint = "/pod-resources/podgpu.json" + interval = 30 * time.Second +) + +var ( + NodePodResources []*pb.PodResources + PodResourcesLock sync.RWMutex +) + +type PodResourcesServer struct{} + +func (s *PodResourcesServer) List(ctx context.Context, req *pb.ListPodResourcesRequest) (*pb.ListPodResourcesResponse, error) { + klog.V(1).Infof("List(): list resp nodePodResources %v", NodePodResources) + return &pb.ListPodResourcesResponse{PodResources: NodePodResources}, nil +} type gpuPlugin struct{} func (p *gpuPlugin) Register(op hooks.Options) { klog.V(5).Infof("register hook %v", "gpu env inject") hooks.Register(rmconfig.PreCreateContainer, "gpu env inject", "inject NVIDIA_VISIBLE_DEVICES env into container", p.InjectContainerGPUEnv) + + //Construct the memory object by loading pod-gpu.json from disk + nodePodResources, err := LoadNodePodResourcesFromFile(ServerPodResourcesKubeletCheckPoint) + if err != nil { + klog.Errorf("Register(): Error loading nodePodResources: %v\n", err) + } + NodePodResources = nodePodResources } var singleton *gpuPlugin @@ -106,5 +138,176 @@ func (p *gpuPlugin) InjectContainerGPUEnv(proto protocol.HooksProtocol) error { } } + klog.V(1).Infof("InjectContainerGPUEnv(): start to convertToPodResources") + convertToPodResources(containerReq, devices) + + return nil +} + +func convertToPodResources(request protocol.ContainerRequest, deviceAllocation []*ext.DeviceAllocation) *pb.ContainerDevices { + klog.V(1).Infof("convertToPodResources(): enter into convertToPodResources") + podName := request.PodMeta.Name + nameSpace := request.PodMeta.Namespace + containerName := request.ContainerMeta.Name + + devices := make([]string, 0, len(deviceAllocation)) + for _, device := range deviceAllocation { + devices = append(devices, device.ID) + } + + containerDevices := &pb.ContainerDevices{ + ResourceName: ext.ResourceNvidiaGPU.String(), + DeviceIds: devices, + } + klog.V(1).Infof("convertToPodResources(): containerDevices: %v", containerDevices) + + containerDevicesSlice := make([]*pb.ContainerDevices, 0, 1) + containerDevicesSlice = append(containerDevicesSlice, containerDevices) + + var podFound bool + + PodResourcesLock.RLock() + for i, podResources := range NodePodResources { + klog.V(1).Infof("convertToPodResources(): traversal nodePodResources %d,%s/%s", i, podResources.GetName(), podResources.GetNamespace()) + if podResources.Name == podName && podResources.Namespace == nameSpace { + klog.V(1).Infof("convertToPodResources(): pod is found in nodePodResources %d,%s/%s", i, podResources.GetName(), podResources.GetNamespace()) + podFound = true + + PodResourcesLock.RUnlock() + + PodResourcesLock.Lock() + containerExists := false + for j, container := range podResources.Containers { + if container.Name == containerName { + klog.V(1).Infof("convertToPodResources(): container %s is found", containerName) + containerExists = true + podResources.Containers[j] = &pb.ContainerResources{ + Name: containerName, + Devices: containerDevicesSlice, + } + break + } + } + if !containerExists { + klog.V(1).Infof("convertToPodResources(): container %s is not found", containerName) + podResources.Containers = append(podResources.Containers, &pb.ContainerResources{ + Name: containerName, + Devices: containerDevicesSlice, + }) + } + NodePodResources[i] = podResources + PodResourcesLock.Unlock() + break + } + } + + if !podFound { + PodResourcesLock.RUnlock() + PodResourcesLock.Lock() + newPodResources := &pb.PodResources{ + Name: podName, + Namespace: nameSpace, + Containers: []*pb.ContainerResources{ + { + Name: containerName, + Devices: containerDevicesSlice, + }, + }, + } + NodePodResources = append(NodePodResources, newPodResources) + PodResourcesLock.Unlock() + } + + klog.V(1).Infof("convertToPodResources(): nodePodResources: %v", NodePodResources) + return containerDevices +} + +func StartGrpc() error { + lis, err := net.Listen("unix", ServerPodResourcesKubeletSocket) + if err != nil { + klog.Errorf("failed to listen: %v", err) + return err + } + if err := setSocketPermissions(ServerPodResourcesKubeletSocket); err != nil { + klog.Errorf("failed to set socket permissions: %v", err) + return err + } + server := grpc.NewServer() + pb.RegisterPodResourcesListerServer(server, &PodResourcesServer{}) + + startCheckpoint() + + klog.V(4).Infof("startGrpc():Starting gRPC server on %s", ServerPodResourcesKubeletSocket) + if err := server.Serve(lis); err != nil { + klog.Errorf("failed to serve: %v", err) + return err + } + klog.V(1).Infof("startGrpc():end...") return nil } + +func setSocketPermissions(socketPath string) error { + // In a real application, you would set the correct permissions here. + // For example: + return os.Chmod(socketPath, 0660) + //return nil +} + +func startCheckpoint() { + stopCh := make(chan struct{}) + go PeriodicSave(ServerPodResourcesKubeletCheckPoint, interval, stopCh) +} + +func EnsureDirectory(path string) error { + return os.MkdirAll(filepath.Dir(path), os.ModePerm) +} + +func SaveNodePodResourcesToFile(filename string, data []*pb.PodResources) error { + if err := EnsureDirectory(filename); err != nil { + return fmt.Errorf("failed to ensure directory for %s: %v", filename, err) + } + + jsonData, err := json.MarshalIndent(data, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal nodePodResources to JSON: %v", err) + } + + if err := ioutil.WriteFile(filename, jsonData, 0644); err != nil { + return fmt.Errorf("failed to write JSON data to file %s: %v", filename, err) + } + klog.V(5).Infof("SaveNodePodResourcesToFile(): Saved nodePodResources to %s", filename) + return nil +} + +func PeriodicSave(filename string, interval time.Duration, stopCh <-chan struct{}) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := SaveNodePodResourcesToFile(filename, NodePodResources); err != nil { + klog.V(1).Infof("PeriodicSave(): SaveNodePodResourcesToFile(): Error saving nodePodResources: %v", err) + } + case <-stopCh: + fmt.Println("Stopping periodic save.") + return + } + } +} + +func LoadNodePodResourcesFromFile(filePath string) ([]*pb.PodResources, error) { + klog.V(1).Infof("LoadNodePodResourcesFromFile():start to load PodResources from %s", filePath) + data, err := os.ReadFile(filePath) + if err != nil { + return nil, fmt.Errorf("failed to read file %s: %v", filePath, err) + } + + var nodePodResources []*pb.PodResources + if err := json.Unmarshal(data, &nodePodResources); err != nil { + return nil, fmt.Errorf("failed to unmarshal JSON data from file %s: %v", filePath, err) + } + + klog.V(1).Infof("LoadNodePodResourcesFromFile():Loaded %d PodResources from %s\n", len(nodePodResources), filePath) + return nodePodResources, nil +} diff --git a/pkg/koordlet/runtimehooks/hooks/podgpu/gpu_delete.go b/pkg/koordlet/runtimehooks/hooks/podgpu/gpu_delete.go new file mode 100644 index 000000000..acdf1eaad --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/podgpu/gpu_delete.go @@ -0,0 +1,86 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podgpu + +import ( + "fmt" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/protocol" + rmconfig "github.com/koordinator-sh/koordinator/pkg/runtimeproxy/config" + "k8s.io/klog/v2" + pb "k8s.io/kubelet/pkg/apis/podresources/v1alpha1" +) + +type podGpuDeletePlugin struct{} + +func (p *podGpuDeletePlugin) Register(op hooks.Options) { + klog.V(5).Infof("register hook %v", "nodePodResourcese gpu delete") + hooks.Register(rmconfig.PreRemoveRunPodSandbox, "nodePodResourcese gpu delete", "delete nodePodResourcese's gpu data when pod which use gpu is deleted", p.DeletePodGPU) +} + +var singleton *podGpuDeletePlugin + +func Object() *podGpuDeletePlugin { + if singleton == nil { + singleton = &podGpuDeletePlugin{} + } + return singleton +} + +func (p *podGpuDeletePlugin) DeletePodGPU(proto protocol.HooksProtocol) error { + podCtx := proto.(*protocol.PodContext) + klog.V(1).Info("DeletePodGPU(): podCtx:%v", podCtx) + if podCtx == nil { + return fmt.Errorf("container protocol is nil for plugin gpu") + } + + klog.V(1).Info("DeletePodGPU(): start to parse params") + containerReq := podCtx.Request + + podName := containerReq.PodMeta.Name + nameSpace := containerReq.PodMeta.Namespace + klog.V(1).Infof("DeletePodGPU(): container %s/%s in nodePodResources", podName, nameSpace) + + gpu.PodResourcesLock.Lock() + defer gpu.PodResourcesLock.Unlock() + + for i, podResources := range gpu.NodePodResources { + if podResources.Name == podName && podResources.Namespace == nameSpace { + klog.V(1).Infof("DeletePodGPU(): found pod %s/%s in nodePodResources", podName, nameSpace) + + if err := removePodResourceByIndex(&gpu.NodePodResources, i); err != nil { + klog.Errorf("removePodResourceByIndex error:%v", err) + } + klog.V(1).Infof("DeletePodGPU(): deleted pod %s/%s from nodePodResources", podName, nameSpace) + return nil + } + } + + klog.V(1).Infof("DeletePodGPU(): not found pod %s/%s in nodePodResources, then no delete podInfo", podName, nameSpace) + return nil +} + +func removePodResourceByIndex(resources *[]*pb.PodResources, index int) error { + if index < 0 || index >= len(*resources) { + return fmt.Errorf("invalid index %d", index) + } + + *resources = append((*resources)[:index], (*resources)[index+1:]...) + klog.V(1).Infof("removePodResourceByIndex(): Removed pod resource at index %d and newresources is %v", index, resources) + return nil +}