Skip to content

Commit

Permalink
add gpu uuid in annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
ferris-cx committed Dec 20, 2024
1 parent a62dd49 commit a754d1b
Show file tree
Hide file tree
Showing 5 changed files with 305 additions and 3 deletions.
3 changes: 3 additions & 0 deletions cmd/koordlet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"flag"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu"

Check failure on line 21 in cmd/koordlet/main.go

View workflow job for this annotation

GitHub Actions / golangci-lint

could not import github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu (-: # github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu
"net/http"
_ "net/http/pprof"
"time"
Expand Down Expand Up @@ -80,6 +81,8 @@ func main() {
// Expose the Prometheus http endpoint
go installHTTPHandler()

go gpu.StartGrpc()

// Start the Cmd
klog.Info("Starting the koordlet daemon")
d.Run(stopCtx.Done())
Expand Down
10 changes: 10 additions & 0 deletions pkg/koordlet/runtimehooks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package runtimehooks

import (
"flag"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/podgpu"
"math"
"time"

Expand Down Expand Up @@ -61,6 +62,13 @@ const (
// beta: v1.1
GPUEnvInject featuregate.Feature = "GPUEnvInject"

// PODGpuDelete delete nodePodResourcese's gpu data when pod which use gpu is deleted.
//
// owner: @Stone
// alpha: v0.3
// beta: v1.1
PODGpuDelete featuregate.Feature = "PODGpuDelete"

// RDMADeviceInject injects rdma device info according to allocate result from koord-scheduler.
//
// owner: @ZiMengSheng
Expand Down Expand Up @@ -108,6 +116,7 @@ var (
GroupIdentity: {Default: true, PreRelease: featuregate.Beta},
CPUSetAllocator: {Default: true, PreRelease: featuregate.Beta},
GPUEnvInject: {Default: false, PreRelease: featuregate.Alpha},
PODGpuDelete: {Default: false, PreRelease: featuregate.Alpha},
RDMADeviceInject: {Default: false, PreRelease: featuregate.Alpha},
BatchResource: {Default: true, PreRelease: featuregate.Beta},
CPUNormalization: {Default: false, PreRelease: featuregate.Alpha},
Expand All @@ -121,6 +130,7 @@ var (
GroupIdentity: groupidentity.Object(),
CPUSetAllocator: cpuset.Object(),
GPUEnvInject: gpu.Object(),
PODGpuDelete: podgpu.Object(),
RDMADeviceInject: rdma.Object(),
BatchResource: batchresource.Object(),
CPUNormalization: cpunormalization.Object(),
Expand Down
205 changes: 204 additions & 1 deletion pkg/koordlet/runtimehooks/hooks/gpu/gpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,17 @@ package gpu

import (
"fmt"
"google.golang.org/grpc"
"io/ioutil"
"net"
"os"
"path/filepath"
"strings"
"sync"
"time"

"k8s.io/klog/v2"
pb "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"

ext "github.com/koordinator-sh/koordinator/apis/extension"
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
Expand All @@ -30,13 +38,37 @@ import (
rmconfig "github.com/koordinator-sh/koordinator/pkg/runtimeproxy/config"
)

const GpuAllocEnv = "NVIDIA_VISIBLE_DEVICES"
const (
GpuAllocEnv = "NVIDIA_VISIBLE_DEVICES"
ServerPodResourcesKubeletSocket = "/pod-resources/koordlet.sock"
ServerPodResourcesKubeletCheckPoint = "/pod-resources/podgpu.json"
interval = 30 * time.Second
)

var (
NodePodResources []*pb.PodResources
PodResourcesLock sync.RWMutex
)

type PodResourcesServer struct{}

func (s *PodResourcesServer) List(ctx context.Context, req *pb.ListPodResourcesRequest) (*pb.ListPodResourcesResponse, error) {

Check failure on line 55 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go build)

undefined: context

Check failure on line 55 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Verify govet)

undefined: context

Check failure on line 55 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: context

Check failure on line 55 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: context

Check failure on line 55 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: context

Check failure on line 55 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go test)

undefined: context
klog.V(1).Infof("List(): list resp nodePodResources %v", NodePodResources)
return &pb.ListPodResourcesResponse{PodResources: NodePodResources}, nil
}

type gpuPlugin struct{}

func (p *gpuPlugin) Register(op hooks.Options) {
klog.V(5).Infof("register hook %v", "gpu env inject")
hooks.Register(rmconfig.PreCreateContainer, "gpu env inject", "inject NVIDIA_VISIBLE_DEVICES env into container", p.InjectContainerGPUEnv)

//Construct the memory object by loading pod-gpu.json from disk
nodePodResources, err := LoadNodePodResourcesFromFile(ServerPodResourcesKubeletCheckPoint)
if err != nil {
klog.Errorf("Register(): Error loading nodePodResources: %v\n", err)
}
NodePodResources = nodePodResources
}

var singleton *gpuPlugin
Expand Down Expand Up @@ -106,5 +138,176 @@ func (p *gpuPlugin) InjectContainerGPUEnv(proto protocol.HooksProtocol) error {
}
}

klog.V(1).Infof("InjectContainerGPUEnv(): start to convertToPodResources")
convertToPodResources(containerReq, devices)

return nil
}

func convertToPodResources(request protocol.ContainerRequest, deviceAllocation []*ext.DeviceAllocation) *pb.ContainerDevices {
klog.V(1).Infof("convertToPodResources(): enter into convertToPodResources")
podName := request.PodMeta.Name
nameSpace := request.PodMeta.Namespace
containerName := request.ContainerMeta.Name

devices := make([]string, 0, len(deviceAllocation))
for _, device := range deviceAllocation {
devices = append(devices, device.ID)
}

containerDevices := &pb.ContainerDevices{
ResourceName: ext.ResourceNvidiaGPU.String(),
DeviceIds: devices,
}
klog.V(1).Infof("convertToPodResources(): containerDevices: %v", containerDevices)

containerDevicesSlice := make([]*pb.ContainerDevices, 0, 1)
containerDevicesSlice = append(containerDevicesSlice, containerDevices)

var podFound bool

PodResourcesLock.RLock()
for i, podResources := range NodePodResources {
klog.V(1).Infof("convertToPodResources(): traversal nodePodResources %d,%s/%s", i, podResources.GetName(), podResources.GetNamespace())
if podResources.Name == podName && podResources.Namespace == nameSpace {
klog.V(1).Infof("convertToPodResources(): pod is found in nodePodResources %d,%s/%s", i, podResources.GetName(), podResources.GetNamespace())
podFound = true

PodResourcesLock.RUnlock()

PodResourcesLock.Lock()
containerExists := false
for j, container := range podResources.Containers {
if container.Name == containerName {
klog.V(1).Infof("convertToPodResources(): container %s is found", containerName)
containerExists = true
podResources.Containers[j] = &pb.ContainerResources{
Name: containerName,
Devices: containerDevicesSlice,
}
break
}
}
if !containerExists {
klog.V(1).Infof("convertToPodResources(): container %s is not found", containerName)
podResources.Containers = append(podResources.Containers, &pb.ContainerResources{
Name: containerName,
Devices: containerDevicesSlice,
})
}
NodePodResources[i] = podResources
PodResourcesLock.Unlock()
break
}
}

if !podFound {
PodResourcesLock.RUnlock()
PodResourcesLock.Lock()
newPodResources := &pb.PodResources{
Name: podName,
Namespace: nameSpace,
Containers: []*pb.ContainerResources{
{
Name: containerName,
Devices: containerDevicesSlice,
},
},
}
NodePodResources = append(NodePodResources, newPodResources)
PodResourcesLock.Unlock()
}

klog.V(1).Infof("convertToPodResources(): nodePodResources: %v", NodePodResources)
return containerDevices
}

func StartGrpc() error {
lis, err := net.Listen("unix", ServerPodResourcesKubeletSocket)
if err != nil {
klog.Errorf("failed to listen: %v", err)
return err
}
if err := setSocketPermissions(ServerPodResourcesKubeletSocket); err != nil {
klog.Errorf("failed to set socket permissions: %v", err)
return err
}
server := grpc.NewServer()
pb.RegisterPodResourcesListerServer(server, &PodResourcesServer{})

startCheckpoint()

klog.V(4).Infof("startGrpc():Starting gRPC server on %s", ServerPodResourcesKubeletSocket)
if err := server.Serve(lis); err != nil {
klog.Errorf("failed to serve: %v", err)
return err
}
klog.V(1).Infof("startGrpc():end...")
return nil
}

func setSocketPermissions(socketPath string) error {
// In a real application, you would set the correct permissions here.
// For example:
return os.Chmod(socketPath, 0660)
//return nil
}

func startCheckpoint() {
stopCh := make(chan struct{})
go PeriodicSave(ServerPodResourcesKubeletCheckPoint, interval, stopCh)
}

func EnsureDirectory(path string) error {
return os.MkdirAll(filepath.Dir(path), os.ModePerm)
}

func SaveNodePodResourcesToFile(filename string, data []*pb.PodResources) error {
if err := EnsureDirectory(filename); err != nil {
return fmt.Errorf("failed to ensure directory for %s: %v", filename, err)
}

jsonData, err := json.MarshalIndent(data, "", " ")

Check failure on line 270 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go build)

undefined: json

Check failure on line 270 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Verify govet)

undefined: json

Check failure on line 270 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: json

Check failure on line 270 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: json

Check failure on line 270 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go test)

undefined: json
if err != nil {
return fmt.Errorf("failed to marshal nodePodResources to JSON: %v", err)
}

if err := ioutil.WriteFile(filename, jsonData, 0644); err != nil {
return fmt.Errorf("failed to write JSON data to file %s: %v", filename, err)
}
klog.V(5).Infof("SaveNodePodResourcesToFile(): Saved nodePodResources to %s", filename)
return nil
}

func PeriodicSave(filename string, interval time.Duration, stopCh <-chan struct{}) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if err := SaveNodePodResourcesToFile(filename, NodePodResources); err != nil {
klog.V(1).Infof("PeriodicSave(): SaveNodePodResourcesToFile(): Error saving nodePodResources: %v", err)
}
case <-stopCh:
fmt.Println("Stopping periodic save.")
return
}
}
}

func LoadNodePodResourcesFromFile(filePath string) ([]*pb.PodResources, error) {
klog.V(1).Infof("LoadNodePodResourcesFromFile():start to load PodResources from %s", filePath)
data, err := os.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("failed to read file %s: %v", filePath, err)
}

var nodePodResources []*pb.PodResources
if err := json.Unmarshal(data, &nodePodResources); err != nil {

Check failure on line 307 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go build)

undefined: json

Check failure on line 307 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Verify govet)

undefined: json

Check failure on line 307 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: json) (typecheck)

Check failure on line 307 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: json) (typecheck)

Check failure on line 307 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go test)

undefined: json
return nil, fmt.Errorf("failed to unmarshal JSON data from file %s: %v", filePath, err)
}

klog.V(1).Infof("LoadNodePodResourcesFromFile():Loaded %d PodResources from %s\n", len(nodePodResources), filePath)
return nodePodResources, nil
}
86 changes: 86 additions & 0 deletions pkg/koordlet/runtimehooks/hooks/podgpu/gpu_delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podgpu

import (
"fmt"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu"

Check failure on line 22 in pkg/koordlet/runtimehooks/hooks/podgpu/gpu_delete.go

View workflow job for this annotation

GitHub Actions / golangci-lint

could not import github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu (-: # github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/protocol"
rmconfig "github.com/koordinator-sh/koordinator/pkg/runtimeproxy/config"
"k8s.io/klog/v2"
pb "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
)

type podGpuDeletePlugin struct{}

func (p *podGpuDeletePlugin) Register(op hooks.Options) {
klog.V(5).Infof("register hook %v", "nodePodResourcese gpu delete")
hooks.Register(rmconfig.PreRemoveRunPodSandbox, "nodePodResourcese gpu delete", "delete nodePodResourcese's gpu data when pod which use gpu is deleted", p.DeletePodGPU)
}

var singleton *podGpuDeletePlugin

func Object() *podGpuDeletePlugin {
if singleton == nil {
singleton = &podGpuDeletePlugin{}
}
return singleton
}

func (p *podGpuDeletePlugin) DeletePodGPU(proto protocol.HooksProtocol) error {
podCtx := proto.(*protocol.PodContext)
klog.V(1).Info("DeletePodGPU(): podCtx:%v", podCtx)
if podCtx == nil {
return fmt.Errorf("container protocol is nil for plugin gpu")
}

klog.V(1).Info("DeletePodGPU(): start to parse params")
containerReq := podCtx.Request

podName := containerReq.PodMeta.Name
nameSpace := containerReq.PodMeta.Namespace
klog.V(1).Infof("DeletePodGPU(): container %s/%s in nodePodResources", podName, nameSpace)

gpu.PodResourcesLock.Lock()
defer gpu.PodResourcesLock.Unlock()

for i, podResources := range gpu.NodePodResources {
if podResources.Name == podName && podResources.Namespace == nameSpace {
klog.V(1).Infof("DeletePodGPU(): found pod %s/%s in nodePodResources", podName, nameSpace)

if err := removePodResourceByIndex(&gpu.NodePodResources, i); err != nil {
klog.Errorf("removePodResourceByIndex error:%v", err)
}
klog.V(1).Infof("DeletePodGPU(): deleted pod %s/%s from nodePodResources", podName, nameSpace)
return nil
}
}

klog.V(1).Infof("DeletePodGPU(): not found pod %s/%s in nodePodResources, then no delete podInfo", podName, nameSpace)
return nil
}

func removePodResourceByIndex(resources *[]*pb.PodResources, index int) error {
if index < 0 || index >= len(*resources) {
return fmt.Errorf("invalid index %d", index)
}

*resources = append((*resources)[:index], (*resources)[index+1:]...)
klog.V(1).Infof("removePodResourceByIndex(): Removed pod resource at index %d and newresources is %v", index, resources)
return nil
}
4 changes: 2 additions & 2 deletions pkg/scheduler/plugins/deviceshare/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,10 +564,10 @@ func (p *Plugin) fillID(allocationResult apiext.DeviceAllocations, nodeName stri
return err
}
for deviceType, allocations := range allocationResult {
if deviceType == schedulingv1alpha1.GPU {
/*if deviceType == schedulingv1alpha1.GPU {
// because gpu minor is well known, ID isn't needed
continue
}
}*/
for i, allocation := range allocations {
for _, info := range device.Spec.Devices {
if info.Minor != nil && *info.Minor == allocation.Minor && info.Type == deviceType {
Expand Down

0 comments on commit a754d1b

Please sign in to comment.