Skip to content

Commit

Permalink
merged
Browse files Browse the repository at this point in the history
  • Loading branch information
jsun-m committed Jan 24, 2024
2 parents 5ba704b + 59c6373 commit 4fc1efd
Show file tree
Hide file tree
Showing 17 changed files with 208 additions and 67 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ setup:
kubectl delete pod -l app=gateway

setup-sdk:
curl -sSL https://install.python-poetry.org | python3 -
export PATH="$$HOME/.local/bin:$$PATH"
poetry config virtualenvs.in-project true
poetry install -C sdk
poetry shell -C sdk

k3d-up:
bash bin/k3d.sh up
Expand Down
30 changes: 28 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

Beta9 is an open-source platform for running remote containers directly from Python. It supports GPU/CUDA acceleration, allows you to scale out arbitrary Python code to hundreds of machines, easily deploy functions and task queues, and distribute workloads across various cloud providers (including bare metal providers).

We use beta9 internally at [Beam](https://beam.cloud) to run AI applications for users at scale.
We use beta9 internally at [Beam](https://beam.cloud) to run AI applications for users at scale.

## Features

Expand All @@ -47,7 +47,33 @@ Beta9 is designed for launching remote serverless containers very quickly. There

## Local development

[TODO]
### Setting up the server

k3d is used for local development. You'll need Docker and Make to get started.

To use our fully automated setup, run the `setup` make target.

> [!NOTE]
> This will overwrite some of the tools you may already have installed. Review the [setup.sh](bin/setup.sh) to learn more.
```
make setup
```

### Setting up the SDK

The SDK is written in Python. You'll need Python 3.8 or higher. Use the `setup-sdk` make target to get started.

> [!NOTE]
> This will install the Poetry package manager.
```
make setup-sdk
```

### Using the SDK

After you've setup the server and SDK, check out the SDK readme [here](sdk/README.md).

## Community & Support

Expand Down
6 changes: 6 additions & 0 deletions bin/setup.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
#!/usr/bin/env bash

# ----------------------------------------------
# This script is used to setup the environment for Kubernetes development.
# It installs kubectl, stern, okteto, and k3d on the machine.
# It determines the operating system and architecture of the machine to download the appropriate binaries.
# ----------------------------------------------

set +xeu

os=$(uname -s | tr '[:upper:]' '[:lower:]')
Expand Down
2 changes: 2 additions & 0 deletions internal/common/config.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ storage:
gateway:
host: gateway.beta9
port: 1993
max_recv_msg_size_in_mb: 1024
max_send_msg_size_in_mb: 1024
imageService:
cacheURL:
registryStore: local
Expand Down
5 changes: 5 additions & 0 deletions internal/common/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var (

var (
workerPrefix string = "worker"
workerImageLock string = "worker:%s:image:%s:lock"
workerContainerRequest string = "worker:%s:container:%s:request"
workerContainerResourceUsage string = "worker:%s:container:%s:resource_usage"
)
Expand Down Expand Up @@ -127,6 +128,10 @@ func (rk *redisKeys) WorkerContainerResourceUsage(workerId string, containerId s
return fmt.Sprintf(workerContainerResourceUsage, workerId, containerId)
}

func (rk *redisKeys) WorkerImageLock(workerId string, imageId string) string {
return fmt.Sprintf(workerImageLock, workerId, imageId)
}

// WorkerPool keys
func (rk *redisKeys) WorkerPoolLock(poolId string) string {
return fmt.Sprintf(workerPoolLock, poolId)
Expand Down
2 changes: 2 additions & 0 deletions internal/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func (g *Gateway) initGrpc() error {
serverOptions := []grpc.ServerOption{
grpc.UnaryInterceptor(authInterceptor.Unary()),
grpc.StreamInterceptor(authInterceptor.Stream()),
grpc.MaxRecvMsgSize(g.config.GatewayService.MaxRecvMsgSize * 1024 * 1024),
grpc.MaxSendMsgSize(g.config.GatewayService.MaxSendMsgSize * 1024 * 1024),
}

g.grpcServer = grpc.NewServer(
Expand Down
2 changes: 2 additions & 0 deletions internal/repository/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type WorkerRepository interface {
AddContainerRequestToWorker(workerId string, containerId string, request *types.ContainerRequest) error
RemoveContainerRequestFromWorker(workerId string, containerId string) error
SetContainerResourceValues(workerId string, containerId string, usage types.ContainerResourceUsage) error
SetImagePullLock(workerId, imageId string) error
RemoveImagePullLock(workerId, imageId string) error
}

type ContainerRepository interface {
Expand Down
2 changes: 1 addition & 1 deletion internal/repository/container_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (cr *ContainerRedisRepository) SetContainerState(containerId string, info *
}

func (cr *ContainerRedisRepository) SetContainerExitCode(containerId string, exitCode int) error {
err := cr.lock.Acquire(context.TODO(), common.RedisKeys.SchedulerContainerLock(containerId), common.RedisLockOptions{TtlS: 10, Retries: 0})
err := cr.lock.Acquire(context.TODO(), common.RedisKeys.SchedulerContainerLock(containerId), common.RedisLockOptions{TtlS: 10, Retries: 1})
if err != nil {
return err
}
Expand Down
9 changes: 9 additions & 0 deletions internal/repository/worker_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log"
"strings"
"time"

"github.com/beam-cloud/beta9/internal/common"
"github.com/beam-cloud/beta9/internal/types"
Expand Down Expand Up @@ -355,3 +356,11 @@ func (r *WorkerRedisRepository) SetContainerResourceValues(workerId string, cont

return nil
}

func (r *WorkerRedisRepository) SetImagePullLock(workerId, imageId string) error {
return r.rdb.Set(context.TODO(), common.RedisKeys.WorkerImageLock(workerId, imageId), true, 5*time.Minute).Err()
}

func (r *WorkerRedisRepository) RemoveImagePullLock(workerId, imageId string) error {
return r.rdb.Del(context.TODO(), common.RedisKeys.WorkerImageLock(workerId, imageId)).Err()
}
4 changes: 2 additions & 2 deletions internal/scheduler/pool_k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ func (wpc *KubernetesWorkerPoolController) addWorkerWithId(workerId string, cpu
func (wpc *KubernetesWorkerPoolController) createWorkerJob(workerId string, cpu int64, memory int64, gpuType string) (*batchv1.Job, *types.Worker) {
jobName := fmt.Sprintf("%s-%s-%s", Beta9WorkerJobPrefix, wpc.name, workerId)
labels := map[string]string{
"app": "beta9-" + Beta9WorkerLabelValue,
"prometheus.io/scrape": "true",
"app": Beta9WorkerLabelValue,
Beta9WorkerLabelKey: Beta9WorkerLabelValue,
"prometheus.io/scrape": "true",
}

workerCpu := cpu
Expand Down
6 changes: 4 additions & 2 deletions internal/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ type PostgresConfig struct {
}

type GatewayServiceConfig struct {
Host string `key:"host"`
Port int `key:"port"`
Host string `key:"host"`
Port int `key:"port"`
MaxRecvMsgSize int `key:"max_recv_msg_size_in_mb"`
MaxSendMsgSize int `key:"max_send_msg_size_in_mb"`
}

type ImageServiceConfig struct {
Expand Down
84 changes: 43 additions & 41 deletions internal/worker/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"log"
"os"
"os/exec"
"path"
"path/filepath"
"regexp"
"syscall"
"time"

"github.com/beam-cloud/beta9/internal/abstractions/image"
common "github.com/beam-cloud/beta9/internal/common"
"github.com/beam-cloud/beta9/internal/repository"
types "github.com/beam-cloud/beta9/internal/types"
"github.com/beam-cloud/clip/pkg/clip"
clipCommon "github.com/beam-cloud/clip/pkg/common"
Expand Down Expand Up @@ -46,9 +46,11 @@ type ImageClient struct {
Debug bool
Creds string
config types.ImageServiceConfig
workerId string
workerRepo repository.WorkerRepository
}

func NewImageClient(config types.ImageServiceConfig) (*ImageClient, error) {
func NewImageClient(config types.ImageServiceConfig, workerId string, workerRepo repository.WorkerRepository) (*ImageClient, error) {
var provider CredentialProvider // Configure image registry credentials

switch config.RegistryCredentialProviderName {
Expand Down Expand Up @@ -78,8 +80,10 @@ func NewImageClient(config types.ImageServiceConfig) (*ImageClient, error) {
}
}

baseImagePath := filepath.Join(imageCachePath)
os.MkdirAll(baseImagePath, os.ModePerm)
err = os.MkdirAll(imageCachePath, os.ModePerm)
if err != nil {
return nil, err
}

creds, err := provider.GetAuthString()
if err != nil {
Expand All @@ -90,11 +94,13 @@ func NewImageClient(config types.ImageServiceConfig) (*ImageClient, error) {
config: config,
registry: registry,
cacheClient: cacheClient,
ImagePath: baseImagePath,
ImagePath: imageCachePath,
PullCommand: imagePullCommand,
CommandTimeout: -1,
Debug: false,
Creds: creds,
workerId: workerId,
workerRepo: workerRepo,
}, nil
}

Expand All @@ -121,13 +127,12 @@ func (c *ImageClient) PullLazy(imageId string) error {
return nil
}

// Attempt to acquire the lock
fileLock := NewFileLock(path.Join(imagePath, fmt.Sprintf("%s_%s", imageId, imageMountLockFilename)))
if err := fileLock.Acquire(); err != nil {
fmt.Printf("Unable to acquire mount lock: %v\n", err)
// Get lock on image mount
err = c.workerRepo.SetImagePullLock(c.workerId, imageId)
if err != nil {
return err
}
defer fileLock.Release()
defer c.workerRepo.RemoveImagePullLock(c.workerId, imageId)

startServer, _, err := clip.MountArchive(*mountOptions)
if err != nil {
Expand All @@ -143,7 +148,7 @@ func (c *ImageClient) PullLazy(imageId string) error {
}

func (i *ImageClient) PullAndArchiveImage(ctx context.Context, sourceImage string, imageId string, creds *string) error {
baseImage, err := i.extractImageNameAndTag(sourceImage)
baseImage, err := extractImageNameAndTag(sourceImage)
if err != nil {
return err
}
Expand All @@ -165,13 +170,13 @@ func (i *ImageClient) PullAndArchiveImage(ctx context.Context, sourceImage strin

status, err := runc.Monitor.Wait(cmd, ec)
if err == nil && status != 0 {
err = fmt.Errorf("unable to pull base image: %s", sourceImage)
log.Printf("unable to copy base image: %v -> %v", sourceImage, dest)
}

bundlePath := filepath.Join(imagePath, imageId)
err = i.unpack(baseImage.ImageName, baseImage.ImageTag, bundlePath)
if err != nil {
return err
return fmt.Errorf("unable to unpack image: %v", err)
}

defer func() {
Expand All @@ -188,34 +193,6 @@ func (i *ImageClient) startCommand(cmd *exec.Cmd) (chan runc.Exit, error) {
return runc.Monitor.Start(cmd)
}

func (i *ImageClient) extractImageNameAndTag(sourceImage string) (image.BaseImage, error) {
re := regexp.MustCompile(`^(([^/]+/[^/]+)/)?([^:]+):?(.*)$`)
matches := re.FindStringSubmatch(sourceImage)

if matches == nil {
return image.BaseImage{}, errors.New("invalid image URI format")
}

// Use default source registry if not specified
sourceRegistry := "docker.io"
if matches[2] != "" {
sourceRegistry = matches[2]
}

imageName := matches[3]
imageTag := "latest"

if matches[4] != "" {
imageTag = matches[4]
}

return image.BaseImage{
SourceRegistry: sourceRegistry,
ImageName: imageName,
ImageTag: imageTag,
}, nil
}

func (i *ImageClient) args(creds *string) (out []string) {
if creds != nil && *creds != "" {
out = append(out, "--src-creds", *creds)
Expand Down Expand Up @@ -323,3 +300,28 @@ func (i *ImageClient) Archive(ctx context.Context, bundlePath string, imageId st
log.Printf("Image <%v> push took %v\n", imageId, time.Since(startTime))
return nil
}

var imageNamePattern = regexp.MustCompile(`^(?:(.*?)\/)?(?:([^\/:]+)\/)?([^\/:]+)(?::([^\/:]+))?$`)

func extractImageNameAndTag(sourceImage string) (image.BaseImage, error) {
matches := imageNamePattern.FindStringSubmatch(sourceImage)
if matches == nil {
return image.BaseImage{}, errors.New("invalid image URI format")
}

registry, name, tag := matches[1], matches[3], matches[4]

if registry == "" {
registry = "docker.io"
}

if tag == "" {
tag = "latest"
}

return image.BaseImage{
SourceRegistry: registry,
ImageName: name,
ImageTag: tag,
}, nil
}
58 changes: 58 additions & 0 deletions internal/worker/image_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package worker

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestExtractImageNameAndTag(t *testing.T) {
tests := []struct {
image string
wantTag string
wantName string
wantRegistry string
}{
{
image: "nginx",
wantTag: "latest",
wantName: "nginx",
wantRegistry: "docker.io",
},
{
image: "docker.io/nginx",
wantTag: "latest",
wantName: "nginx",
wantRegistry: "docker.io",
},
{
image: "docker.io/nginx:1.25.3",
wantTag: "1.25.3",
wantName: "nginx",
wantRegistry: "docker.io",
},
{
image: "docker.io/nginx:latest",
wantTag: "latest",
wantName: "nginx",
wantRegistry: "docker.io",
},
{
image: "registry.localhost:5000/beta9-runner:py311-latest",
wantTag: "py311-latest",
wantName: "beta9-runner",
wantRegistry: "registry.localhost:5000",
},
}

for _, test := range tests {
t.Run(test.image, func(t *testing.T) {
image, err := extractImageNameAndTag(test.image)
assert.NoError(t, err)

assert.Equal(t, test.wantTag, image.ImageTag)
assert.Equal(t, test.wantName, image.ImageName)
assert.Equal(t, test.wantRegistry, image.SourceRegistry)
})
}
}
Loading

0 comments on commit 4fc1efd

Please sign in to comment.