Skip to content

Commit

Permalink
Fix: Add InvalidCustomImage, InvalidArch, and InvalidOs image build e…
Browse files Browse the repository at this point in the history
…rrors (#632)

During n session where user-client connects to gateway to listen for
logs from a container, there is a likelihood for the container to fail
before it's able to open a port to allow gateway to connect to it.
Because of this, the gateway continues to look for the connection before
timing out for 5 minutes.

This makes the gateway watch for the container exit code before it
connects as well as utilize the exit code for useful information that
can be forwarded to user.

Also added InvalidArch as well as InvalidOs to make sure user is using a
custom image for linux/amd64

---------

Co-authored-by: luke-lombardi <[email protected]>
Co-authored-by: nickpetrovic <[email protected]>
Co-authored-by: Daniel Levi-Minzi <[email protected]>
  • Loading branch information
4 people authored Oct 21, 2024
1 parent 96f01b0 commit 8cd896c
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pkg/abstractions/common/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type LogStream struct {
}

func (l *LogStream) Stream(ctx context.Context, authInfo *auth.AuthInfo, containerId string) error {
hostname, err := l.containerRepo.GetWorkerAddress(containerId)
hostname, err := l.containerRepo.GetWorkerAddress(ctx, containerId)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/abstractions/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (cs *CmdContainerService) ExecuteCommand(in *pb.CommandExecutionRequest, st
return err
}

hostname, err := cs.containerRepo.GetWorkerAddress(task.ContainerId)
hostname, err := cs.containerRepo.GetWorkerAddress(ctx, task.ContainerId)
if err != nil {
return err
}
Expand Down
27 changes: 26 additions & 1 deletion pkg/abstractions/image/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,11 @@ func (b *Builder) Build(ctx context.Context, opts *BuildOpts, outputChan chan co
return err
}

hostname, err := b.containerRepo.GetWorkerAddress(containerId)
mctx, mcancel := context.WithCancel(ctx)
go b.monitorContainerForPreloadErrors(mctx, containerId, outputChan)

hostname, err := b.containerRepo.GetWorkerAddress(ctx, containerId)
mcancel()
if err != nil {
outputChan <- common.OutputMsg{Done: true, Success: false, Msg: "Failed to connect to build container.\n"}
return err
Expand Down Expand Up @@ -318,6 +322,27 @@ func (b *Builder) Build(ctx context.Context, opts *BuildOpts, outputChan chan co
return nil
}

func (b *Builder) monitorContainerForPreloadErrors(ctx context.Context, containerId string, outputChan chan common.OutputMsg) {
for {
select {
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
if exitCode, err := b.containerRepo.GetContainerExitCode(containerId); err == nil {
if exitCode != 0 {
msg, ok := types.WorkerContainerExitCodes[exitCode]
if !ok {
msg = types.WorkerContainerExitCodes[types.WorkerContainerExitCodeUnknownError]
}

outputChan <- common.OutputMsg{Done: true, Success: false, Msg: fmt.Sprintf("Container exited with error: %s\n", msg)}
}
return
}
}
}
}

func (b *Builder) genContainerId() string {
return fmt.Sprintf("%s%s", types.BuildContainerPrefix, uuid.New().String()[:8])
}
Expand Down
1 change: 0 additions & 1 deletion pkg/abstractions/image/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ func (is *RuncImageService) BuildImage(in *pb.BuildImageRequest, stream pb.Image
}

if !lastMessage.Success {
log.Println("build failed")
return errors.New("build failed")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/repository/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type ContainerRepository interface {
DeleteContainerState(*types.ContainerRequest) error
SetWorkerAddress(containerId string, addr string) error
SetContainerStateWithConcurrencyLimit(quota *types.ConcurrencyLimit, request *types.ContainerRequest) error
GetWorkerAddress(containerId string) (string, error)
GetWorkerAddress(ctx context.Context, containerId string) (string, error)
GetActiveContainersByStubId(stubId string) ([]types.ContainerState, error)
GetActiveContainersByWorkspaceId(workspaceId string) ([]types.ContainerState, error)
GetActiveContainersByWorkerId(workerId string) ([]types.ContainerState, error)
Expand Down
9 changes: 6 additions & 3 deletions pkg/repository/container_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ func (cr *ContainerRedisRepository) SetWorkerAddress(containerId string, addr st
return cr.rdb.Set(context.TODO(), common.RedisKeys.SchedulerWorkerAddress(containerId), addr, 0).Err()
}

func (cr *ContainerRedisRepository) GetWorkerAddress(containerId string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
func (cr *ContainerRedisRepository) GetWorkerAddress(ctx context.Context, containerId string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()

var hostname string = ""
Expand All @@ -210,7 +210,10 @@ func (cr *ContainerRedisRepository) GetWorkerAddress(containerId string) (string
for {
select {
case <-ctx.Done():
return "", errors.New("timeout reached while trying to get worker addr")
if ctx.Err() == context.DeadlineExceeded {
return "", errors.New("timeout reached while trying to get worker addr")
}
return "", errors.New("context cancelled while trying to get worker addr")
case <-ticker.C:
hostname, err = cr.rdb.Get(ctx, common.RedisKeys.SchedulerWorkerAddress(containerId)).Result()
if err == nil {
Expand Down
29 changes: 28 additions & 1 deletion pkg/types/worker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package types

import "time"
import (
"fmt"
"time"
)

const (
WorkerLifecycleStatsKey string = "beta9.worker.usage.spawner.lifecycle"
Expand Down Expand Up @@ -32,3 +35,27 @@ type Mount struct {
MountType string `json:"mount_type"`
MountPointConfig *MountPointConfig `json:"mountpoint_config"`
}

type ExitCodeError struct {
ExitCode int
}

func (e *ExitCodeError) Error() string {
return fmt.Sprintf("exit code error: %s", WorkerContainerExitCodes[e.ExitCode])
}

const (
WorkerContainerExitCodeInvalidCustomImage = 555
WorkerContainerExitCodeIncorrectImageArch = 556
WorkerContainerExitCodeIncorrectImageOs = 557
WorkerContainerExitCodeUnknownError = 1
WorkerContainerExitCodeSuccess = 0
)

var WorkerContainerExitCodes = map[int]string{
WorkerContainerExitCodeSuccess: "Success",
WorkerContainerExitCodeUnknownError: "UnknownError",
WorkerContainerExitCodeIncorrectImageArch: "InvalidArch: Image is not amd64/x86_64",
WorkerContainerExitCodeInvalidCustomImage: "InvalidCustomImage: Could not find custom image",
WorkerContainerExitCodeIncorrectImageOs: "InvalidOs: Image is not built for linux",
}
69 changes: 67 additions & 2 deletions pkg/worker/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package worker

import (
"context"
"encoding/json"
"fmt"
"log"
"os"
Expand Down Expand Up @@ -215,19 +216,59 @@ func (c *ImageClient) Cleanup() error {
return nil
}

func (c *ImageClient) InspectAndVerifyImage(ctx context.Context, sourceImage string, creds string) error {
args := []string{"inspect", fmt.Sprintf("docker://%s", sourceImage)}

args = append(args, c.inspectArgs(creds)...)
cmd := exec.CommandContext(ctx, c.pullCommand, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

output, err := exec.CommandContext(ctx, c.pullCommand, args...).Output()
if err != nil {
return &types.ExitCodeError{
ExitCode: types.WorkerContainerExitCodeInvalidCustomImage,
}
}

var imageInfo map[string]interface{}
err = json.Unmarshal(output, &imageInfo)
if err != nil {
return err
}

if imageInfo["Architecture"] != "amd64" {
return &types.ExitCodeError{
ExitCode: types.WorkerContainerExitCodeIncorrectImageArch,
}
}

if imageInfo["Os"] != "linux" {
return &types.ExitCodeError{
ExitCode: types.WorkerContainerExitCodeIncorrectImageOs,
}
}

return nil
}

func (c *ImageClient) PullAndArchiveImage(ctx context.Context, sourceImage string, imageId string, creds string) error {
baseImage, err := image.ExtractImageNameAndTag(sourceImage)
if err != nil {
return err
}

if err := c.InspectAndVerifyImage(ctx, sourceImage, creds); err != nil {
return err
}

baseTmpBundlePath := filepath.Join(c.imageBundlePath, baseImage.Repo)
os.MkdirAll(baseTmpBundlePath, 0755)

dest := fmt.Sprintf("oci:%s:%s", baseImage.Repo, baseImage.Tag)
args := []string{"copy", fmt.Sprintf("docker://%s", sourceImage), dest}

args = append(args, c.args(creds)...)
args = append(args, c.copyArgs(creds)...)
cmd := exec.CommandContext(ctx, c.pullCommand, args...)
cmd.Env = os.Environ()
cmd.Dir = c.imageBundlePath
Expand Down Expand Up @@ -262,7 +303,7 @@ func (c *ImageClient) startCommand(cmd *exec.Cmd) (chan runc.Exit, error) {
return runc.Monitor.Start(cmd)
}

func (c *ImageClient) args(creds string) (out []string) {
func (c *ImageClient) copyArgs(creds string) (out []string) {
if creds != "" {
out = append(out, "--src-creds", creds)
} else if creds == "" {
Expand All @@ -286,6 +327,30 @@ func (c *ImageClient) args(creds string) (out []string) {
return out
}

func (c *ImageClient) inspectArgs(creds string) (out []string) {
if creds != "" {
out = append(out, "--creds", creds)
} else if creds == "" {
out = append(out, "--no-creds")
} else if c.creds != "" {
out = append(out, "--creds", c.creds)
}

if c.commandTimeout > 0 {
out = append(out, "--command-timeout", fmt.Sprintf("%d", c.commandTimeout))
}

if !c.config.ImageService.EnableTLS {
out = append(out, []string{"--tls-verify=false"}...)
}

if c.debug {
out = append(out, "--debug")
}

return out
}

func (c *ImageClient) unpack(baseImageName string, baseImageTag string, bundlePath string) error {
var unpackOptions layer.UnpackOptions
var meta umoci.Meta
Expand Down
6 changes: 6 additions & 0 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@ func (s *Worker) Run() error {

// Set a non-zero exit code for the container (both in memory, and in repo)
exitCode := 1

serr, ok := err.(*types.ExitCodeError)
if ok {
exitCode = serr.ExitCode
}

err := s.containerRepo.SetContainerExitCode(containerId, exitCode)
if err != nil {
log.Printf("<%s> - failed to set exit code: %v\n", containerId, err)
Expand Down

0 comments on commit 8cd896c

Please sign in to comment.