Skip to content

Commit

Permalink
Create checkpoint in worker
Browse files Browse the repository at this point in the history
  • Loading branch information
yashanand1910 committed Aug 6, 2024
1 parent d703de4 commit 18f4051
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 44 deletions.
1 change: 1 addition & 0 deletions pkg/types/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type ContainerRequest struct {
Mounts []Mount `json:"mounts"`
RetryCount int `json:"retry_count"`
PoolSelector string `json:"pool_selector"`
CheckpointEnabled bool `json:"checkpoint_enabled"`
}

const ContainerExitCodeTtlS int = 300
Expand Down
89 changes: 46 additions & 43 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"log"
"net/http"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -163,14 +164,6 @@ func NewWorker() (*Worker, error) {
return nil, err
}

var cedanaClient *CedanaClient = nil
if config.Cedana.Enabled {
cedanaClient, err = NewCedanaClient(config.Cedana.HostName, "")
if err != nil {
log.Printf("Unable to create Cedana client, checkpoint/restore unavailable: %+v\n", err)
}
}

return &Worker{
ctx: ctx,
cancel: cancel,
Expand All @@ -186,7 +179,6 @@ func NewWorker() (*Worker, error) {
redisClient: redisClient,
podAddr: podAddr,
imageClient: imageClient,
cedanaClient: cedanaClient,
podHostName: podHostName,
eventBus: nil,
workerId: workerId,
Expand Down Expand Up @@ -393,46 +385,43 @@ func (s *Worker) createCheckpoint(request *types.ContainerRequest) {
return
}

_ = fmt.Sprintf("0.0.0.0:%d/health", instance.Port)
// TODO: we need a reliable way to detect that the container is completely booted

/*
We can't just use health checks because one worker could be up while the others
are still booting. After we have a reliable way of doing that,
we should probably be polling whatever method that is here, and also checking for the
containers existence. Something like this:
elapsed := 0
start := time.Now()
containerReady := false
timeout := time.Duration(time.Second * 120)
for := range time.Ticker(time.Second) {
// 1. check if container exists
instance, exists := s.containerInstances.Get(request.ContainerId)
if !exists {
return
}
// 2. check if container is ready
if weAreGood {
containerReady := true
break
}
if time.Since(start) > timeout {
break
}
start := time.Now()
timeout := time.Duration(time.Second * 120)
for time.Since(start) < timeout {
if s.cedanaClient == nil {
cedanaClient, err := NewCedanaClient(context.TODO())
if err != nil {
log.Printf("<%s> - failed to create cedana client: %v\n", request.ContainerId, err)
}
if !containerReady {
s.cedanaClient = cedanaClient
} else {
instance, exists = s.containerInstances.Get(request.ContainerId)
if !exists {
return
}
*/

err := s.cedanaClient.Checkpoint(request.ContainerId)
// Endpoint already configured to ensure /health is successful only if all
resp, err := http.Get(fmt.Sprintf("0.0.0.0:%d/health", instance.Port))
if err == nil && resp.StatusCode == 200 {
break
}
}

time.Sleep(time.Second)
}

err := s.cedanaClient.Checkpoint(context.TODO(), request.ContainerId)
if err != nil {
log.Printf("<%s> - cedana checkpoint failed: %+v\n", request.ContainerId, err)
}

// Notify that the checkpoint was done
resp, err := http.Get(fmt.Sprintf("0.0.0.0:%d/checkpoint_done", instance.Port))
if err != nil || resp.StatusCode != 200 {
log.Printf("<%s> - failed to notify container of checkpoint: %v\n", request.ContainerId, err)
}
}

// Invoke a runc container using a predefined config spec
Expand All @@ -441,9 +430,7 @@ func (s *Worker) SpawnAsync(request *types.ContainerRequest, bundlePath string,

go s.containerWg.Add(1)

// TODO: also need to check the stub config here for experimental cedana flag
// which we may want to attach to the container request for ease of access
if s.config.Cedana.Enabled && s.cedanaClient != nil {
if request.CheckpointEnabled {
go s.createCheckpoint(request)
}

Expand Down Expand Up @@ -706,6 +693,7 @@ func (s *Worker) getContainerEnvironment(request *types.ContainerRequest, option
fmt.Sprintf("CONTAINER_ID=%s", request.ContainerId),
fmt.Sprintf("BETA9_GATEWAY_HOST=%s", os.Getenv("BETA9_GATEWAY_HOST")),
fmt.Sprintf("BETA9_GATEWAY_PORT=%s", os.Getenv("BETA9_GATEWAY_PORT")),
fmt.Sprintf("CHECKPOINT_ENABLED=%t", request.CheckpointEnabled),
"PYTHONUNBUFFERED=1",
}

Expand Down Expand Up @@ -779,6 +767,21 @@ func (s *Worker) specFromRequest(request *types.ContainerRequest, options *Conta
spec.Hooks.Prestart = nil
}

if request.CheckpointEnabled {
// XXX: Hook to spawn daemon inside the container. In later cedana versions, it should be possible
// to spawn and manage from the worker instead.
err = AddCedanaDaemonHook(request, &spec.Hooks.StartContainer, &s.config.Checkpointing.Cedana)
if err != nil {
log.Printf("failed to add cedana hook, checkpoint/restore unavailable: %v", err)
}

// XXX: Modify the entrypoint to start process using cedana. Won't be needed once daemon is started in
// worker as cedana will be able to checkpoint/restore the container directly.
originalArgsString := strings.Join(spec.Process.Args, " ")
// TODO: Use '-it' flag to keep STDIN open and attach pseudo-TTY
spec.Process.Args = []string{CedanaPath, "exec", originalArgsString, "-w", defaultContainerDirectory}
}

spec.Process.Env = append(spec.Process.Env, env...)
spec.Root.Readonly = false

Expand Down
4 changes: 3 additions & 1 deletion sdk/src/beta9/runner/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,11 @@ def __init__(self, logger: logging.Logger, worker: UvicornWorker) -> None:
# to become ready first, while others wait on the lock. The client is expected to
# call /checkpoint_done endpoint to signal that the checkpoint was done.
# Once this worker handles a checkpoint_done request, it releases the lock and
# all workers proceed. The checkpoint barrier ensures that the first worker does not escape.
# all workers proceed. The checkpoint barrier at the end again ensures that the
# first worker does not escape.

if cfg.checkpoint_enabled:
checkpointBarrier.wait()
readyLock.acquire()

@self.app.get("/health")
Expand Down

0 comments on commit 18f4051

Please sign in to comment.