From 67ca7ab7e29863343d7984b441d7b44f7acc0e70 Mon Sep 17 00:00:00 2001 From: tabdar-khan <71217662+tabdar-khan@users.noreply.github.com> Date: Thu, 28 Mar 2024 17:01:26 +0100 Subject: [PATCH 1/2] Add a pre-commit hook (#3150) --- .pre-commit-config.yaml | 18 +++++++++++++ ...-tutorial-get-started-as-a-contributor.rst | 27 +++++++++++++++++++ pyproject.toml | 1 + 3 files changed, 46 insertions(+) create mode 100644 .pre-commit-config.yaml diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 000000000000..ad6cb69f3052 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,18 @@ +repos: + - repo: local + hooks: + - id: format-code + name: Format Code + entry: ./dev/format.sh + language: script + # Ensures the script runs from the repository root: + pass_filenames: false + stages: [commit] + + - id: run-tests + name: Run Tests + entry: ./dev/test.sh + language: script + # Ensures the script runs from the repository root: + pass_filenames: false + stages: [commit] diff --git a/doc/source/contributor-tutorial-get-started-as-a-contributor.rst b/doc/source/contributor-tutorial-get-started-as-a-contributor.rst index 9136fea96bf6..43f9739987ac 100644 --- a/doc/source/contributor-tutorial-get-started-as-a-contributor.rst +++ b/doc/source/contributor-tutorial-get-started-as-a-contributor.rst @@ -102,6 +102,33 @@ Run Linters and Tests $ ./dev/test.sh +Add a pre-commit hook +~~~~~~~~~~~~~~~~~~~~~ + +Developers may integrate a pre-commit hook into their workflow utilizing the `pre-commit `_ library. The pre-commit hook is configured to execute two primary operations: ``./dev/format.sh`` and ``./dev/test.sh`` scripts. + +There are multiple ways developers can use this: + +1. Install the pre-commit hook to your local git directory by simply running: + + :: + + $ pre-commit install + + - Each ``git commit`` will trigger the execution of formatting and linting/test scripts. + - If in a hurry, bypass the hook using ``--no-verify`` with the ``git commit`` command. + :: + + $ git commit --no-verify -m "Add new feature" + +2. For developers who prefer not to install the hook permanently, it is possible to execute a one-time check prior to committing changes by using the following command: + + :: + + $ pre-commit run --all-files + + This executes the formatting and linting checks/tests on all the files without modifying the default behavior of ``git commit``. + Run Github Actions (CI) locally ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/pyproject.toml b/pyproject.toml index dc8b293bc880..3c211e9cf8d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -127,6 +127,7 @@ check-wheel-contents = "==0.4.0" GitPython = "==3.1.32" PyGithub = "==2.1.1" licensecheck = "==2024" +pre-commit = "==3.5.0" [tool.isort] line_length = 88 From 531e0e31991aede82389b6342986836b568989f7 Mon Sep 17 00:00:00 2001 From: Javier Date: Thu, 28 Mar 2024 18:05:12 +0000 Subject: [PATCH 2/2] Handle `ClientApp` exception simulation (#3075) --- .../superlink/fleet/vce/backend/raybackend.py | 9 +-- .../server/superlink/fleet/vce/vce_api.py | 66 ++++++++++++------- .../superlink/fleet/vce/vce_api_test.py | 31 +-------- .../simulation/ray_transport/ray_actor.py | 8 ++- 4 files changed, 55 insertions(+), 59 deletions(-) diff --git a/src/py/flwr/server/superlink/fleet/vce/backend/raybackend.py b/src/py/flwr/server/superlink/fleet/vce/backend/raybackend.py index 8ef0d54622ae..9bede09edf09 100644 --- a/src/py/flwr/server/superlink/fleet/vce/backend/raybackend.py +++ b/src/py/flwr/server/superlink/fleet/vce/backend/raybackend.py @@ -20,7 +20,7 @@ import ray -from flwr.client.client_app import ClientApp, LoadClientAppError +from flwr.client.client_app import ClientApp from flwr.common.context import Context from flwr.common.logger import log from flwr.common.message import Message @@ -151,7 +151,6 @@ async def process_message( ) await future - # Fetch result ( out_mssg, @@ -160,13 +159,15 @@ async def process_message( return out_mssg, updated_context - except LoadClientAppError as load_ex: + except Exception as ex: log( ERROR, "An exception was raised when processing a message by %s", self.__class__.__name__, ) - raise load_ex + # add actor back into pool + await self.pool.add_actor_back_to_pool(future) + raise ex async def terminate(self) -> None: """Terminate all actors in actor pool.""" diff --git a/src/py/flwr/server/superlink/fleet/vce/vce_api.py b/src/py/flwr/server/superlink/fleet/vce/vce_api.py index a693c968d0e8..9736ae0fb57f 100644 --- a/src/py/flwr/server/superlink/fleet/vce/vce_api.py +++ b/src/py/flwr/server/superlink/fleet/vce/vce_api.py @@ -14,9 +14,10 @@ # ============================================================================== """Fleet Simulation Engine API.""" - import asyncio import json +import sys +import time import traceback from logging import DEBUG, ERROR, INFO, WARN from typing import Callable, Dict, List, Optional @@ -24,6 +25,7 @@ from flwr.client.client_app import ClientApp, LoadClientAppError from flwr.client.node_state import NodeState from flwr.common.logger import log +from flwr.common.message import Error from flwr.common.object_ref import load_app from flwr.common.serde import message_from_taskins, message_to_taskres from flwr.proto.task_pb2 import TaskIns # pylint: disable=E0611 @@ -59,6 +61,7 @@ async def worker( """Get TaskIns from queue and pass it to an actor in the pool to execute it.""" state = state_factory.state() while True: + out_mssg = None try: task_ins: TaskIns = await queue.get() node_id = task_ins.task.consumer.node_id @@ -82,24 +85,25 @@ async def worker( task_ins.run_id, context=updated_context ) - # Convert to TaskRes - task_res = message_to_taskres(out_mssg) - # Store TaskRes in state - state.store_task_res(task_res) - except asyncio.CancelledError as e: - log(DEBUG, "Async worker: %s", e) + log(DEBUG, "Terminating async worker: %s", e) break - except LoadClientAppError as app_ex: - log(ERROR, "Async worker: %s", app_ex) - log(ERROR, traceback.format_exc()) - raise - + # Exceptions aren't raised but reported as an error message except Exception as ex: # pylint: disable=broad-exception-caught log(ERROR, ex) log(ERROR, traceback.format_exc()) - break + reason = str(type(ex)) + ":<'" + str(ex) + "'>" + error = Error(code=0, reason=reason) + out_mssg = message.create_error_reply(error=error) + + finally: + if out_mssg: + # Convert to TaskRes + task_res = message_to_taskres(out_mssg) + # Store TaskRes in state + task_res.task.pushed_at = time.time() + state.store_task_res(task_res) async def add_taskins_to_queue( @@ -218,7 +222,7 @@ async def run( await backend.terminate() -# pylint: disable=too-many-arguments,unused-argument,too-many-locals +# pylint: disable=too-many-arguments,unused-argument,too-many-locals,too-many-branches def start_vce( backend_name: str, backend_config_json_stream: str, @@ -300,12 +304,14 @@ def backend_fn() -> Backend: """Instantiate a Backend.""" return backend_type(backend_config, work_dir=app_dir) - log(INFO, "client_app_attr = %s", client_app_attr) - # Load ClientApp if needed def _load() -> ClientApp: if client_app_attr: + + if app_dir is not None: + sys.path.insert(0, app_dir) + app: ClientApp = load_app(client_app_attr, LoadClientAppError) if not isinstance(app, ClientApp): @@ -319,13 +325,23 @@ def _load() -> ClientApp: app_fn = _load - asyncio.run( - run( - app_fn, - backend_fn, - nodes_mapping, - state_factory, - node_states, - f_stop, + try: + # Test if ClientApp can be loaded + _ = app_fn() + + # Run main simulation loop + asyncio.run( + run( + app_fn, + backend_fn, + nodes_mapping, + state_factory, + node_states, + f_stop, + ) ) - ) + except LoadClientAppError as loadapp_ex: + f_stop.set() # set termination event + raise loadapp_ex + except Exception as ex: + raise ex diff --git a/src/py/flwr/server/superlink/fleet/vce/vce_api_test.py b/src/py/flwr/server/superlink/fleet/vce/vce_api_test.py index 9e063e33ff81..66c3c21326d5 100644 --- a/src/py/flwr/server/superlink/fleet/vce/vce_api_test.py +++ b/src/py/flwr/server/superlink/fleet/vce/vce_api_test.py @@ -27,6 +27,7 @@ from unittest import IsolatedAsyncioTestCase from uuid import UUID +from flwr.client.client_app import LoadClientAppError from flwr.common import ( DEFAULT_TTL, GetPropertiesIns, @@ -53,7 +54,6 @@ def terminate_simulation(f_stop: asyncio.Event, sleep_duration: int) -> None: def init_state_factory_nodes_mapping( num_nodes: int, num_messages: int, - erroneous_message: Optional[bool] = False, ) -> Tuple[StateFactory, NodeToPartitionMapping, Dict[UUID, float]]: """Instatiate StateFactory, register nodes and pre-insert messages in the state.""" # Register a state and a run_id in it @@ -68,7 +68,6 @@ def init_state_factory_nodes_mapping( nodes_mapping=nodes_mapping, run_id=run_id, num_messages=num_messages, - erroneous_message=erroneous_message, ) return state_factory, nodes_mapping, expected_results @@ -79,7 +78,6 @@ def register_messages_into_state( nodes_mapping: NodeToPartitionMapping, run_id: int, num_messages: int, - erroneous_message: Optional[bool] = False, ) -> Dict[UUID, float]: """Register `num_messages` into the state factory.""" state: InMemoryState = state_factory.state() # type: ignore @@ -105,11 +103,7 @@ def register_messages_into_state( dst_node_id=dst_node_id, # indicate destination node reply_to_message="", ttl=DEFAULT_TTL, - message_type=( - "a bad message" - if erroneous_message - else MessageTypeLegacy.GET_PROPERTIES - ), + message_type=MessageTypeLegacy.GET_PROPERTIES, ), ) # Convert Message to TaskIns @@ -200,32 +194,13 @@ def test_erroneous_client_app_attr(self) -> None: state_factory, nodes_mapping, _ = init_state_factory_nodes_mapping( num_nodes=num_nodes, num_messages=num_messages ) - with self.assertRaises(RuntimeError): + with self.assertRaises(LoadClientAppError): start_and_shutdown( client_app_attr="totally_fictitious_app:client", state_factory=state_factory, nodes_mapping=nodes_mapping, ) - def test_erroneous_messages(self) -> None: - """Test handling of error in async worker (consumer). - - We register messages which will trigger an error when handling, triggering an - error. - """ - num_messages = 100 - num_nodes = 59 - - state_factory, nodes_mapping, _ = init_state_factory_nodes_mapping( - num_nodes=num_nodes, num_messages=num_messages, erroneous_message=True - ) - - with self.assertRaises(RuntimeError): - start_and_shutdown( - state_factory=state_factory, - nodes_mapping=nodes_mapping, - ) - def test_erroneous_backend_config(self) -> None: """Backend Config should be a JSON stream.""" with self.assertRaises(JSONDecodeError): diff --git a/src/py/flwr/simulation/ray_transport/ray_actor.py b/src/py/flwr/simulation/ray_transport/ray_actor.py index 08d0576e39f0..9773203628ab 100644 --- a/src/py/flwr/simulation/ray_transport/ray_actor.py +++ b/src/py/flwr/simulation/ray_transport/ray_actor.py @@ -493,13 +493,17 @@ async def submit( self._future_to_actor[future] = actor return future + async def add_actor_back_to_pool(self, future: Any) -> None: + """Ad actor assigned to run future back into the pool.""" + actor = self._future_to_actor.pop(future) + await self.pool.put(actor) + async def fetch_result_and_return_actor_to_pool( self, future: Any ) -> Tuple[Message, Context]: """Pull result given a future and add actor back to pool.""" # Get actor that ran job - actor = self._future_to_actor.pop(future) - await self.pool.put(actor) + await self.add_actor_back_to_pool(future) # Retrieve result for object store # Instead of doing ray.get(future) we await it _, out_mssg, updated_context = await future