Skip to content

Commit

Permalink
Change registry host port to 5001, add label to worker (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickpetrovic authored Jan 12, 2024
1 parent d9b0416 commit bc1d3de
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 29 deletions.
16 changes: 9 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
SHELL := /bin/bash
imageVersion := latest
tag := latest
workerTag := latest
runnerTag := latest

setup:
make k3d-up beam-runner beam-worker beam
Expand All @@ -17,18 +19,18 @@ k3d-down:
k3d cluster delete --config hack/k3d.yaml

beam:
docker build . --target build -f ./docker/Dockerfile.beam -t localhost:5000/beam:$(imageVersion)
docker push localhost:5000/beam:$(imageVersion)
docker build . --target build -f ./docker/Dockerfile.beam -t localhost:5001/beam:$(tag)
docker push localhost:5001/beam:$(tag)

beam-worker:
docker build . --target final --build-arg BASE_STAGE=dev -f ./docker/Dockerfile.worker -t localhost:5000/beam-worker:$(imageVersion)
docker push localhost:5000/beam-worker:latest
docker build . --target final --build-arg BASE_STAGE=dev -f ./docker/Dockerfile.worker -t localhost:5001/beam-worker:$(workerTag)
docker push localhost:5001/beam-worker:$(workerTag)
bin/delete_workers.sh

beam-runner:
for target in py312 py311 py310 py39 py38; do \
docker build . --target $$target --platform=linux/amd64 -f ./docker/Dockerfile.runner -t localhost:5000/beam-runner:$$target-latest; \
docker push localhost:5000/beam-runner:$$target-latest; \
docker build . --target $$target --platform=linux/amd64 -f ./docker/Dockerfile.runner -t localhost:5001/beam-runner:$$target-$(runnerTag); \
docker push localhost:5001/beam-runner:$$target-$(runnerTag); \
done

start:
Expand Down
2 changes: 0 additions & 2 deletions docker/Dockerfile.beam
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,4 @@ RUN apt autoclean

COPY --from=build /usr/local/bin/beam /usr/local/bin/

ENV GIN_MODE=release

CMD ["tail", "-f", "/dev/null"]
9 changes: 2 additions & 7 deletions hack/k3d.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,16 @@ volumes:

registries:
create:
name: registry.beam.lan
name: registry.localhost
host: "0.0.0.0"
hostPort: "5000"
hostPort: "5001"
volumes:
- $PWD/.k3d/registry:/var/lib/registry

options:
k3d:
wait: true
timeout: "60s"
k3s:
extraArgs:
- arg: "--tls-san=beam.lan"
nodeFilters:
- server:*
kubeconfig:
updateDefaultKubeconfig: true
switchCurrentContext: true
2 changes: 1 addition & 1 deletion hack/okteto.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
dev:
beam:
image: registry.beam.lan:5000/beam:latest
image: registry.localhost:5000/beam:latest
command:
- /workspace/bin/hotreload.sh
sync:
Expand Down
4 changes: 2 additions & 2 deletions internal/common/config.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ imageService:
runner:
baseImageTag: latest
baseImageName: beam-runner
baseImageRegistry: registry.beam.lan:5000
baseImageRegistry: registry.localhost:5000
tags:
python3.8: py38-latest
python3.9: py39-latest
Expand All @@ -66,7 +66,7 @@ worker:
hostNetwork: false
imageTag: latest
imageName: beam-worker
imageRegistry: registry.beam.lan:5000
imageRegistry: registry.localhost:5000
imagePullSecrets: []
namespace: beam
serviceAccountName: default
Expand Down
1 change: 1 addition & 0 deletions internal/scheduler/pool_k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (wpc *KubernetesWorkerPoolController) addWorkerWithId(workerId string, cpu
func (wpc *KubernetesWorkerPoolController) createWorkerJob(workerId string, cpu int64, memory int64, gpuType string) (*batchv1.Job, *types.Worker) {
jobName := fmt.Sprintf("%s-%s-%s", BeamWorkerJobPrefix, wpc.name, workerId)
labels := map[string]string{
"app": "beam-" + BeamWorkerLabelValue,
BeamWorkerLabelKey: BeamWorkerLabelValue,
}

Expand Down
8 changes: 4 additions & 4 deletions internal/worker/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,21 +307,21 @@ func (i *ImageClient) Archive(ctx context.Context, bundlePath string, imageId st
}

if err != nil {
log.Printf("unable to create archive: %v\n", err)
log.Printf("Unable to create archive: %v\n", err)
// outputChan <- common.OutputMsg{Done: true, Success: false, Msg: "Unable to archive image."}
return err
}
log.Printf("container <%v> archive took %v\n", imageId, time.Since(startTime))
log.Printf("Container <%v> archive took %v\n", imageId, time.Since(startTime))

// Push the archive to a registry
startTime = time.Now()
err = i.registry.Push(ctx, archivePath, imageId)
if err != nil {
log.Printf("failed to push image for image <%v>: %v\n", imageId, err)
log.Printf("Failed to push image for image <%v>: %v\n", imageId, err)
// outputChan <- common.OutputMsg{Done: true, Success: false, Msg: "Unable to push image."}
return err
}

log.Printf("container <%v> push took %v\n", imageId, time.Since(startTime))
log.Printf("Container <%v> push took %v\n", imageId, time.Since(startTime))
return nil
}
2 changes: 1 addition & 1 deletion manifests/k3d/beam.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ spec:
spec:
containers:
- name: beam
image: registry.beam.lan:5000/beam:latest
image: registry.localhost:5000/beam:latest
command:
- /workspace/bin/beam
ports:
Expand Down
10 changes: 5 additions & 5 deletions sdk/src/beam/abstractions/function.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import os
from typing import Any, Callable, Iterable, List, Optional, Union
from typing import Any, Callable, Iterator, List, Optional, Sequence, Union

import cloudpickle

Expand Down Expand Up @@ -75,7 +75,7 @@ def __call__(self, func):


class _CallableWrapper:
def __init__(self, func: Callable, parent: Function):
def __init__(self, func: Callable, parent: Function) -> None:
self.func: Callable = func
self.parent: Function = parent

Expand Down Expand Up @@ -150,7 +150,7 @@ def deploy(self, name: str) -> bool:

return deploy_response.ok

def _gather_and_yield_results(self, inputs: Iterable):
def _gather_and_yield_results(self, inputs: Sequence) -> Iterator[Any]:
container_count = len(inputs)

async def _gather_async():
Expand All @@ -166,11 +166,11 @@ async def _gather_async():
except StopAsyncIteration:
break

def map(self, inputs: Iterable):
def map(self, inputs: Sequence[Any]) -> Iterator[Any]:
if not self.parent.prepare_runtime(
func=self.func,
stub_type=FUNCTION_STUB_TYPE,
):
return
return # type: ignore

return self._gather_and_yield_results(inputs)

0 comments on commit bc1d3de

Please sign in to comment.