Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Install AMD GPU Kernel drivers if required #5875

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions aks-node-controller/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func getCSEEnv(config *aksnodeconfigv1.Configuration) map[string]string {
"API_SERVER_NAME": config.GetApiServerConfig().GetApiServerName(),
"IS_VHD": fmt.Sprintf("%v", getIsVHD(config.IsVhd)),
"GPU_NODE": fmt.Sprintf("%v", getEnableNvidia(config)),
"AMD_GPU_NODE": fmt.Sprintf("%v", config.GetGpuConfig().GetEnableAmdGpu()),
"SGX_NODE": fmt.Sprintf("%v", getIsSgxEnabledSKU(config.GetVmSize())),
"MIG_NODE": fmt.Sprintf("%v", getIsMIGNode(config.GetGpuConfig().GetGpuInstanceProfile())),
"CONFIG_GPU_DRIVER_IF_NEEDED": fmt.Sprintf("%v", config.GetGpuConfig().GetConfigGpuDriver()),
Expand Down
2 changes: 1 addition & 1 deletion e2e/aks_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func getBaseClusterModel(clusterName string) *armcontainerservice.ManagedCluster
{
Name: to.Ptr("nodepool1"),
Count: to.Ptr[int32](1),
VMSize: to.Ptr("standard_d2ds_v5"),
VMSize: to.Ptr(config.Config.DefaultVMSKU),
MaxPods: to.Ptr[int32](110),
OSType: to.Ptr(armcontainerservice.OSTypeLinux),
Type: to.Ptr(armcontainerservice.AgentPoolTypeVirtualMachineScaleSets),
Expand Down
3 changes: 3 additions & 0 deletions e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func ClusterAzureNetwork(ctx context.Context, t *testing.T) (*Cluster, error) {
}

func prepareCluster(ctx context.Context, t *testing.T, cluster *armcontainerservice.ManagedCluster, isAirgap, isNonAnonymousPull bool) (*Cluster, error) {
t.Logf("preparing cluster %q", *cluster.Name)
ctx, cancel := context.WithTimeout(ctx, config.Config.TestTimeoutCluster)
defer cancel()
cluster.Name = to.Ptr(fmt.Sprintf("%s-%s", *cluster.Name, hash(cluster)))
Expand Down Expand Up @@ -173,6 +174,8 @@ func prepareCluster(ctx context.Context, t *testing.T, cluster *armcontainerserv
return nil, fmt.Errorf("get host network debug pod: %w", err)
}

t.Logf("cluster %q is ready", *cluster.Name)

return &Cluster{
Model: cluster,
Kube: kube,
Expand Down
40 changes: 21 additions & 19 deletions e2e/config/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,25 +291,27 @@ func (a *AzureClient) UploadAndGetSignedLink(ctx context.Context, blobName strin
}

func (a *AzureClient) CreateVMManagedIdentity(ctx context.Context) (string, error) {
identity, err := a.UserAssignedIdentities.CreateOrUpdate(ctx, ResourceGroupName, VMIdentityName, armmsi.Identity{
Location: to.Ptr(Config.Location),
}, nil)
if err != nil {
return "", fmt.Errorf("create managed identity: %w", err)
}
err = a.createBlobStorageAccount(ctx)
if err != nil {
return "", err
}
err = a.createBlobStorageContainer(ctx)
if err != nil {
return "", err
}

if err := a.assignRolesToVMIdentity(ctx, identity.Properties.PrincipalID); err != nil {
return "", err
}
return *identity.Properties.ClientID, nil
// HACK: temporary disable to allow running test in different subscription, without enough permissions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we uncomment all this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove it before merging the PR. Once I stop testing it.

Currently it's required only for a single scriptless test that is disabled by default.
Not much harm if it accidentially leak through.

return "", nil
// identity, err := a.UserAssignedIdentities.CreateOrUpdate(ctx, ResourceGroupName, VMIdentityName, armmsi.Identity{
// Location: to.Ptr(Config.Location),
// }, nil)
// if err != nil {
// return "", fmt.Errorf("create managed identity: %w", err)
// }
// err = a.createBlobStorageAccount(ctx)
// if err != nil {
// return "", err
// }
// err = a.createBlobStorageContainer(ctx)
// if err != nil {
// return "", err
// }

// if err := a.assignRolesToVMIdentity(ctx, identity.Properties.PrincipalID); err != nil {
// return "", err
// }
// return *identity.Properties.ClientID, nil
}

func (a *AzureClient) createBlobStorageAccount(ctx context.Context) error {
Expand Down
2 changes: 2 additions & 0 deletions e2e/config/vhd.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func (i *Image) String() string {

func (i *Image) VHDResourceID(ctx context.Context, t *testing.T) (VHDResourceID, error) {
i.vhdOnce.Do(func() {
t.Logf("finding the latest image version for %s, %s", i.Name, i.Version)
switch {
case i.Latest:
i.vhd, i.vhdErr = Azure.LatestSIGImageVersionByTag(ctx, i, "", "")
Expand All @@ -220,6 +221,7 @@ func (i *Image) VHDResourceID(ctx context.Context, t *testing.T) (VHDResourceID,
i.vhdErr = fmt.Errorf("img: %s, tag %s=%s, err %w", i.Name, Config.SIGVersionTagName, Config.SIGVersionTagValue, i.vhdErr)
t.Logf("failed to find the latest image version for %s", i.vhdErr)
}
t.Logf("found the latest image version for %s, %s", i.Name, i.vhd)
})
return i.vhd, i.vhdErr
}
Expand Down
18 changes: 12 additions & 6 deletions e2e/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/Azure/agentbaker/e2e/config"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/remotecommand"
Expand Down Expand Up @@ -54,7 +55,7 @@ type Script struct {
interpreter Interpreter
}

func execScriptOnVm(ctx context.Context, s *Scenario, vmPrivateIP, jumpboxPodName, sshPrivateKey string, script Script) (*podExecResult, error) {
func execScriptOnVm(ctx context.Context, s *Scenario, script Script) (*podExecResult, error) {
/*
This works in a way that doesn't rely on the node having joined the cluster:
* We create a linux pod on a different node.
Expand All @@ -77,21 +78,19 @@ func execScriptOnVm(ctx context.Context, s *Scenario, vmPrivateIP, jumpboxPodNam
}

steps := []string{
fmt.Sprintf("echo '%[1]s' > %[2]s", sshPrivateKey, sshKeyName(vmPrivateIP)),
"set -x",
fmt.Sprintf("echo %[1]s > %[2]s", quoteForBash(script.script), scriptFileName),
fmt.Sprintf("chmod 0600 %s", sshKeyName(vmPrivateIP)),
fmt.Sprintf("chmod 0755 %s", scriptFileName),
fmt.Sprintf(`scp -i %[1]s -o PasswordAuthentication=no -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -o ConnectTimeout=5 %[3]s azureuser@%[2]s:%[4]s`, sshKeyName(vmPrivateIP), vmPrivateIP, scriptFileName, remoteScriptFileName),
fmt.Sprintf("%s %s %s", sshString(vmPrivateIP), interpreter, remoteScriptFileName),
fmt.Sprintf(`scp -i %[1]s -o PasswordAuthentication=no -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -o ConnectTimeout=5 %[3]s azureuser@%[2]s:%[4]s`, sshKeyName(s.Runtime.VMPrivateIP), s.Runtime.VMPrivateIP, scriptFileName, remoteScriptFileName),
fmt.Sprintf("%s %s %s", sshString(s.Runtime.VMPrivateIP), interpreter, remoteScriptFileName),
}

joinedSteps := strings.Join(steps, " && ")

s.T.Logf("Executing script %[1]s using %[2]s:\n---START-SCRIPT---\n%[3]s\n---END-SCRIPT---\n", scriptFileName, interpreter, script.script)

kube := s.Runtime.Cluster.Kube
execResult, err := execOnPrivilegedPod(ctx, kube, defaultNamespace, jumpboxPodName, joinedSteps)
execResult, err := execOnPrivilegedPod(ctx, kube, defaultNamespace, s.Runtime.Cluster.DebugPod.Name, joinedSteps)
if err != nil {
return nil, fmt.Errorf("error executing command on pod: %w", err)
}
Expand Down Expand Up @@ -172,6 +171,13 @@ func unprivilegedCommandArray() []string {
}
}

func uploadSSHKey(ctx context.Context, s *Scenario) {
cmd := fmt.Sprintf("echo '%[1]s' > %[2]s && chmod 0600 %[2]s", s.Runtime.SSHKeyPrivate, sshKeyName(s.Runtime.VMPrivateIP))
kube := s.Runtime.Cluster.Kube
_, err := execOnPrivilegedPod(ctx, kube, defaultNamespace, s.Runtime.Cluster.DebugPod.Name, cmd)
require.NoError(s.T, err, "error uploading ssh key to pod")
}

func logSSHInstructions(s *Scenario) {
result := "SSH Instructions:"
if !config.Config.KeepVMSS {
Expand Down
56 changes: 50 additions & 6 deletions e2e/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,16 @@ func (k *Kubeclient) WaitUntilNodeReady(ctx context.Context, t *testing.T, vmssN

// found the right node. Use it!
node = castNode
nodeTaints, _ := json.Marshal(node.Spec.Taints)
nodeConditions, _ := json.Marshal(node.Status.Conditions)
if len(node.Spec.Taints) > 0 {
t.Logf("node %s is tainted. Taints: %s Conditions: %s", node.Name, string(nodeTaints), string(nodeConditions))
continue
}

for _, cond := range node.Status.Conditions {
if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue {
t.Logf("node %s is ready. Taints: %s Conditions: %s", node.Name, string(nodeTaints), string(nodeConditions))
t.Logf("node %s is ready", node.Name)
return node.Name
}
}

t.Logf("node %s is not ready. Taints: %s Conditions: %s", node.Name, string(nodeTaints), string(nodeConditions))
}

if node == nil {
Expand Down Expand Up @@ -637,3 +632,52 @@ func nvidiaDevicePluginDaemonSet() *appsv1.DaemonSet {
},
}
}

func podEnableAMDGPUResource(s *Scenario) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-amdgpu-device-plugin", s.Runtime.KubeNodeName),
Namespace: defaultNamespace,
},
Spec: corev1.PodSpec{
PriorityClassName: "system-node-critical",
NodeSelector: map[string]string{
"kubernetes.io/hostname": s.Runtime.KubeNodeName,
},
Containers: []corev1.Container{
{
Name: "amdgpu-device-plugin-container",
Image: "rocm/k8s-device-plugin",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should pull from MCR, dockerhub pull isn't stable. (assuming this is from docker.io)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll need to add it to MCR at some stage. I don't think it's available there yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are SFI to looks for that, we lost the contract preventing us from getting throttled on docker.io so expect throttling.

We have an ACR for E2E, we should setup a remote/cache for this image and use our own ACR

VolumeMounts: []corev1.VolumeMount{
{
Name: "device-plugin",
MountPath: "/var/lib/kubelet/device-plugins",
},
{
Name: "sys",
MountPath: "/sys",
},
},
},
},
Volumes: []corev1.Volume{
{
Name: "device-plugin",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/var/lib/kubelet/device-plugins",
},
},
},
{
Name: "sys",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/sys",
},
},
},
},
},
}
}
4 changes: 2 additions & 2 deletions e2e/node_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func nbcToAKSNodeConfigV1(nbc *datamodel.NodeBootstrappingConfiguration) *aksnod
Version: "v0",
DisableCustomData: false,
LinuxAdminUsername: "azureuser",
VmSize: "Standard_D2ds_v5",
VmSize: config.Config.DefaultVMSKU,
ClusterConfig: &aksnodeconfigv1.ClusterConfig{
Location: nbc.ContainerService.Location,
ResourceGroup: nbc.ResourceGroupName,
Expand Down Expand Up @@ -347,7 +347,7 @@ func baseTemplateLinux(t *testing.T, location string, k8sVersion string, arch st
},
AgentPoolProfile: &datamodel.AgentPoolProfile{
Name: "nodepool2",
VMSize: "Standard_D2ds_v5",
VMSize: config.Config.DefaultVMSKU,
KubeletDiskType: "",
WorkloadRuntime: "",
DNSPrefix: "",
Expand Down
8 changes: 5 additions & 3 deletions e2e/scenario_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ func RunScenario(t *testing.T, s *Scenario) {
ctx, cancel := context.WithTimeout(ctx, config.Config.TestTimeoutVMSS)
defer cancel()
prepareAKSNode(ctx, s)

t.Logf("Choosing the private ACR %q for the vm validation", config.GetPrivateACRName(s.Tags.NonAnonymousACR))
logSSHInstructions(s)

validateVM(ctx, s)
}

Expand Down Expand Up @@ -150,6 +151,8 @@ func prepareAKSNode(ctx context.Context, s *Scenario) {

s.Runtime.VMPrivateIP, err = getVMPrivateIPAddress(ctx, s)
require.NoError(s.T, err, "failed to get VM private IP address")

uploadSSHKey(ctx, s)
}

func maybeSkipScenario(ctx context.Context, t *testing.T, s *Scenario) {
Expand Down Expand Up @@ -177,15 +180,14 @@ func maybeSkipScenario(ctx context.Context, t *testing.T, s *Scenario) {
}
}

vhd, err := s.VHD.VHDResourceID(ctx, t)
_, err := s.VHD.VHDResourceID(ctx, t)
if err != nil {
if config.Config.IgnoreScenariosWithMissingVHD && errors.Is(err, config.ErrNotFound) {
t.Skipf("skipping scenario %q: could not find image", t.Name())
} else {
t.Fatalf("could not find image for %q: %s", t.Name(), err)
}
}
t.Logf("VHD: %q, TAGS %+v", vhd, s.Tags)
}

func validateVM(ctx context.Context, s *Scenario) {
Expand Down
66 changes: 66 additions & 0 deletions e2e/scenario_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1664,3 +1664,69 @@ func Test_Ubuntu2404ARM(t *testing.T) {
},
})
}

func Test_Ubuntu2404Gen2Containerd_AMDGPU_MI300(t *testing.T) {
t.Skip("Provisioning of Standard_ND96isr_MI300X_v5 isn't reliable yet")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a TODO

//E2E_LOCATION=eastus2euap
//SUBSCRIPTION_ID=4f3dc0e4-0c77-40ff-bf9a-6ade1e3048ef
RunScenario(t, &Scenario{
Description: "Tests that a GPU-enabled node using a MarinerV2 VHD can be properly bootstrapped",
Tags: Tags{
GPU: true,
},
Config: Config{
Cluster: ClusterKubenet,
VHD: config.VHDUbuntu2404Gen2Containerd, //TODO: add support for older
BootstrapConfigMutator: func(nbc *datamodel.NodeBootstrappingConfiguration) {
nbc.ContainerService.Properties.AgentPoolProfiles[0].VMSize = "Standard_ND96isr_MI300X_v5"
nbc.ContainerService.Properties.AgentPoolProfiles[0].Distro = "aks-cblmariner-v2-gen2"
nbc.AgentPoolProfile.VMSize = "Standard_ND96isr_MI300X_v5"
nbc.AgentPoolProfile.Distro = "aks-cblmariner-v2-gen2"
nbc.EnableAMDGPU = true
nbc.ConfigGPUDriverIfNeeded = true
},
VMConfigMutator: func(vmss *armcompute.VirtualMachineScaleSet) {
vmss.SKU.Name = to.Ptr("Standard_ND96isr_MI300X_v5")
// rocm images are huge, some space for manual testing
vmss.Properties.VirtualMachineProfile.StorageProfile.OSDisk.DiskSizeGB = to.Ptr[int32](128)
},
Validator: func(ctx context.Context, s *Scenario) {
ValidateAMDGPU(ctx, s)
},
},
})
}

func Test_Ubuntu2204Gen2Containerd_AMDGPU_V710(t *testing.T) {
// the SKU isn't available in subscriptrion/region we run tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a TODO

t.Skip("Provisioning of NV4ads_V710_v5 isn't reliable yet")
//E2E_LOCATION=southcentralus
//SUBSCRIPTION_ID=4f3dc0e4-0c77-40ff-bf9a-6ade1e3048ef
RunScenario(t, &Scenario{
Description: "Tests that a GPU-enabled node using a MarinerV2 VHD can be properly bootstrapped",
Tags: Tags{
GPU: true,
},
Config: Config{
Cluster: ClusterKubenet,
VHD: config.VHDUbuntu2204Gen2Containerd,
BootstrapConfigMutator: func(nbc *datamodel.NodeBootstrappingConfiguration) {
nbc.ContainerService.Properties.AgentPoolProfiles[0].VMSize = "Standard_NV4ads_V710_v5"
nbc.ContainerService.Properties.AgentPoolProfiles[0].Distro = "aks-cblmariner-v2-gen2"
nbc.AgentPoolProfile.VMSize = "Standard_NV4ads_V710_v5"
nbc.AgentPoolProfile.Distro = "aks-cblmariner-v2-gen2"
nbc.EnableAMDGPU = true
nbc.ConfigGPUDriverIfNeeded = true

},
VMConfigMutator: func(vmss *armcompute.VirtualMachineScaleSet) {
vmss.SKU.Name = to.Ptr("Standard_NV4ads_V710_v5")
// rocm images are huge, some space for manual testing
vmss.Properties.VirtualMachineProfile.StorageProfile.OSDisk.DiskSizeGB = to.Ptr[int32](128)
},
Validator: func(ctx context.Context, s *Scenario) {
ValidateAMDGPU(ctx, s)
},
},
})
}
5 changes: 0 additions & 5 deletions e2e/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ func ValidateCommonLinux(ctx context.Context, s *Scenario) {
stdout := execResult.stdout.String()
require.NotContains(s.T, stdout, "--dynamic-config-dir", "kubelet flag '--dynamic-config-dir' should not be present in /etc/default/kubelet\nContents:\n%s")

// the instructions belows expects the SSH key to be uploaded to the user pool VM.
// which happens as a side-effect of execCommandOnVMForScenario, it's ugly but works.
// maybe we should use a single ssh key per cluster, but need to be careful with parallel test runs.
logSSHInstructions(s)

ValidateSysctlConfig(ctx, s, map[string]string{
"net.ipv4.tcp_retries2": "8",
"net.core.message_burst": "80",
Expand Down
16 changes: 14 additions & 2 deletions e2e/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func execScriptOnVMForScenario(ctx context.Context, s *Scenario, cmd string) *po
script.interpreter = Bash
}

result, err := execScriptOnVm(ctx, s, s.Runtime.VMPrivateIP, s.Runtime.Cluster.DebugPod.Name, string(s.Runtime.SSHKeyPrivate), script)
result, err := execScriptOnVm(ctx, s, script)
require.NoError(s.T, err, "failed to execute command on VM")
return result
}
Expand Down Expand Up @@ -334,7 +334,7 @@ func waitUntilResourceAvailable(ctx context.Context, s *Scenario, resourceName s
nodeName := s.Runtime.KubeNodeName
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

s.T.Logf("waiting for resource %q to be available on node %q", resourceName, nodeName)
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -427,3 +427,15 @@ func GetFieldFromJsonObjectOnNode(ctx context.Context, s *Scenario, fileName str

return podExecResult.stdout.String()
}

func ValidateAMDGPU(ctx context.Context, s *Scenario) {
s.T.Logf("validating pod using AMD GPU")

execResult := execScriptOnVMForScenario(ctx, s, "lspci -k")
require.Equal(s.T, "0", execResult.exitCode, "expected to find lspci command, but did not")
assert.Contains(s.T, execResult.stdout.String(), "amdgpu", "expected to see amdgpu kernel module managing a PCI device, but did not")

ensurePod(ctx, s, podEnableAMDGPUResource(s))
waitUntilResourceAvailable(ctx, s, "amd.com/gpu")
//ensureJob(ctx, s, jobAMDGPUWorkload(s))
}
Loading
Loading