diff --git a/pkg/types/scheduler.go b/pkg/types/scheduler.go index a070b5bf0..ba460dff2 100644 --- a/pkg/types/scheduler.go +++ b/pkg/types/scheduler.go @@ -80,6 +80,7 @@ type ContainerRequest struct { Mounts []Mount `json:"mounts"` RetryCount int `json:"retry_count"` PoolSelector string `json:"pool_selector"` + CheckpointEnabled bool `json:"checkpoint_enabled"` } const ContainerExitCodeTtlS int = 300 diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index e48289714..f18bc0563 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "log" + "net/http" "os" "path/filepath" "strconv" @@ -163,14 +164,6 @@ func NewWorker() (*Worker, error) { return nil, err } - var cedanaClient *CedanaClient = nil - if config.Cedana.Enabled { - cedanaClient, err = NewCedanaClient(config.Cedana.HostName, "") - if err != nil { - log.Printf("Unable to create Cedana client, checkpoint/restore unavailable: %+v\n", err) - } - } - return &Worker{ ctx: ctx, cancel: cancel, @@ -186,7 +179,6 @@ func NewWorker() (*Worker, error) { redisClient: redisClient, podAddr: podAddr, imageClient: imageClient, - cedanaClient: cedanaClient, podHostName: podHostName, eventBus: nil, workerId: workerId, @@ -393,46 +385,43 @@ func (s *Worker) createCheckpoint(request *types.ContainerRequest) { return } - _ = fmt.Sprintf("0.0.0.0:%d/health", instance.Port) // TODO: we need a reliable way to detect that the container is completely booted - /* - We can't just use health checks because one worker could be up while the others - are still booting. After we have a reliable way of doing that, - we should probably be polling whatever method that is here, and also checking for the - containers existence. Something like this: - - elapsed := 0 - start := time.Now() - containerReady := false - timeout := time.Duration(time.Second * 120) - for := range time.Ticker(time.Second) { - // 1. check if container exists - instance, exists := s.containerInstances.Get(request.ContainerId) - if !exists { - return - } - - // 2. check if container is ready - if weAreGood { - containerReady := true - break - } - - if time.Since(start) > timeout { - break - } + start := time.Now() + timeout := time.Duration(time.Second * 120) + for time.Since(start) < timeout { + if s.cedanaClient == nil { + cedanaClient, err := NewCedanaClient(context.TODO()) + if err != nil { + log.Printf("<%s> - failed to create cedana client: %v\n", request.ContainerId, err) } - - if !containerReady { + s.cedanaClient = cedanaClient + } else { + instance, exists = s.containerInstances.Get(request.ContainerId) + if !exists { return } - */ - err := s.cedanaClient.Checkpoint(request.ContainerId) + // Endpoint already configured to ensure /health is successful only if all + resp, err := http.Get(fmt.Sprintf("0.0.0.0:%d/health", instance.Port)) + if err == nil && resp.StatusCode == 200 { + break + } + } + + time.Sleep(time.Second) + } + + err := s.cedanaClient.Checkpoint(context.TODO(), request.ContainerId) if err != nil { log.Printf("<%s> - cedana checkpoint failed: %+v\n", request.ContainerId, err) } + + // Notify that the checkpoint was done + resp, err := http.Get(fmt.Sprintf("0.0.0.0:%d/checkpoint_done", instance.Port)) + if err != nil || resp.StatusCode != 200 { + log.Printf("<%s> - failed to notify container of checkpoint: %v\n", request.ContainerId, err) + } } // Invoke a runc container using a predefined config spec @@ -441,9 +430,7 @@ func (s *Worker) SpawnAsync(request *types.ContainerRequest, bundlePath string, go s.containerWg.Add(1) - // TODO: also need to check the stub config here for experimental cedana flag - // which we may want to attach to the container request for ease of access - if s.config.Cedana.Enabled && s.cedanaClient != nil { + if request.CheckpointEnabled { go s.createCheckpoint(request) } @@ -706,6 +693,7 @@ func (s *Worker) getContainerEnvironment(request *types.ContainerRequest, option fmt.Sprintf("CONTAINER_ID=%s", request.ContainerId), fmt.Sprintf("BETA9_GATEWAY_HOST=%s", os.Getenv("BETA9_GATEWAY_HOST")), fmt.Sprintf("BETA9_GATEWAY_PORT=%s", os.Getenv("BETA9_GATEWAY_PORT")), + fmt.Sprintf("CHECKPOINT_ENABLED=%t", request.CheckpointEnabled), "PYTHONUNBUFFERED=1", } @@ -779,6 +767,21 @@ func (s *Worker) specFromRequest(request *types.ContainerRequest, options *Conta spec.Hooks.Prestart = nil } + if request.CheckpointEnabled { + // XXX: Hook to spawn daemon inside the container. In later cedana versions, it should be possible + // to spawn and manage from the worker instead. + err = AddCedanaDaemonHook(request, &spec.Hooks.StartContainer, &s.config.Checkpointing.Cedana) + if err != nil { + log.Printf("failed to add cedana hook, checkpoint/restore unavailable: %v", err) + } + + // XXX: Modify the entrypoint to start process using cedana. Won't be needed once daemon is started in + // worker as cedana will be able to checkpoint/restore the container directly. + originalArgsString := strings.Join(spec.Process.Args, " ") + // TODO: Use '-it' flag to keep STDIN open and attach pseudo-TTY + spec.Process.Args = []string{CedanaPath, "exec", originalArgsString, "-w", defaultContainerDirectory} + } + spec.Process.Env = append(spec.Process.Env, env...) spec.Root.Readonly = false diff --git a/sdk/src/beta9/runner/endpoint.py b/sdk/src/beta9/runner/endpoint.py index da5adf884..b4c63f138 100644 --- a/sdk/src/beta9/runner/endpoint.py +++ b/sdk/src/beta9/runner/endpoint.py @@ -175,9 +175,11 @@ def __init__(self, logger: logging.Logger, worker: UvicornWorker) -> None: # to become ready first, while others wait on the lock. The client is expected to # call /checkpoint_done endpoint to signal that the checkpoint was done. # Once this worker handles a checkpoint_done request, it releases the lock and - # all workers proceed. The checkpoint barrier ensures that the first worker does not escape. + # all workers proceed. The checkpoint barrier at the end again ensures that the + # first worker does not escape. if cfg.checkpoint_enabled: + checkpointBarrier.wait() readyLock.acquire() @self.app.get("/health")