From 7e5517370c8ad5589fcd7d2955f61825ca813916 Mon Sep 17 00:00:00 2001 From: John Date: Tue, 9 Jan 2024 11:07:30 -0700 Subject: [PATCH 01/18] sdk test cases 1 --- sdk/src/beam/abstractions/queue.py | 2 +- sdk/src/beam/cli/main.py | 3 + sdk/tests/test_build.py | 31 - sdk/tests/test_map.py | 142 +++++ sdk/tests/test_parameters.py | 885 ----------------------------- sdk/tests/test_parsers.py | 43 -- sdk/tests/test_queue.py | 119 ++++ sdk/tests/test_task_queue.py | 0 sdk/tests/test_validators.py | 22 - sdk/tests/test_volumes.py | 42 ++ sdk/tests/utils.py | 5 + 11 files changed, 312 insertions(+), 982 deletions(-) delete mode 100644 sdk/tests/test_build.py create mode 100644 sdk/tests/test_map.py delete mode 100644 sdk/tests/test_parameters.py delete mode 100644 sdk/tests/test_parsers.py create mode 100644 sdk/tests/test_queue.py create mode 100644 sdk/tests/test_task_queue.py delete mode 100644 sdk/tests/test_validators.py create mode 100644 sdk/tests/test_volumes.py create mode 100644 sdk/tests/utils.py diff --git a/sdk/src/beam/abstractions/queue.py b/sdk/src/beam/abstractions/queue.py index e00786412..a025f193b 100644 --- a/sdk/src/beam/abstractions/queue.py +++ b/sdk/src/beam/abstractions/queue.py @@ -36,7 +36,7 @@ def put(self, value: Any) -> bool: def pop(self) -> Any: r = self.run_sync(self.stub.pop(name=self.name)) if not r.ok: - return SimpleQueueInternalServerError + raise SimpleQueueInternalServerError if len(r.value) > 0: return cloudpickle.loads(r.value) diff --git a/sdk/src/beam/cli/main.py b/sdk/src/beam/cli/main.py index 5cbef07de..6c288a632 100644 --- a/sdk/src/beam/cli/main.py +++ b/sdk/src/beam/cli/main.py @@ -9,3 +9,6 @@ cli = click.Group() cli.add_command(configure.configure) cli.add_command(tasks.cli) + +if __name__ == "__main__": + cli() diff --git a/sdk/tests/test_build.py b/sdk/tests/test_build.py deleted file mode 100644 index 7020486aa..000000000 --- a/sdk/tests/test_build.py +++ /dev/null @@ -1,31 +0,0 @@ -import os -import sys - -import pytest -from tests.conftest import get_app_dirs - -from beam.utils.build import AppBuilder - - -@pytest.fixture(scope="function") -def change_cwd_to_app_dir(request): - original_cwd = os.getcwd() - - # Add app directory to sys path and chdir - app_dir_absolute_path = os.path.abspath(request.param) - - # Change directory to the app directory - os.chdir(app_dir_absolute_path) - sys.path.insert(0, app_dir_absolute_path) - - yield - - # Revert back to normal - sys.path.remove(app_dir_absolute_path) - - os.chdir(original_cwd) - - -@pytest.mark.parametrize("change_cwd_to_app_dir", get_app_dirs(), indirect=True) -def test_app_build(change_cwd_to_app_dir): - AppBuilder.build(module_path="app.py", func_or_app_name=None) diff --git a/sdk/tests/test_map.py b/sdk/tests/test_map.py new file mode 100644 index 000000000..cc334e734 --- /dev/null +++ b/sdk/tests/test_map.py @@ -0,0 +1,142 @@ +from unittest import TestCase +from unittest.mock import MagicMock + +import cloudpickle + +from beam.abstractions.map import Map +from beam.clients.map import ( + MapCountResponse, + MapDeleteResponse, + MapGetResponse, + MapKeysResponse, + MapSetResponse, +) + +from .utils import override_run_sync + + +class TestMap(TestCase): + def setUp(self): + pass + + def test_set(self): + mock_stub = MagicMock() + + mock_stub.map_set.return_value = MapSetResponse(ok=True) + + map = Map(name="test") + map.stub = mock_stub + map.run_sync = override_run_sync + + self.assertTrue(map.set("test", "test")) + + mock_stub.map_set.return_value = MapSetResponse(ok=False) + + map = Map(name="test") + map.stub = mock_stub + map.run_sync = override_run_sync + + self.assertFalse(map.set("test", "test")) + + def test_get(self): + mock_stub = MagicMock() + + pickled_value = cloudpickle.dumps("test") + + mock_stub.map_get.return_value = MapGetResponse(ok=True, value=pickled_value) + + map = Map(name="test") + map.stub = mock_stub + map.run_sync = override_run_sync + + self.assertEqual(map.get("test"), "test") + + mock_stub.map_get.return_value = MapGetResponse(ok=False, value=b"") + + map = Map(name="test") + map.stub = mock_stub + map.run_sync = override_run_sync + + self.assertEqual(map.get("test"), None) + + def test_delitem(self): + mock_stub = MagicMock() + + mock_stub.map_delete.return_value = MapDeleteResponse(ok=True) + + map = Map(name="test") + map.stub = mock_stub + map.run_sync = override_run_sync + + del map["test"] + + mock_stub.map_delete.return_value = MapDeleteResponse(ok=False) + + map = Map(name="test") + map.stub = mock_stub + map.run_sync = override_run_sync + + def _del(): + del map["test"] + + self.assertRaises(KeyError, _del) + + def test_len(self): + mock_stub = MagicMock() + + mock_stub.map_count.return_value = MapCountResponse(ok=True, count=1) + + map = Map(name="test") + map.stub = mock_stub + map.run_sync = override_run_sync + + self.assertEqual(len(map), 1) + + mock_stub.map_count.return_value = MapCountResponse(ok=False, count=1) + + map = Map(name="test") + map.stub = mock_stub + map.run_sync = override_run_sync + + self.assertEqual(len(map), 0) + + def test_iter(self): + mock_stub = MagicMock() + + mock_stub.map_keys.return_value = MapKeysResponse(ok=True, keys=["test"]) + + map = Map(name="test") + map.stub = mock_stub + map.run_sync = override_run_sync + + self.assertEqual(list(map), ["test"]) + + mock_stub.map_keys.return_value = MapKeysResponse(ok=False, keys=[]) + + map = Map(name="test") + map.stub = mock_stub + map.run_sync = override_run_sync + + self.assertEqual(list(map), []) + + def test_items(self): + mock_stub = MagicMock() + + pickled_value = cloudpickle.dumps("test") + + mock_stub.map_keys.return_value = MapKeysResponse(ok=True, keys=["test"]) + mock_stub.map_get.return_value = MapGetResponse(ok=True, value=pickled_value) + + map = Map(name="test") + map.stub = mock_stub + map.run_sync = override_run_sync + + self.assertEqual(list(map.items()), [("test", "test")]) + + mock_stub.map_keys.return_value = MapKeysResponse(ok=False, keys=[]) + + map = Map(name="test") + map.stub = mock_stub + map.run_sync = override_run_sync + + self.assertEqual(list(map.items()), []) diff --git a/sdk/tests/test_parameters.py b/sdk/tests/test_parameters.py deleted file mode 100644 index 1633c5004..000000000 --- a/sdk/tests/test_parameters.py +++ /dev/null @@ -1,885 +0,0 @@ -import copy -import unittest -from collections import namedtuple -from typing import Any - -import pytest -from marshmallow import ValidationError - -from beam import ( - App, - Autoscaling, - Image, - Output, - PythonVersion, - RequestLatencyAutoscaler, - Runtime, - Volume, - build_config, -) -from beam.utils.parse import compose_cpu - -try: - from importlib.metadata import version # type: ignore -except ImportError: - from importlib_metadata import version - - -beam_sdk_version = version("beam-sdk") - - -class TestParameters(unittest.TestCase): - def __init__(self, methodName: str = "runTest") -> None: - super().__init__(methodName) - self.maxDiff = None - self._default_config = { - "app_spec_version": "v3", - "name": "test", - "sdk_version": beam_sdk_version, - "mounts": [], - "runtime": Runtime().data, - "triggers": [ - { - "handler": "tests/test_parameters.py:test", - "callback_url": None, - "runtime": None, - "outputs": [], - "autoscaling": None, - "autoscaler": None, - "task_policy": { - "max_retries": 3, - "timeout": 3600, - }, - "workers": 1, - "authorized": True, - }, - ], - "run": None, - } - - self._default_rest_api_config = { - **self._default_config, - "triggers": [ - { - **self._default_config["triggers"][0], - "path": "/test", - "loader": None, - "trigger_type": "rest_api", - "max_pending_tasks": 100, - "keep_warm_seconds": 90, - "method": "POST", - } - ], - } - - self._default_task_queue_config = { - **self._default_config, - "triggers": [ - { - **self._default_rest_api_config["triggers"][0], - "trigger_type": "webhook", - "keep_warm_seconds": 10, - } - ], - } - - self._default_schedule_config = { - **self._default_config, - "triggers": [ - { - **self._default_config["triggers"][0], - "trigger_type": "cron_job", - "when": "every 1m", - } - ], - } - - self._default_run_config = { - "app_spec_version": "v3", - "sdk_version": beam_sdk_version, - "name": "test", - "mounts": [], - "triggers": [], - "runtime": Runtime().data, - "run": { - "handler": "tests/test_parameters.py:test", - "callback_url": None, - "outputs": [], - "runtime": None, - "task_policy": { - "max_retries": 3, - "timeout": 3600, - }, - }, - } - - @property - def default_config(self) -> dict: - return copy.deepcopy(self._default_config) - - @property - def default_rest_api_config(self) -> dict: - return copy.deepcopy(self._default_rest_api_config) - - @property - def default_task_queue_config(self) -> dict: - return copy.deepcopy(self._default_task_queue_config) - - @property - def default_schedule_config(self) -> dict: - return copy.deepcopy(self._default_schedule_config) - - @property - def default_run_config(self) -> dict: - return copy.deepcopy(self._default_run_config) - - def _run_single_trigger_subtest( - self, - app_config: dict, - expected_overall_config: dict, - trigger_type: str = "rest_api", - trigger_kwargs: dict = {}, - ): - with self.subTest( - app_config=app_config, - expected_config=expected_overall_config, - trigger_type=trigger_type, - trigger_kwargs=trigger_kwargs, - ): - app = App(**{"name": "test", "runtime": Runtime(), **app_config}) - - if trigger_type == "rest_api": - - @app.rest_api(**trigger_kwargs) - def test(): - pass - - elif trigger_type == "webhook": - - @app.task_queue(**trigger_kwargs) - def test(): - pass - - elif trigger_type == "cron_job": - - @app.schedule(**trigger_kwargs) - def test(): - pass - - config_dict = test() - print("provided:", trigger_kwargs) - print("returned:", config_dict) - print("expected:", expected_overall_config) - self.assertDictEqual(config_dict, expected_overall_config) - - def _run_single_run_subtest(self, app_config, expected_overall_config, run_kwargs={}): - with self.subTest(app_config=app_config, expected_config=expected_overall_config): - app = App(**{"name": "test", "runtime": Runtime(), **app_config}) - - @app.run(**run_kwargs) - def test(): - pass - - config_dict = test() - self.assertDictEqual(config_dict, expected_overall_config) - - def test_default_trigger_and_app(self): - self._run_single_trigger_subtest({}, self.default_rest_api_config) - expected_config = self.default_task_queue_config - expected_config["triggers"][0]["trigger_type"] = "webhook" - self._run_single_trigger_subtest( - {}, expected_overall_config=expected_config, trigger_type="webhook" - ) - - expected_config = self.default_schedule_config - - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_config, - trigger_type="cron_job", - trigger_kwargs={"when": "every 1m"}, - ) - - def test_runtime_app_parameters(self): - runtimes: list[dict] = [ - { - "memory": "2Gi", - }, - {"cpu": compose_cpu(2), "memory": "2Gi", "gpu": "A10G"}, - ] - - for i in range(len(runtimes)): - expected_config = self.default_rest_api_config - expected_config["runtime"] = Runtime(**runtimes[i]).data - expected_config["triggers"][0]["runtime"] = None - - self._run_single_trigger_subtest( - app_config={"runtime": runtimes[i]}, - expected_overall_config=expected_config, - ) - - self._run_single_trigger_subtest( - app_config={"runtime": Runtime(**runtimes[i])}, - expected_overall_config=expected_config, - ) - - def test_volumes_app_parameters(self): - volumes = [ - { - "input": [ - {"name": "test", "path": "./test", "volume_type": "shared"}, - ], - "expected": [{"name": "test", "app_path": "./test", "mount_type": "shared"}], - }, - { - "input": [ - { - "name": "test", - "path": "./test", - "volume_type": "persistent", - }, - { - "name": "test2", - "path": "./test2", - "volume_type": "shared", - }, - ], - "expected": [ - {"name": "test", "app_path": "./test", "mount_type": "persistent"}, - {"name": "test2", "app_path": "./test2", "mount_type": "shared"}, - ], - }, - { - "input": [ - { - "name": "test", - "path": "./test", - } - ], - "expected": [{"name": "test", "app_path": "./test", "mount_type": "shared"}], - }, - ] - - for i in range(len(volumes)): - expected_config = self.default_rest_api_config - expected_config["mounts"] = volumes[i]["expected"] - - self._run_single_trigger_subtest( - app_config={"volumes": volumes[i]["input"]}, - expected_overall_config=expected_config, - ) - - self._run_single_trigger_subtest( - app_config={"volumes": [Volume(**volume) for volume in volumes[i]["input"]]}, - expected_overall_config=expected_config, - ) - - def _create_parameter_permutations(self, parameters, current_param_set, permute_list=[]): - for i in range(len(parameters)): - self._create_parameter_permutations( - parameters[i + 1 :], current_param_set + [parameters[i]], permute_list - ) - - self._create_parameter_permutations( - parameters[i + 1 :], current_param_set, permute_list - ) - - if len(parameters) == 0: - combined_params = {} - for param in current_param_set: - combined_params = {**combined_params, **param} - - permute_list.append(combined_params) - - return permute_list - - def test_rest_api_and_task_queue_parameters(self): - parameters = [ - { - "outputs": [ - {"path": "./test"}, - {"path": "./test2"}, - ] - }, - { - "autoscaling": { - "max_replicas": 10, - "desired_latency": 500, - "autoscaling_type": "max_request_latency", - } - }, - {"max_pending_tasks": 100000}, - {"keep_warm_seconds": 100}, - {"loader": "handler.py:handler"}, - ] - - # Create permutations of parameters - parameter_permutations = self._create_parameter_permutations(parameters, [], []) - - for param in parameter_permutations: - expected_task_queue_config = self.default_task_queue_config - expected_rest_api_config = self.default_rest_api_config - - expected_task_queue_config["triggers"][0] = { - **expected_task_queue_config["triggers"][0], - **param, - } - - expected_rest_api_config["triggers"][0] = { - **expected_rest_api_config["triggers"][0], - **param, - } - - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_task_queue_config, - trigger_type="webhook", - trigger_kwargs=param, - ) - - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_rest_api_config, - trigger_kwargs=param, - ) - - # Test with classes - input_param_with_classes = { - **param, - "outputs": [ - Output(**output) for output in (param["outputs"] if "outputs" in param else []) - ], - "autoscaling": Autoscaling(**param["autoscaling"]) - if "autoscaling" in param - else None, - "runtime": Runtime(**param["runtime"]) if "runtime" in param else None, - } - - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_task_queue_config, - trigger_type="webhook", - trigger_kwargs=input_param_with_classes, - ) - - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_rest_api_config, - trigger_kwargs=input_param_with_classes, - ) - - def test_schedule_parameters(self): - parameters = [ - { - "when": "0 0 * * *", - }, - { - "when": "every 1s", - }, - { - "when": "every 1m", - }, - { - "when": "every 1h", - }, - { - "outputs": [ - {"path": "./test"}, - {"path": "./test2"}, - ] - }, - { - "runtime": { - "cpu": "2000m", - "memory": "10Gi", - "gpu": "", - "image": build_config(Image(), Image), - }, - }, - ] - - # Create permutations of parameters - parameter_permutations = self._create_parameter_permutations(parameters, [], []) - - for param in parameter_permutations: - default_param = {"when": "0 0 * * *", "runtime": Runtime().data} - - param = {**default_param, **param} - - expected_schedule_config = self.default_schedule_config - - if "runtime" in param: - expected_schedule_config["runtime"] = param["runtime"] - - expected_schedule_config["triggers"][0] = { - **expected_schedule_config["triggers"][0], - **param, - } - - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_schedule_config, - trigger_type="cron_job", - trigger_kwargs={**param}, - ) - - # Test with classes - input_param_with_classes = { - **param, - "outputs": [ - Output(**output) for output in (param["outputs"] if "outputs" in param else []) - ], - "runtime": Runtime(**param["runtime"]) if "runtime" in param else Runtime(), - } - - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_schedule_config, - trigger_type="cron_job", - trigger_kwargs=input_param_with_classes, - ) - - def test_outputs_parameters(self): - parameters = [ - {"outputs": []}, - { - "outputs": [ - {"path": "./testdir"}, - {"path": "./testfile"}, - ] - }, - ] - - for param in parameters: - for config in [ - self.default_task_queue_config, - self.default_rest_api_config, - self.default_schedule_config, - ]: - trigger_type = config["triggers"][0].get("trigger_type") - config["triggers"][0] = { - **config["triggers"][0], - **param, - } - - param = { - **param, - **( - {"when": config["triggers"][0]["when"]} - if trigger_type == "cron_job" - else {} - ), - } - - self._run_single_trigger_subtest( - {}, - expected_overall_config=config, - trigger_type=trigger_type, - trigger_kwargs=param, - ) - - input_class_params = { - **param, - "outputs": [ - Output(**output) - for output in (param["outputs"] if "outputs" in param else []) - ], - } - - self._run_single_trigger_subtest( - {}, - expected_overall_config=config, - trigger_type=trigger_type, - trigger_kwargs=input_class_params, - ) - - def test_autoscaling_parameters(self): - default_autoscaling_params = { - "max_replicas": 1, - "desired_latency": 100, - "autoscaling_type": "max_request_latency", - } - - parameters = [ - {"autoscaling": {"max_replicas": 1, "desired_latency": 1000}}, - { - "autoscaling": { - "max_replicas": 2, - } - }, - {"autoscaling": {"desired_latency": 1000}}, - ] - - param: dict[str, Any] - for param in parameters: - expected_task_queue_config = self.default_task_queue_config - expected_rest_api_config = self.default_rest_api_config - - for config in [expected_task_queue_config, expected_rest_api_config]: - trigger_type = config["triggers"][0].get("trigger_type") - config["triggers"][0] = { - **config["triggers"][0], - "autoscaling": { - **default_autoscaling_params, - **param["autoscaling"], - }, - } - - self._run_single_trigger_subtest( - {}, - expected_overall_config=config, - trigger_type=trigger_type, - trigger_kwargs=param, - ) - - input_class_params = { - "autoscaling": Autoscaling(**param["autoscaling"]) - if "autoscaling" in param - else None, - } - - self._run_single_trigger_subtest( - {}, - expected_overall_config=config, - trigger_type=trigger_type, - trigger_kwargs=input_class_params, - ) - - def test_image_parameters(self): - beam_sdk_version = "beam-sdk==0.0.0" - - parameters = [ - { - "image": { - "python_version": PythonVersion.Python310, - } - }, - { - "image": { - "python_version": PythonVersion.Python310, - "python_packages": ["numpy", "pandas", beam_sdk_version], - } - }, - { - "image": { - "python_version": PythonVersion.Python310, - "python_packages": ["numpy", "pandas", beam_sdk_version], - "commands": ["echo hi"], - } - }, - { - "image": { - "python_version": PythonVersion.Python310, - "python_packages": ["numpy", "pandas", beam_sdk_version], - "commands": ["echo hi"], - "base_image": "docker.io/beamcloud/custom-img-test:latest", - } - }, - ] - - # Test params for trigger level runtime image - for param in parameters: - expected_config = self.default_task_queue_config - expected_config["triggers"][0]["runtime"] = Runtime().data - trigger = expected_config["triggers"][0] - trigger_type = trigger.get("trigger_type") - - input_dict_params = { - "runtime": { - **trigger.get("runtime"), - "image": { - **trigger.get("runtime").get("image"), - **param["image"], - }, - } - } - - expected_config["triggers"][0] = { - **expected_config["triggers"][0], - **input_dict_params, - } - expected_config["runtime"] = input_dict_params["runtime"] - - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_config, - trigger_type=trigger_type, - trigger_kwargs=input_dict_params, - ) - - input_class_params = { - "runtime": { - **trigger.get("runtime"), - "image": Image(**{**trigger.get("runtime").get("image"), **param["image"]}), - } - } - - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_config, - trigger_type=trigger_type, - trigger_kwargs=input_class_params, - ) - - # Test params for app level runtime image - for param in parameters: - expected_config = self.default_task_queue_config - expected_config["runtime"] = Runtime(image=Image(**param["image"])).data - trigger = expected_config["triggers"][0] - - input_dict_params = { - "runtime": { - **expected_config["runtime"], - "image": { - **expected_config["runtime"]["image"], - **param["image"], - }, - } - } - - self._run_single_trigger_subtest( - input_dict_params, - expected_overall_config=expected_config, - trigger_type="webhook", - ) - - input_class_params = { - "runtime": { - **expected_config["runtime"], - "image": Image(**{**expected_config["runtime"]["image"], **param["image"]}), - } - } - - self._run_single_trigger_subtest( - input_class_params, - expected_overall_config=expected_config, - trigger_type="webhook", - ) - - def test_run_parameters(self): - parameters = [ - { - "outputs": [ - {"path": "./tmp1"}, - {"path": "./tmp2"}, - ] - }, - { - "runtime": build_config(Runtime(cpu=3, memory="1Gi"), Runtime), - }, - { - "callback_url": "http://test.com", - }, - ] - - parameter_permutations = self._create_parameter_permutations(parameters, [], []) - - for param in parameter_permutations: - expected_config = self.default_run_config - expected_config["run"] = {**expected_config["run"], **param} - - if "runtime" in param: - expected_config["runtime"] = param["runtime"] - - self._run_single_run_subtest( - {}, - expected_overall_config=expected_config, - run_kwargs=param, - ) - - def test_loaders(self): - def some_function(): - pass - - expected_func_path = "tests/test_parameters.py:some_function" - - expected_config = self.default_task_queue_config - expected_config["triggers"][0]["loader"] = expected_func_path - - # test the passing in of a function - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_config, - trigger_type="webhook", - trigger_kwargs={"loader": some_function}, - ) - - # test the passing in of a string - self._run_single_trigger_subtest( - {}, - expected_overall_config=expected_config, - trigger_type="webhook", - trigger_kwargs={"loader": expected_func_path}, - ) - - def test_autoscalers(self): - Test = namedtuple("Test", ["trigger", "autoscaler", "autoscaler_expected"]) - - tests = [ - # TODO: Re-enable when queue depth autoscaler is implemented. - # Test( - # "webhook", - # QueueDepthAutoscaler(max_tasks_per_replica=20), - # {"queue_depth": {"max_tasks_per_replica": 20, "max_replicas": 1}}, - # ), - # Test( - # "rest_api", - # QueueDepthAutoscaler(max_tasks_per_replica=1000), - # {"queue_depth": {"max_tasks_per_replica": 1000, "max_replicas": 1}}, - # ), - Test( - "webhook", - RequestLatencyAutoscaler(desired_latency=100), - {"request_latency": {"desired_latency": 100, "max_replicas": 1}}, - ), - Test( - "webhook", - { - "request_latency": {"desired_latency": 100, "max_replicas": 2}, - "queue_depth": {"max_tasks_per_replica": 11}, - }, - None, - ), - Test( - "rest_api", - RequestLatencyAutoscaler(desired_latency=60), - {"request_latency": {"desired_latency": 60, "max_replicas": 1}}, - ), - ] - - for test in tests: - expected_config = self.default_config - if test.trigger == "rest_api": - expected_config = self.default_rest_api_config - elif test.trigger == "webhook": - expected_config = self.default_task_queue_config - - expected_config["triggers"][0]["autoscaler"] = test.autoscaler_expected - expected_config["triggers"][0]["trigger_type"] = test.trigger - - subtest_kwargs = dict( - app_config={}, - expected_overall_config=expected_config, - trigger_type=test.trigger, - trigger_kwargs={"autoscaler": test.autoscaler}, - ) - - if isinstance(test.autoscaler, dict) and len(test.autoscaler.keys()) > 1: - with pytest.raises(ValidationError): - self._run_single_trigger_subtest(**subtest_kwargs) - else: - self._run_single_trigger_subtest(**subtest_kwargs) - - def test_memory_limits_should_fail(self): - tests = [ - { - "memory": "65Gi", - } - ] - - for test in tests: - with pytest.raises(ValueError): - runtime = self.default_config["runtime"] - runtime["memory"] = test["memory"] - - self._run_single_run_subtest( - {"runtime": runtime}, - expected_overall_config=self.default_run_config, - ) - - self._run_single_trigger_subtest( - {"runtime": Runtime(**runtime)}, - expected_overall_config=self.default_run_config, - ) - - def test_task_policy(self): - Test = namedtuple("Test", ["trigger", "task_policy", "task_policy_expected"]) - - tests = [ - Test( - "webhook", - { - "max_retries": 0, - }, - { - "max_retries": 0, - "timeout": 3600, - }, - ), - Test( - "rest_api", - { - "max_retries": 1, - "timeout": 4200, - }, - { - "max_retries": 1, - "timeout": 4200, - }, - ), - Test( - "cron_job", - {}, - { - "max_retries": 3, - "timeout": 3600, - }, - ), - ] - - for test in tests: - expected_trigger_config = self.default_config - kwargs = {} - - if test.trigger == "webhook": - expected_trigger_config = self.default_task_queue_config - elif test.trigger == "rest_api": - expected_trigger_config = self.default_rest_api_config - elif test.trigger == "cron_job": - expected_trigger_config = self.default_schedule_config - kwargs["when"] = self.default_schedule_config["triggers"][0]["when"] - - expected_trigger_config["triggers"][0]["task_policy"] = test.task_policy_expected - expected_trigger_config["triggers"][0]["trigger_type"] = test.trigger - - self._run_single_trigger_subtest( - {}, - trigger_type=test.trigger, - trigger_kwargs={"task_policy": test.task_policy, **kwargs}, - expected_overall_config=expected_trigger_config, - ) - - # Make sure it works for run as well - expected_run_config = self.default_run_config - expected_run_config["run"]["task_policy"] = test.task_policy_expected - - self._run_single_run_subtest( - {}, - run_kwargs={ - "task_policy": test.task_policy, - }, - expected_overall_config=expected_run_config, - ) - - # Now we want to test flatten params - self._run_single_trigger_subtest( - {}, - trigger_type=test.trigger, - trigger_kwargs={**test.task_policy, **kwargs}, - expected_overall_config=expected_trigger_config, - ) - - # Make sure it works for run as well - run_kwargs = {} - - if test.task_policy.get("timeout"): - run_kwargs["timeout"] = test.task_policy["timeout"] - - # Runs cant set max_retries, so it will always be 3 (ignored in backend) - expected_run_config["run"]["task_policy"]["max_retries"] = 3 - - self._run_single_run_subtest( - {}, - run_kwargs=run_kwargs, - expected_overall_config=expected_run_config, - ) diff --git a/sdk/tests/test_parsers.py b/sdk/tests/test_parsers.py deleted file mode 100644 index 1a44c52d5..000000000 --- a/sdk/tests/test_parsers.py +++ /dev/null @@ -1,43 +0,0 @@ -import pytest - -from beam.utils.parse import compose_cpu, compose_memory - - -class TestParsers: - def test_compose_cpu(self): - assert compose_cpu(1) == "1000m" - assert compose_cpu(1.999) == "1999m" - assert compose_cpu("1000m") == "1000m" - - def test_compose_memory(self): - assert compose_memory("10gi") == "10Gi" - assert compose_memory("10 gi ") == "10Gi" - assert compose_memory("10 Gi ") == "10Gi" - - # Raises if Gi is > 256 - with pytest.raises(ValueError): - compose_memory("10000Gi") - - assert compose_memory("256mi") == "256Mi" - assert compose_memory("256 mi ") == "256Mi" - assert compose_memory("256 Mi ") == "256Mi" - - # Raises if Mi is < 128 or >= 1000 - with pytest.raises(ValueError): - compose_memory("127Mi") - - with pytest.raises(ValueError): - compose_memory("1000Mi") - - # Test invalid formats - with pytest.raises(ValueError): - compose_memory("1000ti") - - with pytest.raises(ValueError): - compose_memory("1000.0") - - with pytest.raises(ValueError): - compose_memory("1000") - - with pytest.raises(ValueError): - compose_memory("2 gigabytes") diff --git a/sdk/tests/test_queue.py b/sdk/tests/test_queue.py new file mode 100644 index 000000000..7172ae6da --- /dev/null +++ b/sdk/tests/test_queue.py @@ -0,0 +1,119 @@ +from unittest import TestCase +from unittest.mock import MagicMock + +import cloudpickle + +from beam.abstractions.queue import SimpleQueue, SimpleQueueInternalServerError +from beam.clients.simplequeue import ( + SimpleQueueEmptyResponse, + SimpleQueuePeekResponse, + SimpleQueuePopResponse, + SimpleQueuePutResponse, + SimpleQueueSizeResponse, +) + +from .utils import override_run_sync + + +class TestSimpleQueue(TestCase): + def setUp(self): + pass + + def test_put(self): + mock_stub = MagicMock() + + mock_stub.put.return_value = SimpleQueuePutResponse(ok=True) + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + queue.run_sync = override_run_sync + + self.assertTrue(queue.put("test")) + + mock_stub.put.return_value = SimpleQueuePutResponse(ok=False) + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + queue.run_sync = override_run_sync + + self.assertRaises(SimpleQueueInternalServerError, queue.put, "test") + + def test_pop(self): + mock_stub = MagicMock() + + pickled_value = cloudpickle.dumps("test") + + mock_stub.pop.return_value = SimpleQueuePopResponse(ok=True, value=pickled_value) + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + queue.run_sync = override_run_sync + + self.assertEqual(queue.pop(), "test") + + mock_stub.pop.return_value = SimpleQueuePopResponse(ok=False, value=b"") + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + queue.run_sync = override_run_sync + + self.assertRaises(SimpleQueueInternalServerError, queue.pop) + + def test_peek(self): + mock_stub = MagicMock() + + pickled_value = cloudpickle.dumps("test") + + mock_stub.peek.return_value = SimpleQueuePeekResponse(ok=True, value=pickled_value) + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + queue.run_sync = override_run_sync + + self.assertEqual(queue.peek(), "test") + + mock_stub.peek.return_value = SimpleQueuePeekResponse(ok=False, value=b"") + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + queue.run_sync = override_run_sync + + self.assertRaises(SimpleQueueInternalServerError, queue.peek) + + def test_empty(self): + mock_stub = MagicMock() + + mock_stub.empty.return_value = SimpleQueueEmptyResponse(ok=True, empty=True) + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + queue.run_sync = override_run_sync + + self.assertTrue(queue.empty()) + + mock_stub.empty.return_value = SimpleQueueEmptyResponse(ok=False, empty=True) + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + queue.run_sync = override_run_sync + + self.assertRaises(SimpleQueueInternalServerError, queue.empty) + + def test_size(self): + mock_stub = MagicMock() + + mock_stub.size.return_value = SimpleQueueSizeResponse(ok=True, size=1) + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + queue.run_sync = override_run_sync + + self.assertEqual(len(queue), 1) + + mock_stub.size.return_value = SimpleQueueSizeResponse(ok=False, size=1) + + queue = SimpleQueue(name="test") + queue.stub = mock_stub + queue.run_sync = override_run_sync + + self.assertEqual(len(queue), 0) diff --git a/sdk/tests/test_task_queue.py b/sdk/tests/test_task_queue.py new file mode 100644 index 000000000..e69de29bb diff --git a/sdk/tests/test_validators.py b/sdk/tests/test_validators.py deleted file mode 100644 index a11151192..000000000 --- a/sdk/tests/test_validators.py +++ /dev/null @@ -1,22 +0,0 @@ -import pytest - -from beam import validators - - -def test_every(): - with pytest.raises(validators.ValidationError): - validators.IsValidEvery()("every") - - with pytest.raises(validators.ValidationError): - validators.IsValidEvery()("every 1") - - with pytest.raises(validators.ValidationError): - validators.IsValidEvery()("every 1 s") - - with pytest.raises(validators.ValidationError): - validators.IsValidEvery()("every 1 unknown-s") - - for time_values in [1, 10, 100]: - for unit in validators.IsValidEvery.time_units: - # should not fail - validators.IsValidEvery()(f"every {time_values}{unit}") diff --git a/sdk/tests/test_volumes.py b/sdk/tests/test_volumes.py new file mode 100644 index 000000000..2396863dc --- /dev/null +++ b/sdk/tests/test_volumes.py @@ -0,0 +1,42 @@ +from unittest import TestCase +from unittest.mock import MagicMock + +from beam.abstractions.volume import Volume +from beam.clients.volume import GetOrCreateVolumeResponse + +from .utils import override_run_sync + + +class TestVolumes(TestCase): + def setUp(self): + pass + + def test_get_or_create(self): + mock_stub = MagicMock() + + # Test that a valid grpc response sets the volume id and ready flag + mock_stub.get_or_create_volume.return_value = GetOrCreateVolumeResponse( + ok=True, volume_id="1234" + ) + + volume = Volume(name="test", mount_path="/test") + volume.stub = mock_stub + volume.run_sync = override_run_sync + + self.assertFalse(volume.ready) + self.assertTrue(volume.get_or_create()) + self.assertTrue(volume.ready) + self.assertEqual(volume.volume_id, "1234") + + # Test that an invalid grpc response does not set the volume id or ready flag + mock_stub.get_or_create_volume.return_value = GetOrCreateVolumeResponse( + ok=False, volume_id="" + ) + + volume = Volume(name="test", mount_path="/test") + volume.stub = mock_stub + volume.run_sync = override_run_sync + + self.assertFalse(volume.get_or_create()) + self.assertFalse(volume.ready) + self.assertEqual(volume.volume_id, None) diff --git a/sdk/tests/utils.py b/sdk/tests/utils.py new file mode 100644 index 000000000..9fd740b0c --- /dev/null +++ b/sdk/tests/utils.py @@ -0,0 +1,5 @@ +from typing import Callable + + +def override_run_sync(f: Callable): + return f From f3ef98fb9adeee3eaab8f58a18ef8d0642b55a8f Mon Sep 17 00:00:00 2001 From: John Date: Tue, 9 Jan 2024 12:31:26 -0700 Subject: [PATCH 02/18] add task queue tests --- sdk/tests/test_task_queue.py | 72 ++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/sdk/tests/test_task_queue.py b/sdk/tests/test_task_queue.py index e69de29bb..4a6c97e3f 100644 --- a/sdk/tests/test_task_queue.py +++ b/sdk/tests/test_task_queue.py @@ -0,0 +1,72 @@ +from unittest import TestCase +from unittest.mock import MagicMock + +from beam import Image +from beam.abstractions.taskqueue import TaskQueue +from beam.clients.taskqueue import TaskQueuePutResponse + +from .utils import override_run_sync + + +class TestTaskQueue(TestCase): + def test_init(self): + mock_stub = MagicMock() + + queue = TaskQueue(Image(python_version="python3.7"), cpu=100, memory=128) + queue.stub = mock_stub + + self.assertEqual(queue.image.python_version, "python3.7") + self.assertEqual(queue.cpu, 100) + self.assertEqual(queue.memory, 128) + + def test_run_local(self): + @TaskQueue(Image(python_version="python3.7"), cpu=100, memory=128) + def test_func(): + return 1 + + resp = test_func.local() + + self.assertEqual(resp, 1) + + def test_put(self): + @TaskQueue(Image(python_version="python3.7"), cpu=100, memory=128) + def test_func(): + return 1 + + test_func.parent.taskqueue_stub = MagicMock() + test_func.parent.taskqueue_stub.task_queue_put.return_value = TaskQueuePutResponse( + ok=True, task_id="1234" + ) + test_func.parent.prepare_runtime = MagicMock(return_value=True) + test_func.parent.run_sync = override_run_sync + + test_func.put() + + test_func.parent.taskqueue_stub.task_queue_put.return_value = TaskQueuePutResponse( + ok=False, task_id="" + ) + + self.assertRaises(SystemExit, test_func.put) + + def test__call__(self): + @TaskQueue(Image(python_version="python3.7"), cpu=100, memory=128) + def test_func(): + return 1 + + test_func.parent.taskqueue_stub = MagicMock() + test_func.parent.taskqueue_stub.task_queue_put.return_value = TaskQueuePutResponse( + ok=True, task_id="1234" + ) + test_func.parent.prepare_runtime = MagicMock(return_value=True) + test_func.parent.run_sync = override_run_sync + + test_func() + + test_func.parent.taskqueue_stub.task_queue_put.return_value = TaskQueuePutResponse( + ok=False, task_id="" + ) + + import os + + os.environ["CONTAINER_ID"] = "1234" + self.assertEqual(test_func(), 1) From 1d74d5fa0065b9c47b4a48cad26a22083294034c Mon Sep 17 00:00:00 2001 From: John Date: Tue, 9 Jan 2024 12:58:57 -0700 Subject: [PATCH 03/18] Add tests for function --- sdk/src/beam/abstractions/function.py | 2 +- sdk/tests/test_function.py | 70 +++++++++++++++++++++++++++ sdk/tests/utils.py | 9 +++- 3 files changed, 78 insertions(+), 3 deletions(-) create mode 100644 sdk/tests/test_function.py diff --git a/sdk/src/beam/abstractions/function.py b/sdk/src/beam/abstractions/function.py index 2644678a2..4dde50c9a 100644 --- a/sdk/src/beam/abstractions/function.py +++ b/sdk/src/beam/abstractions/function.py @@ -70,7 +70,7 @@ async def _call_remote(self, *args, **kwargs) -> Any: if r.output != "": terminal.detail(r.output) - if r.done: + if r.done or r.exit_code != 0: last_response = r break diff --git a/sdk/tests/test_function.py b/sdk/tests/test_function.py new file mode 100644 index 000000000..040be6150 --- /dev/null +++ b/sdk/tests/test_function.py @@ -0,0 +1,70 @@ +from unittest import TestCase +from unittest.mock import MagicMock + +import cloudpickle + +from beam import Image +from beam.abstractions.function import Function +from beam.clients.function import FunctionInvokeResponse + +from .utils import override_run_sync + + +class AsyncIterator: + def __init__(self, seq): + self.iter = iter(seq) + + def __aiter__(self): + return self + + async def __anext__(self): + try: + return next(self.iter) + except StopIteration: + raise StopAsyncIteration + + +class TestTaskQueue(TestCase): + def test_init(self): + mock_stub = MagicMock() + + queue = Function(Image(python_version="python3.7"), cpu=100, memory=128) + queue.stub = mock_stub + + self.assertEqual(queue.image.python_version, "python3.7") + self.assertEqual(queue.cpu, 100) + self.assertEqual(queue.memory, 128) + + def test_run_local(self): + @Function(Image(python_version="python3.7"), cpu=100, memory=128) + def test_func(): + return 1 + + resp = test_func.local() + + self.assertEqual(resp, 1) + + def test_function_invoke(self): + @Function(Image(python_version="python3.7"), cpu=100, memory=128) + def test_func(*args, **kwargs): + return 1998 + + pickled_value = cloudpickle.dumps(1998) + + test_func.parent.function_stub = MagicMock() + test_func.parent.syncer = MagicMock() + + test_func.parent.function_stub.function_invoke.return_value = AsyncIterator( + [FunctionInvokeResponse(done=True, exit_code=0, result=pickled_value)] + ) + + test_func.parent.prepare_runtime = MagicMock(return_value=True) + test_func.parent.run_sync = override_run_sync + + self.assertEqual(test_func(), 1998) + + test_func.parent.function_stub.function_invoke.return_value = AsyncIterator( + [FunctionInvokeResponse(done=False, exit_code=1, result=b"")] + ) + + self.assertRaises(SystemExit, test_func) diff --git a/sdk/tests/utils.py b/sdk/tests/utils.py index 9fd740b0c..0a6a0cbcf 100644 --- a/sdk/tests/utils.py +++ b/sdk/tests/utils.py @@ -1,5 +1,10 @@ -from typing import Callable +import asyncio +import inspect -def override_run_sync(f: Callable): +def override_run_sync(f): + if inspect.isawaitable(f): + loop = asyncio.get_event_loop() + return loop.run_until_complete(f) + return f From 0854b4a80fe0f5e064f060ee99c0963aaec2dc94 Mon Sep 17 00:00:00 2001 From: John Date: Tue, 9 Jan 2024 13:13:15 -0700 Subject: [PATCH 04/18] Add test for function map --- sdk/src/beam/abstractions/taskqueue.py | 9 ++++----- sdk/tests/test_function.py | 24 ++++++++++++++++++++++++ sdk/tests/test_task_queue.py | 8 ++++---- 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/sdk/src/beam/abstractions/taskqueue.py b/sdk/src/beam/abstractions/taskqueue.py index 242629c43..695eec819 100644 --- a/sdk/src/beam/abstractions/taskqueue.py +++ b/sdk/src/beam/abstractions/taskqueue.py @@ -51,11 +51,10 @@ def __call__(self, *args, **kwargs) -> Any: if container_id is not None: return self.local(*args, **kwargs) - if not self.parent.prepare_runtime( - func=self.func, - stub_type=TASKQUEUE_STUB_TYPE, - ): - return + raise NotImplementedError( + "Direct calls to TaskQueues are not yet supported." + + " To enqueue items use .put(*args, **kwargs)" + ) def local(self, *args, **kwargs) -> Any: return self.func(*args, **kwargs) diff --git a/sdk/tests/test_function.py b/sdk/tests/test_function.py index 040be6150..b18e155e7 100644 --- a/sdk/tests/test_function.py +++ b/sdk/tests/test_function.py @@ -68,3 +68,27 @@ def test_func(*args, **kwargs): ) self.assertRaises(SystemExit, test_func) + + def test_map(self): + @Function(Image(python_version="python3.7"), cpu=100, memory=128) + def test_func(*args, **kwargs): + return 1998 + + pickled_value = cloudpickle.dumps(1998) + + test_func.parent.function_stub = MagicMock() + test_func.parent.syncer = MagicMock() + + test_func.parent.function_stub.function_invoke.return_value = AsyncIterator( + [ + FunctionInvokeResponse(done=True, exit_code=0, result=pickled_value), + FunctionInvokeResponse(done=True, exit_code=0, result=pickled_value), + FunctionInvokeResponse(done=True, exit_code=0, result=pickled_value), + ] + ) + + test_func.parent.prepare_runtime = MagicMock(return_value=True) + test_func.parent.run_sync = override_run_sync + + for val in test_func.map([1, 2, 3]): + self.assertEqual(val, 1998) diff --git a/sdk/tests/test_task_queue.py b/sdk/tests/test_task_queue.py index 4a6c97e3f..083b6ac07 100644 --- a/sdk/tests/test_task_queue.py +++ b/sdk/tests/test_task_queue.py @@ -60,12 +60,12 @@ def test_func(): test_func.parent.prepare_runtime = MagicMock(return_value=True) test_func.parent.run_sync = override_run_sync - test_func() - - test_func.parent.taskqueue_stub.task_queue_put.return_value = TaskQueuePutResponse( - ok=False, task_id="" + self.assertRaises( + NotImplementedError, + test_func, ) + # Test calling in container import os os.environ["CONTAINER_ID"] = "1234" From de7b9a8cf79bb71ff092e6a09378f076a637d1f7 Mon Sep 17 00:00:00 2001 From: John Date: Tue, 9 Jan 2024 13:14:44 -0700 Subject: [PATCH 05/18] add comment explanation --- sdk/tests/test_function.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdk/tests/test_function.py b/sdk/tests/test_function.py index b18e155e7..c271f5d77 100644 --- a/sdk/tests/test_function.py +++ b/sdk/tests/test_function.py @@ -79,6 +79,10 @@ def test_func(*args, **kwargs): test_func.parent.function_stub = MagicMock() test_func.parent.syncer = MagicMock() + # Since the return value is a reference to this same aysnc iterator, everytime it + # it will iterate to the next value. This iterator in testing is persisted across + # multiple calls to the function, so we can simulate multiple responses. + # (ONLY HAPPENS DURING TESTING) test_func.parent.function_stub.function_invoke.return_value = AsyncIterator( [ FunctionInvokeResponse(done=True, exit_code=0, result=pickled_value), From c14e4708b2a00c768963b1c2fac3855b02f8ed4f Mon Sep 17 00:00:00 2001 From: John Date: Tue, 9 Jan 2024 19:12:38 -0700 Subject: [PATCH 06/18] change function to use mock_coroutine_with_result --- sdk/src/beam/cli/main.py | 3 --- sdk/tests/test_function.py | 4 ---- sdk/tests/test_map.py | 43 +++++++++++++----------------------- sdk/tests/test_queue.py | 36 +++++++++++++----------------- sdk/tests/test_task_queue.py | 20 ++++++++--------- sdk/tests/test_volumes.py | 12 +++++----- sdk/tests/utils.py | 15 +++++-------- 7 files changed, 50 insertions(+), 83 deletions(-) diff --git a/sdk/src/beam/cli/main.py b/sdk/src/beam/cli/main.py index 6c288a632..5cbef07de 100644 --- a/sdk/src/beam/cli/main.py +++ b/sdk/src/beam/cli/main.py @@ -9,6 +9,3 @@ cli = click.Group() cli.add_command(configure.configure) cli.add_command(tasks.cli) - -if __name__ == "__main__": - cli() diff --git a/sdk/tests/test_function.py b/sdk/tests/test_function.py index c271f5d77..94ac6ad50 100644 --- a/sdk/tests/test_function.py +++ b/sdk/tests/test_function.py @@ -7,8 +7,6 @@ from beam.abstractions.function import Function from beam.clients.function import FunctionInvokeResponse -from .utils import override_run_sync - class AsyncIterator: def __init__(self, seq): @@ -59,7 +57,6 @@ def test_func(*args, **kwargs): ) test_func.parent.prepare_runtime = MagicMock(return_value=True) - test_func.parent.run_sync = override_run_sync self.assertEqual(test_func(), 1998) @@ -92,7 +89,6 @@ def test_func(*args, **kwargs): ) test_func.parent.prepare_runtime = MagicMock(return_value=True) - test_func.parent.run_sync = override_run_sync for val in test_func.map([1, 2, 3]): self.assertEqual(val, 1998) diff --git a/sdk/tests/test_map.py b/sdk/tests/test_map.py index cc334e734..b8e930005 100644 --- a/sdk/tests/test_map.py +++ b/sdk/tests/test_map.py @@ -12,7 +12,7 @@ MapSetResponse, ) -from .utils import override_run_sync +from .utils import mock_coroutine_with_result class TestMap(TestCase): @@ -22,19 +22,17 @@ def setUp(self): def test_set(self): mock_stub = MagicMock() - mock_stub.map_set.return_value = MapSetResponse(ok=True) + mock_stub.map_set = mock_coroutine_with_result(MapSetResponse(ok=True)) map = Map(name="test") map.stub = mock_stub - map.run_sync = override_run_sync self.assertTrue(map.set("test", "test")) - mock_stub.map_set.return_value = MapSetResponse(ok=False) + mock_stub.map_set = mock_coroutine_with_result(MapSetResponse(ok=False)) map = Map(name="test") map.stub = mock_stub - map.run_sync = override_run_sync self.assertFalse(map.set("test", "test")) @@ -43,38 +41,34 @@ def test_get(self): pickled_value = cloudpickle.dumps("test") - mock_stub.map_get.return_value = MapGetResponse(ok=True, value=pickled_value) + mock_stub.map_get = mock_coroutine_with_result(MapGetResponse(ok=True, value=pickled_value)) map = Map(name="test") map.stub = mock_stub - map.run_sync = override_run_sync self.assertEqual(map.get("test"), "test") - mock_stub.map_get.return_value = MapGetResponse(ok=False, value=b"") + mock_stub.map_get = mock_coroutine_with_result(MapGetResponse(ok=False, value=b"")) map = Map(name="test") map.stub = mock_stub - map.run_sync = override_run_sync self.assertEqual(map.get("test"), None) def test_delitem(self): mock_stub = MagicMock() - mock_stub.map_delete.return_value = MapDeleteResponse(ok=True) + mock_stub.map_delete = mock_coroutine_with_result(MapDeleteResponse(ok=True)) map = Map(name="test") map.stub = mock_stub - map.run_sync = override_run_sync del map["test"] - mock_stub.map_delete.return_value = MapDeleteResponse(ok=False) + mock_stub.map_delete = mock_coroutine_with_result(MapDeleteResponse(ok=False)) map = Map(name="test") map.stub = mock_stub - map.run_sync = override_run_sync def _del(): del map["test"] @@ -84,38 +78,34 @@ def _del(): def test_len(self): mock_stub = MagicMock() - mock_stub.map_count.return_value = MapCountResponse(ok=True, count=1) + mock_stub.map_count = mock_coroutine_with_result(MapCountResponse(ok=True, count=1)) map = Map(name="test") map.stub = mock_stub - map.run_sync = override_run_sync self.assertEqual(len(map), 1) - mock_stub.map_count.return_value = MapCountResponse(ok=False, count=1) + mock_stub.map_count = mock_coroutine_with_result(MapCountResponse(ok=False, count=1)) map = Map(name="test") map.stub = mock_stub - map.run_sync = override_run_sync self.assertEqual(len(map), 0) def test_iter(self): mock_stub = MagicMock() - mock_stub.map_keys.return_value = MapKeysResponse(ok=True, keys=["test"]) + mock_stub.map_keys = mock_coroutine_with_result(MapKeysResponse(ok=True, keys=["test"])) map = Map(name="test") map.stub = mock_stub - map.run_sync = override_run_sync self.assertEqual(list(map), ["test"]) - mock_stub.map_keys.return_value = MapKeysResponse(ok=False, keys=[]) + mock_stub.map_keys = mock_coroutine_with_result(MapKeysResponse(ok=False, keys=[])) map = Map(name="test") map.stub = mock_stub - map.run_sync = override_run_sync self.assertEqual(list(map), []) @@ -124,19 +114,16 @@ def test_items(self): pickled_value = cloudpickle.dumps("test") - mock_stub.map_keys.return_value = MapKeysResponse(ok=True, keys=["test"]) - mock_stub.map_get.return_value = MapGetResponse(ok=True, value=pickled_value) + mock_stub.map_keys = mock_coroutine_with_result(MapKeysResponse(ok=True, keys=["test"])) + mock_stub.map_get = mock_coroutine_with_result(MapGetResponse(ok=True, value=pickled_value)) map = Map(name="test") map.stub = mock_stub - map.run_sync = override_run_sync + self.assertListEqual(list(map.items()), [("test", "test")]) - self.assertEqual(list(map.items()), [("test", "test")]) - - mock_stub.map_keys.return_value = MapKeysResponse(ok=False, keys=[]) + mock_stub.map_keys = mock_coroutine_with_result(MapKeysResponse(ok=False, keys=[])) map = Map(name="test") map.stub = mock_stub - map.run_sync = override_run_sync self.assertEqual(list(map.items()), []) diff --git a/sdk/tests/test_queue.py b/sdk/tests/test_queue.py index 7172ae6da..a1419110b 100644 --- a/sdk/tests/test_queue.py +++ b/sdk/tests/test_queue.py @@ -12,7 +12,7 @@ SimpleQueueSizeResponse, ) -from .utils import override_run_sync +from .utils import mock_coroutine_with_result class TestSimpleQueue(TestCase): @@ -22,19 +22,17 @@ def setUp(self): def test_put(self): mock_stub = MagicMock() - mock_stub.put.return_value = SimpleQueuePutResponse(ok=True) + mock_stub.put = mock_coroutine_with_result(SimpleQueuePutResponse(ok=True)) queue = SimpleQueue(name="test") queue.stub = mock_stub - queue.run_sync = override_run_sync self.assertTrue(queue.put("test")) - mock_stub.put.return_value = SimpleQueuePutResponse(ok=False) + mock_stub.put = mock_coroutine_with_result(SimpleQueuePutResponse(ok=False)) queue = SimpleQueue(name="test") queue.stub = mock_stub - queue.run_sync = override_run_sync self.assertRaises(SimpleQueueInternalServerError, queue.put, "test") @@ -43,19 +41,19 @@ def test_pop(self): pickled_value = cloudpickle.dumps("test") - mock_stub.pop.return_value = SimpleQueuePopResponse(ok=True, value=pickled_value) + mock_stub.pop = mock_coroutine_with_result( + SimpleQueuePopResponse(ok=True, value=pickled_value) + ) queue = SimpleQueue(name="test") queue.stub = mock_stub - queue.run_sync = override_run_sync self.assertEqual(queue.pop(), "test") - mock_stub.pop.return_value = SimpleQueuePopResponse(ok=False, value=b"") + mock_stub.pop = mock_coroutine_with_result(SimpleQueuePopResponse(ok=False, value=b"")) queue = SimpleQueue(name="test") queue.stub = mock_stub - queue.run_sync = override_run_sync self.assertRaises(SimpleQueueInternalServerError, queue.pop) @@ -64,56 +62,52 @@ def test_peek(self): pickled_value = cloudpickle.dumps("test") - mock_stub.peek.return_value = SimpleQueuePeekResponse(ok=True, value=pickled_value) + mock_stub.peek = mock_coroutine_with_result( + SimpleQueuePeekResponse(ok=True, value=pickled_value) + ) queue = SimpleQueue(name="test") queue.stub = mock_stub - queue.run_sync = override_run_sync self.assertEqual(queue.peek(), "test") - mock_stub.peek.return_value = SimpleQueuePeekResponse(ok=False, value=b"") + mock_stub.peek = mock_coroutine_with_result(SimpleQueuePeekResponse(ok=False, value=b"")) queue = SimpleQueue(name="test") queue.stub = mock_stub - queue.run_sync = override_run_sync self.assertRaises(SimpleQueueInternalServerError, queue.peek) def test_empty(self): mock_stub = MagicMock() - mock_stub.empty.return_value = SimpleQueueEmptyResponse(ok=True, empty=True) + mock_stub.empty = mock_coroutine_with_result(SimpleQueueEmptyResponse(ok=True, empty=True)) queue = SimpleQueue(name="test") queue.stub = mock_stub - queue.run_sync = override_run_sync self.assertTrue(queue.empty()) - mock_stub.empty.return_value = SimpleQueueEmptyResponse(ok=False, empty=True) + mock_stub.empty = mock_coroutine_with_result(SimpleQueueEmptyResponse(ok=False, empty=True)) queue = SimpleQueue(name="test") queue.stub = mock_stub - queue.run_sync = override_run_sync self.assertRaises(SimpleQueueInternalServerError, queue.empty) def test_size(self): mock_stub = MagicMock() - mock_stub.size.return_value = SimpleQueueSizeResponse(ok=True, size=1) + mock_stub.size = mock_coroutine_with_result(SimpleQueueSizeResponse(ok=True, size=1)) queue = SimpleQueue(name="test") queue.stub = mock_stub - queue.run_sync = override_run_sync self.assertEqual(len(queue), 1) - mock_stub.size.return_value = SimpleQueueSizeResponse(ok=False, size=1) + mock_stub.size = mock_coroutine_with_result(SimpleQueueSizeResponse(ok=False, size=1)) queue = SimpleQueue(name="test") queue.stub = mock_stub - queue.run_sync = override_run_sync self.assertEqual(len(queue), 0) diff --git a/sdk/tests/test_task_queue.py b/sdk/tests/test_task_queue.py index 083b6ac07..08ff72c76 100644 --- a/sdk/tests/test_task_queue.py +++ b/sdk/tests/test_task_queue.py @@ -1,3 +1,4 @@ +import os from unittest import TestCase from unittest.mock import MagicMock @@ -5,7 +6,7 @@ from beam.abstractions.taskqueue import TaskQueue from beam.clients.taskqueue import TaskQueuePutResponse -from .utils import override_run_sync +from .utils import mock_coroutine_with_result class TestTaskQueue(TestCase): @@ -34,16 +35,15 @@ def test_func(): return 1 test_func.parent.taskqueue_stub = MagicMock() - test_func.parent.taskqueue_stub.task_queue_put.return_value = TaskQueuePutResponse( - ok=True, task_id="1234" + test_func.parent.taskqueue_stub.task_queue_put = mock_coroutine_with_result( + TaskQueuePutResponse(ok=True, task_id="1234") ) test_func.parent.prepare_runtime = MagicMock(return_value=True) - test_func.parent.run_sync = override_run_sync test_func.put() - test_func.parent.taskqueue_stub.task_queue_put.return_value = TaskQueuePutResponse( - ok=False, task_id="" + test_func.parent.taskqueue_stub.task_queue_put = mock_coroutine_with_result( + TaskQueuePutResponse(ok=False, task_id="") ) self.assertRaises(SystemExit, test_func.put) @@ -54,11 +54,11 @@ def test_func(): return 1 test_func.parent.taskqueue_stub = MagicMock() - test_func.parent.taskqueue_stub.task_queue_put.return_value = TaskQueuePutResponse( - ok=True, task_id="1234" + test_func.parent.taskqueue_stub.task_queue_put = mock_coroutine_with_result( + TaskQueuePutResponse(ok=True, task_id="1234") ) + test_func.parent.prepare_runtime = MagicMock(return_value=True) - test_func.parent.run_sync = override_run_sync self.assertRaises( NotImplementedError, @@ -66,7 +66,5 @@ def test_func(): ) # Test calling in container - import os - os.environ["CONTAINER_ID"] = "1234" self.assertEqual(test_func(), 1) diff --git a/sdk/tests/test_volumes.py b/sdk/tests/test_volumes.py index 2396863dc..12d7211e0 100644 --- a/sdk/tests/test_volumes.py +++ b/sdk/tests/test_volumes.py @@ -4,7 +4,7 @@ from beam.abstractions.volume import Volume from beam.clients.volume import GetOrCreateVolumeResponse -from .utils import override_run_sync +from .utils import mock_coroutine_with_result class TestVolumes(TestCase): @@ -15,13 +15,12 @@ def test_get_or_create(self): mock_stub = MagicMock() # Test that a valid grpc response sets the volume id and ready flag - mock_stub.get_or_create_volume.return_value = GetOrCreateVolumeResponse( - ok=True, volume_id="1234" + mock_stub.get_or_create_volume = mock_coroutine_with_result( + GetOrCreateVolumeResponse(ok=True, volume_id="1234") ) volume = Volume(name="test", mount_path="/test") volume.stub = mock_stub - volume.run_sync = override_run_sync self.assertFalse(volume.ready) self.assertTrue(volume.get_or_create()) @@ -29,13 +28,12 @@ def test_get_or_create(self): self.assertEqual(volume.volume_id, "1234") # Test that an invalid grpc response does not set the volume id or ready flag - mock_stub.get_or_create_volume.return_value = GetOrCreateVolumeResponse( - ok=False, volume_id="" + mock_stub.get_or_create_volume = mock_coroutine_with_result( + GetOrCreateVolumeResponse(ok=False, volume_id="") ) volume = Volume(name="test", mount_path="/test") volume.stub = mock_stub - volume.run_sync = override_run_sync self.assertFalse(volume.get_or_create()) self.assertFalse(volume.ready) diff --git a/sdk/tests/utils.py b/sdk/tests/utils.py index 0a6a0cbcf..a2bd77d60 100644 --- a/sdk/tests/utils.py +++ b/sdk/tests/utils.py @@ -1,10 +1,7 @@ -import asyncio -import inspect +# Helps mock out results that should return from a coroutine function +# that you want to override +def mock_coroutine_with_result(result): + async def _result(*args, **kwargs): + return result - -def override_run_sync(f): - if inspect.isawaitable(f): - loop = asyncio.get_event_loop() - return loop.run_until_complete(f) - - return f + return _result From 84eadc54f5277a8a703532c0da2556372bc7f8bc Mon Sep 17 00:00:00 2001 From: John Date: Tue, 9 Jan 2024 19:13:44 -0700 Subject: [PATCH 07/18] updated for review --- sdk/tests/test_function.py | 10 +++++----- sdk/tests/test_task_queue.py | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sdk/tests/test_function.py b/sdk/tests/test_function.py index 94ac6ad50..4b9f771be 100644 --- a/sdk/tests/test_function.py +++ b/sdk/tests/test_function.py @@ -26,15 +26,15 @@ class TestTaskQueue(TestCase): def test_init(self): mock_stub = MagicMock() - queue = Function(Image(python_version="python3.7"), cpu=100, memory=128) + queue = Function(Image(python_version="python3.8"), cpu=100, memory=128) queue.stub = mock_stub - self.assertEqual(queue.image.python_version, "python3.7") + self.assertEqual(queue.image.python_version, "python3.8") self.assertEqual(queue.cpu, 100) self.assertEqual(queue.memory, 128) def test_run_local(self): - @Function(Image(python_version="python3.7"), cpu=100, memory=128) + @Function(Image(python_version="python3.8"), cpu=100, memory=128) def test_func(): return 1 @@ -43,7 +43,7 @@ def test_func(): self.assertEqual(resp, 1) def test_function_invoke(self): - @Function(Image(python_version="python3.7"), cpu=100, memory=128) + @Function(Image(python_version="python3.8"), cpu=100, memory=128) def test_func(*args, **kwargs): return 1998 @@ -67,7 +67,7 @@ def test_func(*args, **kwargs): self.assertRaises(SystemExit, test_func) def test_map(self): - @Function(Image(python_version="python3.7"), cpu=100, memory=128) + @Function(Image(python_version="python3.8"), cpu=100, memory=128) def test_func(*args, **kwargs): return 1998 diff --git a/sdk/tests/test_task_queue.py b/sdk/tests/test_task_queue.py index 08ff72c76..a9bad615a 100644 --- a/sdk/tests/test_task_queue.py +++ b/sdk/tests/test_task_queue.py @@ -13,15 +13,15 @@ class TestTaskQueue(TestCase): def test_init(self): mock_stub = MagicMock() - queue = TaskQueue(Image(python_version="python3.7"), cpu=100, memory=128) + queue = TaskQueue(Image(python_version="python3.8"), cpu=100, memory=128) queue.stub = mock_stub - self.assertEqual(queue.image.python_version, "python3.7") + self.assertEqual(queue.image.python_version, "python3.8") self.assertEqual(queue.cpu, 100) self.assertEqual(queue.memory, 128) def test_run_local(self): - @TaskQueue(Image(python_version="python3.7"), cpu=100, memory=128) + @TaskQueue(Image(python_version="python3.8"), cpu=100, memory=128) def test_func(): return 1 @@ -30,7 +30,7 @@ def test_func(): self.assertEqual(resp, 1) def test_put(self): - @TaskQueue(Image(python_version="python3.7"), cpu=100, memory=128) + @TaskQueue(Image(python_version="python3.8"), cpu=100, memory=128) def test_func(): return 1 @@ -49,7 +49,7 @@ def test_func(): self.assertRaises(SystemExit, test_func.put) def test__call__(self): - @TaskQueue(Image(python_version="python3.7"), cpu=100, memory=128) + @TaskQueue(Image(python_version="python3.8"), cpu=100, memory=128) def test_func(): return 1 From 008599f517c30f7ea1739922385b4d140db7f820 Mon Sep 17 00:00:00 2001 From: John Date: Wed, 10 Jan 2024 14:13:56 -0700 Subject: [PATCH 08/18] add github actions --- .github/workflows/ci.yml | 43 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 000000000..c71df2c0d --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,43 @@ +name: CI + +on: [push] + +defaults: + run: + working-directory: ./sdk + +jobs: + lint_and_test: + runs-on: ubuntu-latest + strategy: + max-parallel: 4 + + steps: + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.11" + + - name: Set up Poetry + env: + ACTIONS_ALLOW_UNSECURE_COMMANDS: true + uses: snok/install-poetry@v1 + with: + version: 1.5.1 + virtualenvs-create: true + virtualenvs-in-project: true + installer-parallel: true + + - name: Install dependencies + run: poetry install + if: steps.cache.outputs.cache-hit != 'true' + + - name: Code formatting + run: poetry run black . --check + + # todo: remove exit-zero once ruff issues are fixed + - name: Code linting + run: poetry run ruff . --exit-zero + + - name: Run tests + run: make test From 5af75c3d49a89eac2f02a8a99a9771a3c5a112cb Mon Sep 17 00:00:00 2001 From: John Date: Wed, 10 Jan 2024 14:23:24 -0700 Subject: [PATCH 09/18] patch actions --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c71df2c0d..6e10f1fbc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,7 +4,7 @@ on: [push] defaults: run: - working-directory: ./sdk + working-directory: sdk jobs: lint_and_test: From cf0ab6f78117d053ad196c2bda0184aa20171a16 Mon Sep 17 00:00:00 2001 From: John Date: Wed, 10 Jan 2024 14:29:41 -0700 Subject: [PATCH 10/18] poetry --- .github/workflows/ci.yml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6e10f1fbc..70bb9ae27 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,6 +28,13 @@ jobs: virtualenvs-in-project: true installer-parallel: true + - name: Load cached venv + id: cached-poetry-dependencies + uses: actions/cache@v3 + with: + path: .venv + key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }} + - name: Install dependencies run: poetry install if: steps.cache.outputs.cache-hit != 'true' From 9022f38554e0dd65a1d8cd77a3ffe43e65c46fb1 Mon Sep 17 00:00:00 2001 From: John Date: Wed, 10 Jan 2024 14:33:27 -0700 Subject: [PATCH 11/18] poetry --- .github/workflows/ci.yml | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 70bb9ae27..28726b12a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,9 +2,9 @@ name: CI on: [push] -defaults: - run: - working-directory: sdk +# defaults: +# run: +# working-directory: sdk jobs: lint_and_test: @@ -36,8 +36,10 @@ jobs: key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }} - name: Install dependencies - run: poetry install - if: steps.cache.outputs.cache-hit != 'true' + run: | + cd sdk + poetry install + if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' - name: Code formatting run: poetry run black . --check From 55ae69ac9a25136a6f8499da71e9ce04179a8674 Mon Sep 17 00:00:00 2001 From: John Date: Wed, 10 Jan 2024 14:41:21 -0700 Subject: [PATCH 12/18] forgot ot checkut --- .github/workflows/ci.yml | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 28726b12a..ecaa32ff9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,9 +2,9 @@ name: CI on: [push] -# defaults: -# run: -# working-directory: sdk +defaults: + run: + working-directory: sdk jobs: lint_and_test: @@ -13,6 +13,9 @@ jobs: max-parallel: 4 steps: + - name: Check out repository + uses: actions/checkout@v3 + - name: Set up Python uses: actions/setup-python@v4 with: @@ -36,9 +39,7 @@ jobs: key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }} - name: Install dependencies - run: | - cd sdk - poetry install + run: poetry install if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' - name: Code formatting From 73db7e811718b29c6ead35f681dc2a93d9ea2daf Mon Sep 17 00:00:00 2001 From: John Date: Wed, 10 Jan 2024 14:56:07 -0700 Subject: [PATCH 13/18] fix ruff --- sdk/src/beam/cli/configure.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/sdk/src/beam/cli/configure.py b/sdk/src/beam/cli/configure.py index 3d961db55..d5be4d913 100644 --- a/sdk/src/beam/cli/configure.py +++ b/sdk/src/beam/cli/configure.py @@ -1,7 +1,11 @@ import click from beam import terminal -from beam.config import configure_gateway_credentials, load_config_from_file, save_config_to_file +from beam.config import ( + configure_gateway_credentials, + load_config_from_file, + save_config_to_file, +) @click.command() @@ -13,7 +17,11 @@ def configure(name: str, token: str, gateway_host: str, gateway_port: str): config = load_config_from_file() config = configure_gateway_credentials( - config, name=name, gateway_host=gateway_host, gateway_port=gateway_port, token=token + config, + name=name, + gateway_host=gateway_host, + gateway_port=gateway_port, + token=token, ) save_config_to_file( From a91350a5bf1047f8b18fe296482590e4a62860de Mon Sep 17 00:00:00 2001 From: John Date: Wed, 10 Jan 2024 14:56:18 -0700 Subject: [PATCH 14/18] fix ruff --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ecaa32ff9..fa81d9008 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,7 +43,7 @@ jobs: if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' - name: Code formatting - run: poetry run black . --check + run: poetry run ruff format . --exit-zero # todo: remove exit-zero once ruff issues are fixed - name: Code linting From 6f8f4827a3978eb50e5a8b2f3abacc3d385b5007 Mon Sep 17 00:00:00 2001 From: John Date: Wed, 10 Jan 2024 15:00:48 -0700 Subject: [PATCH 15/18] fix ruff --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fa81d9008..c131a3267 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,11 +43,11 @@ jobs: if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' - name: Code formatting - run: poetry run ruff format . --exit-zero + run: poetry run ruff format . # todo: remove exit-zero once ruff issues are fixed - name: Code linting - run: poetry run ruff . --exit-zero + run: poetry run ruff check . - name: Run tests run: make test From c3ddd50b46f1fa96210119a88b8a69810b187436 Mon Sep 17 00:00:00 2001 From: John Date: Wed, 10 Jan 2024 15:06:09 -0700 Subject: [PATCH 16/18] last step --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c131a3267..c9e7de594 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -50,4 +50,4 @@ jobs: run: poetry run ruff check . - name: Run tests - run: make test + run: make tests From 472e38d4502d158653e3a5b5aca52c1b5c18c758 Mon Sep 17 00:00:00 2001 From: John Date: Wed, 10 Jan 2024 15:16:16 -0700 Subject: [PATCH 17/18] ignore auth for ci --- sdk/src/beam/abstractions/base.py | 1 + sdk/src/beam/config.py | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/sdk/src/beam/abstractions/base.py b/sdk/src/beam/abstractions/base.py index 35542c06a..a8647ddce 100644 --- a/sdk/src/beam/abstractions/base.py +++ b/sdk/src/beam/abstractions/base.py @@ -11,6 +11,7 @@ class BaseAbstraction(ABC): def __init__(self) -> None: self.loop: AbstractEventLoop = asyncio.get_event_loop() + self.channel: Channel = get_gateway_channel() def run_sync(self, coroutine: Coroutine) -> Any: diff --git a/sdk/src/beam/config.py b/sdk/src/beam/config.py index bcf00237a..d4f405a56 100644 --- a/sdk/src/beam/config.py +++ b/sdk/src/beam/config.py @@ -140,6 +140,10 @@ def configure_gateway_credentials( def get_gateway_channel() -> Channel: + if os.getenv("CI"): + # Ignore auth for CI + return Channel(host="localhost", port=50051, ssl=False) + config: GatewayConfig = get_gateway_config() channel: Union[AuthenticatedChannel, None] = None From fba252b897a013b5c5a5c210b408ed0812c68380 Mon Sep 17 00:00:00 2001 From: John Date: Wed, 10 Jan 2024 15:16:22 -0700 Subject: [PATCH 18/18] ignore auth for ci --- .github/workflows/ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c9e7de594..1aeee0351 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -50,4 +50,6 @@ jobs: run: poetry run ruff check . - name: Run tests + env: + CI: true run: make tests