diff --git a/pkg/abstractions/common/logs.go b/pkg/abstractions/common/logs.go index 952848f9d..41bf6e2af 100644 --- a/pkg/abstractions/common/logs.go +++ b/pkg/abstractions/common/logs.go @@ -42,7 +42,7 @@ type LogStream struct { } func (l *LogStream) Stream(ctx context.Context, authInfo *auth.AuthInfo, containerId string) error { - hostname, err := l.containerRepo.GetWorkerAddress(containerId) + hostname, err := l.containerRepo.GetWorkerAddress(ctx, containerId) if err != nil { return err } diff --git a/pkg/abstractions/container/container.go b/pkg/abstractions/container/container.go index 571a5521d..07548e691 100644 --- a/pkg/abstractions/container/container.go +++ b/pkg/abstractions/container/container.go @@ -177,7 +177,7 @@ func (cs *CmdContainerService) ExecuteCommand(in *pb.CommandExecutionRequest, st return err } - hostname, err := cs.containerRepo.GetWorkerAddress(task.ContainerId) + hostname, err := cs.containerRepo.GetWorkerAddress(ctx, task.ContainerId) if err != nil { return err } diff --git a/pkg/abstractions/image/build.go b/pkg/abstractions/image/build.go index 2bb91f1ee..dddeb5392 100644 --- a/pkg/abstractions/image/build.go +++ b/pkg/abstractions/image/build.go @@ -200,7 +200,11 @@ func (b *Builder) Build(ctx context.Context, opts *BuildOpts, outputChan chan co return err } - hostname, err := b.containerRepo.GetWorkerAddress(containerId) + mctx, mcancel := context.WithCancel(ctx) + go b.monitorContainerForPreloadErrors(mctx, containerId, outputChan) + + hostname, err := b.containerRepo.GetWorkerAddress(ctx, containerId) + mcancel() if err != nil { outputChan <- common.OutputMsg{Done: true, Success: false, Msg: "Failed to connect to build container.\n"} return err @@ -318,6 +322,27 @@ func (b *Builder) Build(ctx context.Context, opts *BuildOpts, outputChan chan co return nil } +func (b *Builder) monitorContainerForPreloadErrors(ctx context.Context, containerId string, outputChan chan common.OutputMsg) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + if exitCode, err := b.containerRepo.GetContainerExitCode(containerId); err == nil { + if exitCode != 0 { + msg, ok := types.WorkerContainerExitCodes[exitCode] + if !ok { + msg = types.WorkerContainerExitCodes[types.WorkerContainerExitCodeUnknownError] + } + + outputChan <- common.OutputMsg{Done: true, Success: false, Msg: fmt.Sprintf("Container exited with error: %s\n", msg)} + } + return + } + } + } +} + func (b *Builder) genContainerId() string { return fmt.Sprintf("%s%s", types.BuildContainerPrefix, uuid.New().String()[:8]) } diff --git a/pkg/abstractions/image/image.go b/pkg/abstractions/image/image.go index ad4641a6f..1349a0822 100644 --- a/pkg/abstractions/image/image.go +++ b/pkg/abstractions/image/image.go @@ -122,7 +122,6 @@ func (is *RuncImageService) BuildImage(in *pb.BuildImageRequest, stream pb.Image } if !lastMessage.Success { - log.Println("build failed") return errors.New("build failed") } diff --git a/pkg/repository/base.go b/pkg/repository/base.go index 041f5b2d0..fe82bf395 100755 --- a/pkg/repository/base.go +++ b/pkg/repository/base.go @@ -48,7 +48,7 @@ type ContainerRepository interface { DeleteContainerState(*types.ContainerRequest) error SetWorkerAddress(containerId string, addr string) error SetContainerStateWithConcurrencyLimit(quota *types.ConcurrencyLimit, request *types.ContainerRequest) error - GetWorkerAddress(containerId string) (string, error) + GetWorkerAddress(ctx context.Context, containerId string) (string, error) GetActiveContainersByStubId(stubId string) ([]types.ContainerState, error) GetActiveContainersByWorkspaceId(workspaceId string) ([]types.ContainerState, error) GetActiveContainersByWorkerId(workerId string) ([]types.ContainerState, error) diff --git a/pkg/repository/container_redis.go b/pkg/repository/container_redis.go index 01ec1f158..52d522410 100644 --- a/pkg/repository/container_redis.go +++ b/pkg/repository/container_redis.go @@ -197,8 +197,8 @@ func (cr *ContainerRedisRepository) SetWorkerAddress(containerId string, addr st return cr.rdb.Set(context.TODO(), common.RedisKeys.SchedulerWorkerAddress(containerId), addr, 0).Err() } -func (cr *ContainerRedisRepository) GetWorkerAddress(containerId string) (string, error) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) +func (cr *ContainerRedisRepository) GetWorkerAddress(ctx context.Context, containerId string) (string, error) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() var hostname string = "" @@ -210,7 +210,10 @@ func (cr *ContainerRedisRepository) GetWorkerAddress(containerId string) (string for { select { case <-ctx.Done(): - return "", errors.New("timeout reached while trying to get worker addr") + if ctx.Err() == context.DeadlineExceeded { + return "", errors.New("timeout reached while trying to get worker addr") + } + return "", errors.New("context cancelled while trying to get worker addr") case <-ticker.C: hostname, err = cr.rdb.Get(ctx, common.RedisKeys.SchedulerWorkerAddress(containerId)).Result() if err == nil { diff --git a/pkg/types/worker.go b/pkg/types/worker.go index f2d2d2b50..d5b31f906 100644 --- a/pkg/types/worker.go +++ b/pkg/types/worker.go @@ -1,6 +1,9 @@ package types -import "time" +import ( + "fmt" + "time" +) const ( WorkerLifecycleStatsKey string = "beta9.worker.usage.spawner.lifecycle" @@ -32,3 +35,27 @@ type Mount struct { MountType string `json:"mount_type"` MountPointConfig *MountPointConfig `json:"mountpoint_config"` } + +type ExitCodeError struct { + ExitCode int +} + +func (e *ExitCodeError) Error() string { + return fmt.Sprintf("exit code error: %s", WorkerContainerExitCodes[e.ExitCode]) +} + +const ( + WorkerContainerExitCodeInvalidCustomImage = 555 + WorkerContainerExitCodeIncorrectImageArch = 556 + WorkerContainerExitCodeIncorrectImageOs = 557 + WorkerContainerExitCodeUnknownError = 1 + WorkerContainerExitCodeSuccess = 0 +) + +var WorkerContainerExitCodes = map[int]string{ + WorkerContainerExitCodeSuccess: "Success", + WorkerContainerExitCodeUnknownError: "UnknownError", + WorkerContainerExitCodeIncorrectImageArch: "InvalidArch: Image is not amd64/x86_64", + WorkerContainerExitCodeInvalidCustomImage: "InvalidCustomImage: Could not find custom image", + WorkerContainerExitCodeIncorrectImageOs: "InvalidOs: Image is not built for linux", +} diff --git a/pkg/worker/image.go b/pkg/worker/image.go index f5d8aa53b..07bae6ac1 100644 --- a/pkg/worker/image.go +++ b/pkg/worker/image.go @@ -2,6 +2,7 @@ package worker import ( "context" + "encoding/json" "fmt" "log" "os" @@ -215,19 +216,59 @@ func (c *ImageClient) Cleanup() error { return nil } +func (c *ImageClient) InspectAndVerifyImage(ctx context.Context, sourceImage string, creds string) error { + args := []string{"inspect", fmt.Sprintf("docker://%s", sourceImage)} + + args = append(args, c.inspectArgs(creds)...) + cmd := exec.CommandContext(ctx, c.pullCommand, args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + output, err := exec.CommandContext(ctx, c.pullCommand, args...).Output() + if err != nil { + return &types.ExitCodeError{ + ExitCode: types.WorkerContainerExitCodeInvalidCustomImage, + } + } + + var imageInfo map[string]interface{} + err = json.Unmarshal(output, &imageInfo) + if err != nil { + return err + } + + if imageInfo["Architecture"] != "amd64" { + return &types.ExitCodeError{ + ExitCode: types.WorkerContainerExitCodeIncorrectImageArch, + } + } + + if imageInfo["Os"] != "linux" { + return &types.ExitCodeError{ + ExitCode: types.WorkerContainerExitCodeIncorrectImageOs, + } + } + + return nil +} + func (c *ImageClient) PullAndArchiveImage(ctx context.Context, sourceImage string, imageId string, creds string) error { baseImage, err := image.ExtractImageNameAndTag(sourceImage) if err != nil { return err } + if err := c.InspectAndVerifyImage(ctx, sourceImage, creds); err != nil { + return err + } + baseTmpBundlePath := filepath.Join(c.imageBundlePath, baseImage.Repo) os.MkdirAll(baseTmpBundlePath, 0755) dest := fmt.Sprintf("oci:%s:%s", baseImage.Repo, baseImage.Tag) args := []string{"copy", fmt.Sprintf("docker://%s", sourceImage), dest} - args = append(args, c.args(creds)...) + args = append(args, c.copyArgs(creds)...) cmd := exec.CommandContext(ctx, c.pullCommand, args...) cmd.Env = os.Environ() cmd.Dir = c.imageBundlePath @@ -262,7 +303,7 @@ func (c *ImageClient) startCommand(cmd *exec.Cmd) (chan runc.Exit, error) { return runc.Monitor.Start(cmd) } -func (c *ImageClient) args(creds string) (out []string) { +func (c *ImageClient) copyArgs(creds string) (out []string) { if creds != "" { out = append(out, "--src-creds", creds) } else if creds == "" { @@ -286,6 +327,30 @@ func (c *ImageClient) args(creds string) (out []string) { return out } +func (c *ImageClient) inspectArgs(creds string) (out []string) { + if creds != "" { + out = append(out, "--creds", creds) + } else if creds == "" { + out = append(out, "--no-creds") + } else if c.creds != "" { + out = append(out, "--creds", c.creds) + } + + if c.commandTimeout > 0 { + out = append(out, "--command-timeout", fmt.Sprintf("%d", c.commandTimeout)) + } + + if !c.config.ImageService.EnableTLS { + out = append(out, []string{"--tls-verify=false"}...) + } + + if c.debug { + out = append(out, "--debug") + } + + return out +} + func (c *ImageClient) unpack(baseImageName string, baseImageTag string, bundlePath string) error { var unpackOptions layer.UnpackOptions var meta umoci.Meta diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 8244187e2..6ed4dcd92 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -251,6 +251,12 @@ func (s *Worker) Run() error { // Set a non-zero exit code for the container (both in memory, and in repo) exitCode := 1 + + serr, ok := err.(*types.ExitCodeError) + if ok { + exitCode = serr.ExitCode + } + err := s.containerRepo.SetContainerExitCode(containerId, exitCode) if err != nil { log.Printf("<%s> - failed to set exit code: %v\n", containerId, err)