diff --git a/temporalio/worker/_activity.py b/temporalio/worker/_activity.py index c55ff245..9c4889c9 100644 --- a/temporalio/worker/_activity.py +++ b/temporalio/worker/_activity.py @@ -508,6 +508,7 @@ async def _run_activity( completion.result.failed.failure.message = ( f"Failed building exception result: {inner_err}" ) + completion.result.failed.failure.application_failure_info.SetInParent() # Do final completion try: diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index bc4f5393..c33796ef 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -442,6 +442,7 @@ def activate( self._current_completion.failed.failure.message = ( f"Failed converting activation exception: {inner_err}" ) + self._current_completion.failed.failure.application_failure_info.SetInParent() def is_completion(command): return ( diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 02ac81e2..facf9b30 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -74,6 +74,7 @@ ) from temporalio.converter import ( DataConverter, + DefaultFailureConverter, DefaultFailureConverterWithEncodedAttributes, DefaultPayloadConverter, PayloadCodec, @@ -84,6 +85,7 @@ ApplicationError, CancelledError, ChildWorkflowError, + FailureError, TemporalError, TimeoutError, WorkflowAlreadyStartedError, @@ -6451,6 +6453,86 @@ async def test_concurrent_sleeps_use_proper_options( await handle.query("__temporal_workflow_metadata") +class BadFailureConverterError(Exception): + pass + + +class BadFailureConverter(DefaultFailureConverter): + def to_failure( + self, + exception: BaseException, + payload_converter: PayloadConverter, + failure: Failure, + ) -> None: + if isinstance(exception, BadFailureConverterError): + raise RuntimeError("Intentional failure conversion error") + super().to_failure(exception, payload_converter, failure) + + +@activity.defn +async def bad_failure_converter_activity() -> None: + raise BadFailureConverterError + + +@workflow.defn(sandboxed=False) +class BadFailureConverterWorkflow: + @workflow.run + async def run(self, fail_workflow_task) -> None: + if fail_workflow_task: + raise BadFailureConverterError + else: + await workflow.execute_activity( + bad_failure_converter_activity, + schedule_to_close_timeout=timedelta(seconds=30), + retry_policy=RetryPolicy(maximum_attempts=1), + ) + + +async def test_bad_failure_converter(client: Client): + config = client.config() + config["data_converter"] = dataclasses.replace( + config["data_converter"], + failure_converter_class=BadFailureConverter, + ) + client = Client(**config) + async with new_worker( + client, BadFailureConverterWorkflow, activities=[bad_failure_converter_activity] + ) as worker: + # Check activity + with pytest.raises(WorkflowFailureError) as err: + await client.execute_workflow( + BadFailureConverterWorkflow.run, + False, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + assert isinstance(err.value.cause, ActivityError) + assert isinstance(err.value.cause.cause, ApplicationError) + assert ( + err.value.cause.cause.message + == "Failed building exception result: Intentional failure conversion error" + ) + + # Check workflow + handle = await client.start_workflow( + BadFailureConverterWorkflow.run, + True, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + + async def task_failed_message() -> Optional[str]: + async for e in handle.fetch_history_events(): + if e.HasField("workflow_task_failed_event_attributes"): + return e.workflow_task_failed_event_attributes.failure.message + return None + + await assert_eq_eventually( + "Failed converting activation exception: Intentional failure conversion error", + task_failed_message, # type: ignore + ) + + @workflow.defn class SignalsActivitiesTimersUpdatesTracingWorkflow: """