From 8600cf63937ee24e2e5b1a1359150cdbcc856863 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Fri, 25 Oct 2024 10:57:07 -0400 Subject: [PATCH] wip --- pkg/scheduler/pool_local.go | 2 +- pkg/types/scheduler.go | 1 + pkg/worker/cedana.go | 6 ++++-- pkg/worker/lifecycle.go | 20 +++++++++++++++++--- pkg/worker/util.go | 8 -------- pkg/worker/worker.go | 3 +-- sdk/src/beta9/runner/common.py | 1 + 7 files changed, 25 insertions(+), 16 deletions(-) diff --git a/pkg/scheduler/pool_local.go b/pkg/scheduler/pool_local.go index 9c973275c..db339ba76 100644 --- a/pkg/scheduler/pool_local.go +++ b/pkg/scheduler/pool_local.go @@ -144,7 +144,7 @@ func (wpc *LocalKubernetesWorkerPoolController) createWorkerJob(workerId string, workerImage := fmt.Sprintf("%s/%s:%s", wpc.config.Worker.ImageRegistry, wpc.config.Worker.ImageName, - "cedana-w16", //wpc.config.Worker.ImageTag, + "cedana-w17", //wpc.config.Worker.ImageTag, ) resources := corev1.ResourceRequirements{} diff --git a/pkg/types/scheduler.go b/pkg/types/scheduler.go index 71d70d477..5445a34ba 100644 --- a/pkg/types/scheduler.go +++ b/pkg/types/scheduler.go @@ -179,6 +179,7 @@ type CheckpointState struct { StubId string `redis:"stub_id" json:"stub_id"` ContainerId string `redis:"container_id" json:"container_id"` Status CheckpointStatus `redis:"status" json:"status"` + RemoteKey string `redis:"remote_key" json:"remote_key"` } type ErrCheckpointNotFound struct { diff --git a/pkg/worker/cedana.go b/pkg/worker/cedana.go index c33a7f90a..d3fbd93dd 100644 --- a/pkg/worker/cedana.go +++ b/pkg/worker/cedana.go @@ -24,9 +24,9 @@ const ( logLevel = "debug" checkpointPathBase = "/tmp/checkpoints" defaultManageDeadline = 10 * time.Second - defaultCheckpointDeadline = 2 * time.Minute + defaultCheckpointDeadline = 10 * time.Minute defaultRestoreDeadline = 5 * time.Minute - defaultHealthCheckDeadline = 30 * time.Second + defaultHealthCheckDeadline = 5 * time.Second ) type CedanaClient struct { @@ -124,6 +124,8 @@ func (c *CedanaClient) prepareContainerSpec(spec *specs.Spec, gpuEnabled bool) e // Remove nvidia prestart hook as we don't need actual device mounts spec.Hooks.Prestart = nil + // TODO: will this causes issues on multi-gpu nodes...? + // Add shared memory mount from worker instead, remove existing /dev/shm mount for i, m := range spec.Mounts { if m.Destination == "/dev/shm" { diff --git a/pkg/worker/lifecycle.go b/pkg/worker/lifecycle.go index f4b765ef8..3367d14ac 100644 --- a/pkg/worker/lifecycle.go +++ b/pkg/worker/lifecycle.go @@ -494,11 +494,12 @@ func (s *Worker) spawn(request *types.ContainerRequest, spec *specs.Spec, output pidChan := make(chan int, 1) go s.collectAndSendContainerMetrics(ctx, request, spec, pidChan) + // Handle checkpoint creation & restore if applicable if request.CheckpointEnabled && s.checkpointingAvailable { state, createCheckpoint := s.shouldCreateCheckpoint(request) // If checkpointing is enabled, attempt to create a checkpoint - if s.checkpointingAvailable && createCheckpoint { + if createCheckpoint { go s.createCheckpoint(ctx, request) } else if state.Status == types.CheckpointStatusAvailable { processState, err := s.cedanaClient.Restore(ctx, state.ContainerId) @@ -506,6 +507,9 @@ func (s *Worker) spawn(request *types.ContainerRequest, spec *specs.Spec, output log.Printf("<%s> - failed to restore checkpoint: %v\n", request.ContainerId, err) } + // TODO: if restore fails, update checkpoint state to restore_failed + + // pidChan <- int(processState.PID) log.Printf("<%s> - checkpoint found and restored, process state: %+v\n", request.ContainerId, processState) } } @@ -592,6 +596,7 @@ waitForReady: } else { log.Printf("<%s> - endpoint not ready for checkpoint.\n", instance.Id) } + } } @@ -599,28 +604,37 @@ waitForReady: err := s.cedanaClient.Checkpoint(ctx, request.ContainerId) if err != nil { log.Printf("<%s> - cedana checkpoint failed: %+v\n", request.ContainerId, err) + + s.containerRepo.UpdateCheckpointState(request.Workspace.Name, request.StubId, &types.CheckpointState{ + Status: types.CheckpointStatusCheckpointFailed, + ContainerId: request.ContainerId, + StubId: request.StubId, + }) return err } // Move compressed checkpoint file to long-term storage directory archiveName := fmt.Sprintf("%s.tar", request.ContainerId) - err = moveFile(filepath.Join(checkpointPathBase, archiveName), filepath.Join(s.config.Worker.Checkpointing.Storage.MountPath, request.Workspace.Name, archiveName)) + remoteKey := fmt.Sprintf("%s/%s", request.Workspace.Name, archiveName) + err = copyFile(filepath.Join(checkpointPathBase, archiveName), filepath.Join(s.config.Worker.Checkpointing.Storage.MountPath, remoteKey)) if err != nil { log.Printf("<%s> - failed to copy checkpoint to storage: %v\n", request.ContainerId, err) return err } - // TODO: Delete checkpoint files from local disk + // TODO: Delete checkpoint files from local disk in /tmp log.Printf("<%s> - checkpoint created successfully\n", request.ContainerId) return s.containerRepo.UpdateCheckpointState(request.Workspace.Name, request.StubId, &types.CheckpointState{ Status: types.CheckpointStatusAvailable, ContainerId: request.ContainerId, // We store this as a reference to the container that we initially checkpointed StubId: request.StubId, + RemoteKey: remoteKey, }) } // shouldCreateCheckpoint checks if a checkpoint should be created for a given container +// NOTE: this currently only works for deployments since functions can run multiple containers func (s *Worker) shouldCreateCheckpoint(request *types.ContainerRequest) (types.CheckpointState, bool) { if !s.checkpointingAvailable || !request.CheckpointEnabled { return types.CheckpointState{}, false diff --git a/pkg/worker/util.go b/pkg/worker/util.go index 38d39cba5..59c556b33 100644 --- a/pkg/worker/util.go +++ b/pkg/worker/util.go @@ -46,14 +46,6 @@ func copyFile(src, dst string) error { return os.WriteFile(dst, input, 0644) } -func moveFile(src, dst string) error { - err := os.Rename(src, dst) - if err != nil { - return err - } - return nil -} - type FileLock struct { file *os.File path string diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 302f6c611..6795bf000 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -163,8 +163,7 @@ func NewWorker() (*Worker, error) { os.MkdirAll(config.Worker.Checkpointing.Storage.MountPath, os.ModePerm) - log.Printf("Checkpoint storage mode: %s\n", config.Worker.Checkpointing.Storage.Mode) - + // If storage mode is S3, mount the checkpoint storage as a FUSE filesystem if config.Worker.Checkpointing.Storage.Mode == string(types.CheckpointStorageModeS3) { checkpointStorage, _ := storage.NewMountPointStorage(types.MountPointConfig{ S3Bucket: config.Worker.Checkpointing.Storage.ObjectStore.BucketName, diff --git a/sdk/src/beta9/runner/common.py b/sdk/src/beta9/runner/common.py index df333b187..91e507382 100644 --- a/sdk/src/beta9/runner/common.py +++ b/sdk/src/beta9/runner/common.py @@ -353,6 +353,7 @@ def wait_for_checkpoint(): Path(CHECKPOINT_SIGNAL_FILE).touch(exist_ok=True) return + # TODO: add timeout while True: with workers_ready.get_lock(): if workers_ready.value == config.workers: