Skip to content

Commit

Permalink
Fix GPU runc checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
yashanand1910 committed Oct 22, 2024
1 parent 2d4a9de commit 9d3e2c3
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 78 deletions.
2 changes: 1 addition & 1 deletion docker/Dockerfile.runner
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# syntax=docker/dockerfile:1.6
FROM ubuntu:20.04 as base
FROM ubuntu:22.04 as base

ENV DEBIAN_FRONTEND=noninteractive

Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile.worker
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ RUN go build -o /usr/local/bin/worker ./cmd/worker/main.go

# final image
# ========================
FROM nvidia/cuda:12.3.1-base-ubuntu20.04 AS release
FROM nvidia/cuda:12.3.1-base-ubuntu22.04 AS release
FROM release AS dev

FROM ${BASE_STAGE} AS final
Expand Down
27 changes: 14 additions & 13 deletions pkg/common/config.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,20 @@ worker:
minFreeMemory: 32Gi
sharedMemoryLimitPct: 100%
# example gpu worker pool
nvidia:
mode: local
gpuType: any
runtime: nvidia
jobSpec:
nodeSelector: {}
poolSizing:
defaultWorkerCpu: 1000m
defaultWorkerGpuType: ""
defaultWorkerMemory: 1Gi
minFreeCpu:
minFreeGpu:
minFreeMemory:
# nvidia:
# mode: local
# gpuType: any
# runtime: nvidia
# jobSpec:
# nodeSelector: {}
# poolSizing:
# defaultWorkerCpu: 8000m
# defaultWorkerGpuType: ""
# defaultWorkerMemory: 12Gi
# minFreeCpu:
# minFreeGpu:
# minFreeMemory:
# sharedMemoryLimitPct: 100%
# global pool attributes
useHostResolvConf: true
hostNetwork: false
Expand Down
12 changes: 0 additions & 12 deletions pkg/worker/base_runc_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,6 @@
"relatime"
]
},
{
"destination": "/usr/lib/worker/x86_64-linux-gnu",
"type": "bind",
"source": "/usr/lib/x86_64-linux-gnu",
"options": [
"rbind",
"rprivate",
"nosuid",
"nodev",
"rw"
]
},
{
"destination": "/usr/local/lib/python3.8/dist-packages/beam",
"type": "bind",
Expand Down
124 changes: 94 additions & 30 deletions pkg/worker/cedana.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ package worker

import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/exec"
"time"

api "github.com/cedana/cedana/pkg/api/services/task"
types "github.com/cedana/cedana/pkg/types"
"github.com/opencontainers/runtime-spec/specs-go"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -20,12 +19,14 @@ const (
DefaultCedanaPort = 8080
host = "0.0.0.0"
binPath = "/usr/bin/cedana"
sharedLibPath = "/usr/local/lib/libcedana-gpu.so"
runcRoot = "/run/runc"
logLevel = "debug"
defaultStartDeadline = 10 * time.Second
checkpointPathBase = "/data"
defaultManageDeadline = 10 * time.Second
defaultCheckpointDeadline = 2 * time.Minute
defaultRestoreDeadline = 2 * time.Minute
defaultHealthCheckDeadline = 10 * time.Second
defaultHealthCheckDeadline = 30 * time.Second
)

type CedanaClient struct {
Expand All @@ -46,19 +47,17 @@ func NewCedanaClient(ctx context.Context, config types.Config, port int, gpuEnab
taskClient := api.NewTaskServiceClient(taskConn)

// Launch the daemon
configJSON, err := json.Marshal(config)
if err != nil {
return nil, fmt.Errorf("failed to parse cedana config: %v", err)
}
log.Printf("launching cedana daemon with config JSON: %s", configJSON)
daemon := exec.CommandContext(ctx, binPath, "daemon", "start",
// FIXDME: configjson invalid parsing error
fmt.Sprintf("--config='%s'", configJSON),
fmt.Sprintf("--port=%d", port),
fmt.Sprintf("--gpu-enabled=%t", gpuEnabled))
daemon.Stdout = os.Stdout
daemon.Stderr = os.Stderr
daemon.Env = append(daemon.Env, fmt.Sprintf("CEDANA_LOG_LEVEL=%s", logLevel))
// XXX: Set config as env var until config JSON parsing is fixed
daemon.Env = append(os.Environ(), fmt.Sprintf("CEDANA_LOG_LEVEL=%s", logLevel))
daemon.Env = append(daemon.Env, fmt.Sprintf("CEDANA_CLIENT_LEAVE_RUNNING=%t", config.Client.LeaveRunning))
daemon.Env = append(daemon.Env, fmt.Sprintf("CEDANA_DUMP_STORAGE_DIR=%s", config.SharedStorage.DumpStorageDir))
daemon.Env = append(daemon.Env, fmt.Sprintf("CEDANA_URL=%s", config.Connection.CedanaUrl))
daemon.Env = append(daemon.Env, fmt.Sprintf("CEDANA_AUTH_TOKEN=%s", config.Connection.CedanaAuthToken))
err = daemon.Start()
if err != nil {
return nil, fmt.Errorf("failed to start cedana daemon: %v", err)
Expand All @@ -77,23 +76,83 @@ func NewCedanaClient(ctx context.Context, config types.Config, port int, gpuEnab
}

// Wait for the daemon to be ready, and do health check
details, err := client.DetailedHealthCheckWait(context.Background())
if err != nil || (details != nil && len(details.UnhealthyReasons) > 0) {
defer daemon.Process.Kill()
defer taskConn.Close()
if err == nil && len(details.UnhealthyReasons) > 0 {
return nil, fmt.Errorf("cedana health check failed: %+v", details.UnhealthyReasons)
} else {
return nil, fmt.Errorf("cedana health check failed: %v", err)
_, err = client.DetailedHealthCheckWait(ctx)
// if err != nil || len(details.UnhealthyReasons) > 0 {
// defer daemon.Process.Kill()
// defer taskConn.Close()
// if err != nil {
// return nil, fmt.Errorf("cedana health check failed: %v", err)
// }
// if len(details.UnhealthyReasons) > 0 {
// return nil, fmt.Errorf("cedana health failed with reasons: %v", details.UnhealthyReasons)
// }
// }

return client, nil
}

func (c *CedanaClient) Close() {
c.conn.Close()
c.daemon.Process.Kill()
}

// Updates the runc container spec to make the shared library available
// as well as the shared memory that is used for communication
func (c *CedanaClient) prepareContainerSpec(spec *specs.Spec, gpuEnabled bool) error {
if !gpuEnabled {
return nil // no need to do anything
}

// First check if shared library is on worker
if _, err := os.Stat(sharedLibPath); os.IsNotExist(err) {
return fmt.Errorf("%s not found on worker. Was the daemon started with GPU enabled?", sharedLibPath)
}

// Remove nvidia prestart hook as we don't need actual device mounts
spec.Hooks.Prestart = nil

// Add shared memory mount from worker instead, remove existing /dev/shm mount
for i, m := range spec.Mounts {
if m.Destination == "/dev/shm" {
spec.Mounts = append(spec.Mounts[:i], spec.Mounts[i+1:]...)
break
}
}
spec.Mounts = append(spec.Mounts, specs.Mount{
Destination: "/dev/shm",
Source: "/dev/shm",
Type: "bind",
Options: []string{
"rbind",
"rprivate",
"nosuid",
"nodev",
"rw",
},
})

// Add the shared library to the container
spec.Mounts = append(spec.Mounts, specs.Mount{
Destination: sharedLibPath,
Source: sharedLibPath,
Type: "bind",
Options: []string{
"rbind",
"rprivate",
"nosuid",
"nodev",
"rw",
},
})

spec.Process.Env = append(spec.Process.Env, "LD_PRELOAD="+sharedLibPath)

return client, nil
return nil
}

// Start managing a runc container
func (c *CedanaClient) Manage(ctx context.Context, containerId string, gpuEnabled bool) error {
ctx, cancel := context.WithTimeout(ctx, defaultStartDeadline)
ctx, cancel := context.WithTimeout(ctx, defaultManageDeadline)
defer cancel()

args := &api.RuncManageArgs{
Expand All @@ -113,17 +172,22 @@ func (c *CedanaClient) Checkpoint(ctx context.Context, containerId string) error
ctx, cancel := context.WithTimeout(ctx, defaultCheckpointDeadline)
defer cancel()

external := []string{""} // Add any external mounts here

args := api.JobDumpArgs{
Type: api.CRType_LOCAL,
JID: containerId,
CriuOpts: &api.CriuOpts{TcpEstablished: true, LeaveRunning: true},
// Dump dir taken from config
Type: api.CRType_LOCAL,
JID: containerId,
CriuOpts: &api.CriuOpts{
TcpEstablished: true,
LeaveRunning: true,
External: external,
},
}
res, err := c.service.JobDump(ctx, &args)
_ = res.DumpStats
if err != nil {
return err
}
_ = res.DumpStats
return nil
}

Expand All @@ -138,10 +202,10 @@ func (c *CedanaClient) Restore(ctx context.Context, containerId string) error {
CriuOpts: &api.CriuOpts{TcpEstablished: true},
}
res, err := c.service.JobRestore(ctx, args)
_ = res.RestoreStats
if err != nil {
return err
}
_ = res.RestoreStats
return nil
}

Expand All @@ -153,10 +217,10 @@ func (c *CedanaClient) DetailedHealthCheckWait(ctx context.Context) (*api.Detail
opts := []grpc.CallOption{}
opts = append(opts, grpc.WaitForReady(true))

resp, err := c.service.DetailedHealthCheck(ctx, &api.DetailedHealthCheckRequest{}, opts...)
res, err := c.service.DetailedHealthCheck(ctx, &api.DetailedHealthCheckRequest{}, opts...)
if err != nil {
return nil, err
}

return resp, nil
return res, nil
}
41 changes: 20 additions & 21 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (s *Worker) RunContainer(request *types.ContainerRequest) error {
}
log.Printf("<%s> - acquired port: %d\n", containerId, bindPort)

if request.CheckpointEnabled && s.config.Checkpointing.Enabled {
if !s.isBuildRequest(request) && request.CheckpointEnabled && s.config.Checkpointing.Enabled {
port, err := getRandomFreePort()
if err != nil {
log.Printf("<%s> - failed to get random port for cedana, trying default (%d): %v\n", containerId, DefaultCedanaPort, err)
Expand Down Expand Up @@ -457,7 +457,7 @@ func (s *Worker) createCheckpoint(request *types.ContainerRequest) {
log.Printf("<%s> - endpoint not ready for checkpoint: %+v\n", instance.Id, err)
}
if err != nil {
log.Printf("<%s> - cedana health check failed: %+v\n", request.ContainerId, err)
log.Printf("<%s> - endpoint health check failed: %+v\n", request.ContainerId, err)
}
}
} else {
Expand Down Expand Up @@ -704,21 +704,21 @@ func (s *Worker) spawn(request *types.ContainerRequest, spec *specs.Spec, output
defer containerInstance.Overlay.Cleanup()
spec.Root.Path = containerInstance.Overlay.TopLayerPath()

// Setup container network namespace / devices
err = s.containerNetworkManager.Setup(containerId, spec)
if err != nil {
log.Printf("<%s> failed to setup container network: %v", containerId, err)
containerErr = err
return
}

// Expose the bind port
err = s.containerNetworkManager.ExposePort(containerId, opts.BindPort, opts.BindPort)
if err != nil {
log.Printf("<%s> failed to expose container bind port: %v", containerId, err)
containerErr = err
return
}
// // Setup container network namespace / devices
// err = s.containerNetworkManager.Setup(containerId, spec)
// if err != nil {
// log.Printf("<%s> failed to setup container network: %v", containerId, err)
// containerErr = err
// return
// }

// // Expose the bind port
// err = s.containerNetworkManager.ExposePort(containerId, opts.BindPort, opts.BindPort)
// if err != nil {
// log.Printf("<%s> failed to expose container bind port: %v", containerId, err)
// containerErr = err
// return
// }

// Write runc config spec to disk
configContents, err := json.MarshalIndent(spec, "", " ")
Expand Down Expand Up @@ -873,10 +873,9 @@ func (s *Worker) specFromRequest(request *types.ContainerRequest, options *Conta
spec.Hooks.Prestart = nil
}

// We need to modify the spec to support Cedana C/R, only for GPU containers
if s.cedanaClient != nil && request.Gpu != "" {
// TODO: add cedana GPU stuff
// spec.Hooks.Prestart = nil
// We need to modify the spec to support Cedana C/R
if s.cedanaClient != nil {
s.cedanaClient.prepareContainerSpec(spec, request.Gpu != "")
}

spec.Process.Env = append(spec.Process.Env, env...)
Expand Down

0 comments on commit 9d3e2c3

Please sign in to comment.