diff --git a/Makefile b/Makefile index a78bc3b5f..fda45a075 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,7 @@ SHELL := /bin/bash -imageVersion := latest +tag := latest +workerTag := latest +runnerTag := latest setup: make k3d-up beam-runner beam-worker beam @@ -17,18 +19,18 @@ k3d-down: k3d cluster delete --config hack/k3d.yaml beam: - docker build . --target build -f ./docker/Dockerfile.beam -t localhost:5000/beam:$(imageVersion) - docker push localhost:5000/beam:$(imageVersion) + docker build . --target build -f ./docker/Dockerfile.beam -t localhost:5001/beam:$(tag) + docker push localhost:5001/beam:$(tag) beam-worker: - docker build . --target final --build-arg BASE_STAGE=dev -f ./docker/Dockerfile.worker -t localhost:5000/beam-worker:$(imageVersion) - docker push localhost:5000/beam-worker:latest + docker build . --target final --build-arg BASE_STAGE=dev -f ./docker/Dockerfile.worker -t localhost:5001/beam-worker:$(workerTag) + docker push localhost:5001/beam-worker:$(workerTag) bin/delete_workers.sh beam-runner: for target in py312 py311 py310 py39 py38; do \ - docker build . --target $$target --platform=linux/amd64 -f ./docker/Dockerfile.runner -t localhost:5000/beam-runner:$$target-latest; \ - docker push localhost:5000/beam-runner:$$target-latest; \ + docker build . --target $$target --platform=linux/amd64 -f ./docker/Dockerfile.runner -t localhost:5001/beam-runner:$$target-$(runnerTag); \ + docker push localhost:5001/beam-runner:$$target-$(runnerTag); \ done start: diff --git a/docker/Dockerfile.beam b/docker/Dockerfile.beam index ce5e4eac6..e4d0e7b40 100644 --- a/docker/Dockerfile.beam +++ b/docker/Dockerfile.beam @@ -30,6 +30,4 @@ RUN apt autoclean COPY --from=build /usr/local/bin/beam /usr/local/bin/ -ENV GIN_MODE=release - CMD ["tail", "-f", "/dev/null"] diff --git a/hack/k3d.yaml b/hack/k3d.yaml index d166937c5..983d2a95f 100644 --- a/hack/k3d.yaml +++ b/hack/k3d.yaml @@ -21,9 +21,9 @@ volumes: registries: create: - name: registry.beam.lan + name: registry.localhost host: "0.0.0.0" - hostPort: "5000" + hostPort: "5001" volumes: - $PWD/.k3d/registry:/var/lib/registry @@ -31,11 +31,6 @@ options: k3d: wait: true timeout: "60s" - k3s: - extraArgs: - - arg: "--tls-san=beam.lan" - nodeFilters: - - server:* kubeconfig: updateDefaultKubeconfig: true switchCurrentContext: true diff --git a/hack/okteto.yaml b/hack/okteto.yaml index 803546589..99736e8ac 100644 --- a/hack/okteto.yaml +++ b/hack/okteto.yaml @@ -1,6 +1,6 @@ dev: beam: - image: registry.beam.lan:5000/beam:latest + image: registry.localhost:5000/beam:latest command: - /workspace/bin/hotreload.sh sync: diff --git a/internal/common/config.default.yaml b/internal/common/config.default.yaml index 8cd44b9a7..2aff2f212 100644 --- a/internal/common/config.default.yaml +++ b/internal/common/config.default.yaml @@ -43,7 +43,7 @@ imageService: runner: baseImageTag: latest baseImageName: beam-runner - baseImageRegistry: registry.beam.lan:5000 + baseImageRegistry: registry.localhost:5000 tags: python3.8: py38-latest python3.9: py39-latest @@ -66,7 +66,7 @@ worker: hostNetwork: false imageTag: latest imageName: beam-worker - imageRegistry: registry.beam.lan:5000 + imageRegistry: registry.localhost:5000 imagePullSecrets: [] namespace: beam serviceAccountName: default diff --git a/internal/scheduler/pool_k8s.go b/internal/scheduler/pool_k8s.go index 484c960a2..517b376d9 100644 --- a/internal/scheduler/pool_k8s.go +++ b/internal/scheduler/pool_k8s.go @@ -162,6 +162,7 @@ func (wpc *KubernetesWorkerPoolController) addWorkerWithId(workerId string, cpu func (wpc *KubernetesWorkerPoolController) createWorkerJob(workerId string, cpu int64, memory int64, gpuType string) (*batchv1.Job, *types.Worker) { jobName := fmt.Sprintf("%s-%s-%s", BeamWorkerJobPrefix, wpc.name, workerId) labels := map[string]string{ + "app": "beam-" + BeamWorkerLabelValue, BeamWorkerLabelKey: BeamWorkerLabelValue, } diff --git a/internal/worker/image.go b/internal/worker/image.go index 1337118ce..59e7493b5 100644 --- a/internal/worker/image.go +++ b/internal/worker/image.go @@ -307,21 +307,21 @@ func (i *ImageClient) Archive(ctx context.Context, bundlePath string, imageId st } if err != nil { - log.Printf("unable to create archive: %v\n", err) + log.Printf("Unable to create archive: %v\n", err) // outputChan <- common.OutputMsg{Done: true, Success: false, Msg: "Unable to archive image."} return err } - log.Printf("container <%v> archive took %v\n", imageId, time.Since(startTime)) + log.Printf("Container <%v> archive took %v\n", imageId, time.Since(startTime)) // Push the archive to a registry startTime = time.Now() err = i.registry.Push(ctx, archivePath, imageId) if err != nil { - log.Printf("failed to push image for image <%v>: %v\n", imageId, err) + log.Printf("Failed to push image for image <%v>: %v\n", imageId, err) // outputChan <- common.OutputMsg{Done: true, Success: false, Msg: "Unable to push image."} return err } - log.Printf("container <%v> push took %v\n", imageId, time.Since(startTime)) + log.Printf("Container <%v> push took %v\n", imageId, time.Since(startTime)) return nil } diff --git a/manifests/k3d/beam.yaml b/manifests/k3d/beam.yaml index af2449cfa..d6498d864 100644 --- a/manifests/k3d/beam.yaml +++ b/manifests/k3d/beam.yaml @@ -53,7 +53,7 @@ spec: spec: containers: - name: beam - image: registry.beam.lan:5000/beam:latest + image: registry.localhost:5000/beam:latest command: - /workspace/bin/beam ports: diff --git a/sdk/src/beam/abstractions/function.py b/sdk/src/beam/abstractions/function.py index c31923f18..1e1dc0920 100644 --- a/sdk/src/beam/abstractions/function.py +++ b/sdk/src/beam/abstractions/function.py @@ -1,6 +1,6 @@ import asyncio import os -from typing import Any, Callable, Iterable, List, Optional, Union +from typing import Any, Callable, Iterator, List, Optional, Sequence, Union import cloudpickle @@ -75,7 +75,7 @@ def __call__(self, func): class _CallableWrapper: - def __init__(self, func: Callable, parent: Function): + def __init__(self, func: Callable, parent: Function) -> None: self.func: Callable = func self.parent: Function = parent @@ -150,7 +150,7 @@ def deploy(self, name: str) -> bool: return deploy_response.ok - def _gather_and_yield_results(self, inputs: Iterable): + def _gather_and_yield_results(self, inputs: Sequence) -> Iterator[Any]: container_count = len(inputs) async def _gather_async(): @@ -166,11 +166,11 @@ async def _gather_async(): except StopAsyncIteration: break - def map(self, inputs: Iterable): + def map(self, inputs: Sequence[Any]) -> Iterator[Any]: if not self.parent.prepare_runtime( func=self.func, stub_type=FUNCTION_STUB_TYPE, ): - return + return # type: ignore return self._gather_and_yield_results(inputs)