Skip to content

Commit

Permalink
Improve graphql expected vs unexpected err handling
Browse files Browse the repository at this point in the history
Don't unnecessarily process graphql errors
Tidy resolvers
  • Loading branch information
MetRonnie committed Aug 10, 2022
1 parent 53a04bc commit d7f5a0a
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 51 deletions.
16 changes: 14 additions & 2 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,15 @@ def __str__(self):

class ClientError(CylcError):

def __init__(self, message: str, traceback: Optional[str] = None):
def __init__(
self,
message: str,
traceback: Optional[str] = None,
workflow: Optional[str] = None
):
self.message = message
self.traceback = traceback
self.workflow = workflow

def __str__(self) -> str:
ret = self.message
Expand All @@ -277,7 +283,13 @@ def __str__(self):


class ClientTimeout(CylcError):
pass

def __init__(self, message: str, workflow: Optional[str] = None):
self.message = message
self.workflow = workflow

def __str__(self) -> str:
return self.message


class CyclingError(CylcError):
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/network/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ async def async_request(
return response.content
if response.err:
raise ClientError(*response.err)
raise ClientError(f"Received invalid response: {response}")
raise ClientError("No response from server. Check the workflow log.")

def serial_request(
self,
Expand Down
72 changes: 41 additions & 31 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,17 @@ async def get_workflows_data(self, args: Dict[str, Any]):
async def get_workflows(self, args):
"""Return workflow elements."""
return sort_elements(
[flow[WORKFLOW]
for flow in await self.get_workflows_data(args)],
args)
[workflow[WORKFLOW]
for workflow in await self.get_workflows_data(args)],
args
)

async def get_workflow_ids(self, w_args: Dict[str, Any]) -> List[str]:
"""Return workflow ids for matching workflows in the data store."""
return [
workflow[WORKFLOW].id
for workflow in await self.get_workflows_data(w_args)
]

# nodes
def get_node_state(self, node, node_type):
Expand Down Expand Up @@ -622,6 +630,13 @@ async def flow_delta_processed(self, context, op_id):
sub_id, w_id = context['ops_queue'][op_id].get(False)
self.delta_processing_flows[sub_id].remove(w_id)

@property
def _no_matching_workflows_response(self) -> GenericResponse:
workflows = list(self.data_store_mgr.data.keys())
return GenericResponse(
success=False, message=f'No matching workflow in {workflows}'
)

@abstractmethod
async def mutator(
self,
Expand All @@ -631,6 +646,7 @@ async def mutator(
kwargs: Dict[str, Any],
meta: Dict[str, Any]
) -> List[GenericResponse]:
"""Method for mutating workflow."""
...


Expand All @@ -653,43 +669,37 @@ async def mutator(
meta: Dict[str, Any]
) -> List[GenericResponse]:
"""Mutate workflow."""
w_ids = [flow[WORKFLOW].id
for flow in await self.get_workflows_data(w_args)]
w_ids = await self.get_workflow_ids(w_args)
if not w_ids:
workflows = list(self.data_store_mgr.data.keys())
ret = GenericResponse(
success=False, message=f'No matching workflow in {workflows}'
)
return [ret]
return [self._no_matching_workflows_response]
w_id = w_ids[0]
result = await self._mutation_mapper(command, kwargs, meta)
if result is None:
result = (True, 'Command queued')
ret = GenericResponse(w_id, *result)
return [ret]
return [GenericResponse(w_id, *result)]

async def _mutation_mapper(
self, command: str, kwargs: Dict[str, Any], meta: Dict[str, Any]
) -> Optional[Tuple[bool, str]]:
) -> Tuple[bool, str]:
"""Map between GraphQL resolvers and internal command interface."""
result: Optional[Tuple[bool, str]] = None
method = getattr(self, command, None)
if method is not None:
return method(**kwargs)
try:
self.schd.get_command_method(command)
except AttributeError:
raise ValueError(f"Command '{command}' not found")
if command != "put_messages":
log_msg = f"[command] {command}"
user = meta.get('auth_user', self.schd.owner)
if user != self.schd.owner:
log_msg += (f" (issued by {user})")
LOG.info(log_msg)
self.schd.queue_command(
command,
kwargs
)
return None
result = method(**kwargs)
else:
try:
self.schd.get_command_method(command)
except AttributeError:
raise ValueError(f"Command '{command}' not found")
if command != "put_messages":
log_msg = f"[command] {command}"
user = meta.get('auth_user', self.schd.owner)
if user != self.schd.owner:
log_msg += (f" (issued by {user})")
LOG.info(log_msg)
self.schd.queue_command(
command,
kwargs
)
return result or (True, "Command queued")

def broadcast(
self,
Expand Down
17 changes: 15 additions & 2 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import logging
from operator import attrgetter
from textwrap import dedent
import traceback
from typing import (
TYPE_CHECKING,
AsyncGenerator,
Expand All @@ -38,6 +39,7 @@
)
from graphene.types.generic import GenericScalar
from graphene.utils.str_converters import to_snake_case
from graphql.error import GraphQLError

from cylc.flow.broadcast_mgr import ALL_CYCLE_POINTS_STRS, addict
from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE
Expand Down Expand Up @@ -1221,7 +1223,7 @@ async def mutator(
workflows: Optional[List[str]] = None,
exworkflows: Optional[List[str]] = None,
**kwargs: Any
) -> List[GenericResponse]:
) -> Mutation:
"""Call the resolver method that act on the workflow service
via the internal command queue.
Expand Down Expand Up @@ -1252,7 +1254,18 @@ async def mutator(
info.context.get('resolvers') # type: ignore[union-attr]
)
meta = info.context.get('meta') # type: ignore[union-attr]
res = await resolvers.mutator(info, command, w_args, kwargs, meta)
try:
res = await resolvers.mutator(info, command, w_args, kwargs, meta)
except Exception as exc: # Unexpected exception
if isinstance(exc, GraphQLError):
raise exc
# Wrap exception to make it easier to debug
raise GraphQLError(
f"{type(exc).__name__}: {exc}",
extensions={
'traceback': traceback.format_tb(exc.__traceback__)
}
)
return info.return_type.graphene_type( # type: ignore[union-attr]
results=res
)
Expand Down
14 changes: 0 additions & 14 deletions cylc/flow/network/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,20 +382,6 @@ def graphql(
)
except Exception as exc:
raise GraphQLError(f"ERROR: GraphQL execution error \n{exc}")
if executed.errors:
for i, excp in enumerate(executed.errors):
if isinstance(excp, GraphQLError):
error = excp
else:
error = GraphQLError(message=str(excp))
if hasattr(excp, '__traceback__'):
import traceback
extensions = error.extensions or {}
extensions['traceback'] = traceback.format_exception(
excp.__class__, excp, excp.__traceback__
)
error.extensions = extensions
executed.errors[i] = error
return format_execution_result(executed)

# UIServer Data Commands
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ async def test_mutation_mapper(mock_flow):
"""Test the mapping of mutations to internal command methods."""
meta = {}
response = await mock_flow.resolvers._mutation_mapper('pause', {}, meta)
assert response is None
assert response == (True, 'Command queued')
with pytest.raises(ValueError):
await mock_flow.resolvers._mutation_mapper('non_exist', {}, meta)

Expand Down

0 comments on commit d7f5a0a

Please sign in to comment.