diff --git a/docs/usage.rst b/docs/usage.rst index f224957..d3aa78d 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -86,6 +86,9 @@ The decorator takes the same options as the monitor command, and can be set usin .. caution:: If the decorated function is run multiple times, perun will behave as if it was run multiple times, initializing everything multiple times. To avoid this overhead, ensure the decorated function is called a single time. If information about a particular function which runs multiple times is needed, check out the :ref:`monitoring functions` section. +.. caution:: + If due to configuration options, perun is setup to run for multiple rounds, and the decorated function retuns a value, only the result of the last run will be returned. + Application Name and Run ID ~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/perun/core.py b/perun/core.py index 27ab1f3..5439ff6 100644 --- a/perun/core.py +++ b/perun/core.py @@ -281,13 +281,18 @@ def mark_event(self, region_id: str): def monitor_application( self, app: Application, - ): + ) -> Any: """Execute coordination, monitoring, post-processing, and reporting steps, in that order. Parameters ---------- app : Path App script file path + + Returns + ------- + Any + Last result of the application execution, only when the perun decorator is used. """ log.debug(f"Rank {self.comm.Get_rank()} Backends: {pp.pformat(self.backends)}") @@ -314,7 +319,9 @@ def monitor_application( self.warmup_round = True for i in range(self.config.getint("benchmarking", "warmup_rounds")): log.info(f"Warmup run: {i}") - status, _ = self._monitor.run_application(str(i), record=False) + status, _, last_result = self._monitor.run_application( + str(i), record=False + ) if ( status == MonitorStatus.FILE_NOT_FOUND or status == MonitorStatus.SCRIPT_ERROR @@ -325,7 +332,9 @@ def monitor_application( multirun_nodes: Dict[str, DataNode] = {} self.warmup_round = False for i in range(self.config.getint("benchmarking", "rounds")): - status, runNode = self._monitor.run_application(str(i), record=True) + status, runNode, last_result = self._monitor.run_application( + str(i), record=True + ) if status == MonitorStatus.SCRIPT_ERROR: if runNode is not None: @@ -357,6 +366,8 @@ def monitor_application( self._export_multirun(multirun_node) self._run_postprocess_callbacks(multirun_node) + return last_result + def _export_multirun(self, multirun_node: DataNode): data_out = Path(self.config.get("output", "data_out")) app_name = self.config.get("output", "app_name") diff --git a/perun/monitoring/application.py b/perun/monitoring/application.py index cc568bb..07a54dc 100644 --- a/perun/monitoring/application.py +++ b/perun/monitoring/application.py @@ -7,7 +7,7 @@ import subprocess from configparser import ConfigParser from pathlib import Path -from typing import Callable, Dict, Union +from typing import Any, Callable, Dict, Union log = logging.getLogger("perun") @@ -123,9 +123,9 @@ def _cleanup(self): for i in range(3): gc.collect(i) - def run(self): + def run(self) -> Any: """ - Execute the application. + Execute the application. If callable, returns the function result. Raises ------ @@ -141,8 +141,9 @@ def run(self): ) self._cleanup() elif callable(self._app): - self._app(*self._args, **self._kwargs) + result = self._app(*self._args, **self._kwargs) self._cleanup() + return result else: raise ValueError("Application not found") diff --git a/perun/monitoring/monitor.py b/perun/monitoring/monitor.py index 3bc0674..a15c7bb 100644 --- a/perun/monitoring/monitor.py +++ b/perun/monitoring/monitor.py @@ -8,7 +8,7 @@ from multiprocessing import Event, Process, Queue from multiprocessing.synchronize import Event as EventClass from subprocess import Popen -from typing import Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple from perun.backend.backend import Backend from perun.comm import Comm @@ -109,7 +109,7 @@ def run_application( self, run_id: str, record: bool = True, - ) -> Tuple[MonitorStatus, Optional[DataNode]]: + ) -> Tuple[MonitorStatus, Optional[DataNode], Any]: """ Run the application and returns the monitor status and data node. @@ -122,8 +122,8 @@ def run_application( Returns ------- - Tuple[MonitorStatus, Optional[DataNode]] - A tuple containing the monitor status and the data node. + Tuple[MonitorStatus, Optional[DataNode], Any] + A tuple containing the monitor status and the data node, and the application result. Raises ------ @@ -152,7 +152,7 @@ def run_application( else: try: self.status = MonitorStatus.RUNNING - self._app.run() + result = self._app.run() self.status = MonitorStatus.PROCESSING except SystemExit: self.status = MonitorStatus.PROCESSING @@ -167,9 +167,11 @@ def run_application( s, r = getattr(e, "message", str(e)), getattr(e, "message", repr(e)) log.error(f"Rank {self._comm.Get_rank()}: {s}") log.error(f"Rank {self._comm.Get_rank()}: {r}") - return self.status, None + return self.status, None, result - def _run_python_app(self, run_id: str) -> Tuple[MonitorStatus, Optional[DataNode]]: + def _run_python_app( + self, run_id: str + ) -> Tuple[MonitorStatus, Optional[DataNode], Any]: # 1) Get sensor configuration self.sp_ready_event = Event() self.start_event = Event() @@ -212,7 +214,7 @@ def _run_python_app(self, run_id: str) -> Tuple[MonitorStatus, Optional[DataNode starttime_ns = time.time_ns() self.status = MonitorStatus.RUNNING try: - self._app.run() + app_result = self._app.run() except SystemExit: log.info( "The application exited using exit(), quit() or sys.exit(). This is not the recommended way to exit an application, as it complicates the data collection process. Please refactor your code." @@ -231,7 +233,7 @@ def _run_python_app(self, run_id: str) -> Tuple[MonitorStatus, Optional[DataNode f"Rank {self._comm.Get_rank()}: Set start and stop event forcefully" ) recoveredNodes = self._handle_failed_run() - return self.status, recoveredNodes + return self.status, recoveredNodes, None self.status = MonitorStatus.PROCESSING # run_stoptime = datetime.utcnow() @@ -239,9 +241,11 @@ def _run_python_app(self, run_id: str) -> Tuple[MonitorStatus, Optional[DataNode self.stop_event.set() # type: ignore # 4) App finished, stop subrocess and get data - return self.status, self._process_single_run(run_id, starttime_ns) + return self.status, self._process_single_run(run_id, starttime_ns), app_result - def _run_binary_app(self, run_id: str) -> Tuple[MonitorStatus, Optional[DataNode]]: + def _run_binary_app( + self, run_id: str + ) -> Tuple[MonitorStatus, Optional[DataNode], Any]: # 1) Prepare sensors ( timesteps, @@ -287,7 +291,7 @@ def _run_binary_app(self, run_id: str) -> Tuple[MonitorStatus, Optional[DataNode runNode = DataNode(id=run_id, type=NodeType.RUN, nodes={hostNode.id: hostNode}) runNode.addRegionData(globalRegions, starttime_ns) - return MonitorStatus.SUCCESS, runNode + return MonitorStatus.SUCCESS, runNode, None def _handle_failed_run(self) -> Optional[DataNode]: availableRanks = self._comm.check_available_ranks()