diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 000000000..1aeee0351 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,55 @@ +name: CI + +on: [push] + +defaults: + run: + working-directory: sdk + +jobs: + lint_and_test: + runs-on: ubuntu-latest + strategy: + max-parallel: 4 + + steps: + - name: Check out repository + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.11" + + - name: Set up Poetry + env: + ACTIONS_ALLOW_UNSECURE_COMMANDS: true + uses: snok/install-poetry@v1 + with: + version: 1.5.1 + virtualenvs-create: true + virtualenvs-in-project: true + installer-parallel: true + + - name: Load cached venv + id: cached-poetry-dependencies + uses: actions/cache@v3 + with: + path: .venv + key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }} + + - name: Install dependencies + run: poetry install + if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' + + - name: Code formatting + run: poetry run ruff format . + + # todo: remove exit-zero once ruff issues are fixed + - name: Code linting + run: poetry run ruff check . + + - name: Run tests + env: + CI: true + run: make tests diff --git a/sdk/src/beam/abstractions/base/__init__.py b/sdk/src/beam/abstractions/base/__init__.py index 35542c06a..a8647ddce 100644 --- a/sdk/src/beam/abstractions/base/__init__.py +++ b/sdk/src/beam/abstractions/base/__init__.py @@ -11,6 +11,7 @@ class BaseAbstraction(ABC): def __init__(self) -> None: self.loop: AbstractEventLoop = asyncio.get_event_loop() + self.channel: Channel = get_gateway_channel() def run_sync(self, coroutine: Coroutine) -> Any: diff --git a/sdk/src/beam/abstractions/function.py b/sdk/src/beam/abstractions/function.py index b5c0a4f68..99a7d1968 100644 --- a/sdk/src/beam/abstractions/function.py +++ b/sdk/src/beam/abstractions/function.py @@ -70,7 +70,7 @@ async def _call_remote(self, *args, **kwargs) -> Any: if r.output != "": terminal.detail(r.output) - if r.done: + if r.done or r.exit_code != 0: last_response = r break diff --git a/sdk/src/beam/abstractions/queue.py b/sdk/src/beam/abstractions/queue.py index e00786412..a025f193b 100644 --- a/sdk/src/beam/abstractions/queue.py +++ b/sdk/src/beam/abstractions/queue.py @@ -36,7 +36,7 @@ def put(self, value: Any) -> bool: def pop(self) -> Any: r = self.run_sync(self.stub.pop(name=self.name)) if not r.ok: - return SimpleQueueInternalServerError + raise SimpleQueueInternalServerError if len(r.value) > 0: return cloudpickle.loads(r.value) diff --git a/sdk/src/beam/abstractions/taskqueue.py b/sdk/src/beam/abstractions/taskqueue.py index b1e573c11..f546262c6 100644 --- a/sdk/src/beam/abstractions/taskqueue.py +++ b/sdk/src/beam/abstractions/taskqueue.py @@ -51,11 +51,10 @@ def __call__(self, *args, **kwargs) -> Any: if container_id is not None: return self.local(*args, **kwargs) - if not self.parent.prepare_runtime( - func=self.func, - stub_type=TASKQUEUE_STUB_TYPE, - ): - return + raise NotImplementedError( + "Direct calls to TaskQueues are not yet supported." + + " To enqueue items use .put(*args, **kwargs)" + ) def local(self, *args, **kwargs) -> Any: return self.func(*args, **kwargs) diff --git a/sdk/src/beam/cli/configure.py b/sdk/src/beam/cli/configure.py index 3d961db55..d5be4d913 100644 --- a/sdk/src/beam/cli/configure.py +++ b/sdk/src/beam/cli/configure.py @@ -1,7 +1,11 @@ import click from beam import terminal -from beam.config import configure_gateway_credentials, load_config_from_file, save_config_to_file +from beam.config import ( + configure_gateway_credentials, + load_config_from_file, + save_config_to_file, +) @click.command() @@ -13,7 +17,11 @@ def configure(name: str, token: str, gateway_host: str, gateway_port: str): config = load_config_from_file() config = configure_gateway_credentials( - config, name=name, gateway_host=gateway_host, gateway_port=gateway_port, token=token + config, + name=name, + gateway_host=gateway_host, + gateway_port=gateway_port, + token=token, ) save_config_to_file( diff --git a/sdk/src/beam/config.py b/sdk/src/beam/config.py index bcf00237a..d4f405a56 100644 --- a/sdk/src/beam/config.py +++ b/sdk/src/beam/config.py @@ -140,6 +140,10 @@ def configure_gateway_credentials( def get_gateway_channel() -> Channel: + if os.getenv("CI"): + # Ignore auth for CI + return Channel(host="localhost", port=50051, ssl=False) + config: GatewayConfig = get_gateway_config() channel: Union[AuthenticatedChannel, None] = None diff --git a/sdk/tests/test_build.py b/sdk/tests/test_build.py deleted file mode 100644 index 7020486aa..000000000 --- a/sdk/tests/test_build.py +++ /dev/null @@ -1,31 +0,0 @@ -import os -import sys - -import pytest -from tests.conftest import get_app_dirs - -from beam.utils.build import AppBuilder - - -@pytest.fixture(scope="function") -def change_cwd_to_app_dir(request): - original_cwd = os.getcwd() - - # Add app directory to sys path and chdir - app_dir_absolute_path = os.path.abspath(request.param) - - # Change directory to the app directory - os.chdir(app_dir_absolute_path) - sys.path.insert(0, app_dir_absolute_path) - - yield - - # Revert back to normal - sys.path.remove(app_dir_absolute_path) - - os.chdir(original_cwd) - - -@pytest.mark.parametrize("change_cwd_to_app_dir", get_app_dirs(), indirect=True) -def test_app_build(change_cwd_to_app_dir): - AppBuilder.build(module_path="app.py", func_or_app_name=None) diff --git a/sdk/tests/test_function.py b/sdk/tests/test_function.py new file mode 100644 index 000000000..4b9f771be --- /dev/null +++ b/sdk/tests/test_function.py @@ -0,0 +1,94 @@ +from unittest import TestCase +from unittest.mock import MagicMock + +import cloudpickle + +from beam import Image +from beam.abstractions.function import Function +from beam.clients.function import FunctionInvokeResponse + + +class AsyncIterator: + def __init__(self, seq): + self.iter = iter(seq) + + def __aiter__(self): + return self + + async def __anext__(self): + try: + return next(self.iter) + except StopIteration: + raise StopAsyncIteration + + +class TestTaskQueue(TestCase): + def test_init(self): + mock_stub = MagicMock() + + queue = Function(Image(python_version="python3.8"), cpu=100, memory=128) + queue.stub = mock_stub + + self.assertEqual(queue.image.python_version, "python3.8") + self.assertEqual(queue.cpu, 100) + self.assertEqual(queue.memory, 128) + + def test_run_local(self): + @Function(Image(python_version="python3.8"), cpu=100, memory=128) + def test_func(): + return 1 + + resp = test_func.local() + + self.assertEqual(resp, 1) + + def test_function_invoke(self): + @Function(Image(python_version="python3.8"), cpu=100, memory=128) + def test_func(*args, **kwargs): + return 1998 + + pickled_value = cloudpickle.dumps(1998) + + test_func.parent.function_stub = MagicMock() + test_func.parent.syncer = MagicMock() + + test_func.parent.function_stub.function_invoke.return_value = AsyncIterator( + [FunctionInvokeResponse(done=True, exit_code=0, result=pickled_value)] + ) + + test_func.parent.prepare_runtime = MagicMock(return_value=True) + + self.assertEqual(test_func(), 1998) + + test_func.parent.function_stub.function_invoke.return_value = AsyncIterator( + [FunctionInvokeResponse(done=False, exit_code=1, result=b"")] + ) + + self.assertRaises(SystemExit, test_func) + + def test_map(self): + @Function(Image(python_version="python3.8"), cpu=100, memory=128) + def test_func(*args, **kwargs): + return 1998 + + pickled_value = cloudpickle.dumps(1998) + + test_func.parent.function_stub = MagicMock() + test_func.parent.syncer = MagicMock() + + # Since the return value is a reference to this same aysnc iterator, everytime it + # it will iterate to the next value. This iterator in testing is persisted across + # multiple calls to the function, so we can simulate multiple responses. + # (ONLY HAPPENS DURING TESTING) + test_func.parent.function_stub.function_invoke.return_value = AsyncIterator( + [ + FunctionInvokeResponse(done=True, exit_code=0, result=pickled_value), + FunctionInvokeResponse(done=True, exit_code=0, result=pickled_value), + FunctionInvokeResponse(done=True, exit_code=0, result=pickled_value), + ] + ) + + test_func.parent.prepare_runtime = MagicMock(return_value=True) + + for val in test_func.map([1, 2, 3]): + self.assertEqual(val, 1998) diff --git a/sdk/tests/test_map.py b/sdk/tests/test_map.py new file mode 100644 index 000000000..b8e930005 --- /dev/null +++ b/sdk/tests/test_map.py @@ -0,0 +1,129 @@ +from unittest import TestCase +from unittest.mock import MagicMock + +import cloudpickle + +from beam.abstractions.map import Map +from beam.clients.map import ( + MapCountResponse, + MapDeleteResponse, + MapGetResponse, + MapKeysResponse, + MapSetResponse, +) + +from .utils import mock_coroutine_with_result + + +class TestMap(TestCase): + def setUp(self): + pass + + def test_set(self): + mock_stub = MagicMock() + + mock_stub.map_set = mock_coroutine_with_result(MapSetResponse(ok=True)) + + map = Map(name="test") + map.stub = mock_stub + + self.assertTrue(map.set("test", "test")) + + mock_stub.map_set = mock_coroutine_with_result(MapSetResponse(ok=False)) + + map = Map(name="test") + map.stub = mock_stub + + self.assertFalse(map.set("test", "test")) + + def test_get(self): + mock_stub = MagicMock() + + pickled_value = cloudpickle.dumps("test") + + mock_stub.map_get = mock_coroutine_with_result(MapGetResponse(ok=True, value=pickled_value)) + + map = Map(name="test") + map.stub = mock_stub + + self.assertEqual(map.get("test"), "test") + + mock_stub.map_get = mock_coroutine_with_result(MapGetResponse(ok=False, value=b"")) + + map = Map(name="test") + map.stub = mock_stub + + self.assertEqual(map.get("test"), None) + + def test_delitem(self): + mock_stub = MagicMock() + + mock_stub.map_delete = mock_coroutine_with_result(MapDeleteResponse(ok=True)) + + map = Map(name="test") + map.stub = mock_stub + + del map["test"] + + mock_stub.map_delete = mock_coroutine_with_result(MapDeleteResponse(ok=False)) + + map = Map(name="test") + map.stub = mock_stub + + def _del(): + del map["test"] + + self.assertRaises(KeyError, _del) + + def test_len(self): + mock_stub = MagicMock() + + mock_stub.map_count = mock_coroutine_with_result(MapCountResponse(ok=True, count=1)) + + map = Map(name="test") + map.stub = mock_stub + + self.assertEqual(len(map), 1) + + mock_stub.map_count = mock_coroutine_with_result(MapCountResponse(ok=False, count=1)) + + map = Map(name="test") + map.stub = mock_stub + + self.assertEqual(len(map), 0) + + def test_iter(self): + mock_stub = MagicMock() + + mock_stub.map_keys = mock_coroutine_with_result(MapKeysResponse(ok=True, keys=["test"])) + + map = Map(name="test") + map.stub = mock_stub + + self.assertEqual(list(map), ["test"]) + + mock_stub.map_keys = mock_coroutine_with_result(MapKeysResponse(ok=False, keys=[])) + + map = Map(name="test") + map.stub = mock_stub + + self.assertEqual(list(map), []) + + def test_items(self): + mock_stub = MagicMock() + + pickled_value = cloudpickle.dumps("test") + + mock_stub.map_keys = mock_coroutine_with_result(MapKeysResponse(ok=True, keys=["test"])) + mock_stub.map_get = mock_coroutine_with_result(MapGetResponse(ok=True, value=pickled_value)) + + map = Map(name="test") + map.stub = mock_stub + self.assertListEqual(list(map.items()), [("test", "test")]) + + mock_stub.map_keys = mock_coroutine_with_result(MapKeysResponse(ok=False, keys=[])) + + map = Map(name="test") + map.stub = mock_stub + + self.assertEqual(list(map.items()), []) diff --git a/sdk/tests/test_parameters.py b/sdk/tests/test_parameters.py deleted file mode 100644 index 1633c5004..000000000 --- a/sdk/tests/test_parameters.py +++ /dev/null @@ -1,885 +0,0 @@ -import copy -import unittest -from collections import namedtuple -from typing import Any - -import pytest -from marshmallow import ValidationError - -from beam import ( - App, - Autoscaling, - Image, - Output, - PythonVersion, - RequestLatencyAutoscaler, - Runtime, - Volume, - build_config, -) -from beam.utils.parse import compose_cpu - -try: - from importlib.metadata import version # type: ignore -except ImportError: - from importlib_metadata import version - - -beam_sdk_version = version("beam-sdk") - - -class TestParameters(unittest.TestCase): - def __init__(self, methodName: str = "runTest") -> None: - super().__init__(methodName) - self.maxDiff = None - self._default_config = { - "app_spec_version": "v3", - "name": "test", - "sdk_version": beam_sdk_version, - "mounts": [], - "runtime": Runtime().data, - "triggers": [ - { - "handler": "tests/test_parameters.py:test", - "callback_url": None, - "runtime": None, - "outputs": [], - "autoscaling": None, - "autoscaler": None, - "task_policy": { - "max_retries": 3, - "timeout": 3600, - }, - "workers": 1, - "authorized": True, - }, - ], - "run": None, - } - - self._default_rest_api_config = { - **self._default_config, - "triggers": [ - { - **self._default_config["triggers"][0], - "path": "/test", - "loader": None, - "trigger_type": "rest_api", - "max_pending_tasks": 100, - "keep_warm_seconds": 90, - "method": "POST", - } - ], - } - - self._default_task_queue_config = { - **self._default_config, - "triggers": [ - { - **self._default_rest_api_config["triggers"][0], - "trigger_type": "webhook", - "keep_warm_seconds": 10, - } - ], - } - - self._default_schedule_config = { - **self._default_config, - "triggers": [ - { - **self._default_config["triggers"][0], - "trigger_type": "cron_job", - "when": "every 1m", - } - ], - } - - self._default_run_config = { - "app_spec_version": "v3", - "sdk_version": beam_sdk_version, - "name": "test", - "mounts": [], - "triggers": [], - "runtime": Runtime().data, - "run": { - "handler": "tests/test_parameters.py:test", - "callback_url": None, - "outputs": [], - "runtime": None, - "task_policy": { - "max_retries": 3, - "timeout": 3600, - }, - }, - } - - @property - def default_config(self) -> dict: - return copy.deepcopy(self._default_config) - - @property - def default_rest_api_config(self) -> dict: - return copy.deepcopy(self._default_rest_api_config) - - @property - def default_task_queue_config(self) -> dict: - return copy.deepcopy(self._default_task_queue_config) - - @property - def default_schedule_config(self) -> dict: - return copy.deepcopy(self._default_schedule_config) - - @property - def default_run_config(self) -> dict: - return copy.deepcopy(self._default_run_config) - - def _run_single_trigger_subtest( - self, - app_config: dict, - expected_overall_config: dict, - trigger_type: str = "rest_api", - trigger_kwargs: dict = {}, - ): - with self.subTest( - app_config=app_config, - expected_config=expected_overall_config, - trigger_type=trigger_type, - trigger_kwargs=trigger_kwargs, - ): - app = App(**{"name": "test", "runtime": Runtime(), **app_config}) - - if trigger_type == "rest_api": - - @app.rest_api(**trigger_kwargs) - def test(): - pass - - elif trigger_type == "webhook": - - @app.task_queue(**trigger_kwargs) - def test(): - pass - - elif trigger_type == "cron_job": - - @app.schedule(**trigger_kwargs) - def test(): - pass - - config_dict = test() - print("provided:", trigger_kwargs) - print("returned:", config_dict) - print("expected:", expected_overall_config) - self.assertDictEqual(config_dict, expected_overall_config) - - def _run_single_run_subtest(self, app_config, expected_overall_config, run_kwargs={}): - with self.subTest(app_config=app_config, expected_config=expected_overall_config): - app = App(**{"name": "test", "runtime": Runtime(), **app_config}) - - @app.run(**run_kwargs) - def test(): - pass - - config_dict = test() - self.assertDictEqual(config_dict, expected_overall_config) - - def test_default_trigger_and_app(self): - self._run_single_trigger_subtest({}, self.default_rest_api_config) - expected_config = self.default_task_queue_config - expected_config["triggers"][0]["trigger_type"] = "webhook" - self._run_single_trigger_subtest( - {}, expected_overall_config=expected_config, trigger_type="webhook" - ) - - expected_config = self.default_schedule_config - - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_config, - trigger_type="cron_job", - trigger_kwargs={"when": "every 1m"}, - ) - - def test_runtime_app_parameters(self): - runtimes: list[dict] = [ - { - "memory": "2Gi", - }, - {"cpu": compose_cpu(2), "memory": "2Gi", "gpu": "A10G"}, - ] - - for i in range(len(runtimes)): - expected_config = self.default_rest_api_config - expected_config["runtime"] = Runtime(**runtimes[i]).data - expected_config["triggers"][0]["runtime"] = None - - self._run_single_trigger_subtest( - app_config={"runtime": runtimes[i]}, - expected_overall_config=expected_config, - ) - - self._run_single_trigger_subtest( - app_config={"runtime": Runtime(**runtimes[i])}, - expected_overall_config=expected_config, - ) - - def test_volumes_app_parameters(self): - volumes = [ - { - "input": [ - {"name": "test", "path": "./test", "volume_type": "shared"}, - ], - "expected": [{"name": "test", "app_path": "./test", "mount_type": "shared"}], - }, - { - "input": [ - { - "name": "test", - "path": "./test", - "volume_type": "persistent", - }, - { - "name": "test2", - "path": "./test2", - "volume_type": "shared", - }, - ], - "expected": [ - {"name": "test", "app_path": "./test", "mount_type": "persistent"}, - {"name": "test2", "app_path": "./test2", "mount_type": "shared"}, - ], - }, - { - "input": [ - { - "name": "test", - "path": "./test", - } - ], - "expected": [{"name": "test", "app_path": "./test", "mount_type": "shared"}], - }, - ] - - for i in range(len(volumes)): - expected_config = self.default_rest_api_config - expected_config["mounts"] = volumes[i]["expected"] - - self._run_single_trigger_subtest( - app_config={"volumes": volumes[i]["input"]}, - expected_overall_config=expected_config, - ) - - self._run_single_trigger_subtest( - app_config={"volumes": [Volume(**volume) for volume in volumes[i]["input"]]}, - expected_overall_config=expected_config, - ) - - def _create_parameter_permutations(self, parameters, current_param_set, permute_list=[]): - for i in range(len(parameters)): - self._create_parameter_permutations( - parameters[i + 1 :], current_param_set + [parameters[i]], permute_list - ) - - self._create_parameter_permutations( - parameters[i + 1 :], current_param_set, permute_list - ) - - if len(parameters) == 0: - combined_params = {} - for param in current_param_set: - combined_params = {**combined_params, **param} - - permute_list.append(combined_params) - - return permute_list - - def test_rest_api_and_task_queue_parameters(self): - parameters = [ - { - "outputs": [ - {"path": "./test"}, - {"path": "./test2"}, - ] - }, - { - "autoscaling": { - "max_replicas": 10, - "desired_latency": 500, - "autoscaling_type": "max_request_latency", - } - }, - {"max_pending_tasks": 100000}, - {"keep_warm_seconds": 100}, - {"loader": "handler.py:handler"}, - ] - - # Create permutations of parameters - parameter_permutations = self._create_parameter_permutations(parameters, [], []) - - for param in parameter_permutations: - expected_task_queue_config = self.default_task_queue_config - expected_rest_api_config = self.default_rest_api_config - - expected_task_queue_config["triggers"][0] = { - **expected_task_queue_config["triggers"][0], - **param, - } - - expected_rest_api_config["triggers"][0] = { - **expected_rest_api_config["triggers"][0], - **param, - } - - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_task_queue_config, - trigger_type="webhook", - trigger_kwargs=param, - ) - - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_rest_api_config, - trigger_kwargs=param, - ) - - # Test with classes - input_param_with_classes = { - **param, - "outputs": [ - Output(**output) for output in (param["outputs"] if "outputs" in param else []) - ], - "autoscaling": Autoscaling(**param["autoscaling"]) - if "autoscaling" in param - else None, - "runtime": Runtime(**param["runtime"]) if "runtime" in param else None, - } - - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_task_queue_config, - trigger_type="webhook", - trigger_kwargs=input_param_with_classes, - ) - - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_rest_api_config, - trigger_kwargs=input_param_with_classes, - ) - - def test_schedule_parameters(self): - parameters = [ - { - "when": "0 0 * * *", - }, - { - "when": "every 1s", - }, - { - "when": "every 1m", - }, - { - "when": "every 1h", - }, - { - "outputs": [ - {"path": "./test"}, - {"path": "./test2"}, - ] - }, - { - "runtime": { - "cpu": "2000m", - "memory": "10Gi", - "gpu": "", - "image": build_config(Image(), Image), - }, - }, - ] - - # Create permutations of parameters - parameter_permutations = self._create_parameter_permutations(parameters, [], []) - - for param in parameter_permutations: - default_param = {"when": "0 0 * * *", "runtime": Runtime().data} - - param = {**default_param, **param} - - expected_schedule_config = self.default_schedule_config - - if "runtime" in param: - expected_schedule_config["runtime"] = param["runtime"] - - expected_schedule_config["triggers"][0] = { - **expected_schedule_config["triggers"][0], - **param, - } - - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_schedule_config, - trigger_type="cron_job", - trigger_kwargs={**param}, - ) - - # Test with classes - input_param_with_classes = { - **param, - "outputs": [ - Output(**output) for output in (param["outputs"] if "outputs" in param else []) - ], - "runtime": Runtime(**param["runtime"]) if "runtime" in param else Runtime(), - } - - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_schedule_config, - trigger_type="cron_job", - trigger_kwargs=input_param_with_classes, - ) - - def test_outputs_parameters(self): - parameters = [ - {"outputs": []}, - { - "outputs": [ - {"path": "./testdir"}, - {"path": "./testfile"}, - ] - }, - ] - - for param in parameters: - for config in [ - self.default_task_queue_config, - self.default_rest_api_config, - self.default_schedule_config, - ]: - trigger_type = config["triggers"][0].get("trigger_type") - config["triggers"][0] = { - **config["triggers"][0], - **param, - } - - param = { - **param, - **( - {"when": config["triggers"][0]["when"]} - if trigger_type == "cron_job" - else {} - ), - } - - self._run_single_trigger_subtest( - {}, - expected_overall_config=config, - trigger_type=trigger_type, - trigger_kwargs=param, - ) - - input_class_params = { - **param, - "outputs": [ - Output(**output) - for output in (param["outputs"] if "outputs" in param else []) - ], - } - - self._run_single_trigger_subtest( - {}, - expected_overall_config=config, - trigger_type=trigger_type, - trigger_kwargs=input_class_params, - ) - - def test_autoscaling_parameters(self): - default_autoscaling_params = { - "max_replicas": 1, - "desired_latency": 100, - "autoscaling_type": "max_request_latency", - } - - parameters = [ - {"autoscaling": {"max_replicas": 1, "desired_latency": 1000}}, - { - "autoscaling": { - "max_replicas": 2, - } - }, - {"autoscaling": {"desired_latency": 1000}}, - ] - - param: dict[str, Any] - for param in parameters: - expected_task_queue_config = self.default_task_queue_config - expected_rest_api_config = self.default_rest_api_config - - for config in [expected_task_queue_config, expected_rest_api_config]: - trigger_type = config["triggers"][0].get("trigger_type") - config["triggers"][0] = { - **config["triggers"][0], - "autoscaling": { - **default_autoscaling_params, - **param["autoscaling"], - }, - } - - self._run_single_trigger_subtest( - {}, - expected_overall_config=config, - trigger_type=trigger_type, - trigger_kwargs=param, - ) - - input_class_params = { - "autoscaling": Autoscaling(**param["autoscaling"]) - if "autoscaling" in param - else None, - } - - self._run_single_trigger_subtest( - {}, - expected_overall_config=config, - trigger_type=trigger_type, - trigger_kwargs=input_class_params, - ) - - def test_image_parameters(self): - beam_sdk_version = "beam-sdk==0.0.0" - - parameters = [ - { - "image": { - "python_version": PythonVersion.Python310, - } - }, - { - "image": { - "python_version": PythonVersion.Python310, - "python_packages": ["numpy", "pandas", beam_sdk_version], - } - }, - { - "image": { - "python_version": PythonVersion.Python310, - "python_packages": ["numpy", "pandas", beam_sdk_version], - "commands": ["echo hi"], - } - }, - { - "image": { - "python_version": PythonVersion.Python310, - "python_packages": ["numpy", "pandas", beam_sdk_version], - "commands": ["echo hi"], - "base_image": "docker.io/beamcloud/custom-img-test:latest", - } - }, - ] - - # Test params for trigger level runtime image - for param in parameters: - expected_config = self.default_task_queue_config - expected_config["triggers"][0]["runtime"] = Runtime().data - trigger = expected_config["triggers"][0] - trigger_type = trigger.get("trigger_type") - - input_dict_params = { - "runtime": { - **trigger.get("runtime"), - "image": { - **trigger.get("runtime").get("image"), - **param["image"], - }, - } - } - - expected_config["triggers"][0] = { - **expected_config["triggers"][0], - **input_dict_params, - } - expected_config["runtime"] = input_dict_params["runtime"] - - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_config, - trigger_type=trigger_type, - trigger_kwargs=input_dict_params, - ) - - input_class_params = { - "runtime": { - **trigger.get("runtime"), - "image": Image(**{**trigger.get("runtime").get("image"), **param["image"]}), - } - } - - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_config, - trigger_type=trigger_type, - trigger_kwargs=input_class_params, - ) - - # Test params for app level runtime image - for param in parameters: - expected_config = self.default_task_queue_config - expected_config["runtime"] = Runtime(image=Image(**param["image"])).data - trigger = expected_config["triggers"][0] - - input_dict_params = { - "runtime": { - **expected_config["runtime"], - "image": { - **expected_config["runtime"]["image"], - **param["image"], - }, - } - } - - self._run_single_trigger_subtest( - input_dict_params, - expected_overall_config=expected_config, - trigger_type="webhook", - ) - - input_class_params = { - "runtime": { - **expected_config["runtime"], - "image": Image(**{**expected_config["runtime"]["image"], **param["image"]}), - } - } - - self._run_single_trigger_subtest( - input_class_params, - expected_overall_config=expected_config, - trigger_type="webhook", - ) - - def test_run_parameters(self): - parameters = [ - { - "outputs": [ - {"path": "./tmp1"}, - {"path": "./tmp2"}, - ] - }, - { - "runtime": build_config(Runtime(cpu=3, memory="1Gi"), Runtime), - }, - { - "callback_url": "http://test.com", - }, - ] - - parameter_permutations = self._create_parameter_permutations(parameters, [], []) - - for param in parameter_permutations: - expected_config = self.default_run_config - expected_config["run"] = {**expected_config["run"], **param} - - if "runtime" in param: - expected_config["runtime"] = param["runtime"] - - self._run_single_run_subtest( - {}, - expected_overall_config=expected_config, - run_kwargs=param, - ) - - def test_loaders(self): - def some_function(): - pass - - expected_func_path = "tests/test_parameters.py:some_function" - - expected_config = self.default_task_queue_config - expected_config["triggers"][0]["loader"] = expected_func_path - - # test the passing in of a function - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_config, - trigger_type="webhook", - trigger_kwargs={"loader": some_function}, - ) - - # test the passing in of a string - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_config, - trigger_type="webhook", - trigger_kwargs={"loader": expected_func_path}, - ) - - def test_autoscalers(self): - Test = namedtuple("Test", ["trigger", "autoscaler", "autoscaler_expected"]) - - tests = [ - # TODO: Re-enable when queue depth autoscaler is implemented. - # Test( - # "webhook", - # QueueDepthAutoscaler(max_tasks_per_replica=20), - # {"queue_depth": {"max_tasks_per_replica": 20, "max_replicas": 1}}, - # ), - # Test( - # "rest_api", - # QueueDepthAutoscaler(max_tasks_per_replica=1000), - # {"queue_depth": {"max_tasks_per_replica": 1000, "max_replicas": 1}}, - # ), - Test( - "webhook", - RequestLatencyAutoscaler(desired_latency=100), - {"request_latency": {"desired_latency": 100, "max_replicas": 1}}, - ), - Test( - "webhook", - { - "request_latency": {"desired_latency": 100, "max_replicas": 2}, - "queue_depth": {"max_tasks_per_replica": 11}, - }, - None, - ), - Test( - "rest_api", - RequestLatencyAutoscaler(desired_latency=60), - {"request_latency": {"desired_latency": 60, "max_replicas": 1}}, - ), - ] - - for test in tests: - expected_config = self.default_config - if test.trigger == "rest_api": - expected_config = self.default_rest_api_config - elif test.trigger == "webhook": - expected_config = self.default_task_queue_config - - expected_config["triggers"][0]["autoscaler"] = test.autoscaler_expected - expected_config["triggers"][0]["trigger_type"] = test.trigger - - subtest_kwargs = dict( - app_config={}, - expected_overall_config=expected_config, - trigger_type=test.trigger, - trigger_kwargs={"autoscaler": test.autoscaler}, - ) - - if isinstance(test.autoscaler, dict) and len(test.autoscaler.keys()) > 1: - with pytest.raises(ValidationError): - self._run_single_trigger_subtest(**subtest_kwargs) - else: - self._run_single_trigger_subtest(**subtest_kwargs) - - def test_memory_limits_should_fail(self): - tests = [ - { - "memory": "65Gi", - } - ] - - for test in tests: - with pytest.raises(ValueError): - runtime = self.default_config["runtime"] - runtime["memory"] = test["memory"] - - self._run_single_run_subtest( - {"runtime": runtime}, - expected_overall_config=self.default_run_config, - ) - - self._run_single_trigger_subtest( - {"runtime": Runtime(**runtime)}, - expected_overall_config=self.default_run_config, - ) - - def test_task_policy(self): - Test = namedtuple("Test", ["trigger", "task_policy", "task_policy_expected"]) - - tests = [ - Test( - "webhook", - { - "max_retries": 0, - }, - { - "max_retries": 0, - "timeout": 3600, - }, - ), - Test( - "rest_api", - { - "max_retries": 1, - "timeout": 4200, - }, - { - "max_retries": 1, - "timeout": 4200, - }, - ), - Test( - "cron_job", - {}, - { - "max_retries": 3, - "timeout": 3600, - }, - ), - ] - - for test in tests: - expected_trigger_config = self.default_config - kwargs = {} - - if test.trigger == "webhook": - expected_trigger_config = self.default_task_queue_config - elif test.trigger == "rest_api": - expected_trigger_config = self.default_rest_api_config - elif test.trigger == "cron_job": - expected_trigger_config = self.default_schedule_config - kwargs["when"] = self.default_schedule_config["triggers"][0]["when"] - - expected_trigger_config["triggers"][0]["task_policy"] = test.task_policy_expected - expected_trigger_config["triggers"][0]["trigger_type"] = test.trigger - - self._run_single_trigger_subtest( - {}, - trigger_type=test.trigger, - trigger_kwargs={"task_policy": test.task_policy, **kwargs}, - expected_overall_config=expected_trigger_config, - ) - - # Make sure it works for run as well - expected_run_config = self.default_run_config - expected_run_config["run"]["task_policy"] = test.task_policy_expected - - self._run_single_run_subtest( - {}, - run_kwargs={ - "task_policy": test.task_policy, - }, - expected_overall_config=expected_run_config, - ) - - # Now we want to test flatten params - self._run_single_trigger_subtest( - {}, - trigger_type=test.trigger, - trigger_kwargs={**test.task_policy, **kwargs}, - expected_overall_config=expected_trigger_config, - ) - - # Make sure it works for run as well - run_kwargs = {} - - if test.task_policy.get("timeout"): - run_kwargs["timeout"] = test.task_policy["timeout"] - - # Runs cant set max_retries, so it will always be 3 (ignored in backend) - expected_run_config["run"]["task_policy"]["max_retries"] = 3 - - self._run_single_run_subtest( - {}, - run_kwargs=run_kwargs, - expected_overall_config=expected_run_config, - ) diff --git a/sdk/tests/test_parsers.py b/sdk/tests/test_parsers.py deleted file mode 100644 index 1a44c52d5..000000000 --- a/sdk/tests/test_parsers.py +++ /dev/null @@ -1,43 +0,0 @@ -import pytest - -from beam.utils.parse import compose_cpu, compose_memory - - -class TestParsers: - def test_compose_cpu(self): - assert compose_cpu(1) == "1000m" - assert compose_cpu(1.999) == "1999m" - assert compose_cpu("1000m") == "1000m" - - def test_compose_memory(self): - assert compose_memory("10gi") == "10Gi" - assert compose_memory("10 gi ") == "10Gi" - assert compose_memory("10 Gi ") == "10Gi" - - # Raises if Gi is > 256 - with pytest.raises(ValueError): - compose_memory("10000Gi") - - assert compose_memory("256mi") == "256Mi" - assert compose_memory("256 mi ") == "256Mi" - assert compose_memory("256 Mi ") == "256Mi" - - # Raises if Mi is < 128 or >= 1000 - with pytest.raises(ValueError): - compose_memory("127Mi") - - with pytest.raises(ValueError): - compose_memory("1000Mi") - - # Test invalid formats - with pytest.raises(ValueError): - compose_memory("1000ti") - - with pytest.raises(ValueError): - compose_memory("1000.0") - - with pytest.raises(ValueError): - compose_memory("1000") - - with pytest.raises(ValueError): - compose_memory("2 gigabytes") diff --git a/sdk/tests/test_queue.py b/sdk/tests/test_queue.py new file mode 100644 index 000000000..a1419110b --- /dev/null +++ b/sdk/tests/test_queue.py @@ -0,0 +1,113 @@ +from unittest import TestCase +from unittest.mock import MagicMock + +import cloudpickle + +from beam.abstractions.queue import SimpleQueue, SimpleQueueInternalServerError +from beam.clients.simplequeue import ( + SimpleQueueEmptyResponse, + SimpleQueuePeekResponse, + SimpleQueuePopResponse, + SimpleQueuePutResponse, + SimpleQueueSizeResponse, +) + +from .utils import mock_coroutine_with_result + + +class TestSimpleQueue(TestCase): + def setUp(self): + pass + + def test_put(self): + mock_stub = MagicMock() + + mock_stub.put = mock_coroutine_with_result(SimpleQueuePutResponse(ok=True)) + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + + self.assertTrue(queue.put("test")) + + mock_stub.put = mock_coroutine_with_result(SimpleQueuePutResponse(ok=False)) + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + + self.assertRaises(SimpleQueueInternalServerError, queue.put, "test") + + def test_pop(self): + mock_stub = MagicMock() + + pickled_value = cloudpickle.dumps("test") + + mock_stub.pop = mock_coroutine_with_result( + SimpleQueuePopResponse(ok=True, value=pickled_value) + ) + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + + self.assertEqual(queue.pop(), "test") + + mock_stub.pop = mock_coroutine_with_result(SimpleQueuePopResponse(ok=False, value=b"")) + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + + self.assertRaises(SimpleQueueInternalServerError, queue.pop) + + def test_peek(self): + mock_stub = MagicMock() + + pickled_value = cloudpickle.dumps("test") + + mock_stub.peek = mock_coroutine_with_result( + SimpleQueuePeekResponse(ok=True, value=pickled_value) + ) + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + + self.assertEqual(queue.peek(), "test") + + mock_stub.peek = mock_coroutine_with_result(SimpleQueuePeekResponse(ok=False, value=b"")) + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + + self.assertRaises(SimpleQueueInternalServerError, queue.peek) + + def test_empty(self): + mock_stub = MagicMock() + + mock_stub.empty = mock_coroutine_with_result(SimpleQueueEmptyResponse(ok=True, empty=True)) + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + + self.assertTrue(queue.empty()) + + mock_stub.empty = mock_coroutine_with_result(SimpleQueueEmptyResponse(ok=False, empty=True)) + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + + self.assertRaises(SimpleQueueInternalServerError, queue.empty) + + def test_size(self): + mock_stub = MagicMock() + + mock_stub.size = mock_coroutine_with_result(SimpleQueueSizeResponse(ok=True, size=1)) + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + + self.assertEqual(len(queue), 1) + + mock_stub.size = mock_coroutine_with_result(SimpleQueueSizeResponse(ok=False, size=1)) + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + + self.assertEqual(len(queue), 0) diff --git a/sdk/tests/test_task_queue.py b/sdk/tests/test_task_queue.py new file mode 100644 index 000000000..a9bad615a --- /dev/null +++ b/sdk/tests/test_task_queue.py @@ -0,0 +1,70 @@ +import os +from unittest import TestCase +from unittest.mock import MagicMock + +from beam import Image +from beam.abstractions.taskqueue import TaskQueue +from beam.clients.taskqueue import TaskQueuePutResponse + +from .utils import mock_coroutine_with_result + + +class TestTaskQueue(TestCase): + def test_init(self): + mock_stub = MagicMock() + + queue = TaskQueue(Image(python_version="python3.8"), cpu=100, memory=128) + queue.stub = mock_stub + + self.assertEqual(queue.image.python_version, "python3.8") + self.assertEqual(queue.cpu, 100) + self.assertEqual(queue.memory, 128) + + def test_run_local(self): + @TaskQueue(Image(python_version="python3.8"), cpu=100, memory=128) + def test_func(): + return 1 + + resp = test_func.local() + + self.assertEqual(resp, 1) + + def test_put(self): + @TaskQueue(Image(python_version="python3.8"), cpu=100, memory=128) + def test_func(): + return 1 + + test_func.parent.taskqueue_stub = MagicMock() + test_func.parent.taskqueue_stub.task_queue_put = mock_coroutine_with_result( + TaskQueuePutResponse(ok=True, task_id="1234") + ) + test_func.parent.prepare_runtime = MagicMock(return_value=True) + + test_func.put() + + test_func.parent.taskqueue_stub.task_queue_put = mock_coroutine_with_result( + TaskQueuePutResponse(ok=False, task_id="") + ) + + self.assertRaises(SystemExit, test_func.put) + + def test__call__(self): + @TaskQueue(Image(python_version="python3.8"), cpu=100, memory=128) + def test_func(): + return 1 + + test_func.parent.taskqueue_stub = MagicMock() + test_func.parent.taskqueue_stub.task_queue_put = mock_coroutine_with_result( + TaskQueuePutResponse(ok=True, task_id="1234") + ) + + test_func.parent.prepare_runtime = MagicMock(return_value=True) + + self.assertRaises( + NotImplementedError, + test_func, + ) + + # Test calling in container + os.environ["CONTAINER_ID"] = "1234" + self.assertEqual(test_func(), 1) diff --git a/sdk/tests/test_validators.py b/sdk/tests/test_validators.py deleted file mode 100644 index a11151192..000000000 --- a/sdk/tests/test_validators.py +++ /dev/null @@ -1,22 +0,0 @@ -import pytest - -from beam import validators - - -def test_every(): - with pytest.raises(validators.ValidationError): - validators.IsValidEvery()("every") - - with pytest.raises(validators.ValidationError): - validators.IsValidEvery()("every 1") - - with pytest.raises(validators.ValidationError): - validators.IsValidEvery()("every 1 s") - - with pytest.raises(validators.ValidationError): - validators.IsValidEvery()("every 1 unknown-s") - - for time_values in [1, 10, 100]: - for unit in validators.IsValidEvery.time_units: - # should not fail - validators.IsValidEvery()(f"every {time_values}{unit}") diff --git a/sdk/tests/test_volumes.py b/sdk/tests/test_volumes.py new file mode 100644 index 000000000..12d7211e0 --- /dev/null +++ b/sdk/tests/test_volumes.py @@ -0,0 +1,40 @@ +from unittest import TestCase +from unittest.mock import MagicMock + +from beam.abstractions.volume import Volume +from beam.clients.volume import GetOrCreateVolumeResponse + +from .utils import mock_coroutine_with_result + + +class TestVolumes(TestCase): + def setUp(self): + pass + + def test_get_or_create(self): + mock_stub = MagicMock() + + # Test that a valid grpc response sets the volume id and ready flag + mock_stub.get_or_create_volume = mock_coroutine_with_result( + GetOrCreateVolumeResponse(ok=True, volume_id="1234") + ) + + volume = Volume(name="test", mount_path="/test") + volume.stub = mock_stub + + self.assertFalse(volume.ready) + self.assertTrue(volume.get_or_create()) + self.assertTrue(volume.ready) + self.assertEqual(volume.volume_id, "1234") + + # Test that an invalid grpc response does not set the volume id or ready flag + mock_stub.get_or_create_volume = mock_coroutine_with_result( + GetOrCreateVolumeResponse(ok=False, volume_id="") + ) + + volume = Volume(name="test", mount_path="/test") + volume.stub = mock_stub + + self.assertFalse(volume.get_or_create()) + self.assertFalse(volume.ready) + self.assertEqual(volume.volume_id, None) diff --git a/sdk/tests/utils.py b/sdk/tests/utils.py new file mode 100644 index 000000000..a2bd77d60 --- /dev/null +++ b/sdk/tests/utils.py @@ -0,0 +1,7 @@ +# Helps mock out results that should return from a coroutine function +# that you want to override +def mock_coroutine_with_result(result): + async def _result(*args, **kwargs): + return result + + return _result