Skip to content

Commit

Permalink
update get controller logic, runc hook
Browse files Browse the repository at this point in the history
  • Loading branch information
nickpetrovic committed Jan 18, 2024
1 parent fcd4468 commit 73f82b7
Show file tree
Hide file tree
Showing 15 changed files with 137 additions and 29 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ workerTag := latest
runnerTag := latest

setup:
bash bin/setup.sh
make k3d-up beam-runner beam-worker beam
kubectl delete pod -l app=beam

Expand Down
28 changes: 28 additions & 0 deletions bin/setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env bash

set +xeu

os=$(uname -s | tr '[:upper:]' '[:lower:]')

if [ "$(uname -m)" = "arm64" ]; then
arch="arm64"
elif [ "$(uname -m)" = "x86_64" ]; then
arch="amd64"
fi

k8s_version=$(curl -sSfL https://dl.k8s.io/release/stable.txt)
stern_version="1.28.0"

echo "Installing kubectl"
curl -sSfL "https://dl.k8s.io/release/${k8s_version}/bin/${os}/${arch}/kubectl" > /usr/local/bin/kubectl
chmod +x /usr/local/bin/kubectl

echo "Installing stern"
curl -sSfL "https://github.com/stern/stern/releases/download/v${stern_version}/stern_${stern_version}_${os}_${arch}.tar.gz" | tar -xz -C /usr/local/bin stern
chmod +x /usr/local/bin/stern

echo "Installing okteto"
curl -sSfL https://get.okteto.com | sh

echo "Installing k3d"
curl -sSfL https://raw.githubusercontent.com/k3d-io/k3d/main/install.sh | bash
27 changes: 23 additions & 4 deletions docker/Dockerfile.worker
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,21 @@ FROM release AS dev

FROM ${BASE_STAGE} AS final

ENV DEBIAN_FRONTEND="noninteractive"
WORKDIR /workspace

RUN apt-get update && \
apt-get install -y --no-install-recommends --no-install-recommends curl gpg fuse3
RUN rm /etc/apt/sources.list.d/cuda.list && apt-get update && apt-get install -y wget git curl \
libseccomp-dev libsndfile1 libsndfile1-dev \
libaio-dev asciidoc xmlto libzmq3-dev iptables \
build-essential git libprotobuf-dev libprotobuf-c-dev \
protobuf-c-compiler protobuf-compiler \
pkg-config libbsd-dev iproute2 \
libnftnl-dev libcap-dev libnet1-dev libnl-3-dev \
python3 python3.8 python3-distutils python3-pip \
curl gpg fuse3

COPY --from=golang /usr/local/go/ /usr/local/go/
ENV PATH="/usr/local/go/bin:${PATH}"

RUN <<EOT
set -eux
Expand All @@ -79,8 +90,16 @@ set -eux
curl -sSL https://d.juicefs.com/install | sh -

# nvidia-container-toolkit repo
curl -fsSL https://nvidia.github.io/libnvidia-container/gpgkey | gpg --dearmor -o /usr/share/keyrings/nvidia-container-toolkit-keyring.gpg
echo 'deb [signed-by=/usr/share/keyrings/nvidia-container-toolkit-keyring.gpg] https://nvidia.github.io/libnvidia-container/stable/deb/$(ARCH) /' > /etc/apt/sources.list.d/nvidia-container-toolkit.list
# curl -fsSL https://nvidia.github.io/libnvidia-container/gpgkey | gpg --dearmor -o /usr/share/keyrings/nvidia-container-toolkit-keyring.gpg
# echo 'deb [signed-by=/usr/share/keyrings/nvidia-container-toolkit-keyring.gpg] https://nvidia.github.io/libnvidia-container/stable/deb/$(ARCH) /' > /etc/apt/sources.list.d/nvidia-container-toolkit.list
curl -s -L https://nvidia.github.io/nvidia-container-runtime/gpgkey | apt-key add -
curl -s -L https://nvidia.github.io/nvidia-container-runtime/ubuntu20.04/nvidia-container-runtime.list | tee /etc/apt/sources.list.d/nvidia-container-runtime.list
apt-get update && apt-get install -y nvidia-container-runtime fuse3 libfuse3-dev

# Build and install custom nvidia-container-toolkit
git clone https://github.com/beam-cloud/nvidia-container-toolkit.git
cd /workspace/nvidia-container-toolkit && make build && make binaries
cp /workspace/nvidia-container-toolkit/nvidia-container-runtime* /usr/bin/

# criu repo
curl -fsSL https://download.opensuse.org/repositories/devel:/tools:/criu/xUbuntu_20.04/Release.key | gpg --dearmor -o /usr/share/keyrings/criu.gpg
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/ecr v1.24.4
github.com/aws/aws-sdk-go-v2/service/kinesis v1.20.0
github.com/aws/aws-sdk-go-v2/service/s3 v1.38.3
github.com/beam-cloud/blobcache v0.0.0-20240111193326-cae10a611c3f
github.com/beam-cloud/clip v0.0.0-20230822004255-4ecd939864f9
github.com/beam-cloud/go-runc v0.0.0-20231222221338-b89899f33170
github.com/bsm/redislock v0.9.4
Expand Down Expand Up @@ -67,7 +68,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.15.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.21.3 // indirect
github.com/aws/smithy-go v1.19.0 // indirect
github.com/beam-cloud/blobcache v0.0.0-20240111193326-cae10a611c3f // indirect
github.com/briandowns/spinner v1.23.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/console v1.0.3 // indirect
Expand Down
2 changes: 2 additions & 0 deletions hack/k3d.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@ options:
kubeconfig:
updateDefaultKubeconfig: true
switchCurrentContext: true
# runtime:
# gpuRequest: all
13 changes: 12 additions & 1 deletion internal/common/config.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ imageService:
python3.12: py312-latest
worker:
pools:
beam-cpu:
default:
jobSpec:
nodeSelector: {}
poolSizing:
Expand All @@ -62,6 +62,17 @@ worker:
minFreeCpu: 1000m
minFreeGpu: 0
minFreeMemory: 1Gi
nvidia:
gpuType: A40
jobSpec:
nodeSelector: {}
poolSizing:
defaultWorkerCpu: 1000m
defaultWorkerGpuType: ""
defaultWorkerMemory: 1Gi
minFreeCpu:
minFreeGpu:
minFreeMemory:
# global pool attributes
hostNetwork: false
imageTag: latest
Expand Down
2 changes: 1 addition & 1 deletion internal/repository/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type BackendRepository interface {
type WorkerPoolRepository interface {
GetPool(name string) (*types.WorkerPoolConfig, error)
GetPools() ([]types.WorkerPoolConfig, error)
SetPool(name string, pool *types.WorkerPoolConfig) error
SetPool(name string, pool types.WorkerPoolConfig) error
RemovePool(name string) error
}

Expand Down
2 changes: 1 addition & 1 deletion internal/repository/worker_pool_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (r *WorkerPoolRedisRepository) GetPools() ([]types.WorkerPoolConfig, error)
return pools, nil
}

func (r *WorkerPoolRedisRepository) SetPool(name string, pool *types.WorkerPoolConfig) error {
func (r *WorkerPoolRedisRepository) SetPool(name string, pool types.WorkerPoolConfig) error {
lockKey := common.RedisKeys.WorkerPoolLock(name)
if err := r.lock.Acquire(context.TODO(), lockKey, r.lockOptions); err != nil {
return err
Expand Down
17 changes: 15 additions & 2 deletions internal/scheduler/pool_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// and a controller responsible for managing its worker instances.
type WorkerPool struct {
Name string
Config *types.WorkerPoolConfig
Config types.WorkerPoolConfig
Controller WorkerPoolController
}

Expand Down Expand Up @@ -50,9 +50,22 @@ func (m *WorkerPoolManager) GetPool(name string) (*WorkerPool, bool) {
return pool, true
}

func (m *WorkerPoolManager) GetPoolByGPU(gpuType string) (*WorkerPool, bool) {
m.mu.RLock()
defer m.mu.RUnlock()

for _, pool := range m.pools {
if pool.Config.GPUType == gpuType {
return pool, true
}
}

return nil, false
}

// Set/add WorkerPool.
// This will overwrite any existing WorkerPools with the same name defined in WorkerPoolResource.
func (m *WorkerPoolManager) SetPool(name string, config *types.WorkerPoolConfig, controller WorkerPoolController) error {
func (m *WorkerPoolManager) SetPool(name string, config types.WorkerPoolConfig, controller WorkerPoolController) error {
m.mu.Lock()
defer m.mu.Unlock()

Expand Down
25 changes: 9 additions & 16 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package scheduler

import (
"errors"
"fmt"
"log"
"sort"
"strings"
"time"

"github.com/beam-cloud/beam/internal/common"
Expand Down Expand Up @@ -36,7 +34,7 @@ func NewScheduler(config types.AppConfig, redisClient *common.RedisClient) (*Sch
workerPoolManager := NewWorkerPoolManager(workerPoolRepo)
for name, pool := range config.Worker.Pools {
controller, _ := NewKubernetesWorkerPoolController(config, name, workerRepo)
workerPoolManager.SetPool(name, &pool, controller)
workerPoolManager.SetPool(name, pool, controller)
}

return &Scheduler{
Expand Down Expand Up @@ -102,22 +100,17 @@ func (s *Scheduler) Stop(containerId string) error {
}

func (s *Scheduler) getController(request *types.ContainerRequest) (WorkerPoolController, error) {
poolName := "beam-cpu"

if request.Gpu != "" {
switch types.GPUType(request.Gpu) {
case types.GPU_T4, types.GPU_A10G:
poolName = fmt.Sprintf("beam-%s", strings.ToLower(request.Gpu))
case types.GPU_L4, types.GPU_A100_40, types.GPU_A100_80:
poolName = fmt.Sprintf("beam-%s-gcp", strings.ToLower(request.Gpu))
default:
return nil, errors.New("unsupported gpu")
}
var ok bool
var workerPool *WorkerPool

if request.Gpu == "" {
workerPool, ok = s.workerPoolManager.GetPool("default")
} else {
workerPool, ok = s.workerPoolManager.GetPoolByGPU(request.Gpu)
}

workerPool, ok := s.workerPoolManager.GetPool(poolName)
if !ok {
return nil, fmt.Errorf("no controller found for worker pool name: %s", poolName)
return nil, errors.New("no controller found for request")
}

return workerPool.Controller, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewSchedulerForTest() (*Scheduler, error) {

workerPoolManager := NewWorkerPoolManager(repo.NewWorkerPoolRedisRepository(rdb))
for name, pool := range config.Worker.Pools {
workerPoolManager.SetPool(name, &pool, &WorkerPoolControllerForTest{
workerPoolManager.SetPool(name, pool, &WorkerPoolControllerForTest{
name: name,
config: config,
workerRepo: workerRepo,
Expand Down
1 change: 1 addition & 0 deletions internal/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type WorkerConfig struct {
}

type WorkerPoolConfig struct {
GPUType string `key:"gpuType"`
JobSpec WorkerPoolJobSpecConfig `key:"jobSpec"`
PoolSizing WorkerPoolJobSpecPoolSizingConfig `key:"poolSizing"`
}
Expand Down
2 changes: 1 addition & 1 deletion internal/worker/base_runc_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@
"path": "/usr/bin/nvidia-container-runtime-hook",
"args": [
"/usr/bin/nvidia-container-runtime-hook",
"-config"
"-ociconfig"
]
}
]
Expand Down
40 changes: 40 additions & 0 deletions manifests/k3d/nvidia-device-plugin.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: nvidia-device-plugin-daemonset
namespace: kube-system
spec:
selector:
matchLabels:
name: nvidia-device-plugin-ds
updateStrategy:
type: RollingUpdate
template:
metadata:
labels:
name: nvidia-device-plugin-ds
annotations:
scheduler.alpha.kubernetes.io/critical-pod: ""
spec:
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
priorityClassName: system-node-critical
containers:
- image: nvcr.io/nvidia/k8s-device-plugin:v0.14.3
name: nvidia-device-plugin-ctr
env:
- name: FAIL_ON_INIT_ERROR
value: "false"
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop: ["ALL"]
volumeMounts:
- name: device-plugin
mountPath: /var/lib/kubelet/device-plugins
volumes:
- name: device-plugin
hostPath:
path: /var/lib/kubelet/device-plugins
2 changes: 1 addition & 1 deletion sdk/src/beam/abstractions/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,6 @@ def map(self, inputs: Sequence[Any]) -> Iterator[Any]:
func=self.func,
stub_type=FUNCTION_STUB_TYPE,
):
return # type: ignore
terminal.error("Function failed to prepare runtime ☠️")

return self._gather_and_yield_results(inputs)

0 comments on commit 73f82b7

Please sign in to comment.