Skip to content

Commit

Permalink
clean up should checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
luke-lombardi committed Oct 25, 2024
1 parent 0525c70 commit 95d3e21
Showing 1 changed file with 21 additions and 25 deletions.
46 changes: 21 additions & 25 deletions pkg/worker/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (s *Worker) getContainerEnvironment(request *types.ContainerRequest, option
fmt.Sprintf("CONTAINER_ID=%s", request.ContainerId),
fmt.Sprintf("BETA9_GATEWAY_HOST=%s", os.Getenv("BETA9_GATEWAY_HOST")),
fmt.Sprintf("BETA9_GATEWAY_PORT=%s", os.Getenv("BETA9_GATEWAY_PORT")),
fmt.Sprintf("CHECKPOINT_ENABLED=%t", request.CheckpointEnabled),
fmt.Sprintf("CHECKPOINT_ENABLED=%t", request.CheckpointEnabled && s.checkpointingAvailable),
"PYTHONUNBUFFERED=1",
}

Expand Down Expand Up @@ -494,17 +494,20 @@ func (s *Worker) spawn(request *types.ContainerRequest, spec *specs.Spec, output
pidChan := make(chan int, 1)
go s.collectAndSendContainerMetrics(ctx, request, spec, pidChan)

// If checkpointing is enabled, attempt to create a checkpoint
state, createCheckpoint := s.shouldCreateCheckpoint(request)
if s.checkpointingAvailable && createCheckpoint {
go s.createCheckpoint(ctx, request)
} else if s.checkpointingAvailable && state.Status == types.CheckpointStatusAvailable {
processState, err := s.cedanaClient.Restore(ctx, state.ContainerId)
if err != nil {
log.Printf("<%s> - failed to restore checkpoint: %v\n", request.ContainerId, err)
}
if request.CheckpointEnabled && s.checkpointingAvailable {
state, createCheckpoint := s.shouldCreateCheckpoint(request)

// If checkpointing is enabled, attempt to create a checkpoint
if s.checkpointingAvailable && createCheckpoint {
go s.createCheckpoint(ctx, request)
} else if state.Status == types.CheckpointStatusAvailable {
processState, err := s.cedanaClient.Restore(ctx, state.ContainerId)
if err != nil {
log.Printf("<%s> - failed to restore checkpoint: %v\n", request.ContainerId, err)
}

log.Printf("<%s> - checkpoint found and restored, process state: %+v\n", request.ContainerId, processState)
log.Printf("<%s> - checkpoint found and restored, process state: %+v\n", request.ContainerId, processState)
}
}

// Invoke runc process (launch the container)
Expand Down Expand Up @@ -606,27 +609,20 @@ waitForReady:
}

// shouldCreateCheckpoint checks if a checkpoint should be created for a given container
func (s *Worker) shouldCreateCheckpoint(request *types.ContainerRequest) (*types.CheckpointState, bool) {
func (s *Worker) shouldCreateCheckpoint(request *types.ContainerRequest) (types.CheckpointState, bool) {
if !s.checkpointingAvailable || !request.CheckpointEnabled {
return nil, false
return types.CheckpointState{}, false
}

state, err := s.containerRepo.GetCheckpointState(request.Workspace.Name, request.StubId)
if err != nil {
// Checkpoint is enabled, but no checkpoint state found, attempt a checkpoint
if _, ok := err.(*types.ErrCheckpointNotFound); ok {
return &types.CheckpointState{Status: types.CheckpointStatusNotFound}, true
if _, ok := err.(*types.ErrCheckpointNotFound); !ok {
return types.CheckpointState{}, false
}

return nil, false
}

if state.Status == types.CheckpointStatusAvailable {
return state, false
} else if state.Status == types.CheckpointStatusFailed {
return state, false
// If checkpoint not found, we can proceed to create one
return types.CheckpointState{Status: types.CheckpointStatusNotFound}, true
}

// TODO: figure out this case
return state, false
return *state, false
}

0 comments on commit 95d3e21

Please sign in to comment.