Skip to content

Commit

Permalink
Add gRPC msg size, better image name parsing, update a label (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickpetrovic authored Jan 24, 2024
1 parent c959d24 commit 55d8704
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 36 deletions.
2 changes: 2 additions & 0 deletions internal/common/config.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ storage:
gateway:
host: gateway.beta9
port: 1993
max_recv_msg_size_in_mb: 1024
max_send_msg_size_in_mb: 1024
imageService:
cacheURL:
registryStore: local
Expand Down
2 changes: 2 additions & 0 deletions internal/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ func (g *Gateway) initGrpc() error {
serverOptions := []grpc.ServerOption{
grpc.UnaryInterceptor(authInterceptor.Unary()),
grpc.StreamInterceptor(authInterceptor.Stream()),
grpc.MaxRecvMsgSize(g.config.GatewayService.MaxRecvMsgSize * 1024 * 1024),
grpc.MaxSendMsgSize(g.config.GatewayService.MaxSendMsgSize * 1024 * 1024),
}

g.grpcServer = grpc.NewServer(
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/pool_k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (wpc *KubernetesWorkerPoolController) addWorkerWithId(workerId string, cpu
func (wpc *KubernetesWorkerPoolController) createWorkerJob(workerId string, cpu int64, memory int64, gpuType string) (*batchv1.Job, *types.Worker) {
jobName := fmt.Sprintf("%s-%s-%s", Beta9WorkerJobPrefix, wpc.name, workerId)
labels := map[string]string{
"app": "beta9-" + Beta9WorkerLabelValue,
"app": Beta9WorkerLabelValue,
Beta9WorkerLabelKey: Beta9WorkerLabelValue,
}

Expand Down
6 changes: 4 additions & 2 deletions internal/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ type PostgresConfig struct {
}

type GatewayServiceConfig struct {
Host string `key:"host"`
Port int `key:"port"`
Host string `key:"host"`
Port int `key:"port"`
MaxRecvMsgSize int `key:"max_recv_msg_size_in_mb"`
MaxSendMsgSize int `key:"max_send_msg_size_in_mb"`
}

type ImageServiceConfig struct {
Expand Down
59 changes: 28 additions & 31 deletions internal/worker/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (c *ImageClient) PullLazy(imageId string) error {
}

func (i *ImageClient) PullAndArchiveImage(ctx context.Context, sourceImage string, imageId string, creds *string) error {
baseImage, err := i.extractImageNameAndTag(sourceImage)
baseImage, err := extractImageNameAndTag(sourceImage)
if err != nil {
return err
}
Expand All @@ -165,13 +165,13 @@ func (i *ImageClient) PullAndArchiveImage(ctx context.Context, sourceImage strin

status, err := runc.Monitor.Wait(cmd, ec)
if err == nil && status != 0 {
err = fmt.Errorf("unable to pull base image: %s", sourceImage)
log.Printf("unable to copy base image: %v -> %v", sourceImage, dest)
}

bundlePath := filepath.Join(imagePath, imageId)
err = i.unpack(baseImage.ImageName, baseImage.ImageTag, bundlePath)
if err != nil {
return err
return fmt.Errorf("unable to unpack image: %v", err)
}

defer func() {
Expand All @@ -188,34 +188,6 @@ func (i *ImageClient) startCommand(cmd *exec.Cmd) (chan runc.Exit, error) {
return runc.Monitor.Start(cmd)
}

func (i *ImageClient) extractImageNameAndTag(sourceImage string) (image.BaseImage, error) {
re := regexp.MustCompile(`^(([^/]+/[^/]+)/)?([^:]+):?(.*)$`)
matches := re.FindStringSubmatch(sourceImage)

if matches == nil {
return image.BaseImage{}, errors.New("invalid image URI format")
}

// Use default source registry if not specified
sourceRegistry := "docker.io"
if matches[2] != "" {
sourceRegistry = matches[2]
}

imageName := matches[3]
imageTag := "latest"

if matches[4] != "" {
imageTag = matches[4]
}

return image.BaseImage{
SourceRegistry: sourceRegistry,
ImageName: imageName,
ImageTag: imageTag,
}, nil
}

func (i *ImageClient) args(creds *string) (out []string) {
if creds != nil && *creds != "" {
out = append(out, "--src-creds", *creds)
Expand Down Expand Up @@ -323,3 +295,28 @@ func (i *ImageClient) Archive(ctx context.Context, bundlePath string, imageId st
log.Printf("Image <%v> push took %v\n", imageId, time.Since(startTime))
return nil
}

var imageNamePattern = regexp.MustCompile(`^(?:(.*?)\/)?(?:([^\/:]+)\/)?([^\/:]+)(?::([^\/:]+))?$`)

func extractImageNameAndTag(sourceImage string) (image.BaseImage, error) {
matches := imageNamePattern.FindStringSubmatch(sourceImage)
if matches == nil {
return image.BaseImage{}, errors.New("invalid image URI format")
}

registry, name, tag := matches[1], matches[3], matches[4]

if registry == "" {
registry = "docker.io"
}

if tag == "" {
tag = "latest"
}

return image.BaseImage{
SourceRegistry: registry,
ImageName: name,
ImageTag: tag,
}, nil
}
58 changes: 58 additions & 0 deletions internal/worker/image_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package worker

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestExtractImageNameAndTag(t *testing.T) {
tests := []struct {
image string
wantTag string
wantName string
wantRegistry string
}{
{
image: "nginx",
wantTag: "latest",
wantName: "nginx",
wantRegistry: "docker.io",
},
{
image: "docker.io/nginx",
wantTag: "latest",
wantName: "nginx",
wantRegistry: "docker.io",
},
{
image: "docker.io/nginx:1.25.3",
wantTag: "1.25.3",
wantName: "nginx",
wantRegistry: "docker.io",
},
{
image: "docker.io/nginx:latest",
wantTag: "latest",
wantName: "nginx",
wantRegistry: "docker.io",
},
{
image: "registry.localhost:5000/beta9-runner:py311-latest",
wantTag: "py311-latest",
wantName: "beta9-runner",
wantRegistry: "registry.localhost:5000",
},
}

for _, test := range tests {
t.Run(test.image, func(t *testing.T) {
image, err := extractImageNameAndTag(test.image)
assert.NoError(t, err)

assert.Equal(t, test.wantTag, image.ImageTag)
assert.Equal(t, test.wantName, image.ImageName)
assert.Equal(t, test.wantRegistry, image.SourceRegistry)
})
}
}
4 changes: 2 additions & 2 deletions manifests/k3d/beta9.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ spec:
memory: 1Gi
limits:
cpu: 1000m
memory: 1Gi
memory: 2Gi
volumes:
- name: images
persistentVolumeClaim:
Expand Down Expand Up @@ -178,4 +178,4 @@ spec:
enabled: true
persistence:
enabled: true
size: 1Gi
size: 1Gi

0 comments on commit 55d8704

Please sign in to comment.