diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b4e1df586..3ea3a8050 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,7 +19,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "pypy-3.9"] + python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12", "3.13.0-beta.4", "pypy-3.9"] steps: - uses: actions/checkout@v3 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a454a0223..060f7462e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,7 +2,7 @@ # See https://pre-commit.com/hooks.html for more hooks repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.4.0 + rev: v4.6.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer @@ -13,7 +13,7 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: v0.4.8 + rev: v0.5.5 hooks: # Run the linter. - id: ruff diff --git a/pyproject.toml b/pyproject.toml index 924adee03..2a627e66f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ classifiers = [ "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", ] dynamic = ["version"] @@ -50,10 +51,11 @@ dependencies = [ [project.optional-dependencies] dev = [ "boto3-stubs[s3,swf]", + "cffi==v1.17.0rc1; python_full_version=='3.13.0b4'", # via cryptography via moto, secretstorage "flaky", "hatch==1.7.0", "invoke", - "moto<3.0.0", + "moto>=4.2.8,<5.0.0", "packaging", "pre-commit", "pytest", diff --git a/script/test b/script/test index 12f539aeb..f5af7afb3 100755 --- a/script/test +++ b/script/test @@ -1,5 +1,9 @@ #!/bin/bash +# not needed, but harmless, in CI/container +find . -name '*.pyc' -print0 | xargs -0 rm +export PYTHONDONTWRITEBYTECODE=1 + # The AWS_DEFAULT_REGION parameter determines the region used for SWF # Leaving it to a value different than "us-east-1" would break moto, # because moto.swf only mocks calls to us-east-1 region for now. @@ -26,7 +30,7 @@ export SIMPLEFLOW_VCR_RECORD_MODE=none # Disable jumbo fields export SIMPLEFLOW_JUMBO_FIELDS_BUCKET="" -# Prevent Travis from overriding boto configuration +# Prevent CI from overriding boto configuration export BOTO_CONFIG=/dev/null PYTHON=${PYTHON:-python} diff --git a/simpleflow/swf/mapper/actors/decider.py b/simpleflow/swf/mapper/actors/decider.py index b0b3d2067..4171b53ba 100644 --- a/simpleflow/swf/mapper/actors/decider.py +++ b/simpleflow/swf/mapper/actors/decider.py @@ -61,8 +61,8 @@ def complete( raise DoesNotExistError( f"Unable to complete decision task with token={task_token}", message, - ) - raise ResponseError(message) + ) from e + raise ResponseError(message, error_code=error_code) from e finally: logging_context.reset() @@ -117,9 +117,9 @@ def poll(self, task_list: str | None = None, identity: str | None = None, **kwar raise DoesNotExistError( "Unable to poll decision task", message, - ) + ) from e - raise ResponseError(message) + raise ResponseError(message, error_code=error_code) from e token = task.get("taskToken") if not token: diff --git a/simpleflow/swf/mapper/actors/worker.py b/simpleflow/swf/mapper/actors/worker.py index 92428cf9c..bbae3980e 100644 --- a/simpleflow/swf/mapper/actors/worker.py +++ b/simpleflow/swf/mapper/actors/worker.py @@ -64,8 +64,8 @@ def cancel(self, task_token: str, details: str | None = None) -> dict[str, Any] raise DoesNotExistError( f"Unable to cancel activity task with token={task_token}", message, - ) - raise ResponseError(message) + ) from e + raise ResponseError(message, error_code=error_code) from e finally: logging_context.reset() @@ -87,9 +87,9 @@ def complete(self, task_token: str, result: Any = None) -> dict[str, Any] | None raise DoesNotExistError( f"Unable to complete activity task with token={task_token}", message, - ) + ) from e - raise ResponseError(message) + raise ResponseError(message, error_code=error_code) from e except JumboTooLargeError as e: return self.respond_activity_task_failed(task_token, reason=format_exc(e)) @@ -113,9 +113,9 @@ def fail(self, task_token: str, details: str | None = None, reason: str | None = raise DoesNotExistError( f"Unable to fail activity task with token={task_token}", message, - ) + ) from e - raise ResponseError(message) + raise ResponseError(message, error_code=error_code) from e except JumboTooLargeError as e: return self.respond_activity_task_failed(task_token, reason=format_exc(e)) @@ -137,15 +137,15 @@ def heartbeat(self, task_token: str, details: str | None = None) -> dict[str, An raise DoesNotExistError( f"Unable to send heartbeat with token={task_token}", message, - ) + ) from e if error_code == "ThrottlingException": raise RateLimitExceededError( f"Rate exceeded when sending heartbeat with token={task_token}", message, - ) + ) from e - raise ResponseError(message) + raise ResponseError(message, error_code=error_code) from e def poll(self, task_list: str | None = None, identity: str | None = None) -> Response: """Polls for an activity task to process from current @@ -184,9 +184,9 @@ def poll(self, task_list: str | None = None, identity: str | None = None) -> Res raise DoesNotExistError( "Unable to poll activity task", message, - ) + ) from e - raise ResponseError(message) + raise ResponseError(message, error_code=error_code) from e if not task.get("taskToken"): raise PollTimeout("Activity Worker poll timed out") diff --git a/simpleflow/swf/mapper/exceptions.py b/simpleflow/swf/mapper/exceptions.py index b7c8a9404..5ca6c362a 100644 --- a/simpleflow/swf/mapper/exceptions.py +++ b/simpleflow/swf/mapper/exceptions.py @@ -3,13 +3,13 @@ import re from collections.abc import Sequence from functools import partial, wraps -from typing import Any, Callable +from typing import Any, Callable, Pattern from botocore.exceptions import ClientError class SWFError(Exception): - def __init__(self, message: str, raw_error: str = "", *args) -> None: + def __init__(self, message: str = "", raw_error: str = "", error_code: str = "", *args) -> None: """ Examples: @@ -46,9 +46,16 @@ def __init__(self, message: str, raw_error: str = "", *args) -> None: 'kind' >>> error.details 'details' + >>> error = SWFError('message', error_code='FooFault') + >>> error.message + 'message' + >>> error.error_code + 'FooFault' + >>> error.details + '' """ - Exception.__init__(self, message, *args) + super().__init__(message, *args) values = raw_error.split(":", 1) @@ -59,6 +66,7 @@ def __init__(self, message: str, raw_error: str = "", *args) -> None: self.kind = values[0].strip() self.type_ = self.kind.lower().strip().replace(" ", "_") if self.kind else None + self.error_code = error_code @property def message(self): @@ -105,6 +113,10 @@ class RateLimitExceededError(SWFError): pass +class WorkflowExecutionAlreadyStartedError(SWFError): + pass + + def ignore(*args, **kwargs): return @@ -113,18 +125,15 @@ def ignore(*args, **kwargs): REGEX_NESTED_RESOURCE = re.compile(r"Unknown (?:type|execution)[:,]\s*([^ =]+)\s*=") -def match_equals(regex, string, values): +def match_equals(regex: Pattern, string: str | None, values: str | Sequence[str]) -> bool: """ Extract a value from a string with a regex and compare it. :param regex: to extract the value to check. - :type regex: _sre.SRE_Pattern (compiled regex) :param string: that contains the value to extract. - :type string: str :param values: to compare with. - :type values: [str] """ if string is None: @@ -139,12 +148,11 @@ def match_equals(regex, string, values): return matched[0] in values -def is_unknown_resource_raised(error, *args, **kwargs): +def is_unknown_resource_raised(error: Exception, *args, **kwargs) -> bool: """ Handler that checks if *error* is an unknown resource fault. :param error: is the exception to check. - :type error: Exception """ if not isinstance(error, ClientError): @@ -153,7 +161,7 @@ def is_unknown_resource_raised(error, *args, **kwargs): return extract_error_code(error) == "UnknownResourceFault" -def is_unknown(resource: str | Sequence[str]): +def is_unknown(resource: str | Sequence[str]) -> Callable: """ Return a function that checks if *error* is an unknown *resource* fault. @@ -185,7 +193,7 @@ def wrapped(error, *args, **kwargs): return wrapped -def always(value): +def always(value: Any) -> Callable: """ Always return *value* whatever arguments it got. @@ -212,7 +220,7 @@ def wrapped(*args, **kwargs): return wrapped -def generate_resource_not_found_message(error): +def generate_resource_not_found_message(error: Exception) -> str: error_code = extract_error_code(error) if error_code != "UnknownResourceFault": raise ValueError(f"cannot extract resource from {error}") @@ -222,37 +230,43 @@ def generate_resource_not_found_message(error): return f"Resource {resource[0] if resource else 'unknown'} does not exist" -def raises(exception, when, extract: Callable[[Any], str] = str): +def raises( + exception: type[Exception] | type[SWFError], + when: Callable[[Exception, tuple, dict], bool], + extract: Callable[[Any], str] = str, +): """ :param exception: to raise when the predicate is True. - :type exception: type(Exception) - :param when: predicate to apply. - :type when: (error, *args, **kwargs) -> bool + :param extract: function to extract the value from the exception. """ @wraps(raises) def raises_closure(error, *args, **kwargs): if when(error, *args, **kwargs) is True: - raise exception(extract(error)) - raise error + if isinstance(getattr(error, "response", None), dict) and issubclass(exception, SWFError): + raise exception(extract_message(error), error_code=extract_error_code(error)) from error + + raise exception(extract(error)) from error + raise error from None return raises_closure -def catch(exceptions, handle_with=None, log=False): +def catch( + exceptions: type[Exception] | Sequence[type[Exception]] | tuple[type[Exception]], + handle_with: Callable[[Exception, tuple, dict], Any] | None = None, + log: bool = False, +): """ Catch *exceptions*, then eventually handle and log them. :param exceptions: sequence of exceptions to catch. - :type exceptions: Exception | (Exception, ) :param handle_with: handle the exceptions (if handle_with is not None) or raise them again. - :type handle_with: function(err, *args, **kwargs) :param log: the exception with default logger. - :type log: bool Examples: @@ -307,8 +321,10 @@ def translate(exceptions, to): """ - def throw(err, *args, **kwargs): - raise to(extract_message(err)) + def throw(err: Exception, *args, **kwargs): + if isinstance(getattr(err, "response", None), dict) and issubclass(to, SWFError): + raise to(extract_message(err), error_code=extract_error_code(err)) from err + raise to(extract_message(err)) from err return catch(exceptions, handle_with=throw) diff --git a/simpleflow/swf/mapper/models/activity.py b/simpleflow/swf/mapper/models/activity.py index ff147ca99..83065ff7c 100644 --- a/simpleflow/swf/mapper/models/activity.py +++ b/simpleflow/swf/mapper/models/activity.py @@ -121,9 +121,9 @@ def _diff(self, ignore_fields: list[str] | None = None) -> ModelDiff: error_code = extract_error_code(e) message = extract_message(e) if error_code == "UnknownResourceFault": - raise DoesNotExistError("Remote ActivityType does not exist") + raise DoesNotExistError("Remote ActivityType does not exist") from e - raise ResponseError(message) + raise ResponseError(message, error_code=error_code) from e info = description["typeInfo"] config = description["configuration"] @@ -192,9 +192,9 @@ def save(self): error_code = extract_error_code(e) message = extract_message(e) if error_code == "TypeAlreadyExistsFault": - raise AlreadyExistsError(f"{self} already exists") + raise AlreadyExistsError(f"{self} already exists") from e if error_code in ("UnknownResourceFault", "TypeDeprecatedFault"): - raise DoesNotExistError(f"{error_code}: {message}") + raise DoesNotExistError(f"{error_code}: {message}") from e raise @exceptions.catch( diff --git a/simpleflow/swf/mapper/models/domain.py b/simpleflow/swf/mapper/models/domain.py index e1ccb37f6..5442f5f9b 100644 --- a/simpleflow/swf/mapper/models/domain.py +++ b/simpleflow/swf/mapper/models/domain.py @@ -86,9 +86,9 @@ def _diff(self, ignore_fields: list[str] | None = None) -> ModelDiff: except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "UnknownResourceFault": - raise DoesNotExistError("Remote Domain does not exist") + raise DoesNotExistError("Remote Domain does not exist") from e - raise ResponseError(e.args[0]) + raise ResponseError(e.args[0], error_code=error_code) from e domain_info = description["domainInfo"] domain_config = description["configuration"] @@ -137,8 +137,8 @@ def save(self) -> None: error_code = extract_error_code(e) message = extract_message(e) if error_code == "DomainAlreadyExistsFault": - raise AlreadyExistsError(f"Domain {self.name} already exists amazon-side") - raise ResponseError(message) + raise AlreadyExistsError(f"Domain {self.name} already exists amazon-side") from e + raise ResponseError(message, error_code=error_code) from e @exceptions.translate(ClientError, to=ResponseError) @exceptions.catch( diff --git a/simpleflow/swf/mapper/models/workflow.py b/simpleflow/swf/mapper/models/workflow.py index ae48ce53c..bfc2427d9 100644 --- a/simpleflow/swf/mapper/models/workflow.py +++ b/simpleflow/swf/mapper/models/workflow.py @@ -13,6 +13,7 @@ AlreadyExistsError, DoesNotExistError, ResponseError, + WorkflowExecutionAlreadyStartedError, extract_error_code, extract_message, raises, @@ -132,9 +133,9 @@ def _diff(self, ignore_fields: list[str] | None = None) -> ModelDiff: error_code = extract_error_code(e) message = extract_message(e) if error_code == "UnknownResourceFault": - raise DoesNotExistError("Remote Domain does not exist") + raise DoesNotExistError("Remote Domain does not exist") from e - raise ResponseError(message) + raise ResponseError(message, error_code=error_code) from e workflow_info = description["typeInfo"] workflow_config = description["configuration"] @@ -198,9 +199,9 @@ def save(self) -> None: error_code = extract_error_code(e) message = extract_message(e) if error_code == "TypeAlreadyExistsFault": - raise AlreadyExistsError(f"Workflow type {self.name} already exists amazon-side") + raise AlreadyExistsError(f"Workflow type {self.name} already exists amazon-side") from e if error_code == "UnknownResourceFault": - raise DoesNotExistError(message) + raise DoesNotExistError(message) from e raise def delete(self) -> None: @@ -211,7 +212,7 @@ def delete(self) -> None: error_code = extract_error_code(e) message = extract_message(e) if error_code in ["UnknownResourceFault", "TypeDeprecatedFault"]: - raise DoesNotExistError(message) + raise DoesNotExistError(message) from e def upstream(self) -> WorkflowType: from simpleflow.swf.mapper.querysets.workflow import WorkflowTypeQuerySet @@ -219,6 +220,14 @@ def upstream(self) -> WorkflowType: qs = WorkflowTypeQuerySet(self.domain) return qs.get(self.name, self.version) + @exceptions.catch( + ClientError, + raises( + WorkflowExecutionAlreadyStartedError, + when=lambda error, *args, **kwargs: extract_error_code(error) == "WorkflowExecutionAlreadyStartedFault", + extract=lambda *args, **kwargs: "", + ), + ) def start_execution( self, workflow_id: str | None = None, @@ -391,9 +400,9 @@ def _diff(self, ignore_fields: list[str] | None = None) -> ModelDiff: error_code = extract_error_code(e) message = extract_message(e) if error_code == "UnknownResourceFault": - raise DoesNotExistError("Remote Domain does not exist") + raise DoesNotExistError("Remote Domain does not exist") from e - raise ResponseError(message) + raise ResponseError(message, error_code=error_code) from e execution_info = description["executionInfo"] execution_config = description["executionConfiguration"] diff --git a/simpleflow/swf/mapper/querysets/activity.py b/simpleflow/swf/mapper/querysets/activity.py index 5b37924bc..26bf4c00d 100644 --- a/simpleflow/swf/mapper/querysets/activity.py +++ b/simpleflow/swf/mapper/querysets/activity.py @@ -101,9 +101,9 @@ def get(self, name: str, version: str, *args, **kwargs) -> ActivityType: error_code = extract_error_code(e) message = extract_message(e) if error_code == "UnknownResourceFault": - raise DoesNotExistError(message) + raise DoesNotExistError(message) from e - raise ResponseError(message) + raise ResponseError(message, error_code=error_code) from e activity_info = response[self._infos] activity_config = response["configuration"] diff --git a/simpleflow/swf/mapper/querysets/domain.py b/simpleflow/swf/mapper/querysets/domain.py index b83e07dcb..a47a94e07 100644 --- a/simpleflow/swf/mapper/querysets/domain.py +++ b/simpleflow/swf/mapper/querysets/domain.py @@ -39,9 +39,9 @@ def get(self, name: str, *args, **kwargs) -> Domain: except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "UnknownResourceFault": - raise DoesNotExistError(f"No such domain: {name}") + raise DoesNotExistError(f"No such domain: {name}") from e # Any other errors should raise - raise ResponseError(e.args[0]) + raise ResponseError(e.args[0], error_code=error_code) from e domain_info = response["domainInfo"] domain_config = response["configuration"] diff --git a/simpleflow/swf/mapper/querysets/workflow.py b/simpleflow/swf/mapper/querysets/workflow.py index 596443e9b..e91de7529 100644 --- a/simpleflow/swf/mapper/querysets/workflow.py +++ b/simpleflow/swf/mapper/querysets/workflow.py @@ -120,9 +120,9 @@ def get(self, name, version, *args, **kwargs): error_code = extract_error_code(e) message = extract_message(e) if error_code == "UnknownResourceFault": - raise DoesNotExistError(message) + raise DoesNotExistError(message) from e - raise ResponseError(message) + raise ResponseError(message, error_code=error_code) from e wt_info = response[self._infos] wt_config = response["configuration"] @@ -475,9 +475,9 @@ def get(self, workflow_id, run_id, *args, **kwargs): error_code = extract_error_code(e) message = extract_message(e) if error_code == "UnknownResourceFault": - raise DoesNotExistError(message) + raise DoesNotExistError(message) from e - raise ResponseError(message) + raise ResponseError(message, error_code=error_code) from e execution_info = response[self._infos] execution_config = response["executionConfiguration"] diff --git a/tests/test_simpleflow/swf/mapper/test_exceptions.py b/tests/test_simpleflow/swf/mapper/test_exceptions.py new file mode 100644 index 000000000..9548fa022 --- /dev/null +++ b/tests/test_simpleflow/swf/mapper/test_exceptions.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +import pytest +from botocore.exceptions import ClientError + +from simpleflow.swf.mapper.exceptions import ResponseError, translate + + +def test_translate(): + def raiser(): + raise ClientError( + error_response={ + "Error": { + "Code": "UnknownError", + "Message": "unknown error", + } + }, + operation_name="Foo", + ) + + error_ = translate(ClientError, to=ResponseError)(raiser) + with pytest.raises(ResponseError) as excinfo: + error_() + assert excinfo.value.error_code == "UnknownError" diff --git a/tests/test_simpleflow/test_canvas.py b/tests/test_simpleflow/test_canvas.py index 380eaeb4d..bc770027f 100644 --- a/tests/test_simpleflow/test_canvas.py +++ b/tests/test_simpleflow/test_canvas.py @@ -410,7 +410,7 @@ def test_handle_not_all(self): agg_ex = AggregateException([ZeroDivisionError(), None, memory_error]) def my_handler(ex, a, b=None): - return type(ex) == ZeroDivisionError + return isinstance(ex, ZeroDivisionError) with self.assertRaises(AggregateException) as new_agg_ex: agg_ex.handle(my_handler, 1, b=True) diff --git a/tests/utils/mock_swf_test_case.py b/tests/utils/mock_swf_test_case.py index 015d20f2d..490f821e1 100644 --- a/tests/utils/mock_swf_test_case.py +++ b/tests/utils/mock_swf_test_case.py @@ -4,7 +4,6 @@ import boto3 from moto import mock_s3, mock_swf -from moto.swf import swf_backend from simpleflow.swf.executor import Executor from simpleflow.swf.mapper.actors import Decider @@ -38,12 +37,6 @@ def setUp(self): self.s3_conn = boto3.client("s3", region_name="us-east-1") self.s3_conn.create_bucket(Bucket="jumbo-bucket") - def tearDown(self): - swf_backend.reset() - assert not self.swf_conn.list_domains(registrationStatus="REGISTERED")[ - "domainInfos" - ], "moto state incorrectly reset!" - def register_activity_type(self, func: str, task_list: str): self.swf_conn.register_activity_type(domain=self.domain.name, name=func, version=task_list)