Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
luke-lombardi committed Oct 25, 2024
1 parent 9111810 commit 9f8dd59
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 26 deletions.
7 changes: 4 additions & 3 deletions pkg/worker/cedana.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (c *CedanaClient) Checkpoint(ctx context.Context, containerId string) error
}

// Restore a runc container
func (c *CedanaClient) Restore(ctx context.Context, containerId string) error {
func (c *CedanaClient) Restore(ctx context.Context, containerId string) (*api.ProcessState, error) {
ctx, cancel := context.WithTimeout(ctx, defaultCheckpointDeadline)
defer cancel()

Expand All @@ -225,10 +225,11 @@ func (c *CedanaClient) Restore(ctx context.Context, containerId string) error {
}
res, err := c.service.JobRestore(ctx, args)
if err != nil {
return err
return nil, err
}

_ = res.RestoreStats
return nil
return res.State, nil
}

// Perform a detailed health check of cedana C/R capabilities
Expand Down
44 changes: 21 additions & 23 deletions pkg/worker/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,15 +493,17 @@ func (s *Worker) spawn(request *types.ContainerRequest, spec *specs.Spec, output
pidChan := make(chan int, 1)
go s.collectAndSendContainerMetrics(ctx, request, spec, pidChan)

status, createCheckpoint := s.shouldCreateCheckpoint(request)
// If checkpointing is enabled, attempt to create a checkpoint
state, createCheckpoint := s.shouldCreateCheckpoint(request)
if s.checkpointingAvailable && createCheckpoint {
go s.createCheckpoint(ctx, request)
}
} else if s.checkpointingAvailable && state.Status == types.CheckpointStatusAvailable {
processState, err := s.cedanaClient.Restore(ctx, state.ContainerId)
if err != nil {
log.Printf("<%s> - failed to restore checkpoint: %v\n", request.ContainerId, err)
}

// TODO: actually
if status == types.CheckpointStatusAvailable {
// s.cedanaClient.Restore(ctx, request.ContainerId)
log.Printf("<%s> - checkpoint found! not creating a new one\n", request.ContainerId)
log.Printf("<%s> - checkpoint found and restored, process state: %+v\n", request.ContainerId, processState)
}

// Invoke runc process (launch the container)
Expand Down Expand Up @@ -542,9 +544,7 @@ func (s *Worker) isBuildRequest(request *types.ContainerRequest) bool {

// Waits for the endpoint to be ready to checkpoint at the desired point in execution, ie.
// after all endpoint workers have reached a checkpointable state
func (s *Worker) createCheckpoint(ctx context.Context, request *types.ContainerRequest) {
log.Printf("<%s> - waiting for container to be ready for checkpoint\n", request.ContainerId)

func (s *Worker) createCheckpoint(ctx context.Context, request *types.ContainerRequest) error {
timeout := defaultCheckpointDeadline
managing := false
gpuEnabled := request.Gpu != ""
Expand All @@ -559,8 +559,7 @@ waitForReady:
for {
select {
case <-ctx.Done():
log.Printf("<%s> - exited createCheckpoint", request.ContainerId)
return
return fmt.Errorf("checkpoint deadline exceeded or container exited")
case <-ticker.C:
instance, exists := s.containerInstances.Get(request.ContainerId)
if !exists {
Expand Down Expand Up @@ -594,40 +593,39 @@ waitForReady:
err := s.cedanaClient.Checkpoint(ctx, request.ContainerId)
if err != nil {
log.Printf("<%s> - cedana checkpoint failed: %+v\n", request.ContainerId, err)
return
return err
}

s.containerRepo.UpdateCheckpointState(request.Workspace.Name, request.StubId, &types.CheckpointState{
log.Printf("<%s> - checkpoint created successfully\n", request.ContainerId)
return s.containerRepo.UpdateCheckpointState(request.Workspace.Name, request.StubId, &types.CheckpointState{
Status: types.CheckpointStatusAvailable,
ContainerId: request.ContainerId, // We store this just as a reference to which container we initially checkpointed
ContainerId: request.ContainerId, // We store this as a reference to the container that we initially checkpointed
StubId: request.StubId,
})

log.Printf("<%s> - checkpoint created successfully\n", request.ContainerId)
}

// shouldCreateCheckpoint checks if a checkpoint should be created for a given container
func (s *Worker) shouldCreateCheckpoint(request *types.ContainerRequest) (types.CheckpointStatus, bool) {
func (s *Worker) shouldCreateCheckpoint(request *types.ContainerRequest) (*types.CheckpointState, bool) {
if !s.checkpointingAvailable || !request.CheckpointEnabled {
return "", false
return nil, false
}

state, err := s.containerRepo.GetCheckpointState(request.Workspace.Name, request.StubId)
if err != nil {
// Checkpoint is enabled, but no checkpoint state found, attempt a checkpoint
if _, ok := err.(*types.ErrCheckpointNotFound); ok {
return types.CheckpointStatusNotFound, true
return &types.CheckpointState{Status: types.CheckpointStatusNotFound}, true
}

return "", false
return nil, false
}

if state.Status == types.CheckpointStatusAvailable {
return types.CheckpointStatusAvailable, false
return state, false
} else if state.Status == types.CheckpointStatusFailed {
return types.CheckpointStatusFailed, false
return state, false
}

// TODO: figure out this case
return types.CheckpointStatusNotFound, false
return state, false
}

0 comments on commit 9f8dd59

Please sign in to comment.