Skip to content

Commit

Permalink
fix: result returned from perun decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
JuanPedroGHM committed Oct 18, 2024
1 parent 17067e1 commit 238746c
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 19 deletions.
3 changes: 3 additions & 0 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ The decorator takes the same options as the monitor command, and can be set usin
.. caution::
If the decorated function is run multiple times, perun will behave as if it was run multiple times, initializing everything multiple times. To avoid this overhead, ensure the decorated function is called a single time. If information about a particular function which runs multiple times is needed, check out the :ref:`monitoring functions` section.

.. caution::
If due to configuration options, perun is setup to run for multiple rounds, and the decorated function retuns a value, only the result of the last run will be returned.


Application Name and Run ID
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
17 changes: 14 additions & 3 deletions perun/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,18 @@ def mark_event(self, region_id: str):
def monitor_application(
self,
app: Application,
):
) -> Any:
"""Execute coordination, monitoring, post-processing, and reporting steps, in that order.
Parameters
----------
app : Path
App script file path
Returns
-------
Any
Last result of the application execution, only when the perun decorator is used.
"""
log.debug(f"Rank {self.comm.Get_rank()} Backends: {pp.pformat(self.backends)}")

Expand All @@ -314,7 +319,9 @@ def monitor_application(
self.warmup_round = True
for i in range(self.config.getint("benchmarking", "warmup_rounds")):
log.info(f"Warmup run: {i}")
status, _ = self._monitor.run_application(str(i), record=False)
status, _, last_result = self._monitor.run_application(
str(i), record=False
)
if (
status == MonitorStatus.FILE_NOT_FOUND
or status == MonitorStatus.SCRIPT_ERROR
Expand All @@ -325,7 +332,9 @@ def monitor_application(
multirun_nodes: Dict[str, DataNode] = {}
self.warmup_round = False
for i in range(self.config.getint("benchmarking", "rounds")):
status, runNode = self._monitor.run_application(str(i), record=True)
status, runNode, last_result = self._monitor.run_application(
str(i), record=True
)

if status == MonitorStatus.SCRIPT_ERROR:
if runNode is not None:
Expand Down Expand Up @@ -357,6 +366,8 @@ def monitor_application(
self._export_multirun(multirun_node)
self._run_postprocess_callbacks(multirun_node)

return last_result

def _export_multirun(self, multirun_node: DataNode):
data_out = Path(self.config.get("output", "data_out"))
app_name = self.config.get("output", "app_name")
Expand Down
9 changes: 5 additions & 4 deletions perun/monitoring/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import subprocess
from configparser import ConfigParser
from pathlib import Path
from typing import Callable, Dict, Union
from typing import Any, Callable, Dict, Union

log = logging.getLogger("perun")

Expand Down Expand Up @@ -123,9 +123,9 @@ def _cleanup(self):
for i in range(3):
gc.collect(i)

def run(self):
def run(self) -> Any:
"""
Execute the application.
Execute the application. If callable, returns the function result.
Raises
------
Expand All @@ -141,8 +141,9 @@ def run(self):
)
self._cleanup()
elif callable(self._app):
self._app(*self._args, **self._kwargs)
result = self._app(*self._args, **self._kwargs)
self._cleanup()
return result
else:
raise ValueError("Application not found")

Expand Down
28 changes: 16 additions & 12 deletions perun/monitoring/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from multiprocessing import Event, Process, Queue
from multiprocessing.synchronize import Event as EventClass
from subprocess import Popen
from typing import Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple

from perun.backend.backend import Backend
from perun.comm import Comm
Expand Down Expand Up @@ -109,7 +109,7 @@ def run_application(
self,
run_id: str,
record: bool = True,
) -> Tuple[MonitorStatus, Optional[DataNode]]:
) -> Tuple[MonitorStatus, Optional[DataNode], Any]:
"""
Run the application and returns the monitor status and data node.
Expand All @@ -122,8 +122,8 @@ def run_application(
Returns
-------
Tuple[MonitorStatus, Optional[DataNode]]
A tuple containing the monitor status and the data node.
Tuple[MonitorStatus, Optional[DataNode], Any]
A tuple containing the monitor status and the data node, and the application result.
Raises
------
Expand Down Expand Up @@ -152,7 +152,7 @@ def run_application(
else:
try:
self.status = MonitorStatus.RUNNING
self._app.run()
result = self._app.run()
self.status = MonitorStatus.PROCESSING
except SystemExit:
self.status = MonitorStatus.PROCESSING
Expand All @@ -167,9 +167,11 @@ def run_application(
s, r = getattr(e, "message", str(e)), getattr(e, "message", repr(e))
log.error(f"Rank {self._comm.Get_rank()}: {s}")
log.error(f"Rank {self._comm.Get_rank()}: {r}")
return self.status, None
return self.status, None, result

def _run_python_app(self, run_id: str) -> Tuple[MonitorStatus, Optional[DataNode]]:
def _run_python_app(
self, run_id: str
) -> Tuple[MonitorStatus, Optional[DataNode], Any]:
# 1) Get sensor configuration
self.sp_ready_event = Event()
self.start_event = Event()
Expand Down Expand Up @@ -212,7 +214,7 @@ def _run_python_app(self, run_id: str) -> Tuple[MonitorStatus, Optional[DataNode
starttime_ns = time.time_ns()
self.status = MonitorStatus.RUNNING
try:
self._app.run()
app_result = self._app.run()
except SystemExit:
log.info(
"The application exited using exit(), quit() or sys.exit(). This is not the recommended way to exit an application, as it complicates the data collection process. Please refactor your code."
Expand All @@ -231,17 +233,19 @@ def _run_python_app(self, run_id: str) -> Tuple[MonitorStatus, Optional[DataNode
f"Rank {self._comm.Get_rank()}: Set start and stop event forcefully"
)
recoveredNodes = self._handle_failed_run()
return self.status, recoveredNodes
return self.status, recoveredNodes, None

self.status = MonitorStatus.PROCESSING
# run_stoptime = datetime.utcnow()
log.info(f"Rank {self._comm.Get_rank()}: App Stopped")
self.stop_event.set() # type: ignore

# 4) App finished, stop subrocess and get data
return self.status, self._process_single_run(run_id, starttime_ns)
return self.status, self._process_single_run(run_id, starttime_ns), app_result

def _run_binary_app(self, run_id: str) -> Tuple[MonitorStatus, Optional[DataNode]]:
def _run_binary_app(
self, run_id: str
) -> Tuple[MonitorStatus, Optional[DataNode], Any]:
# 1) Prepare sensors
(
timesteps,
Expand Down Expand Up @@ -287,7 +291,7 @@ def _run_binary_app(self, run_id: str) -> Tuple[MonitorStatus, Optional[DataNode
runNode = DataNode(id=run_id, type=NodeType.RUN, nodes={hostNode.id: hostNode})
runNode.addRegionData(globalRegions, starttime_ns)

return MonitorStatus.SUCCESS, runNode
return MonitorStatus.SUCCESS, runNode, None

def _handle_failed_run(self) -> Optional[DataNode]:
availableRanks = self._comm.check_available_ranks()
Expand Down

0 comments on commit 238746c

Please sign in to comment.