diff --git a/flatsurvey/limits.py b/flatsurvey/limits.py index a2a51ec..e7bc7b7 100644 --- a/flatsurvey/limits.py +++ b/flatsurvey/limits.py @@ -1,17 +1,94 @@ +r""" +Utilities to watch resource consumption on Linux machines. + +EXAMPLES: + +Limits can be used directly with the ``check`` method:: + + >>> from flatsurvey.limits import TimeLimit + >>> limit = TimeLimit(TimeLimit.parse_limit('1s')) + >>> limit.check() + True + + >>> import time + >>> time.sleep(1) + >>> limit.check() + False + +Limits can also run in the background in async workflows:: + + >>> from flatsurvey.limits import TimeLimit, LimitChecker + + >>> import asyncio + + >>> async def main(): + ... LimitChecker(TimeLimit(TimeLimit.parse_limit('100ms')), lambda: print("callback executed"), period=1).start() + ... print("working...") + ... await asyncio.sleep(2) + ... print("done.") + + >>> asyncio.run(main()) + working... + callback executed + done. + +""" +# ********************************************************************* +# This file is part of flatsurvey. +# +# Copyright (C) 2024 Julian RĂ¼th +# +# flatsurvey is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# flatsurvey is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with flatsurvey. If not, see . +# ********************************************************************* import logging logger = logging.getLogger() class Limit: + r""" + Abstract base class for a limited resource that can be checked. + """ def __init__(self, limit): self._limit = limit def check(self): + r""" + Return whether the limit has been reached. + + EXAMPLES:: + + >>> from flatsurvey.limits import TimeLimit + >>> limit = TimeLimit(TimeLimit.parse_limit('1ms')) + >>> limit.check() + True + + >>> import time + >>> time.sleep(.001) + >>> limit.check() + False + + """ raise NotImplementedError("this Limit does not implement check() yet") class LimitChecker: + r""" + Schedules a ``callback`` once the ``limit`` has been exceeded. + + This can only be used in an ``async`` environment. + """ def __init__(self, limit, callback, period=30): self._limit = limit self._callback = callback @@ -20,6 +97,10 @@ def __init__(self, limit, callback, period=30): self._task = None def start(self): + r""" + Schedule the ``limit`` to be checked every ``period`` seconds in the + async event loop. + """ assert self._task is None import asyncio @@ -28,22 +109,23 @@ def start(self): self._task = loop.create_task(self._run()) def stop(self): + r""" + Do not schedule this limit check any further. + """ if self._task is not None: self._task.cancel() self._task = None - def check(self): - if not self._limit.check(): - self._callback() - self.stop() - async def _run(self): try: while True: - self.check() + assert self._task is not None - import asyncio + if not self._limit.check(): + self._callback() + self.stop() + import asyncio await asyncio.sleep(self._period) except Exception as e: import traceback @@ -54,6 +136,9 @@ async def _run(self): class TimeLimit(Limit): + r""" + A wall time limit. + """ def __init__(self, limit): super().__init__(limit) @@ -61,11 +146,43 @@ def __init__(self, limit): @staticmethod def parse_limit(limit): + r""" + Helper method to parse ``limit`` into a Python time delta. + + EXAMPLES:: + + >>> from flatsurvey.limits import TimeLimit + >>> TimeLimit.parse_limit('100ms') + datetime.timedelta(microseconds=100000) + + """ import pandas return pandas.Timedelta(limit).to_pytimedelta() def check(self): + r""" + Return whether the wall time has advanced by the configured limit since + this method was called for the first time. + + EXAMPLES:: + + >>> from flatsurvey.limits import TimeLimit + >>> limit = TimeLimit(TimeLimit.parse_limit('10ms')) + + >>> import time + >>> time.sleep(.01) + + >>> limit.check() + True + + >>> import time + >>> time.sleep(.01) + + >>> limit.check() + False + + """ import time now = time.time() @@ -82,6 +199,24 @@ def check(self): class MemoryLimit(Limit): + r""" + A limit on the total RAM and swap memory used by this process. + + EXMAPLES:: + + >>> from flatsurvey.limits import MemoryLimit + >>> limit = MemoryLimit(2**28) + >>> limit.check() + True + + Each int uses four bytes, so we exceed the memory limit by allocating that + many ints:: + + >>> A = [0] * 2**26 + >>> limit.check() + False + + """ @staticmethod def parse_limit(limit): import psutil