From d7f5a0af76adcbff257ad4b133a93740c51b9c83 Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Mon, 31 Jan 2022 16:00:37 +0000 Subject: [PATCH] Improve graphql expected vs unexpected err handling Don't unnecessarily process graphql errors Tidy resolvers --- cylc/flow/exceptions.py | 16 ++++++- cylc/flow/network/client.py | 2 +- cylc/flow/network/resolvers.py | 72 ++++++++++++++++------------- cylc/flow/network/schema.py | 17 ++++++- cylc/flow/network/server.py | 14 ------ tests/integration/test_resolvers.py | 2 +- 6 files changed, 72 insertions(+), 51 deletions(-) diff --git a/cylc/flow/exceptions.py b/cylc/flow/exceptions.py index d280d0e7f0d..fb78b539981 100644 --- a/cylc/flow/exceptions.py +++ b/cylc/flow/exceptions.py @@ -254,9 +254,15 @@ def __str__(self): class ClientError(CylcError): - def __init__(self, message: str, traceback: Optional[str] = None): + def __init__( + self, + message: str, + traceback: Optional[str] = None, + workflow: Optional[str] = None + ): self.message = message self.traceback = traceback + self.workflow = workflow def __str__(self) -> str: ret = self.message @@ -277,7 +283,13 @@ def __str__(self): class ClientTimeout(CylcError): - pass + + def __init__(self, message: str, workflow: Optional[str] = None): + self.message = message + self.workflow = workflow + + def __str__(self) -> str: + return self.message class CyclingError(CylcError): diff --git a/cylc/flow/network/client.py b/cylc/flow/network/client.py index ea5b6aad51f..a3876279ddc 100644 --- a/cylc/flow/network/client.py +++ b/cylc/flow/network/client.py @@ -217,7 +217,7 @@ async def async_request( return response.content if response.err: raise ClientError(*response.err) - raise ClientError(f"Received invalid response: {response}") + raise ClientError("No response from server. Check the workflow log.") def serial_request( self, diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index cb6633b60b8..39b8c639518 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -335,9 +335,17 @@ async def get_workflows_data(self, args: Dict[str, Any]): async def get_workflows(self, args): """Return workflow elements.""" return sort_elements( - [flow[WORKFLOW] - for flow in await self.get_workflows_data(args)], - args) + [workflow[WORKFLOW] + for workflow in await self.get_workflows_data(args)], + args + ) + + async def get_workflow_ids(self, w_args: Dict[str, Any]) -> List[str]: + """Return workflow ids for matching workflows in the data store.""" + return [ + workflow[WORKFLOW].id + for workflow in await self.get_workflows_data(w_args) + ] # nodes def get_node_state(self, node, node_type): @@ -622,6 +630,13 @@ async def flow_delta_processed(self, context, op_id): sub_id, w_id = context['ops_queue'][op_id].get(False) self.delta_processing_flows[sub_id].remove(w_id) + @property + def _no_matching_workflows_response(self) -> GenericResponse: + workflows = list(self.data_store_mgr.data.keys()) + return GenericResponse( + success=False, message=f'No matching workflow in {workflows}' + ) + @abstractmethod async def mutator( self, @@ -631,6 +646,7 @@ async def mutator( kwargs: Dict[str, Any], meta: Dict[str, Any] ) -> List[GenericResponse]: + """Method for mutating workflow.""" ... @@ -653,43 +669,37 @@ async def mutator( meta: Dict[str, Any] ) -> List[GenericResponse]: """Mutate workflow.""" - w_ids = [flow[WORKFLOW].id - for flow in await self.get_workflows_data(w_args)] + w_ids = await self.get_workflow_ids(w_args) if not w_ids: - workflows = list(self.data_store_mgr.data.keys()) - ret = GenericResponse( - success=False, message=f'No matching workflow in {workflows}' - ) - return [ret] + return [self._no_matching_workflows_response] w_id = w_ids[0] result = await self._mutation_mapper(command, kwargs, meta) - if result is None: - result = (True, 'Command queued') - ret = GenericResponse(w_id, *result) - return [ret] + return [GenericResponse(w_id, *result)] async def _mutation_mapper( self, command: str, kwargs: Dict[str, Any], meta: Dict[str, Any] - ) -> Optional[Tuple[bool, str]]: + ) -> Tuple[bool, str]: """Map between GraphQL resolvers and internal command interface.""" + result: Optional[Tuple[bool, str]] = None method = getattr(self, command, None) if method is not None: - return method(**kwargs) - try: - self.schd.get_command_method(command) - except AttributeError: - raise ValueError(f"Command '{command}' not found") - if command != "put_messages": - log_msg = f"[command] {command}" - user = meta.get('auth_user', self.schd.owner) - if user != self.schd.owner: - log_msg += (f" (issued by {user})") - LOG.info(log_msg) - self.schd.queue_command( - command, - kwargs - ) - return None + result = method(**kwargs) + else: + try: + self.schd.get_command_method(command) + except AttributeError: + raise ValueError(f"Command '{command}' not found") + if command != "put_messages": + log_msg = f"[command] {command}" + user = meta.get('auth_user', self.schd.owner) + if user != self.schd.owner: + log_msg += (f" (issued by {user})") + LOG.info(log_msg) + self.schd.queue_command( + command, + kwargs + ) + return result or (True, "Command queued") def broadcast( self, diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 16a464fbae1..46e3a29156a 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -22,6 +22,7 @@ import logging from operator import attrgetter from textwrap import dedent +import traceback from typing import ( TYPE_CHECKING, AsyncGenerator, @@ -38,6 +39,7 @@ ) from graphene.types.generic import GenericScalar from graphene.utils.str_converters import to_snake_case +from graphql.error import GraphQLError from cylc.flow.broadcast_mgr import ALL_CYCLE_POINTS_STRS, addict from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE @@ -1221,7 +1223,7 @@ async def mutator( workflows: Optional[List[str]] = None, exworkflows: Optional[List[str]] = None, **kwargs: Any -) -> List[GenericResponse]: +) -> Mutation: """Call the resolver method that act on the workflow service via the internal command queue. @@ -1252,7 +1254,18 @@ async def mutator( info.context.get('resolvers') # type: ignore[union-attr] ) meta = info.context.get('meta') # type: ignore[union-attr] - res = await resolvers.mutator(info, command, w_args, kwargs, meta) + try: + res = await resolvers.mutator(info, command, w_args, kwargs, meta) + except Exception as exc: # Unexpected exception + if isinstance(exc, GraphQLError): + raise exc + # Wrap exception to make it easier to debug + raise GraphQLError( + f"{type(exc).__name__}: {exc}", + extensions={ + 'traceback': traceback.format_tb(exc.__traceback__) + } + ) return info.return_type.graphene_type( # type: ignore[union-attr] results=res ) diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py index 1649345864e..ed5e36a7805 100644 --- a/cylc/flow/network/server.py +++ b/cylc/flow/network/server.py @@ -382,20 +382,6 @@ def graphql( ) except Exception as exc: raise GraphQLError(f"ERROR: GraphQL execution error \n{exc}") - if executed.errors: - for i, excp in enumerate(executed.errors): - if isinstance(excp, GraphQLError): - error = excp - else: - error = GraphQLError(message=str(excp)) - if hasattr(excp, '__traceback__'): - import traceback - extensions = error.extensions or {} - extensions['traceback'] = traceback.format_exception( - excp.__class__, excp, excp.__traceback__ - ) - error.extensions = extensions - executed.errors[i] = error return format_execution_result(executed) # UIServer Data Commands diff --git a/tests/integration/test_resolvers.py b/tests/integration/test_resolvers.py index d58c6b89212..6eafc2dfc39 100644 --- a/tests/integration/test_resolvers.py +++ b/tests/integration/test_resolvers.py @@ -217,7 +217,7 @@ async def test_mutation_mapper(mock_flow): """Test the mapping of mutations to internal command methods.""" meta = {} response = await mock_flow.resolvers._mutation_mapper('pause', {}, meta) - assert response is None + assert response == (True, 'Command queued') with pytest.raises(ValueError): await mock_flow.resolvers._mutation_mapper('non_exist', {}, meta)