diff --git a/.github/workflows/release-runner.yml b/.github/workflows/release-runner.yml index 5f172dd1c..f3205022a 100644 --- a/.github/workflows/release-runner.yml +++ b/.github/workflows/release-runner.yml @@ -11,7 +11,29 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ['py38', 'py39', 'py310', 'py311', 'py312'] + include: + # Python versions + - version: '3.8' + target: 'py38' + - version: '3.9' + target: 'py39' + - version: '3.10' + target: 'py310' + - version: '3.11' + target: 'py311' + - version: '3.12' + target: 'py312' + # Micromamba versions + - version: '3.8' + target: 'micromamba' + - version: '3.9' + target: 'micromamba' + - version: '3.10' + target: 'micromamba' + - version: '3.11' + target: 'micromamba' + - version: '3.12' + target: 'micromamba' steps: - name: Checkout code @@ -34,16 +56,19 @@ jobs: - name: Extract tag name run: echo "RUNNER_TAG=${GITHUB_REF#refs/tags/runner-}" >> $GITHUB_ENV - - name: Build and push Docker image for ${{ matrix.python-version }} + - name: Build and push Docker image uses: docker/build-push-action@v4 with: context: . file: ./docker/Dockerfile.runner push: true - tags: ${{ secrets.ECR_REGISTRY }}/beta9-runner:${{ matrix.python-version }}-${{ env.RUNNER_TAG }},${{ secrets.ECR_REGISTRY }}/beta9-runner:${{ matrix.python-version }}-latest - target: ${{ matrix.python-version }} + tags: ${{ secrets.ECR_REGISTRY }}/beta9-runner:${{ matrix.target == 'micromamba' && format('micromamba{0}', matrix.version) || matrix.target }}-${{ env.RUNNER_TAG }},${{ secrets.ECR_REGISTRY }}/beta9-runner:${{ matrix.target == 'micromamba' && format('micromamba{0}', matrix.version) || matrix.target }}-latest + target: ${{ matrix.target }} + build-args: | + ${{ matrix.target == 'micromamba' && format('PYTHON_VERSION={0}', matrix.version) || '' }} + platforms: linux/amd64 load: false registry: ${{ steps.login-ecr.outputs.registry }} repository: ${{ steps.login-ecr.outputs.registry }}/beta9-runner add_git_labels: true - tag_with_ref: true + tag_with_ref: true \ No newline at end of file diff --git a/Makefile b/Makefile index 6e1782827..d9c3ffa36 100644 --- a/Makefile +++ b/Makefile @@ -42,9 +42,13 @@ proxy: runner: for target in py312 py311 py310 py39 py38; do \ - docker build . --no-cache --target $$target --platform=linux/amd64 -f ./docker/Dockerfile.runner -t localhost:5001/beta9-runner:$$target-$(runnerTag); \ + docker build . --target $$target --platform=linux/amd64 -f ./docker/Dockerfile.runner -t localhost:5001/beta9-runner:$$target-$(runnerTag); \ docker push localhost:5001/beta9-runner:$$target-$(runnerTag); \ done + for version in "3.12" "3.11" "3.10" "3.9" "3.8"; do \ + docker build . --build-arg PYTHON_VERSION=$$version --target micromamba --platform=linux/amd64 -f ./docker/Dockerfile.runner -t localhost:5001/beta9-runner:micromamba$$version-$(runnerTag); \ + docker push localhost:5001/beta9-runner:micromamba$$version-$(runnerTag); \ + done start: cd hack && okteto up --file okteto.yaml diff --git a/README.md b/README.md index 045ab26e0..b83bef348 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,9 @@ --- -### Run GPU Workloads Across Multiple Clouds +### Cross-Cloud Serverless Engine + +Run serverless workloads with fast cold starts on bare-metal servers, anywhere in the world

@@ -30,26 +32,55 @@ -## What is Beta9? - -Beta9 is an open source container orchestrator, designed for running GPU workloads across different cloud environments in different regions. +## Features -- Connect VMs to your cluster with a single cURL command +- Run serverless workloads using a friendly Python interface +- Autoscaling and automatic scale-to-zero - Read large files at the edge using distributed, cross-region storage -- Manage your fleet of GPUs using a Tailscale-powered service mesh +- Connect bare-metal nodes to your cluster with a single cURL command +- Manage your fleet of servers using a Tailscale-powered service mesh - Securely run workloads with end-to-end encryption through WireGuard -- Run workloads using a friendly Python interface -## How does it work? +## How Does It Work? + +### Run Serverless Workloads in Pure Python + +Add an `endpoint` decorator to your code, and you'll get a load-balanced HTTP endpoint (with auth!) to invoke your code. -### Provision GPUs Anywhere +You can also run long-running functions with `@function`, deploy task queues using `@task_queue`, and schedule jobs with `@schedule`: + +```python +from beta9 import endpoint + + +# This will run on a remote A100-40 in your cluster +@endpoint(cpu=1, memory=128, gpu="A100-40") +def square(i: int): + return i**2 +``` + +Deploy with a single command: + +``` +$ beta9 deploy app.py:square --name inference +=> Building image +=> Using cached image +=> Deployed 🎉 + +curl -X POST 'https://inference.beam.cloud/v1' \ +-H 'Authorization: Bearer [YOUR_AUTH_TOKEN]' \ +-H 'Content-Type: application/json' \ +-d '{}' +``` + +### Run on Bare-Metal Servers Around the World Connect any GPU to your cluster with one CLI command and a cURL. ```sh $ beta9 machine create --pool lambda-a100-40 -=> Created machine with ID: '9541cbd2'. Use the following command to setup the node: +=> Created machine with ID: '9541cbd2'. Use the following command to set up the node: #!/bin/bash sudo curl -L -o agent https://release.beam.cloud/agent/agent && \ @@ -64,9 +95,9 @@ sudo ./agent --token "AUTH_TOKEN" \ You can run this install script on your VM to connect it to your cluster. -### Manage Your GPU Fleet +### Manage Your CPU or GPU Fleet -Manage your distributed cross-region GPU cluster using a centralized control plane. +Manage your distributed cross-region cluster using a centralized control plane. ```sh $ beta9 machine list @@ -78,20 +109,6 @@ $ beta9 machine list ``` -### Run Workloads in Python - -Offload any workload to your remote machines by adding a Python decorator to your code. - -```python -from beta9 import function - - -# This will run on a remote A100-40 in your cluster -@function(cpu=1, memory=128, gpu="A100-40") -def square(i: int): - return i**2 -``` - # Local Installation You can run Beta9 locally, or in an existing Kubernetes cluster using our [Helm chart](https://github.com/beam-cloud/beta9/tree/main/deploy/charts/beta9). @@ -102,9 +119,6 @@ k3d is used for local development. You'll need Docker to get started. To use our fully automated setup, run the `setup` make target. -> [!NOTE] -> This will overwrite some of the tools you may already have installed. Review the [setup.sh](bin/setup.sh) to learn more. - ```bash make setup ``` @@ -113,9 +127,6 @@ make setup The SDK is written in Python. You'll need Python 3.8 or higher. Use the `setup-sdk` make target to get started. -> [!NOTE] -> This will install the Poetry package manager. - ```bash make setup-sdk ``` @@ -137,7 +148,7 @@ If you need support, you can reach out through any of these channels: - [Slack](https://join.slack.com/t/beam-cloud/shared_invite/zt-2f16bwiiq-oP8weCLWNrf_9lJZIDf0Fg) \(Chat live with maintainers and community members\) - [GitHub issues](https://github.com/beam-cloud/issues) \(Bug reports, feature requests, and anything roadmap related) -- [Twitter](https://twitter.com/beam_cloud) \(Updates on releases and stuff) +- [Twitter](https://twitter.com/beam_cloud) \(Updates on releases and more) ## Thanks to Our Contributors diff --git a/docker/Dockerfile.runner b/docker/Dockerfile.runner index 996825b92..1e155182e 100644 --- a/docker/Dockerfile.runner +++ b/docker/Dockerfile.runner @@ -195,3 +195,76 @@ rm -rf /usr/lib/python2.7 && rm -rf /usr/lib/python3.6 EOT VOLUME ["/volumes", "/snapshot"] + +# Micromamba-base +# ======================== +FROM base as micromamba-base + +WORKDIR /workspace + +ENV DEBIAN_FRONTEND=noninteractive \ + NVIDIA_VISIBLE_DEVICES=all \ + PATH=/micromamba/bin:$PATH \ + MAMBA_ROOT_PREFIX=/micromamba \ + MAMBA_EXE=/bin/micromamba \ + SHELL=/bin/bash + +COPY ./pkg/abstractions/image/base_requirements.txt /workspace/requirements.txt + +SHELL ["/bin/bash", "-c"] +RUN < /micromamba/mpy +chmod +x /micromamba/mpy + +apt-get clean -y +apt-get autoremove -y +rm -rf /var/lib/apt/lists/* +rm -rf /usr/share/doc +rm -rf /usr/share/man +rm -rf /usr/share/locale +rm -rf /root/.cache/* +rm -rf /tmp/* +EOT + + +VOLUME ["/volumes", "/snapshot"] + +# Micromamba final +# ======================== +ARG PYTHON_VERSION=3.10 +FROM micromamba-base as micromamba + +WORKDIR /workspace + +COPY --from=micromamba-base /micromamba /micromamba + +RUN < 0 { - gpuCount = 1 - } - gpuRequest := types.GpuTypesToStrings(stubConfig.Runtime.Gpus) if stubConfig.Runtime.Gpu != "" { gpuRequest = append(gpuRequest, stubConfig.Runtime.Gpu.String()) } + gpuCount := 0 + if len(gpuRequest) > 0 { + gpuCount = 1 + } + err = cs.scheduler.Run(&types.ContainerRequest{ ContainerId: containerId, Env: env, diff --git a/pkg/abstractions/endpoint/autoscaler.go b/pkg/abstractions/endpoint/autoscaler.go index c1cd74bba..a466a8e40 100644 --- a/pkg/abstractions/endpoint/autoscaler.go +++ b/pkg/abstractions/endpoint/autoscaler.go @@ -50,8 +50,8 @@ func endpointDeploymentScaleFunc(i *endpointInstance, s *endpointAutoscalerSampl desiredContainers += 1 } - // Limit max replicas to either what was set in autoscaler config, or our default of MaxReplicas (whichever is lower) - maxReplicas := math.Min(float64(i.StubConfig.Autoscaler.MaxContainers), float64(abstractions.MaxReplicas)) + // Limit max replicas to either what was set in autoscaler config, or the limit specified on the gateway config (whichever is lower) + maxReplicas := math.Min(float64(i.StubConfig.Autoscaler.MaxContainers), float64(i.AppConfig.GatewayService.StubLimits.MaxReplicas)) desiredContainers = int(math.Min(maxReplicas, float64(desiredContainers))) } diff --git a/pkg/abstractions/endpoint/autoscaler_test.go b/pkg/abstractions/endpoint/autoscaler_test.go index b3ccd0930..d5b34c1b3 100644 --- a/pkg/abstractions/endpoint/autoscaler_test.go +++ b/pkg/abstractions/endpoint/autoscaler_test.go @@ -17,6 +17,12 @@ func TestDeploymentScaleFuncWithDefaults(t *testing.T) { ExternalId: "test", }, }, + AppConfig: types.AppConfig{}, + } + autoscaledInstance.AppConfig.GatewayService = types.GatewayServiceConfig{ + StubLimits: types.StubLimits{ + MaxReplicas: 10, + }, } autoscaledInstance.StubConfig = &types.StubConfigV1{} autoscaledInstance.StubConfig.Autoscaler = &types.Autoscaler{ @@ -64,6 +70,13 @@ func TestDeploymentScaleFuncWithMaxTasksPerContainer(t *testing.T) { ExternalId: "test", }, }, + AppConfig: types.AppConfig{}, + } + + autoscaledInstance.AppConfig.GatewayService = types.GatewayServiceConfig{ + StubLimits: types.StubLimits{ + MaxReplicas: 10, + }, } autoscaledInstance.StubConfig = &types.StubConfigV1{} autoscaledInstance.StubConfig.Autoscaler = &types.Autoscaler{ diff --git a/pkg/abstractions/endpoint/buffer.go b/pkg/abstractions/endpoint/buffer.go index 604a2ba08..5d50818fd 100644 --- a/pkg/abstractions/endpoint/buffer.go +++ b/pkg/abstractions/endpoint/buffer.go @@ -558,6 +558,11 @@ func (rb *RequestBuffer) proxyWebsocketConnection(r *request, c container, diale } func forwardWSConn(src net.Conn, dst net.Conn) { + defer func() { + src.Close() + dst.Close() + }() + _, err := io.Copy(src, dst) if err != nil { return diff --git a/pkg/abstractions/endpoint/instance.go b/pkg/abstractions/endpoint/instance.go index d266b2257..5e81c54bd 100644 --- a/pkg/abstractions/endpoint/instance.go +++ b/pkg/abstractions/endpoint/instance.go @@ -63,16 +63,16 @@ func (i *endpointInstance) startContainers(containersToRun int) error { env = append(secrets, env...) - gpuCount := 0 - if len(i.StubConfig.Runtime.Gpus) > 0 { - gpuCount = 1 - } - gpuRequest := types.GpuTypesToStrings(i.StubConfig.Runtime.Gpus) if i.StubConfig.Runtime.Gpu != "" { gpuRequest = append(gpuRequest, i.StubConfig.Runtime.Gpu.String()) } + gpuCount := 0 + if len(gpuRequest) > 0 { + gpuCount = 1 + } + for c := 0; c < containersToRun; c++ { containerId := i.genContainerId() @@ -125,7 +125,7 @@ func (i *endpointInstance) stopContainers(containersToStop int) error { idx := rnd.Intn(len(containerIds)) containerId := containerIds[idx] - err := i.Scheduler.Stop(containerId) + err := i.Scheduler.Stop(&types.StopContainerArgs{ContainerId: containerId}) if err != nil { log.Printf("<%s> unable to stop container: %v", i.Name, err) return err diff --git a/pkg/abstractions/function/task.go b/pkg/abstractions/function/task.go index 845336cd1..b409c7245 100644 --- a/pkg/abstractions/function/task.go +++ b/pkg/abstractions/function/task.go @@ -130,11 +130,6 @@ func (t *FunctionTask) run(ctx context.Context, stub *types.StubWithRelated) err return err } - gpuCount := 0 - if len(stubConfig.Runtime.Gpus) > 0 { - gpuCount = 1 - } - env := []string{ fmt.Sprintf("TASK_ID=%s", t.msg.TaskId), fmt.Sprintf("HANDLER=%s", stubConfig.Handler), @@ -150,6 +145,11 @@ func (t *FunctionTask) run(ctx context.Context, stub *types.StubWithRelated) err gpuRequest = append(gpuRequest, stubConfig.Runtime.Gpu.String()) } + gpuCount := 0 + if len(gpuRequest) > 0 { + gpuCount = 1 + } + err = t.fs.scheduler.Run(&types.ContainerRequest{ ContainerId: t.containerId, Env: env, diff --git a/pkg/abstractions/image/build.go b/pkg/abstractions/image/build.go index dddeb5392..9f81b9325 100644 --- a/pkg/abstractions/image/build.go +++ b/pkg/abstractions/image/build.go @@ -29,8 +29,9 @@ const ( defaultBuildContainerMemory int64 = 1024 defaultContainerSpinupTimeout time.Duration = 180 * time.Second - pipCommandType string = "pip" - shellCommandType string = "shell" + pipCommandType string = "pip" + shellCommandType string = "shell" + micromambaCommandType string = "micromamba" ) type Builder struct { @@ -266,30 +267,28 @@ func (b *Builder) Build(ctx context.Context, opts *BuildOpts, outputChan chan co // Generate the pip install command and prepend it to the commands list if len(opts.PythonPackages) > 0 { - pipInstallCmd := b.generatePipInstallCommand(opts.PythonPackages, opts.PythonVersion) + pipInstallCmd := generatePipInstallCommand(opts.PythonPackages, opts.PythonVersion) opts.Commands = append([]string{pipInstallCmd}, opts.Commands...) } log.Printf("container <%v> building with options: %s\n", containerId, opts) startTime := time.Now() + micromambaEnv := strings.Contains(opts.PythonVersion, "micromamba") + if micromambaEnv { + client.Exec(containerId, "micromamba config set use_lockfiles False") + } + // Detect if python3.x is installed in the container, if not install it checkPythonVersionCmd := fmt.Sprintf("%s --version", opts.PythonVersion) - if resp, err := client.Exec(containerId, checkPythonVersionCmd); err != nil || !resp.Ok { + if resp, err := client.Exec(containerId, checkPythonVersionCmd); (err != nil || !resp.Ok) && !micromambaEnv { outputChan <- common.OutputMsg{Done: false, Success: false, Msg: fmt.Sprintf("%s not detected, installing it for you...\n", opts.PythonVersion)} - installCmd := b.getPythonInstallCommand(opts.PythonVersion) + installCmd := getPythonInstallCommand(opts.PythonVersion) opts.Commands = append([]string{installCmd}, opts.Commands...) } // Generate the commands to run in the container - for _, step := range opts.BuildSteps { - switch step.Type { - case shellCommandType: - opts.Commands = append(opts.Commands, step.Command) - case pipCommandType: - opts.Commands = append(opts.Commands, b.generatePipInstallCommand([]string{step.Command}, opts.PythonVersion)) - } - } + opts.Commands = parseBuildSteps(opts.BuildSteps, opts.PythonVersion) for _, cmd := range opts.Commands { if cmd == "" { @@ -311,14 +310,13 @@ func (b *Builder) Build(ctx context.Context, opts *BuildOpts, outputChan chan co } log.Printf("container <%v> build took %v\n", containerId, time.Since(startTime)) - outputChan <- common.OutputMsg{Done: false, Success: false, Msg: "\nSaving image, this may take a few minutes...\n"} err = client.Archive(ctx, containerId, imageId, outputChan) if err != nil { - outputChan <- common.OutputMsg{Done: true, Success: false, Msg: err.Error() + "\n"} + outputChan <- common.OutputMsg{Done: true, Archiving: true, Success: false, Msg: err.Error() + "\n"} return err } - outputChan <- common.OutputMsg{Done: true, Success: true, ImageId: imageId} + outputChan <- common.OutputMsg{Done: true, Archiving: true, Success: true, ImageId: imageId} return nil } @@ -364,6 +362,7 @@ func (b *Builder) extractPackageName(pkg string) string { return strings.FieldsFunc(pkg, func(c rune) bool { return c == '=' || c == '>' || c == '<' || c == '[' || c == ';' })[0] } +// handleCustomBaseImage validates the custom base image and parses its details into build options func (b *Builder) handleCustomBaseImage(opts *BuildOpts, outputChan chan common.OutputMsg) error { if outputChan != nil { outputChan <- common.OutputMsg{Done: false, Success: false, Msg: fmt.Sprintf("Using custom base image: %s\n", opts.ExistingImageUri)} @@ -422,57 +421,20 @@ func (b *Builder) Exists(ctx context.Context, imageId string) bool { return b.registry.Exists(ctx, imageId) } -func (b *Builder) getPythonInstallCommand(pythonVersion string) string { - baseCmd := "apt-get update -q && apt-get install -q -y software-properties-common gcc curl git" - components := []string{ - "python3-future", - pythonVersion, - fmt.Sprintf("%s-distutils", pythonVersion), - fmt.Sprintf("%s-dev", pythonVersion), - } - - installCmd := strings.Join(components, " ") - installPipCmd := fmt.Sprintf("curl -sS https://bootstrap.pypa.io/get-pip.py | %s", pythonVersion) - postInstallCmd := fmt.Sprintf("rm -f /usr/bin/python && rm -f /usr/bin/python3 && ln -s /usr/bin/%s /usr/bin/python && ln -s /usr/bin/%s /usr/bin/python3 && %s", pythonVersion, pythonVersion, installPipCmd) - - return fmt.Sprintf("%s && add-apt-repository ppa:deadsnakes/ppa && apt-get update && apt-get install -q -y %s && %s", baseCmd, installCmd, postInstallCmd) -} - -func (b *Builder) generatePipInstallCommand(pythonPackages []string, pythonVersion string) string { - var flagLines []string - var packages []string - var flags = []string{"--", "-"} - - for _, pkg := range pythonPackages { - if hasAnyPrefix(pkg, flags) { - flagLines = append(flagLines, pkg) - } else { - packages = append(packages, fmt.Sprintf("%q", pkg)) - } - } - - command := fmt.Sprintf("%s -m pip install --root-user-action=ignore", pythonVersion) - if len(flagLines) > 0 { - command += " " + strings.Join(flagLines, " ") - } - if len(packages) > 0 { - command += " " + strings.Join(packages, " ") - } - - return command -} - var imageNamePattern = regexp.MustCompile( `^` + // Assert position at the start of the string `(?:(?P(?:(?:localhost|[\w.-]+(?:\.[\w.-]+)+)(?::\d+)?)|[\w]+:\d+)\/)?` + // Optional registry, which can be localhost, a domain with optional port, or a simple registry with port - `(?P(?:[a-z0-9]+(?:(?:[._]|__|[-]*)[a-z0-9]+)*)\/)*` + // Optional namespace, which can contain multiple segments separated by slashes - `(?P[a-z0-9][-a-z0-9._]+)` + // Required repository name, must start with alphanumeric and can contain alphanumerics, hyphens, dots, and underscores + `(?P(?:[\w][\w.-]*(?:/[\w][\w.-]*)*))?` + // Full repository path including namespace `(?::(?P[\w][\w.-]{0,127}))?` + // Optional tag, which starts with a word character and can contain word characters, dots, and hyphens `(?:@(?P[A-Za-z][A-Za-z0-9]*(?:[-_+.][A-Za-z][A-Za-z0-9]*)*:[0-9A-Fa-f]{32,}))?` + // Optional digest, which is a hash algorithm followed by a colon and a hexadecimal hash `$`, // Assert position at the end of the string ) func ExtractImageNameAndTag(imageRef string) (BaseImage, error) { + if imageRef == "" { + return BaseImage{}, errors.New("invalid image URI format") + } + matches := imageNamePattern.FindStringSubmatch(imageRef) if matches == nil { return BaseImage{}, errors.New("invalid image URI format") @@ -490,7 +452,11 @@ func ExtractImageNameAndTag(imageRef string) (BaseImage, error) { registry = "docker.io" } - repo := result["Namespace"] + result["Repo"] + repo := result["Repo"] + if repo == "" { + return BaseImage{}, errors.New("invalid image URI format") + } + tag, digest := result["Tag"], result["Digest"] if tag == "" && digest == "" { tag = "latest" @@ -504,6 +470,50 @@ func ExtractImageNameAndTag(imageRef string) (BaseImage, error) { }, nil } +func getPythonInstallCommand(pythonVersion string) string { + baseCmd := "apt-get update -q && apt-get install -q -y software-properties-common gcc curl git" + components := []string{ + "python3-future", + pythonVersion, + fmt.Sprintf("%s-distutils", pythonVersion), + fmt.Sprintf("%s-dev", pythonVersion), + } + + installCmd := strings.Join(components, " ") + installPipCmd := fmt.Sprintf("curl -sS https://bootstrap.pypa.io/get-pip.py | %s", pythonVersion) + postInstallCmd := fmt.Sprintf("rm -f /usr/bin/python && rm -f /usr/bin/python3 && ln -s /usr/bin/%s /usr/bin/python && ln -s /usr/bin/%s /usr/bin/python3 && %s", pythonVersion, pythonVersion, installPipCmd) + + return fmt.Sprintf("%s && add-apt-repository ppa:deadsnakes/ppa && apt-get update && apt-get install -q -y %s && %s", baseCmd, installCmd, postInstallCmd) +} + +func generatePipInstallCommand(pythonPackages []string, pythonVersion string) string { + flagLines, packages := parseFlagLinesAndPackages(pythonPackages) + + command := fmt.Sprintf("%s -m pip install --root-user-action=ignore", pythonVersion) + if len(flagLines) > 0 { + command += " " + strings.Join(flagLines, " ") + } + if len(packages) > 0 { + command += " " + strings.Join(packages, " ") + } + + return command +} + +func generateMicromambaInstallCommand(pythonPackages []string) string { + flagLines, packages := parseFlagLinesAndPackages(pythonPackages) + + command := fmt.Sprintf("%s install -y -n beta9", micromambaCommandType) + if len(flagLines) > 0 { + command += " " + strings.Join(flagLines, " ") + } + if len(packages) > 0 { + command += " " + strings.Join(packages, " ") + } + + return command +} + func hasAnyPrefix(s string, prefixes []string) bool { for _, prefix := range prefixes { if strings.HasPrefix(s, prefix) { @@ -512,3 +522,75 @@ func hasAnyPrefix(s string, prefixes []string) bool { } return false } + +func parseFlagLinesAndPackages(pythonPackages []string) ([]string, []string) { + var flagLines []string + var packages []string + var flags = []string{"--", "-"} + + for _, pkg := range pythonPackages { + if hasAnyPrefix(pkg, flags) { + flagLines = append(flagLines, pkg) + } else { + packages = append(packages, fmt.Sprintf("%q", pkg)) + } + } + return flagLines, packages +} + +// Generate the commands to run in the container. This function will coalesce pip and mamba commands +// into a single command if they are adjacent to each other. +func parseBuildSteps(buildSteps []BuildStep, pythonVersion string) []string { + commands := []string{} + var ( + mambaStart int = -1 + mambaGroup []string + pipStart int = -1 + pipGroup []string + ) + + for _, step := range buildSteps { + if step.Type == shellCommandType { + commands = append(commands, step.Command) + } + + if step.Type == pipCommandType { + if pipStart == -1 { + pipStart = len(commands) + commands = append(commands, "") + } + pipGroup = append(pipGroup, step.Command) + } + + if step.Type == micromambaCommandType { + if mambaStart == -1 { + mambaStart = len(commands) + commands = append(commands, "") + } + mambaGroup = append(mambaGroup, step.Command) + } + + // Flush any pending pip or mamba groups + if pipStart != -1 && step.Type != pipCommandType { + commands[pipStart] = generatePipInstallCommand(pipGroup, pythonVersion) + pipStart = -1 + pipGroup = nil + } + + if mambaStart != -1 && step.Type != micromambaCommandType { + commands[mambaStart] = generateMicromambaInstallCommand(mambaGroup) + mambaStart = -1 + mambaGroup = nil + } + } + + if mambaStart != -1 { + commands[mambaStart] = generateMicromambaInstallCommand(mambaGroup) + } + + if pipStart != -1 { + commands[pipStart] = generatePipInstallCommand(pipGroup, pythonVersion) + } + + return commands +} diff --git a/pkg/abstractions/image/build_test.go b/pkg/abstractions/image/build_test.go index 0efd0d709..fc42b6b0f 100644 --- a/pkg/abstractions/image/build_test.go +++ b/pkg/abstractions/image/build_test.go @@ -80,9 +80,15 @@ func TestExtractImageNameAndTag(t *testing.T) { { ref: "nvcr.io/nim/meta/llama-3.1-8b-instruct:1.1.0", wantTag: "1.1.0", - wantRepo: "meta/llama-3.1-8b-instruct", + wantRepo: "nim/meta/llama-3.1-8b-instruct", wantRegistry: "nvcr.io", }, + { + ref: "ghcr.io/gis-ops/docker-valhalla/valhalla:latest", + wantTag: "latest", + wantRepo: "gis-ops/docker-valhalla/valhalla", + wantRegistry: "ghcr.io", + }, } for _, test := range tests { @@ -145,8 +151,76 @@ func TestGeneratePipInstallCommand(t *testing.T) { } for _, tc := range testCases { - b := &Builder{} - cmd := b.generatePipInstallCommand(tc.opts.PythonPackages, tc.opts.PythonVersion) + cmd := generatePipInstallCommand(tc.opts.PythonPackages, tc.opts.PythonVersion) assert.Equal(t, tc.want, cmd) } } + +func TestParseBuildSteps(t *testing.T) { + testCases := []struct { + steps []BuildStep + want []string + }{ + { + steps: []BuildStep{}, + want: []string{}, + }, + { + steps: []BuildStep{ + {Type: shellCommandType, Command: "echo 'hello'"}, + {Type: shellCommandType, Command: "echo 'world'"}, + }, + want: []string{"echo 'hello'", "echo 'world'"}, + }, + { + steps: []BuildStep{ + {Type: pipCommandType, Command: "numpy"}, + {Type: pipCommandType, Command: "pandas"}, + }, + want: []string{"micromamba3.10 -m pip install --root-user-action=ignore \"numpy\" \"pandas\""}, + }, + { + steps: []BuildStep{ + {Type: micromambaCommandType, Command: "torch"}, + {Type: micromambaCommandType, Command: "vllm"}, + }, + want: []string{"micromamba install -y -n beta9 \"torch\" \"vllm\""}, + }, + { + steps: []BuildStep{ + {Type: shellCommandType, Command: "echo 'start'"}, + {Type: pipCommandType, Command: "numpy"}, + {Type: micromambaCommandType, Command: "torch"}, + {Type: shellCommandType, Command: "echo 'end'"}, + }, + want: []string{"echo 'start'", "micromamba3.10 -m pip install --root-user-action=ignore \"numpy\"", "micromamba install -y -n beta9 \"torch\"", "echo 'end'"}, + }, + { + steps: []BuildStep{ + {Type: shellCommandType, Command: "echo 'hello'"}, + {Type: pipCommandType, Command: "numpy"}, + {Type: pipCommandType, Command: "pandas"}, + {Type: micromambaCommandType, Command: "torch"}, + {Type: micromambaCommandType, Command: "vllm"}, + }, + want: []string{"echo 'hello'", "micromamba3.10 -m pip install --root-user-action=ignore \"numpy\" \"pandas\"", "micromamba install -y -n beta9 \"torch\" \"vllm\""}, + }, + { + steps: []BuildStep{ + {Type: shellCommandType, Command: "echo 'hello'"}, + {Type: pipCommandType, Command: "numpy"}, + {Type: pipCommandType, Command: "pandas"}, + {Type: micromambaCommandType, Command: "torch"}, + {Type: micromambaCommandType, Command: "vllm"}, + {Type: shellCommandType, Command: "apt install -y ffmpeg"}, + {Type: micromambaCommandType, Command: "ffmpeg"}, + }, + want: []string{"echo 'hello'", "micromamba3.10 -m pip install --root-user-action=ignore \"numpy\" \"pandas\"", "micromamba install -y -n beta9 \"torch\" \"vllm\"", "apt install -y ffmpeg", "micromamba install -y -n beta9 \"ffmpeg\""}, + }, + } + + for _, tc := range testCases { + got := parseBuildSteps(tc.steps, "micromamba3.10") + assert.Equal(t, tc.want, got) + } +} diff --git a/pkg/abstractions/image/image.go b/pkg/abstractions/image/image.go index 1349a0822..8443c1030 100644 --- a/pkg/abstractions/image/image.go +++ b/pkg/abstractions/image/image.go @@ -107,14 +107,24 @@ func (is *RuncImageService) BuildImage(in *pb.BuildImageRequest, stream pb.Image go is.builder.Build(ctx, buildOptions, outputChan) + // This is a switch to stop sending build log messages once the archiving stage is reached + archivingStage := false var lastMessage common.OutputMsg for o := range outputChan { + if archivingStage && !o.Archiving { + continue + } + if err := stream.Send(&pb.BuildImageResponse{Msg: o.Msg, Done: o.Done, Success: o.Success, ImageId: o.ImageId}); err != nil { log.Println("failed to complete build: ", err) lastMessage = o break } + if o.Archiving { + archivingStage = true + } + if o.Done { lastMessage = o break diff --git a/pkg/abstractions/taskqueue/autoscaler.go b/pkg/abstractions/taskqueue/autoscaler.go index d8b19dae0..4b8ae4d63 100644 --- a/pkg/abstractions/taskqueue/autoscaler.go +++ b/pkg/abstractions/taskqueue/autoscaler.go @@ -66,8 +66,8 @@ func taskQueueScaleFunc(i *taskQueueInstance, s *taskQueueAutoscalerSample) *abs desiredContainers += 1 } - // Limit max replicas to either what was set in autoscaler config, or our default of MaxReplicas (whichever is lower) - maxReplicas := math.Min(float64(i.StubConfig.Autoscaler.MaxContainers), float64(abstractions.MaxReplicas)) + // Limit max replicas to either what was set in autoscaler config, or the limit specified on the gateway config (whichever is lower) + maxReplicas := math.Min(float64(i.StubConfig.Autoscaler.MaxContainers), float64(i.AppConfig.GatewayService.StubLimits.MaxReplicas)) desiredContainers = int(math.Min(maxReplicas, float64(desiredContainers))) } diff --git a/pkg/abstractions/taskqueue/autoscaler_test.go b/pkg/abstractions/taskqueue/autoscaler_test.go index 2622fa676..2c2d26a20 100644 --- a/pkg/abstractions/taskqueue/autoscaler_test.go +++ b/pkg/abstractions/taskqueue/autoscaler_test.go @@ -16,6 +16,12 @@ func TestDeploymentScaleFuncWithDefaults(t *testing.T) { MaxContainers: 1, TasksPerContainer: 1, } + autoscaledInstance.AppConfig = types.AppConfig{} + autoscaledInstance.AppConfig.GatewayService = types.GatewayServiceConfig{ + StubLimits: types.StubLimits{ + MaxReplicas: 10, + }, + } instance := &taskQueueInstance{} instance.AutoscaledInstance = autoscaledInstance @@ -56,6 +62,12 @@ func TestDeploymentScaleFuncWithMaxTasksPerContainer(t *testing.T) { MaxContainers: 3, TasksPerContainer: 1, } + autoscaledInstance.AppConfig = types.AppConfig{} + autoscaledInstance.AppConfig.GatewayService = types.GatewayServiceConfig{ + StubLimits: types.StubLimits{ + MaxReplicas: 10, + }, + } // Make sure we scale up to max containers instance := &taskQueueInstance{} diff --git a/pkg/abstractions/taskqueue/instance.go b/pkg/abstractions/taskqueue/instance.go index bd92556b8..7b20fe709 100644 --- a/pkg/abstractions/taskqueue/instance.go +++ b/pkg/abstractions/taskqueue/instance.go @@ -60,16 +60,16 @@ func (i *taskQueueInstance) startContainers(containersToRun int) error { env = append(secrets, env...) - gpuCount := 0 - if len(i.StubConfig.Runtime.Gpus) > 0 { - gpuCount = 1 - } - gpuRequest := types.GpuTypesToStrings(i.StubConfig.Runtime.Gpus) if i.StubConfig.Runtime.Gpu != "" { gpuRequest = append(gpuRequest, i.StubConfig.Runtime.Gpu.String()) } + gpuCount := 0 + if len(gpuRequest) > 0 { + gpuCount = 1 + } + for c := 0; c < containersToRun; c++ { runRequest := &types.ContainerRequest{ ContainerId: i.genContainerId(), @@ -112,7 +112,7 @@ func (i *taskQueueInstance) stopContainers(containersToStop int) error { idx := rnd.Intn(len(containerIds)) containerId := containerIds[idx] - err := i.Scheduler.Stop(containerId) + err := i.Scheduler.Stop(&types.StopContainerArgs{ContainerId: containerId}) if err != nil { log.Printf("<%s> unable to stop container: %v", i.Name, err) return err diff --git a/pkg/api/v1/container.go b/pkg/api/v1/container.go index 6ec5fda36..e4bf8375e 100644 --- a/pkg/api/v1/container.go +++ b/pkg/api/v1/container.go @@ -76,7 +76,7 @@ func (c *ContainerGroup) StopAllWorkspaceContainers(ctx echo.Context) error { } for _, state := range containerStates { - err := c.scheduler.Stop(state.ContainerId) + err := c.scheduler.Stop(&types.StopContainerArgs{ContainerId: state.ContainerId}) if err != nil { log.Println("failed to stop container", state.ContainerId, err) } @@ -90,6 +90,7 @@ func (c *ContainerGroup) StopAllWorkspaceContainers(ctx echo.Context) error { func (c *ContainerGroup) StopContainer(ctx echo.Context) error { workspaceId := ctx.Param("workspaceId") containerId := ctx.Param("containerId") + force := ctx.QueryParam("force") == "true" state, err := c.containerRepo.GetContainerState(containerId) if err != nil { @@ -100,7 +101,7 @@ func (c *ContainerGroup) StopContainer(ctx echo.Context) error { return HTTPBadRequest("Invalid workspace id") } - err = c.scheduler.Stop(containerId) + err = c.scheduler.Stop(&types.StopContainerArgs{ContainerId: containerId, Force: force}) if err != nil { if strings.Contains(err.Error(), "event already exists") { return HTTPConflict("Container is already stopping") diff --git a/pkg/api/v1/deployment.go b/pkg/api/v1/deployment.go index ffb1a21df..0ed78459b 100644 --- a/pkg/api/v1/deployment.go +++ b/pkg/api/v1/deployment.go @@ -216,7 +216,7 @@ func (g *DeploymentGroup) stopDeployments(deployments []types.DeploymentWithRela containers, err := g.containerRepo.GetActiveContainersByStubId(deployment.Stub.ExternalId) if err == nil { for _, container := range containers { - g.scheduler.Stop(container.ContainerId) + g.scheduler.Stop(&types.StopContainerArgs{ContainerId: container.ContainerId}) } } diff --git a/pkg/common/config.default.yaml b/pkg/common/config.default.yaml index d3f93c055..4eac184e9 100644 --- a/pkg/common/config.default.yaml +++ b/pkg/common/config.default.yaml @@ -48,6 +48,7 @@ gateway: shutdownTimeout: 180s stubLimits: memory: 32768 + maxReplicas: 10 imageService: localCacheEnabled: true registryStore: local @@ -72,6 +73,11 @@ imageService: python3.10: py310-latest python3.11: py311-latest python3.12: py312-latest + micromamba3.8: micromamba3.8-latest + micromamba3.9: micromamba3.9-latest + micromamba3.10: micromamba3.10-latest + micromamba3.11: micromamba3.11-latest + micromamba3.12: micromamba3.12-latest worker: pools: default: diff --git a/pkg/common/io.go b/pkg/common/io.go index b8351f401..e24ce2588 100644 --- a/pkg/common/io.go +++ b/pkg/common/io.go @@ -1,9 +1,11 @@ package common type OutputMsg struct { - Msg string - Done bool - Success bool + Msg string + Done bool + Success bool + Archiving bool + ImageId string } diff --git a/pkg/common/runc_client.go b/pkg/common/runc_client.go index 190c18632..957ff7be6 100644 --- a/pkg/common/runc_client.go +++ b/pkg/common/runc_client.go @@ -176,6 +176,7 @@ func generateProgressBar(progress int, total int) string { } func (c *RunCClient) Archive(ctx context.Context, containerId, imageId string, outputChan chan OutputMsg) error { + outputChan <- OutputMsg{Archiving: true, Done: false, Success: false, Msg: "\nSaving image, this may take a few minutes...\n"} stream, err := c.client.RunCArchive(ctx, &pb.RunCArchiveRequest{ContainerId: containerId, ImageId: imageId}) if err != nil { @@ -198,12 +199,12 @@ func (c *RunCClient) Archive(ctx context.Context, containerId, imageId string, o } if resp.ErrorMsg != "" { - outputChan <- OutputMsg{Msg: resp.ErrorMsg + "\n", Done: false} + outputChan <- OutputMsg{Msg: resp.ErrorMsg + "\n", Done: false, Archiving: true} } if !resp.Done && resp.ErrorMsg == "" { progressBar := generateProgressBar(int(resp.Progress), 100) - outputChan <- OutputMsg{Msg: progressBar, Done: false} + outputChan <- OutputMsg{Msg: progressBar, Done: false, Archiving: true} } if resp.Done && resp.Success { diff --git a/pkg/gateway/services/container.go b/pkg/gateway/services/container.go index beac6e2d4..a40ed7d90 100644 --- a/pkg/gateway/services/container.go +++ b/pkg/gateway/services/container.go @@ -5,6 +5,7 @@ import ( "time" "github.com/beam-cloud/beta9/pkg/auth" + "github.com/beam-cloud/beta9/pkg/types" pb "github.com/beam-cloud/beta9/proto" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -60,7 +61,7 @@ func (gws GatewayService) StopContainer(ctx context.Context, in *pb.StopContaine }, nil } - err = gws.scheduler.Stop(in.ContainerId) + err = gws.scheduler.Stop(&types.StopContainerArgs{ContainerId: in.ContainerId}) if err != nil { return &pb.StopContainerResponse{ Ok: false, diff --git a/pkg/gateway/services/deployment.go b/pkg/gateway/services/deployment.go index 7aa1c1817..02bb26d71 100644 --- a/pkg/gateway/services/deployment.go +++ b/pkg/gateway/services/deployment.go @@ -164,7 +164,7 @@ func (gws *GatewayService) stopDeployments(deployments []types.DeploymentWithRel containers, err := gws.containerRepo.GetActiveContainersByStubId(deployment.Stub.ExternalId) if err == nil { for _, container := range containers { - gws.scheduler.Stop(container.ContainerId) + gws.scheduler.Stop(&types.StopContainerArgs{ContainerId: container.ContainerId}) } } diff --git a/pkg/gateway/services/worker.go b/pkg/gateway/services/worker.go index 4c304ff72..96c6eec0a 100644 --- a/pkg/gateway/services/worker.go +++ b/pkg/gateway/services/worker.go @@ -185,7 +185,7 @@ func (gws *GatewayService) DrainWorker(ctx context.Context, in *pb.DrainWorkerRe var group errgroup.Group for _, container := range containers { group.Go(func() error { - return gws.scheduler.Stop(container.ContainerId) + return gws.scheduler.Stop(&types.StopContainerArgs{ContainerId: container.ContainerId}) }) } if err := group.Wait(); err != nil { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 723700c5a..a3a96285a 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -138,19 +138,22 @@ func (s *Scheduler) getConcurrencyLimit(request *types.ContainerRequest) (*types return quota, nil } -func (s *Scheduler) Stop(containerId string) error { - log.Printf("Received STOP request: %s\n", containerId) +func (s *Scheduler) Stop(stopArgs *types.StopContainerArgs) error { + log.Printf("Received STOP request: %+v\n", stopArgs) - err := s.containerRepo.UpdateContainerStatus(containerId, types.ContainerStatusStopping, time.Duration(types.ContainerStateTtlSWhilePending)*time.Second) + err := s.containerRepo.UpdateContainerStatus(stopArgs.ContainerId, types.ContainerStatusStopping, time.Duration(types.ContainerStateTtlSWhilePending)*time.Second) + if err != nil { + return err + } + + eventArgs, err := stopArgs.ToMap() if err != nil { return err } _, err = s.eventBus.Send(&common.Event{ - Type: common.EventTypeStopContainer, - Args: map[string]any{ - "container_id": containerId, - }, + Type: common.EventTypeStopContainer, + Args: eventArgs, LockAndDelete: false, }) if err != nil { @@ -254,14 +257,14 @@ func (s *Scheduler) StartProcessingRequests() { } func (s *Scheduler) scheduleRequest(worker *types.Worker, request *types.ContainerRequest) error { - go s.schedulerMetrics.CounterIncContainerScheduled(request) - go s.eventRepo.PushContainerScheduledEvent(request.ContainerId, worker.Id, request) - if err := s.containerRepo.UpdateAssignedContainerGPU(request.ContainerId, worker.Gpu); err != nil { return err } request.Gpu = worker.Gpu + + go s.schedulerMetrics.CounterIncContainerScheduled(request) + go s.eventRepo.PushContainerScheduledEvent(request.ContainerId, worker.Id, request) return s.workerRepo.ScheduleContainerRequest(worker, request) } @@ -277,14 +280,17 @@ func filterWorkersByPoolSelector(workers []*types.Worker, request *types.Contain } func filterWorkersByResources(workers []*types.Worker, request *types.ContainerRequest) []*types.Worker { + filteredWorkers := []*types.Worker{} gpuRequestsMap := map[string]int{} + requiresGPU := request.RequiresGPU() for index, gpu := range request.GpuRequest { gpuRequestsMap[gpu] = index } - filteredWorkers := []*types.Worker{} for _, worker := range workers { + isGpuWorker := worker.Gpu != "" + // Check if the worker has enough free cpu and memory to run the container if worker.FreeCpu < int64(request.Cpu) || worker.FreeMemory < int64(request.Memory) { continue @@ -295,7 +301,13 @@ func filterWorkersByResources(workers []*types.Worker, request *types.ContainerR continue } - if len(gpuRequestsMap) > 0 { + if (requiresGPU && !isGpuWorker) || (!requiresGPU && isGpuWorker) { + // If the worker doesn't have a GPU and the request requires one, skip + // Likewise, if the worker has a GPU and the request doesn't require one, skip + continue + } + + if requiresGPU { // Validate GPU resource availability priorityModifier, validGpu := gpuRequestsMap[worker.Gpu] @@ -374,7 +386,7 @@ const maxScheduleRetryCount = 3 const maxScheduleRetryDuration = 10 * time.Minute func (s *Scheduler) addRequestToBacklog(request *types.ContainerRequest) error { - if request.Gpu != "" && request.GpuCount <= 0 { + if request.RequiresGPU() && request.GpuCount <= 0 { request.GpuCount = 1 } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index d762a2e22..a47dd2509 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -375,6 +375,7 @@ func TestSelectGPUWorker(t *testing.T) { assert.NotNil(t, wb) newWorker := &types.Worker{ + Id: uuid.New().String(), Status: types.WorkerStatusPending, FreeCpu: 1000, FreeMemory: 1000, @@ -385,24 +386,36 @@ func TestSelectGPUWorker(t *testing.T) { err = wb.workerRepo.AddWorker(newWorker) assert.Nil(t, err) - firstRequest := &types.ContainerRequest{ + cpuRequest := &types.ContainerRequest{ Cpu: 1000, Memory: 1000, - Gpu: "A10G", + } + + firstRequest := &types.ContainerRequest{ + Cpu: 1000, + Memory: 1000, + GpuRequest: []string{"A10G"}, } secondRequest := &types.ContainerRequest{ - Cpu: 1000, - Memory: 1000, - Gpu: "A10G", + Cpu: 1000, + Memory: 1000, + GpuRequest: []string{"T4"}, } thirdRequest := &types.ContainerRequest{ - Cpu: 1000, - Memory: 1000, - Gpu: "any", + Cpu: 1000, + Memory: 1000, + GpuRequest: []string{"any"}, } + // CPU request should not be able to select a GPU worker + _, err = wb.selectWorker(cpuRequest) + assert.Error(t, err) + + _, ok := err.(*types.ErrNoSuitableWorkerFound) + assert.True(t, ok) + // Select a worker for the request worker, err := wb.selectWorker(firstRequest) assert.Nil(t, err) @@ -419,10 +432,11 @@ func TestSelectGPUWorker(t *testing.T) { _, err = wb.selectWorker(secondRequest) assert.Error(t, err) - _, ok := err.(*types.ErrNoSuitableWorkerFound) + _, ok = err.(*types.ErrNoSuitableWorkerFound) assert.True(t, ok) newWorkerAnyGpu := &types.Worker{ + Id: uuid.New().String(), Status: types.WorkerStatusPending, FreeCpu: 1000, FreeMemory: 1000, @@ -451,16 +465,34 @@ func TestSelectCPUWorker(t *testing.T) { assert.NotNil(t, wb) newWorker := &types.Worker{ + Id: uuid.New().String(), Status: types.WorkerStatusPending, FreeCpu: 2000, FreeMemory: 2000, Gpu: "", } + newWorker2 := &types.Worker{ + Id: uuid.New().String(), + Status: types.WorkerStatusPending, + FreeCpu: 1000, + FreeMemory: 1000, + Gpu: "", + } + // Create a new worker err = wb.workerRepo.AddWorker(newWorker) assert.Nil(t, err) + err = wb.workerRepo.AddWorker(newWorker2) + assert.Nil(t, err) + + gpuRequest := &types.ContainerRequest{ + Cpu: 1000, + Memory: 1000, + GpuRequest: []string{"A10G"}, + } + firstRequest := &types.ContainerRequest{ Cpu: 1000, Memory: 1000, @@ -473,6 +505,31 @@ func TestSelectCPUWorker(t *testing.T) { Gpu: "", } + thirdRequest := &types.ContainerRequest{ + Cpu: 1000, + Memory: 1000, + Gpu: "", + } + + // GPU request should not be able to select a CPU worker + _, err = wb.selectWorker(gpuRequest) + assert.Error(t, err) + + _, ok := err.(*types.ErrNoSuitableWorkerFound) + assert.True(t, ok) + + // Add GPU worker to test that CPU workers won't select it + gpuWorker := &types.Worker{ + Id: uuid.New().String(), + Status: types.WorkerStatusPending, + FreeCpu: 1000, + FreeMemory: 1000, + Gpu: "A10G", + } + + err = wb.workerRepo.AddWorker(gpuWorker) + assert.Nil(t, err) + // Select a worker for the request worker, err := wb.selectWorker(firstRequest) assert.Nil(t, err) @@ -483,11 +540,18 @@ func TestSelectCPUWorker(t *testing.T) { worker, err = wb.selectWorker(secondRequest) assert.Nil(t, err) - assert.Equal(t, newWorker.Gpu, worker.Gpu) + assert.Equal(t, "", worker.Gpu) err = wb.scheduleRequest(worker, secondRequest) assert.Nil(t, err) + worker, err = wb.selectWorker(thirdRequest) + assert.Nil(t, err) + assert.Equal(t, newWorker.Gpu, worker.Gpu) + + err = wb.scheduleRequest(worker, thirdRequest) + assert.Nil(t, err) + updatedWorker, err := wb.workerRepo.GetWorkerById(newWorker.Id) assert.Nil(t, err) assert.Equal(t, int64(0), updatedWorker.FreeCpu) @@ -495,25 +559,6 @@ func TestSelectCPUWorker(t *testing.T) { assert.Equal(t, "", updatedWorker.Gpu) assert.Equal(t, types.WorkerStatusPending, updatedWorker.Status) - newWorker2 := &types.Worker{ - Status: types.WorkerStatusPending, - FreeCpu: 1000, - FreeMemory: 1000, - Gpu: "", - } - - thirdRequest := &types.ContainerRequest{ - Cpu: 1000, - Memory: 1000, - } - - // Create a new worker - err = wb.workerRepo.AddWorker(newWorker2) - assert.Nil(t, err) - - worker, err = wb.selectWorker(thirdRequest) - assert.Nil(t, err) - assert.Equal(t, "", worker.Gpu) } func stringInSlice(a string, list []string) bool { diff --git a/pkg/types/config.go b/pkg/types/config.go index 3b6e310d0..4c4628989 100644 --- a/pkg/types/config.go +++ b/pkg/types/config.go @@ -84,7 +84,8 @@ type CORSConfig struct { } type StubLimits struct { - Memory uint64 `key:"memory" json:"memory"` + Memory uint64 `key:"memory" json:"memory"` + MaxReplicas uint64 `key:"maxReplicas" json:"max_replicas"` } type GatewayServiceConfig struct { diff --git a/pkg/types/scheduler.go b/pkg/types/scheduler.go index 1a810ae3b..15da4351f 100644 --- a/pkg/types/scheduler.go +++ b/pkg/types/scheduler.go @@ -1,6 +1,7 @@ package types import ( + "encoding/json" "fmt" "time" ) @@ -89,6 +90,10 @@ type ContainerRequest struct { PoolSelector string `json:"pool_selector"` } +func (c *ContainerRequest) RequiresGPU() bool { + return len(c.GpuRequest) > 0 +} + const ContainerExitCodeTtlS int = 300 const ( @@ -164,3 +169,36 @@ type QuotaDoesNotExistError struct{} func (e *QuotaDoesNotExistError) Error() string { return "quota_does_not_exist" } + +type StopContainerArgs struct { + ContainerId string `json:"container_id"` + Force bool `json:"force"` +} + +func (a StopContainerArgs) ToMap() (map[string]any, error) { + data, err := json.Marshal(a) + if err != nil { + return nil, err + } + + var result map[string]any + if err := json.Unmarshal(data, &result); err != nil { + return nil, err + } + + return result, nil +} + +func ToStopContainerArgs(m map[string]any) (*StopContainerArgs, error) { + data, err := json.Marshal(m) + if err != nil { + return nil, err + } + + var result StopContainerArgs + if err = json.Unmarshal(data, &result); err != nil { + return nil, err + } + + return &result, nil +} diff --git a/pkg/worker/base_runc_config.json b/pkg/worker/base_runc_config.json index 6323bd5ba..c4dec6d27 100644 --- a/pkg/worker/base_runc_config.json +++ b/pkg/worker/base_runc_config.json @@ -15,7 +15,8 @@ "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:$PATH", "LD_LIBRARY_PATH=/usr/local/nvidia/lib64:/usr/lib/x86_64-linux-gnu:/usr/lib/worker/x86_64-linux-gnu:$LD_LIBRARY_PATH", "TERM=xterm", - "LAMBDA_TASK_ROOT=/workspace" + "LAMBDA_TASK_ROOT=/workspace", + "MAMBA_ROOT_PREFIX=/micromamba" ], "cwd": "/", "capabilities": { @@ -422,6 +423,126 @@ "nosuid", "nodev" ] + }, + { + "destination": "/micromamba/envs/beta9/lib/python3.8/site-packages/beam", + "type": "bind", + "source": "/workspace/sdk", + "options": [ + "ro", + "rbind", + "rprivate", + "nosuid", + "nodev" + ] + }, + { + "destination": "/micromamba/envs/beta9/lib/python3.8/site-packages/beta9", + "type": "bind", + "source": "/workspace/sdk", + "options": [ + "ro", + "rbind", + "rprivate", + "nosuid", + "nodev" + ] + }, + { + "destination": "/micromamba/envs/beta9/lib/python3.9/site-packages/beam", + "type": "bind", + "source": "/workspace/sdk", + "options": [ + "ro", + "rbind", + "rprivate", + "nosuid", + "nodev" + ] + }, + { + "destination": "/micromamba/envs/beta9/lib/python3.9/site-packages/beta9", + "type": "bind", + "source": "/workspace/sdk", + "options": [ + "ro", + "rbind", + "rprivate", + "nosuid", + "nodev" + ] + }, + { + "destination": "/micromamba/envs/beta9/lib/python3.10/site-packages/beam", + "type": "bind", + "source": "/workspace/sdk", + "options": [ + "ro", + "rbind", + "rprivate", + "nosuid", + "nodev" + ] + }, + { + "destination": "/micromamba/envs/beta9/lib/python3.10/site-packages/beta9", + "type": "bind", + "source": "/workspace/sdk", + "options": [ + "ro", + "rbind", + "rprivate", + "nosuid", + "nodev" + ] + }, + { + "destination": "/micromamba/envs/beta9/lib/python3.11/site-packages/beam", + "type": "bind", + "source": "/workspace/sdk", + "options": [ + "ro", + "rbind", + "rprivate", + "nosuid", + "nodev" + ] + }, + { + "destination": "/micromamba/envs/beta9/lib/python3.11/site-packages/beta9", + "type": "bind", + "source": "/workspace/sdk", + "options": [ + "ro", + "rbind", + "rprivate", + "nosuid", + "nodev" + ] + }, + { + "destination": "/micromamba/envs/beta9/lib/python3.12/site-packages/beam", + "type": "bind", + "source": "/workspace/sdk", + "options": [ + "ro", + "rbind", + "rprivate", + "nosuid", + "nodev" + ] + }, + { + "destination": "/micromamba/envs/beta9/lib/python3.12/site-packages/beta9", + "type": "bind", + "source": "/workspace/sdk", + "options": [ + "ro", + "rbind", + "rprivate", + "nosuid", + "nodev" + ] } ], "hooks": { diff --git a/pkg/worker/events.go b/pkg/worker/events.go index b8d79eb55..4642c3e81 100644 --- a/pkg/worker/events.go +++ b/pkg/worker/events.go @@ -12,12 +12,7 @@ import ( "github.com/shirou/gopsutil/v4/process" ) -func (w *Worker) collectAndSendContainerMetrics(ctx context.Context, request *types.ContainerRequest, spec *specs.Spec, pidChan <-chan int) { - containerPid := <-pidChan - if containerPid == 0 { - return - } - +func (w *Worker) collectAndSendContainerMetrics(ctx context.Context, request *types.ContainerRequest, spec *specs.Spec, containerPid int) { ticker := time.NewTicker(w.config.Monitoring.ContainerMetricsInterval) defer ticker.Stop() diff --git a/pkg/worker/lifecycle.go b/pkg/worker/lifecycle.go index 6e1c13323..4216f20cc 100644 --- a/pkg/worker/lifecycle.go +++ b/pkg/worker/lifecycle.go @@ -38,19 +38,21 @@ func (s *Worker) handleStopContainerEvent(event *common.Event) bool { s.containerLock.Lock() defer s.containerLock.Unlock() - containerId := event.Args["container_id"].(string) + stopArgs, err := types.ToStopContainerArgs(event.Args) + if err != nil { + log.Printf("failed to parse stop container args: %v\n", err) + return false + } - if _, exists := s.containerInstances.Get(containerId); exists { - log.Printf("<%s> - received stop container event.\n", containerId) - s.stopContainerChan <- stopContainerEvent{ContainerId: containerId, Kill: false} + if _, exists := s.containerInstances.Get(stopArgs.ContainerId); exists { + log.Printf("<%s> - received stop container event.\n", stopArgs.ContainerId) + s.stopContainerChan <- stopContainerEvent{ContainerId: stopArgs.ContainerId, Kill: stopArgs.Force} } return true } -// stopContainer stops a runc container. -// It will remove the container state if the contaienr is forcefully killed. -// Otherwise, the container state will be removed after the termination grace period in clearContainer(). +// stopContainer stops a runc container. When force is true, a SIGKILL signal is sent to the container. func (s *Worker) stopContainer(containerId string, kill bool) error { log.Printf("<%s> - stopping container.\n", containerId) @@ -63,6 +65,7 @@ func (s *Worker) stopContainer(containerId string, kill bool) error { if kill { signal = int(syscall.SIGKILL) s.containerRepo.DeleteContainerState(containerId) + defer s.containerInstances.Delete(containerId) } err := s.runcHandle.Kill(context.Background(), containerId, signal, &runc.KillOpts{All: true}) @@ -468,9 +471,19 @@ func (s *Worker) spawn(request *types.ContainerRequest, spec *specs.Spec, output go s.eventRepo.PushContainerStartedEvent(containerId, s.workerId, request) defer func() { go s.eventRepo.PushContainerStoppedEvent(containerId, s.workerId, request) }() - // Capture resource usage (cpu/mem/gpu) + pid := 0 pidChan := make(chan int, 1) - go s.collectAndSendContainerMetrics(ctx, request, spec, pidChan) + + go func() { + // Wait for runc to start the container + pid = <-pidChan + + // Capture resource usage (cpu/mem/gpu) + go s.collectAndSendContainerMetrics(ctx, request, spec, pid) + + // Watch for OOM events + go s.watchOOMEvents(ctx, containerId, outputChan) + }() // Invoke runc process (launch the container) exitCode, err = s.runcHandle.Run(s.ctx, containerId, opts.BundlePath, &runc.CreateOpts{ @@ -507,3 +520,52 @@ func (s *Worker) createOverlay(request *types.ContainerRequest, bundlePath strin func (s *Worker) isBuildRequest(request *types.ContainerRequest) bool { return request.SourceImage != nil } + +func (s *Worker) watchOOMEvents(ctx context.Context, containerId string, output chan common.OutputMsg) { + seenEvents := make(map[string]struct{}) + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + ch, err := s.runcHandle.Events(ctx, containerId, time.Second) + if err != nil { + return + } + + maxTries, tries := 5, 0 // Used for re-opening the channel if it's closed + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + seenEvents = make(map[string]struct{}) + default: + event, ok := <-ch + if !ok { // If the channel is closed, try to re-open it + if tries == maxTries-1 { + output <- common.OutputMsg{ + Msg: fmt.Sprintln("[WARNING] Unable to watch for OOM events."), + } + return + } + + tries++ + time.Sleep(time.Second) + ch, _ = s.runcHandle.Events(ctx, containerId, time.Second) + continue + } + + if _, ok := seenEvents[event.Type]; ok { + continue + } + + seenEvents[event.Type] = struct{}{} + + if event.Type == "oom" { + output <- common.OutputMsg{ + Msg: fmt.Sprintln("[ERROR] A process in the container was killed due to out-of-memory conditions."), + } + } + } + } +} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 51deeb69d..e79137c3a 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -311,8 +311,6 @@ func (s *Worker) updateContainerStatus(request *types.ContainerRequest) error { return nil } - log.Printf("<%s> - container still running: %s\n", request.ContainerId, request.ImageId) - // Stop container if it is "orphaned" - meaning it's running but has no associated state state, err := s.containerRepo.GetContainerState(request.ContainerId) if err != nil { @@ -325,6 +323,8 @@ func (s *Worker) updateContainerStatus(request *types.ContainerRequest) error { continue } + log.Printf("<%s> - container still running: %s\n", request.ContainerId, request.ImageId) + err = s.containerRepo.UpdateContainerStatus(request.ContainerId, state.Status, time.Duration(types.ContainerStateTtlS)*time.Second) if err != nil { log.Printf("<%s> - unable to update container state: %v\n", request.ContainerId, err) diff --git a/sdk/pyproject.toml b/sdk/pyproject.toml index a2ab799d2..7926418ab 100644 --- a/sdk/pyproject.toml +++ b/sdk/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "beta9" -version = "0.1.96" +version = "0.1.100" description = "" authors = ["beam.cloud "] packages = [ diff --git a/sdk/src/beta9/abstractions/container.py b/sdk/src/beta9/abstractions/container.py index 1fa707959..e9f981c5c 100644 --- a/sdk/src/beta9/abstractions/container.py +++ b/sdk/src/beta9/abstractions/container.py @@ -30,8 +30,8 @@ class Container(RunnerAbstraction): gpu (Union[GpuTypeAlias, List[GpuTypeAlias]]): The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty. - You can specify multiple GPUs by providing a list of GpuTypeAlias. If you specify multiple - GPUs, the container will load balance across them with equal priority. + You can specify multiple GPUs by providing a list of GpuTypeAlias. If you specify several GPUs, + the scheduler prioritizes their selection based on their order in the list. image (Union[Image, dict]): The container image used for the task execution. Default is [Image](#image). volumes (Optional[List[Volume]]): diff --git a/sdk/src/beta9/abstractions/endpoint.py b/sdk/src/beta9/abstractions/endpoint.py index 886663687..c5818d701 100644 --- a/sdk/src/beta9/abstractions/endpoint.py +++ b/sdk/src/beta9/abstractions/endpoint.py @@ -44,8 +44,8 @@ class Endpoint(RunnerAbstraction): gpu (Union[GpuTypeAlias, List[GpuTypeAlias]]): The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty. - You can specify multiple GPUs by providing a list of GpuTypeAlias. If you specify multiple - GPUs, the container will load balance across them with equal priority. + You can specify multiple GPUs by providing a list of GpuTypeAlias. If you specify several GPUs, + the scheduler prioritizes their selection based on their order in the list. image (Union[Image, dict]): The container image used for the task execution. Default is [Image](#image). volumes (Optional[List[Volume]]): diff --git a/sdk/src/beta9/abstractions/function.py b/sdk/src/beta9/abstractions/function.py index 9e8d2ba17..4e3cb85a4 100644 --- a/sdk/src/beta9/abstractions/function.py +++ b/sdk/src/beta9/abstractions/function.py @@ -42,8 +42,8 @@ class Function(RunnerAbstraction): gpu (Union[GpuTypeAlias, List[GpuTypeAlias]]): The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty. - You can specify multiple GPUs by providing a list of GpuTypeAlias. If you specify multiple - GPUs, the container will load balance across them with equal priority. + You can specify multiple GPUs by providing a list of GpuTypeAlias. If you specify several GPUs, + the scheduler prioritizes their selection based on their order in the list. image (Union[Image, dict]): The container image used for the task execution. Default is [Image](#image). timeout (Optional[int]): diff --git a/sdk/src/beta9/abstractions/image.py b/sdk/src/beta9/abstractions/image.py index 4311c651f..74c0326a1 100644 --- a/sdk/src/beta9/abstractions/image.py +++ b/sdk/src/beta9/abstractions/image.py @@ -217,6 +217,43 @@ def get_credentials_from_env(self) -> Dict[str, str]: raise ImageCredentialValueNotFound(key) return creds + def micromamba(self) -> "Image": + """ + Use micromamba to manage python packages. + """ + self.python_version = self.python_version.replace("python", "micromamba") + return self + + def add_micromamba_packages( + self, packages: Union[Sequence[str], str], channels: Optional[Sequence[str]] = [] + ) -> "Image": + """ + Add micromamba packages that will be installed when building the image. + + These will be executed at the end of the image build and in the + order they are added. If a single string is provided, it will be + interpreted as a path to a requirements.txt file. + + Parameters: + packages: The micromamba packages to add or the path to a requirements.txt file. + channels: The micromamba channels to use. + """ + # Error if micromamba is not enabled + if not self.python_version.startswith("micromamba"): + raise ValueError("Micromamba must be enabled to use this method.") + + # Check if we were given a .txt requirement file + if isinstance(packages, str): + packages = self._sanitize_python_packages(self._load_requirements_file(packages)) + + for package in packages: + self.build_steps.append(BuildStep(command=package, type="micromamba")) + + for channel in channels: + self.build_steps.append(BuildStep(command=f"-c {channel}", type="micromamba")) + + return self + def add_commands(self, commands: Sequence[str]) -> "Image": """ Add shell commands that will be executed when building the image. diff --git a/sdk/src/beta9/abstractions/taskqueue.py b/sdk/src/beta9/abstractions/taskqueue.py index 9b771e2e0..5c53a82eb 100644 --- a/sdk/src/beta9/abstractions/taskqueue.py +++ b/sdk/src/beta9/abstractions/taskqueue.py @@ -42,8 +42,8 @@ class TaskQueue(RunnerAbstraction): gpu (Union[GpuTypeAlias, List[GpuTypeAlias]]): The type or name of the GPU device to be used for GPU-accelerated tasks. If not applicable or no GPU required, leave it empty. - You can specify multiple GPUs by providing a list of GpuTypeAlias. If you specify multiple - GPUs, the container will load balance across them with equal priority. + You can specify multiple GPUs by providing a list of GpuTypeAlias. If you specify several GPUs, + the scheduler prioritizes their selection based on their order in the list. image (Union[Image, dict]): The container image used for the task execution. Default is [Image](#image). timeout (Optional[int]): diff --git a/sdk/src/beta9/cli/config.py b/sdk/src/beta9/cli/config.py index f477ef90c..c73ef0358 100644 --- a/sdk/src/beta9/cli/config.py +++ b/sdk/src/beta9/cli/config.py @@ -146,7 +146,7 @@ def create_context(config_path: Path, **kwargs): contexts[name] = context save_config(contexts=contexts, path=config_path) - terminal.success("Added new context 🎉!") + terminal.success(f"Added new context to {config_path}") @management.command( diff --git a/sdk/src/beta9/cli/volume.py b/sdk/src/beta9/cli/volume.py index 0a11d3366..e29ba46c5 100644 --- a/sdk/src/beta9/cli/volume.py +++ b/sdk/src/beta9/cli/volume.py @@ -162,7 +162,7 @@ def cp(service: ServiceClient, local_path: str, remote_path: str): dst = (remote_path / file.relative_to(Path.cwd())).as_posix() req = ( CopyPathRequest(path=dst, content=chunk) - for chunk in read_with_progress(file, desc_width=desc_width) + for chunk in read_with_progress(file, max_desc_width=desc_width) ) res: CopyPathResponse = service.volume.copy_path_stream(req) @@ -173,10 +173,15 @@ def cp(service: ServiceClient, local_path: str, remote_path: str): def read_with_progress( path: Union[Path, str], chunk_size: int = 1024 * 256, - desc_width: int = 20, + max_desc_width: int = 30, ) -> Iterable[bytes]: path = Path(path) - desc = path.name[: min(len(path.name), desc_width)].ljust(desc_width) + name = "/".join(path.relative_to(Path.cwd()).parts[-(len(path.parts)) :]) + + if len(name) > max_desc_width: + desc = f"...{name[-(max_desc_width - 3):]}" + else: + desc = name.ljust(max_desc_width) with terminal.progress_open(path, "rb", description=desc) as file: while chunk := file.read(chunk_size): diff --git a/sdk/src/beta9/sync.py b/sdk/src/beta9/sync.py index 8c1e8575c..e23d79927 100644 --- a/sdk/src/beta9/sync.py +++ b/sdk/src/beta9/sync.py @@ -47,24 +47,6 @@ def get_workspace_object_id() -> str: IGNORE_FILE_NAME = f".{_settings.name}ignore".lower() IGNORE_FILE_CONTENTS = f"""# Generated by {_settings.name} SDK .{_settings.name.lower()}ignore -* -!*.py -!*.json -!*.yaml -!*.yml -!*.toml -!*.ini -!*.cfg -!*.md -!*.rst -!*.txt -!*.csv -!*.tsv -!*.parquet -!*.jsonl -!*.pickle -!*.npy - pyproject.toml .git .idea @@ -78,6 +60,7 @@ def get_workspace_object_id() -> str: drive/MyDrive .coverage .pytest_cache +.ipynb .ruff_cache .dockerignore .ipynb_checkpoints @@ -87,6 +70,8 @@ def get_workspace_object_id() -> str: **/.pytest_cache/ **/node_modules/ *.pyc +.next/ +.circleci """