From 55d870413eb298e3967d173fa5c1fb4564179bf0 Mon Sep 17 00:00:00 2001 From: Nick Petrovic <4001122+nickpetrovic@users.noreply.github.com> Date: Wed, 24 Jan 2024 09:36:41 -0500 Subject: [PATCH 1/4] Add gRPC msg size, better image name parsing, update a label (#64) --- internal/common/config.default.yaml | 2 + internal/gateway/gateway.go | 2 + internal/scheduler/pool_k8s.go | 2 +- internal/types/config.go | 6 ++- internal/worker/image.go | 59 ++++++++++++++--------------- internal/worker/image_test.go | 58 ++++++++++++++++++++++++++++ manifests/k3d/beta9.yaml | 4 +- 7 files changed, 97 insertions(+), 36 deletions(-) create mode 100644 internal/worker/image_test.go diff --git a/internal/common/config.default.yaml b/internal/common/config.default.yaml index 6ed82277d..0d25c6cbf 100644 --- a/internal/common/config.default.yaml +++ b/internal/common/config.default.yaml @@ -27,6 +27,8 @@ storage: gateway: host: gateway.beta9 port: 1993 + max_recv_msg_size_in_mb: 1024 + max_send_msg_size_in_mb: 1024 imageService: cacheURL: registryStore: local diff --git a/internal/gateway/gateway.go b/internal/gateway/gateway.go index e4c6be5b2..f8e762992 100644 --- a/internal/gateway/gateway.go +++ b/internal/gateway/gateway.go @@ -117,6 +117,8 @@ func (g *Gateway) initGrpc() error { serverOptions := []grpc.ServerOption{ grpc.UnaryInterceptor(authInterceptor.Unary()), grpc.StreamInterceptor(authInterceptor.Stream()), + grpc.MaxRecvMsgSize(g.config.GatewayService.MaxRecvMsgSize * 1024 * 1024), + grpc.MaxSendMsgSize(g.config.GatewayService.MaxSendMsgSize * 1024 * 1024), } g.grpcServer = grpc.NewServer( diff --git a/internal/scheduler/pool_k8s.go b/internal/scheduler/pool_k8s.go index 3bbab560b..f620ddc44 100644 --- a/internal/scheduler/pool_k8s.go +++ b/internal/scheduler/pool_k8s.go @@ -149,7 +149,7 @@ func (wpc *KubernetesWorkerPoolController) addWorkerWithId(workerId string, cpu func (wpc *KubernetesWorkerPoolController) createWorkerJob(workerId string, cpu int64, memory int64, gpuType string) (*batchv1.Job, *types.Worker) { jobName := fmt.Sprintf("%s-%s-%s", Beta9WorkerJobPrefix, wpc.name, workerId) labels := map[string]string{ - "app": "beta9-" + Beta9WorkerLabelValue, + "app": Beta9WorkerLabelValue, Beta9WorkerLabelKey: Beta9WorkerLabelValue, } diff --git a/internal/types/config.go b/internal/types/config.go index 03d5f202e..5ab67599e 100644 --- a/internal/types/config.go +++ b/internal/types/config.go @@ -55,8 +55,10 @@ type PostgresConfig struct { } type GatewayServiceConfig struct { - Host string `key:"host"` - Port int `key:"port"` + Host string `key:"host"` + Port int `key:"port"` + MaxRecvMsgSize int `key:"max_recv_msg_size_in_mb"` + MaxSendMsgSize int `key:"max_send_msg_size_in_mb"` } type ImageServiceConfig struct { diff --git a/internal/worker/image.go b/internal/worker/image.go index ad07d9794..f81a1309a 100644 --- a/internal/worker/image.go +++ b/internal/worker/image.go @@ -143,7 +143,7 @@ func (c *ImageClient) PullLazy(imageId string) error { } func (i *ImageClient) PullAndArchiveImage(ctx context.Context, sourceImage string, imageId string, creds *string) error { - baseImage, err := i.extractImageNameAndTag(sourceImage) + baseImage, err := extractImageNameAndTag(sourceImage) if err != nil { return err } @@ -165,13 +165,13 @@ func (i *ImageClient) PullAndArchiveImage(ctx context.Context, sourceImage strin status, err := runc.Monitor.Wait(cmd, ec) if err == nil && status != 0 { - err = fmt.Errorf("unable to pull base image: %s", sourceImage) + log.Printf("unable to copy base image: %v -> %v", sourceImage, dest) } bundlePath := filepath.Join(imagePath, imageId) err = i.unpack(baseImage.ImageName, baseImage.ImageTag, bundlePath) if err != nil { - return err + return fmt.Errorf("unable to unpack image: %v", err) } defer func() { @@ -188,34 +188,6 @@ func (i *ImageClient) startCommand(cmd *exec.Cmd) (chan runc.Exit, error) { return runc.Monitor.Start(cmd) } -func (i *ImageClient) extractImageNameAndTag(sourceImage string) (image.BaseImage, error) { - re := regexp.MustCompile(`^(([^/]+/[^/]+)/)?([^:]+):?(.*)$`) - matches := re.FindStringSubmatch(sourceImage) - - if matches == nil { - return image.BaseImage{}, errors.New("invalid image URI format") - } - - // Use default source registry if not specified - sourceRegistry := "docker.io" - if matches[2] != "" { - sourceRegistry = matches[2] - } - - imageName := matches[3] - imageTag := "latest" - - if matches[4] != "" { - imageTag = matches[4] - } - - return image.BaseImage{ - SourceRegistry: sourceRegistry, - ImageName: imageName, - ImageTag: imageTag, - }, nil -} - func (i *ImageClient) args(creds *string) (out []string) { if creds != nil && *creds != "" { out = append(out, "--src-creds", *creds) @@ -323,3 +295,28 @@ func (i *ImageClient) Archive(ctx context.Context, bundlePath string, imageId st log.Printf("Image <%v> push took %v\n", imageId, time.Since(startTime)) return nil } + +var imageNamePattern = regexp.MustCompile(`^(?:(.*?)\/)?(?:([^\/:]+)\/)?([^\/:]+)(?::([^\/:]+))?$`) + +func extractImageNameAndTag(sourceImage string) (image.BaseImage, error) { + matches := imageNamePattern.FindStringSubmatch(sourceImage) + if matches == nil { + return image.BaseImage{}, errors.New("invalid image URI format") + } + + registry, name, tag := matches[1], matches[3], matches[4] + + if registry == "" { + registry = "docker.io" + } + + if tag == "" { + tag = "latest" + } + + return image.BaseImage{ + SourceRegistry: registry, + ImageName: name, + ImageTag: tag, + }, nil +} diff --git a/internal/worker/image_test.go b/internal/worker/image_test.go new file mode 100644 index 000000000..36ca5f302 --- /dev/null +++ b/internal/worker/image_test.go @@ -0,0 +1,58 @@ +package worker + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestExtractImageNameAndTag(t *testing.T) { + tests := []struct { + image string + wantTag string + wantName string + wantRegistry string + }{ + { + image: "nginx", + wantTag: "latest", + wantName: "nginx", + wantRegistry: "docker.io", + }, + { + image: "docker.io/nginx", + wantTag: "latest", + wantName: "nginx", + wantRegistry: "docker.io", + }, + { + image: "docker.io/nginx:1.25.3", + wantTag: "1.25.3", + wantName: "nginx", + wantRegistry: "docker.io", + }, + { + image: "docker.io/nginx:latest", + wantTag: "latest", + wantName: "nginx", + wantRegistry: "docker.io", + }, + { + image: "registry.localhost:5000/beta9-runner:py311-latest", + wantTag: "py311-latest", + wantName: "beta9-runner", + wantRegistry: "registry.localhost:5000", + }, + } + + for _, test := range tests { + t.Run(test.image, func(t *testing.T) { + image, err := extractImageNameAndTag(test.image) + assert.NoError(t, err) + + assert.Equal(t, test.wantTag, image.ImageTag) + assert.Equal(t, test.wantName, image.ImageName) + assert.Equal(t, test.wantRegistry, image.SourceRegistry) + }) + } +} diff --git a/manifests/k3d/beta9.yaml b/manifests/k3d/beta9.yaml index 26bfb1d70..a6f2bba24 100644 --- a/manifests/k3d/beta9.yaml +++ b/manifests/k3d/beta9.yaml @@ -71,7 +71,7 @@ spec: memory: 1Gi limits: cpu: 1000m - memory: 1Gi + memory: 2Gi volumes: - name: images persistentVolumeClaim: @@ -178,4 +178,4 @@ spec: enabled: true persistence: enabled: true - size: 1Gi \ No newline at end of file + size: 1Gi From e60ea4342f3b3139843edd247835b20eb0e301cb Mon Sep 17 00:00:00 2001 From: Nick Petrovic <4001122+nickpetrovic@users.noreply.github.com> Date: Wed, 24 Jan 2024 11:16:41 -0500 Subject: [PATCH 2/4] Update readme (#65) --- Makefile | 4 ++++ README.md | 32 ++++++++++++++++++++++++++++++-- bin/setup.sh | 6 ++++++ 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 1a2007fcb..be613f4fe 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,11 @@ setup: kubectl delete pod -l app=gateway setup-sdk: + curl -sSL https://install.python-poetry.org | python3 - + export PATH="$$HOME/.local/bin:$$PATH" + poetry config virtualenvs.in-project true poetry install -C sdk + poetry shell -C sdk k3d-up: bash bin/k3d.sh up diff --git a/README.md b/README.md index 252b72ba3..3988a1da5 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ Beta9 is an open-source platform for running remote containers directly from Python. It supports GPU/CUDA acceleration, allows you to scale out arbitrary Python code to hundreds of machines, easily deploy functions and task queues, and distribute workloads across various cloud providers (including bare metal providers). -We use beta9 internally at [Beam](https://beam.cloud) to run AI applications for users at scale. +We use beta9 internally at [Beam](https://beam.cloud) to run AI applications for users at scale. ## Features @@ -47,7 +47,35 @@ Beta9 is designed for launching remote serverless containers very quickly. There ## Local development -[TODO] +### Setting up the server + +k3d is used for local development. You'll need Docker and Make to get started. + +To use our fully automated setup, run the `setup` make target. + +> [!NOTE] +> This will overwrite some of the tools you may already have installed. Review the [setup.sh](bin/setup.sh) to learn more. + +``` +make setup +``` + +### Setting up the SDK + +The SDK is written in Python. You'll need Python 3.8 or higher. Use the `setup-sdk` make target to get started. + +> [!NOTE] +> This will install the Poetry package manager. + +``` +make setup-sdk +``` + +### Using the SDK + +After you've setup the server and SDK, you can now start accessing the system. + +TODO: Add walk through. Should this be a separate readme? ## Community & Support diff --git a/bin/setup.sh b/bin/setup.sh index fcb39c0a5..4856b2be5 100644 --- a/bin/setup.sh +++ b/bin/setup.sh @@ -1,5 +1,11 @@ #!/usr/bin/env bash +# ---------------------------------------------- +# This script is used to setup the environment for Kubernetes development. +# It installs kubectl, stern, okteto, and k3d on the machine. +# It determines the operating system and architecture of the machine to download the appropriate binaries. +# ---------------------------------------------- + set +xeu os=$(uname -s | tr '[:upper:]' '[:lower:]') From b151892fa9848fe5ae052c6a8bae923043bce960 Mon Sep 17 00:00:00 2001 From: Nick Petrovic <4001122+nickpetrovic@users.noreply.github.com> Date: Wed, 24 Jan 2024 13:02:35 -0500 Subject: [PATCH 3/4] Init ignore file when one doesn't exist (#66) --- README.md | 4 +--- sdk/src/beta9/sync.py | 42 +++++++++++++++++++++++++++++++++--------- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 3988a1da5..6807e6ddc 100644 --- a/README.md +++ b/README.md @@ -73,9 +73,7 @@ make setup-sdk ### Using the SDK -After you've setup the server and SDK, you can now start accessing the system. - -TODO: Add walk through. Should this be a separate readme? +After you've setup the server and SDK, check out the SDK readme [here](sdk/README.md). ## Community & Support diff --git a/sdk/src/beta9/sync.py b/sdk/src/beta9/sync.py index e6bba216e..0bc68878c 100644 --- a/sdk/src/beta9/sync.py +++ b/sdk/src/beta9/sync.py @@ -4,7 +4,8 @@ import os import uuid import zipfile -from typing import Generator, NamedTuple, Union +from pathlib import Path +from typing import Generator, NamedTuple, Optional from beta9 import terminal from beta9.clients.gateway import ( @@ -15,6 +16,17 @@ ) IGNORE_FILE_NAME = ".beta9ignore" +IGNORE_FILE_CONTENTS = """# Generated by Beta9 SDK +.beta9ignore +.git +.idea +.python-version +.vscode +.venv +venv +*.pyc +__pycache__ +""" class FileSyncResult(NamedTuple): @@ -29,17 +41,28 @@ def __init__( root_dir=".", ): self.loop = asyncio.get_event_loop() - self.root_dir = os.path.abspath(root_dir) + self.root_dir = Path(root_dir).absolute() self.gateway_stub: GatewayServiceStub = gateway_stub + @property + def ignore_file_path(self) -> Path: + return self.root_dir / IGNORE_FILE_NAME + + def _init_ignore_file(self) -> None: + if self.ignore_file_path.exists(): + return + + terminal.detail(f"Writing {IGNORE_FILE_NAME} file") + with self.ignore_file_path.open(mode="w") as f: + f.writelines(IGNORE_FILE_CONTENTS) + def _read_ignore_file(self) -> list: terminal.detail(f"Reading {IGNORE_FILE_NAME} file") - ignore_file = os.path.join(self.root_dir, IGNORE_FILE_NAME) patterns = [] - if os.path.isfile(ignore_file): - with open(ignore_file, "r") as file: + if self.ignore_file_path.is_file(): + with self.ignore_file_path.open() as file: patterns = [line.strip() for line in file.readlines() if line.strip()] return patterns @@ -70,6 +93,7 @@ def _collect_files(self) -> Generator[str, None, None]: def sync(self) -> FileSyncResult: terminal.header("Syncing files") + self._init_ignore_file() self.ignore_patterns = self._read_ignore_file() temp_zip_name = f"/tmp/{uuid.uuid4()}" @@ -90,12 +114,12 @@ def sync(self) -> FileSyncResult: head_response: HeadObjectResponse = self.loop.run_until_complete( self.gateway_stub.head_object(hash=hash) ) - put_response: Union[PutObjectResponse, None] = None + put_response: Optional[PutObjectResponse] = None if not head_response.exists: metadata = ObjectMetadata(name=hash, size=size) with terminal.progress("Uploading"): - put_response: PutObjectResponse = self.loop.run_until_complete( + put_response = self.loop.run_until_complete( self.gateway_stub.put_object( object_content=object_content, object_metadata=metadata, @@ -109,8 +133,8 @@ def sync(self) -> FileSyncResult: os.remove(temp_zip_name) - if not put_response.ok: + if put_response is None or not put_response.ok: terminal.error("File sync failed ☠️") terminal.header("Files synced") - return FileSyncResult(success=True, object_id=put_response.object_id) + return FileSyncResult(success=True, object_id=put_response.object_id) # pyright: ignore[reportOptionalMemberAccess] From 59c63738e5dd148a2409d21ed17ae1a658ee3572 Mon Sep 17 00:00:00 2001 From: Nick Petrovic <4001122+nickpetrovic@users.noreply.github.com> Date: Wed, 24 Jan 2024 17:58:34 -0500 Subject: [PATCH 4/4] Improve request concurrency (#67) --- internal/common/keys.go | 5 +++++ internal/repository/base.go | 2 ++ internal/repository/container_redis.go | 2 +- internal/repository/worker_redis.go | 9 +++++++++ internal/worker/image.go | 25 +++++++++++++++---------- internal/worker/worker.go | 11 +++++------ sdk/src/beta9/abstractions/function.py | 5 +++-- 7 files changed, 40 insertions(+), 19 deletions(-) diff --git a/internal/common/keys.go b/internal/common/keys.go index dcce7f64f..bec43381d 100644 --- a/internal/common/keys.go +++ b/internal/common/keys.go @@ -28,6 +28,7 @@ var ( var ( workerPrefix string = "worker" + workerImageLock string = "worker:%s:image:%s:lock" workerContainerRequest string = "worker:%s:container:%s:request" workerContainerResourceUsage string = "worker:%s:container:%s:resource_usage" ) @@ -127,6 +128,10 @@ func (rk *redisKeys) WorkerContainerResourceUsage(workerId string, containerId s return fmt.Sprintf(workerContainerResourceUsage, workerId, containerId) } +func (rk *redisKeys) WorkerImageLock(workerId string, imageId string) string { + return fmt.Sprintf(workerImageLock, workerId, imageId) +} + // WorkerPool keys func (rk *redisKeys) WorkerPoolLock(poolId string) string { return fmt.Sprintf(workerPoolLock, poolId) diff --git a/internal/repository/base.go b/internal/repository/base.go index 411347ff7..2b230d9c0 100644 --- a/internal/repository/base.go +++ b/internal/repository/base.go @@ -23,6 +23,8 @@ type WorkerRepository interface { AddContainerRequestToWorker(workerId string, containerId string, request *types.ContainerRequest) error RemoveContainerRequestFromWorker(workerId string, containerId string) error SetContainerResourceValues(workerId string, containerId string, usage types.ContainerResourceUsage) error + SetImagePullLock(workerId, imageId string) error + RemoveImagePullLock(workerId, imageId string) error } type ContainerRepository interface { diff --git a/internal/repository/container_redis.go b/internal/repository/container_redis.go index 42bd3c42d..a1b53db2b 100644 --- a/internal/repository/container_redis.go +++ b/internal/repository/container_redis.go @@ -70,7 +70,7 @@ func (cr *ContainerRedisRepository) SetContainerState(containerId string, info * } func (cr *ContainerRedisRepository) SetContainerExitCode(containerId string, exitCode int) error { - err := cr.lock.Acquire(context.TODO(), common.RedisKeys.SchedulerContainerLock(containerId), common.RedisLockOptions{TtlS: 10, Retries: 0}) + err := cr.lock.Acquire(context.TODO(), common.RedisKeys.SchedulerContainerLock(containerId), common.RedisLockOptions{TtlS: 10, Retries: 1}) if err != nil { return err } diff --git a/internal/repository/worker_redis.go b/internal/repository/worker_redis.go index c747bd6d6..6e99ac663 100644 --- a/internal/repository/worker_redis.go +++ b/internal/repository/worker_redis.go @@ -7,6 +7,7 @@ import ( "fmt" "log" "strings" + "time" "github.com/beam-cloud/beta9/internal/common" "github.com/beam-cloud/beta9/internal/types" @@ -355,3 +356,11 @@ func (r *WorkerRedisRepository) SetContainerResourceValues(workerId string, cont return nil } + +func (r *WorkerRedisRepository) SetImagePullLock(workerId, imageId string) error { + return r.rdb.Set(context.TODO(), common.RedisKeys.WorkerImageLock(workerId, imageId), true, 5*time.Minute).Err() +} + +func (r *WorkerRedisRepository) RemoveImagePullLock(workerId, imageId string) error { + return r.rdb.Del(context.TODO(), common.RedisKeys.WorkerImageLock(workerId, imageId)).Err() +} diff --git a/internal/worker/image.go b/internal/worker/image.go index f81a1309a..4d958a859 100644 --- a/internal/worker/image.go +++ b/internal/worker/image.go @@ -6,7 +6,6 @@ import ( "log" "os" "os/exec" - "path" "path/filepath" "regexp" "syscall" @@ -14,6 +13,7 @@ import ( "github.com/beam-cloud/beta9/internal/abstractions/image" common "github.com/beam-cloud/beta9/internal/common" + "github.com/beam-cloud/beta9/internal/repository" types "github.com/beam-cloud/beta9/internal/types" "github.com/beam-cloud/clip/pkg/clip" clipCommon "github.com/beam-cloud/clip/pkg/common" @@ -46,9 +46,11 @@ type ImageClient struct { Debug bool Creds string config types.ImageServiceConfig + workerId string + workerRepo repository.WorkerRepository } -func NewImageClient(config types.ImageServiceConfig) (*ImageClient, error) { +func NewImageClient(config types.ImageServiceConfig, workerId string, workerRepo repository.WorkerRepository) (*ImageClient, error) { var provider CredentialProvider // Configure image registry credentials switch config.RegistryCredentialProviderName { @@ -78,8 +80,10 @@ func NewImageClient(config types.ImageServiceConfig) (*ImageClient, error) { } } - baseImagePath := filepath.Join(imageCachePath) - os.MkdirAll(baseImagePath, os.ModePerm) + err = os.MkdirAll(imageCachePath, os.ModePerm) + if err != nil { + return nil, err + } creds, err := provider.GetAuthString() if err != nil { @@ -90,11 +94,13 @@ func NewImageClient(config types.ImageServiceConfig) (*ImageClient, error) { config: config, registry: registry, cacheClient: cacheClient, - ImagePath: baseImagePath, + ImagePath: imageCachePath, PullCommand: imagePullCommand, CommandTimeout: -1, Debug: false, Creds: creds, + workerId: workerId, + workerRepo: workerRepo, }, nil } @@ -121,13 +127,12 @@ func (c *ImageClient) PullLazy(imageId string) error { return nil } - // Attempt to acquire the lock - fileLock := NewFileLock(path.Join(imagePath, fmt.Sprintf("%s_%s", imageId, imageMountLockFilename))) - if err := fileLock.Acquire(); err != nil { - fmt.Printf("Unable to acquire mount lock: %v\n", err) + // Get lock on image mount + err = c.workerRepo.SetImagePullLock(c.workerId, imageId) + if err != nil { return err } - defer fileLock.Release() + defer c.workerRepo.RemoveImagePullLock(c.workerId, imageId) startServer, _, err := clip.MountArchive(*mountOptions) if err != nil { diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 7464e49a5..897a807fd 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -116,7 +116,11 @@ func NewWorker() (*Worker, error) { return nil, err } - imageClient, err := NewImageClient(config.ImageService) + containerRepo := repo.NewContainerRedisRepository(redisClient) + workerRepo := repo.NewWorkerRedisRepository(redisClient) + statsdRepo := repo.NewMetricsStatsdRepository() + + imageClient, err := NewImageClient(config.ImageService, workerId, workerRepo) if err != nil { return nil, err } @@ -137,11 +141,6 @@ func NewWorker() (*Worker, error) { } ctx, cancel := context.WithCancel(context.Background()) - - containerRepo := repo.NewContainerRedisRepository(redisClient) - workerRepo := repo.NewWorkerRedisRepository(redisClient) - statsdRepo := repo.NewMetricsStatsdRepository() - workerMetrics := NewWorkerMetrics(ctx, podHostName, statsdRepo, workerRepo, repo.NewMetricsStreamRepository(ctx, config.Metrics)) return &Worker{ diff --git a/sdk/src/beta9/abstractions/function.py b/sdk/src/beta9/abstractions/function.py index 6beea2a0f..0cbfd8afb 100644 --- a/sdk/src/beta9/abstractions/function.py +++ b/sdk/src/beta9/abstractions/function.py @@ -3,6 +3,7 @@ from typing import Any, Callable, Iterator, List, Optional, Sequence, Union import cloudpickle + from beta9 import terminal from beta9.abstractions.base.runner import ( FUNCTION_DEPLOYMENT_STUB_TYPE, @@ -101,7 +102,7 @@ async def _call_remote(self, *args, **kwargs) -> Any: ) terminal.header("Running function") - last_response: Union[None, FunctionInvokeResponse] = None + last_response: Optional[FunctionInvokeResponse] = None async for r in self.parent.function_stub.function_invoke( stub_id=self.parent.stub_id, @@ -114,7 +115,7 @@ async def _call_remote(self, *args, **kwargs) -> Any: last_response = r break - if not last_response.done or last_response.exit_code != 0: + if last_response is None or not last_response.done or last_response.exit_code != 0: terminal.error("Function failed ☠️") return None