Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
luke-lombardi committed Oct 25, 2024
1 parent abc892c commit 8600cf6
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/pool_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (wpc *LocalKubernetesWorkerPoolController) createWorkerJob(workerId string,
workerImage := fmt.Sprintf("%s/%s:%s",
wpc.config.Worker.ImageRegistry,
wpc.config.Worker.ImageName,
"cedana-w16", //wpc.config.Worker.ImageTag,
"cedana-w17", //wpc.config.Worker.ImageTag,
)

resources := corev1.ResourceRequirements{}
Expand Down
1 change: 1 addition & 0 deletions pkg/types/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ type CheckpointState struct {
StubId string `redis:"stub_id" json:"stub_id"`
ContainerId string `redis:"container_id" json:"container_id"`
Status CheckpointStatus `redis:"status" json:"status"`
RemoteKey string `redis:"remote_key" json:"remote_key"`
}

type ErrCheckpointNotFound struct {
Expand Down
6 changes: 4 additions & 2 deletions pkg/worker/cedana.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ const (
logLevel = "debug"
checkpointPathBase = "/tmp/checkpoints"
defaultManageDeadline = 10 * time.Second
defaultCheckpointDeadline = 2 * time.Minute
defaultCheckpointDeadline = 10 * time.Minute
defaultRestoreDeadline = 5 * time.Minute
defaultHealthCheckDeadline = 30 * time.Second
defaultHealthCheckDeadline = 5 * time.Second
)

type CedanaClient struct {
Expand Down Expand Up @@ -124,6 +124,8 @@ func (c *CedanaClient) prepareContainerSpec(spec *specs.Spec, gpuEnabled bool) e
// Remove nvidia prestart hook as we don't need actual device mounts
spec.Hooks.Prestart = nil

// TODO: will this causes issues on multi-gpu nodes...?

// Add shared memory mount from worker instead, remove existing /dev/shm mount
for i, m := range spec.Mounts {
if m.Destination == "/dev/shm" {
Expand Down
20 changes: 17 additions & 3 deletions pkg/worker/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,18 +494,22 @@ func (s *Worker) spawn(request *types.ContainerRequest, spec *specs.Spec, output
pidChan := make(chan int, 1)
go s.collectAndSendContainerMetrics(ctx, request, spec, pidChan)

// Handle checkpoint creation & restore if applicable
if request.CheckpointEnabled && s.checkpointingAvailable {
state, createCheckpoint := s.shouldCreateCheckpoint(request)

// If checkpointing is enabled, attempt to create a checkpoint
if s.checkpointingAvailable && createCheckpoint {
if createCheckpoint {
go s.createCheckpoint(ctx, request)
} else if state.Status == types.CheckpointStatusAvailable {
processState, err := s.cedanaClient.Restore(ctx, state.ContainerId)
if err != nil {
log.Printf("<%s> - failed to restore checkpoint: %v\n", request.ContainerId, err)
}

// TODO: if restore fails, update checkpoint state to restore_failed

// pidChan <- int(processState.PID)
log.Printf("<%s> - checkpoint found and restored, process state: %+v\n", request.ContainerId, processState)
}
}
Expand Down Expand Up @@ -592,35 +596,45 @@ waitForReady:
} else {
log.Printf("<%s> - endpoint not ready for checkpoint.\n", instance.Id)
}

}
}

// Proceed to create the checkpoint
err := s.cedanaClient.Checkpoint(ctx, request.ContainerId)
if err != nil {
log.Printf("<%s> - cedana checkpoint failed: %+v\n", request.ContainerId, err)

s.containerRepo.UpdateCheckpointState(request.Workspace.Name, request.StubId, &types.CheckpointState{
Status: types.CheckpointStatusCheckpointFailed,
ContainerId: request.ContainerId,
StubId: request.StubId,
})
return err
}

// Move compressed checkpoint file to long-term storage directory
archiveName := fmt.Sprintf("%s.tar", request.ContainerId)
err = moveFile(filepath.Join(checkpointPathBase, archiveName), filepath.Join(s.config.Worker.Checkpointing.Storage.MountPath, request.Workspace.Name, archiveName))
remoteKey := fmt.Sprintf("%s/%s", request.Workspace.Name, archiveName)
err = copyFile(filepath.Join(checkpointPathBase, archiveName), filepath.Join(s.config.Worker.Checkpointing.Storage.MountPath, remoteKey))
if err != nil {
log.Printf("<%s> - failed to copy checkpoint to storage: %v\n", request.ContainerId, err)
return err
}

// TODO: Delete checkpoint files from local disk
// TODO: Delete checkpoint files from local disk in /tmp

log.Printf("<%s> - checkpoint created successfully\n", request.ContainerId)
return s.containerRepo.UpdateCheckpointState(request.Workspace.Name, request.StubId, &types.CheckpointState{
Status: types.CheckpointStatusAvailable,
ContainerId: request.ContainerId, // We store this as a reference to the container that we initially checkpointed
StubId: request.StubId,
RemoteKey: remoteKey,
})
}

// shouldCreateCheckpoint checks if a checkpoint should be created for a given container
// NOTE: this currently only works for deployments since functions can run multiple containers
func (s *Worker) shouldCreateCheckpoint(request *types.ContainerRequest) (types.CheckpointState, bool) {
if !s.checkpointingAvailable || !request.CheckpointEnabled {
return types.CheckpointState{}, false
Expand Down
8 changes: 0 additions & 8 deletions pkg/worker/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,6 @@ func copyFile(src, dst string) error {
return os.WriteFile(dst, input, 0644)
}

func moveFile(src, dst string) error {
err := os.Rename(src, dst)
if err != nil {
return err
}
return nil
}

type FileLock struct {
file *os.File
path string
Expand Down
3 changes: 1 addition & 2 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,7 @@ func NewWorker() (*Worker, error) {

os.MkdirAll(config.Worker.Checkpointing.Storage.MountPath, os.ModePerm)

log.Printf("Checkpoint storage mode: %s\n", config.Worker.Checkpointing.Storage.Mode)

// If storage mode is S3, mount the checkpoint storage as a FUSE filesystem
if config.Worker.Checkpointing.Storage.Mode == string(types.CheckpointStorageModeS3) {
checkpointStorage, _ := storage.NewMountPointStorage(types.MountPointConfig{
S3Bucket: config.Worker.Checkpointing.Storage.ObjectStore.BucketName,
Expand Down
1 change: 1 addition & 0 deletions sdk/src/beta9/runner/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ def wait_for_checkpoint():
Path(CHECKPOINT_SIGNAL_FILE).touch(exist_ok=True)
return

# TODO: add timeout
while True:
with workers_ready.get_lock():
if workers_ready.value == config.workers:
Expand Down

0 comments on commit 8600cf6

Please sign in to comment.