diff --git a/docs/extending.rst b/docs/extending.rst index f224d90..a7385b9 100644 --- a/docs/extending.rst +++ b/docs/extending.rst @@ -10,4 +10,5 @@ Extending ExEngine extending/add_devices extending/add_events extending/add_notifications - extending/add_storage \ No newline at end of file + extending/add_storage + extending/threading \ No newline at end of file diff --git a/docs/extending/add_devices.rst b/docs/extending/add_devices.rst index c6b16b0..b2ed1ef 100644 --- a/docs/extending/add_devices.rst +++ b/docs/extending/add_devices.rst @@ -199,6 +199,17 @@ then open ``_build/html/index.html`` in a web browser to view the documentation. Advanced Topics ----------------- +Thread Safety and Execution Control +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +ExEngine provides powerful threading capabilities for device implementations, ensuring thread safety and allowing fine-grained control over execution threads. Key features include: + +Automatic thread safety for device methods and attribute access. +The ability to specify execution threads for devices, methods, or events using the @on_thread decorator. +Options to bypass the executor for non-hardware-interacting methods or attributes. + +For a comprehensive guide on ExEngine's threading capabilities, including detailed explanations and usage examples, please refer to the :ref:threading section. + + What inheritance from ``Device`` provides ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/docs/usage/threading.rst b/docs/extending/threading.rst similarity index 51% rename from docs/usage/threading.rst rename to docs/extending/threading.rst index e879b2b..cfe79b7 100644 --- a/docs/usage/threading.rst +++ b/docs/extending/threading.rst @@ -1,3 +1,5 @@ +.. _threading: + Threading in ExEngine ===================== @@ -24,7 +26,7 @@ Common solutions like single-threaded event loops can limit performance, while i ExEngine's Solution for Thread-Safe Device Control -------------------------------------------------- -ExEngine addresses the challenge of thread-safe device control by routing all method calls and attribute accesses of `Device` objects through a common thread pool managed by the `ExecutionEngine`. In other words, when a user calls a method on a device, sets an attribute, or gets an attribute, the call is automatically routed to the `ExecutionEngine` for execution. +ExEngine addresses the challenge of thread-safe device control by routing all method calls and attribute accesses of ``Device`` objects through a common thread pool managed by the ``ExecutionEngine``. In other words, when a user calls a method on a device, sets an attribute, or gets an attribute, the call is automatically routed to the ``ExecutionEngine`` for execution. This allows for simple, seemingly single-threaded user code. That is, users can methods and set attributes in the normal way (e.g. ``device.some_method()``, ``device.some_attribute = value``), from any thread, but the actual execution happens on a thread managed by the executor. @@ -36,9 +38,9 @@ This approach ensures thread safety when using devices from multiple contexts wi While understanding the underlying mechanics isn't essential for regular usage, here's a brief overview: - The core of this solution lies in the `DeviceMetaclass`, which wraps all methods and set/get operations on attributes classes inheriting from `Device` subclasses. + The core of this solution lies in the ``DeviceMetaclass``, which wraps all methods and set/get operations on attributes classes inheriting from ``Device`` subclasses. - When a method is called or an attribute is accessed, instead of executing directly, a corresponding event (like `MethodCallEvent` or `GetAttrEvent`) is created and submitted to the `ExecutionEngine`. The calling thread blocks until the event execution is complete, maintaining the illusion of synchronous operation. + When a method is called or an attribute is accessed, instead of executing directly, a corresponding event (like ``MethodCallEvent`` or ``GetAttrEvent``) is created and submitted to the ``ExecutionEngine``. The calling thread blocks until the event execution is complete, maintaining the illusion of synchronous operation. In other words, calling a function like: @@ -46,7 +48,7 @@ This approach ensures thread safety when using devices from multiple contexts wi some_device.some_method(arg1, arg2) - Gets automatically transformed into a ``MethodCallEvent`` object, which is then submitted to the `ExecutionEngine` for execution, and its result is returned to the calling thread. + Gets automatically transformed into a ``MethodCallEvent`` object, which is then submitted to the ``ExecutionEngine`` for execution, and its result is returned to the calling thread. .. code-block:: python @@ -60,13 +62,13 @@ This approach ensures thread safety when using devices from multiple contexts wi - On an executor thread, the event's `execute` method is called: + On an executor thread, the event's ``execute`` method is called: - .. code-block:: python + .. code-block:: python - def execute(self): - method = getattr(self.instance, self.method_name) - return method(*self.args, **self.kwargs) + def execute(self): + method = getattr(self.instance, self.method_name) + return method(*self.args, **self.kwargs) This process ensures that all device interactions occur on managed threads, preventing concurrent access issues while maintaining a simple API for users. @@ -90,9 +92,101 @@ ExEngine also supports named threads for task-specific execution: engine.submit(readout_event, thread_name="DetectorThread") engine.submit(control_event, thread_name="HardwareControlThread") + +The ExecutionEngine automatically creates the specified threads as needed. You don't need to manually create or manage these threads. + This feature enables logical separation of asynchronous tasks. For instance: - One thread can be dedicated to detector readouts - Another can manage starting, stopping, and controlling other hardware -Using named threads enhances organization and can improve performance in multi-task scenarios. \ No newline at end of file +Using named threads enhances organization and can improve performance in multi-task scenarios. + + + +Using the @on_thread Decorator +------------------------------ + +ExEngine provides a powerful ``@on_thread`` decorator that allows you to specify which thread should execute a particular event, device, or method. This feature gives you fine-grained control over thread assignment without complicating your code. + +Importing the Decorator +^^^^^^^^^^^^^^^^^^^^^^^ + +To use the ``@on_thread`` decorator, import it from ExEngine: + +```python +from exengine import on_thread +``` + +Decorating Events +^^^^^^^^^^^^^^^^^ + +You can use ``@on_thread`` to specify which thread should execute an event: + +.. code-block:: python + + @on_thread("CustomEventThread") + class MyEvent(ExecutorEvent): + def execute(self): + # This will always run on "CustomEventThread" + ... + +Decorating Devices +^^^^^^^^^^^^^^^^^^ + +When applied to a device class, ``@on_thread`` sets the default thread for all methods of that device: + +.. code-block:: python + + @on_thread("DeviceThread") + class MyDevice(Device): + def method1(self): + # This will run on "DeviceThread" + ... + + def method2(self): + # This will also run on "DeviceThread" + ... + +Decorating Methods +^^^^^^^^^^^^^^^^^^ + +You can also apply ``@on_thread`` to individual methods within a device: + +.. code-block:: python + + class MyDevice(Device): + @on_thread("Method1Thread") + def method1(self): + # This will run on "Method1Thread" + ... + + @on_thread("Method2Thread") + def method2(self): + # This will run on "Method2Thread" + ... + + +Priority and Overriding +^^^^^^^^^^^^^^^^^^^^^^^ + +When both a class and a method have ``@on_thread`` decorators, the method-level decorator takes precedence: + +.. code-block:: python + + @on_thread("DeviceThread") + class MyDevice(Device): + def method1(self): + # This will run on "DeviceThread" + ... + + @on_thread("SpecialThread") + def method2(self): + # This will run on "SpecialThread", overriding the class-level decorator + ... + + + +While ``@on_thread`` provides great flexibility, be mindful of potential overhead from excessive thread switching. Use it judiciously, especially for frequently called methods. + + diff --git a/docs/index.rst b/docs/index.rst index e9526df..297d874 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -12,9 +12,9 @@ Key Features: 2. **Adaptable to Multiple Frontends**: Compatible with GUIs, scripts, networked automated labs, and AI-integrated microscopy -3. **Powerful Threading Capabilities**: Utilities for parallelization, asynchronous execution, and complex, multi-device workflows. +3. :ref:`Powerful Threading Capabilities `: Utilities for parallelization, asynchronous execution, and complex, multi-device workflows. -4. **Modality Agnostic**: Adaptable to diverse microscopy techniques thanks to general purpose design. +4. **Modality Agnosticism**: Adaptable to diverse microscopy techniques thanks to general purpose design. 5. **Modular, Reusable Device Instructions**: Building blocks that can be combined to create complex workflows, in order to promote code reuse and simplify experiment design diff --git a/docs/usage.rst b/docs/usage.rst index b62c330..ce62f01 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -10,4 +10,3 @@ Usage usage/installation usage/backends usage/key_features - usage/threading \ No newline at end of file diff --git a/src/exengine/__init__.py b/src/exengine/__init__.py index 1b3cc6d..d393708 100644 --- a/src/exengine/__init__.py +++ b/src/exengine/__init__.py @@ -6,3 +6,4 @@ from ._version import __version__, version_info from .kernel.executor import ExecutionEngine +from .kernel.threading_decorator import on_thread diff --git a/src/exengine/integration_tests/test_imports.py b/src/exengine/integration_tests/test_imports.py index 87eb27c..3fb5af5 100644 --- a/src/exengine/integration_tests/test_imports.py +++ b/src/exengine/integration_tests/test_imports.py @@ -36,6 +36,11 @@ def test_mm_imports(): except ImportError as e: pytest.fail(f"Import failed for MicroManagerDevice: {e}") +def test_onthread_import(): + try: + from exengine import on_thread + except ImportError as e: + pytest.fail(f"Import failed for MicroManagerDevice: {e}") def test_ndstorage_imports(): try: diff --git a/src/exengine/integration_tests/test_preferred_thread_annotations.py b/src/exengine/integration_tests/test_preferred_thread_annotations.py new file mode 100644 index 0000000..808143d --- /dev/null +++ b/src/exengine/integration_tests/test_preferred_thread_annotations.py @@ -0,0 +1,218 @@ +import pytest +import threading +import functools +from exengine.kernel.device import Device +from exengine.kernel.ex_event_base import ExecutorEvent +from exengine.kernel.executor import ExecutionEngine +from exengine import on_thread +from exengine.kernel.executor import _MAIN_THREAD_NAME + + + +class ThreadRecordingEvent(ExecutorEvent): + def execute(self): + return threading.current_thread().name + + +@on_thread("CustomEventThread") +class DecoratedEvent(ThreadRecordingEvent): + pass + + +class TestDevice(Device): + + def __init__(self, name): + super().__init__(name, no_executor_attrs=('_attribute', 'set_attribute_thread', + 'get_attribute_thread', 'regular_method_thread', + 'decorated_method_thread')) + self._attribute = 123 + + @property + def attribute(self): + self.get_attribute_thread = threading.current_thread().name + return self._attribute + + @attribute.setter + def attribute(self, value): + self.set_attribute_thread = threading.current_thread().name + self._attribute = value + + def regular_method(self): + self.regular_method_thread = threading.current_thread().name + + @on_thread("CustomMethodThread") + def decorated_method(self): + self.decorated_method_thread = threading.current_thread().name + + +@on_thread("CustomDeviceThread") +class CustomThreadTestDevice(Device): + + def __init__(self, name): + super().__init__(name, no_executor_attrs=('_attribute', + 'set_attribute_thread', 'get_attribute_thread', + 'regular_method_thread', 'decorated_method_thread')) + self._attribute = 123 + + @property + def attribute(self): + self.get_attribute_thread = threading.current_thread().name + return self._attribute + + @attribute.setter + def attribute(self, value): + self.set_attribute_thread = threading.current_thread().name + self._attribute = value + + def regular_method(self): + self.regular_method_thread = threading.current_thread().name + +@pytest.fixture() +def engine(): + engine = ExecutionEngine() + yield engine + engine.shutdown() + +############################################################ +# Event tests +############################################################ + +def test_undecorated_event(engine): + """ + Test that an undecorated event executes on the main executor thread. + """ + event = ThreadRecordingEvent() + future = engine.submit(event) + result = future.await_execution() + assert result == _MAIN_THREAD_NAME + +def test_decorated_event(engine): + """ + Test that a decorated event executes on the specified custom thread. + """ + event = DecoratedEvent() + future = engine.submit(event) + result = future.await_execution() + assert result == "CustomEventThread" + + +############################################################ +# Device tests +############################################################ +def test_device_attribute_access(engine): + """ + Test that device attribute access runs on the main thread when nothing else specified. + """ + device = TestDevice("TestDevice") + device.attribute = 'something' + assert device.set_attribute_thread == _MAIN_THREAD_NAME + +def test_device_regular_method_access(engine): + """ + Test that device method access runs on the main thread when nothing else specified. + """ + device = TestDevice("TestDevice") + device.regular_method() + assert device.regular_method_thread == _MAIN_THREAD_NAME + +def test_device_decorated_method_access(engine): + """ + Test that device method access runs on the main thread when nothing else specified. + """ + device = TestDevice("TestDevice") + device.decorated_method() + assert device.decorated_method_thread == "CustomMethodThread" + +def test_custom_thread_device_attribute_access(engine): + """ + Test that device attribute access runs on the custom thread when specified. + """ + custom_device = CustomThreadTestDevice("CustomDevice") + custom_device.attribute = 'something' + assert custom_device.set_attribute_thread == "CustomDeviceThread" + +def test_custom_thread_device_property_access(engine): + """ + Test that device property access runs on the custom thread when specified. + """ + custom_device = CustomThreadTestDevice("CustomDevice") + custom_device.attribute = 'something' + assert custom_device.set_attribute_thread == "CustomDeviceThread" + + f = custom_device.attribute + assert custom_device.get_attribute_thread == "CustomDeviceThread" + + +@on_thread("OuterThread") +class OuterThreadDevice(Device): + def __init__(self, name, inner_device): + super().__init__(name) + self.inner_device = inner_device + self.outer_thread = None + + def outer_method(self): + self.outer_thread = threading.current_thread().name + self.inner_device.inner_method() + + +@on_thread("InnerThread") +class InnerThreadDevice(Device): + def __init__(self, name): + super().__init__(name) + self.inner_thread = None + + def inner_method(self): + self.inner_thread = threading.current_thread().name + + +def test_nested_thread_switch(engine): + """ + Test that nested calls to methods with different thread specifications + result in correct thread switches at each level. + """ + inner_device = InnerThreadDevice("InnerDevice") + outer_device = OuterThreadDevice("OuterDevice", inner_device) + + class OuterEvent(ExecutorEvent): + def execute(self): + outer_device.outer_method() + + event = OuterEvent() + + engine.submit(event).await_execution() + + assert outer_device.outer_thread == "OuterThread" + assert inner_device.inner_thread == "InnerThread" + + +def another_decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + return wrapper + + +class MultiDecoratedDevice(Device): + @on_thread("Thread1") + @another_decorator + def method1(self): + return threading.current_thread().name + + @another_decorator + @on_thread("Thread2") + def method2(self): + return threading.current_thread().name + + +def test_multiple_decorators(engine): + """ + Test that the thread decorator works correctly when combined with other decorators. + """ + device = MultiDecoratedDevice("MultiDevice") + + class MultiEvent(ExecutorEvent): + def execute(self): + return device.method1(), device.method2() + + assert engine.submit(MultiEvent()).await_execution() == ("Thread1", "Thread2") \ No newline at end of file diff --git a/src/exengine/integration_tests/work_in_progress/sandbox_test_micromanager_device.py b/src/exengine/integration_tests/work_in_progress/sandbox_test_micromanager_device.py deleted file mode 100644 index a632f62..0000000 --- a/src/exengine/integration_tests/work_in_progress/sandbox_test_micromanager_device.py +++ /dev/null @@ -1,42 +0,0 @@ -from exengine.kernel.data_coords import DataCoordinates -import os -from exengine.kernel.executor import ExecutionEngine -from exengine.kernel.ex_event_base import DataHandler -from exengine.backends.micromanager.mm_device_implementations import MicroManagerCamera -from storage_backends.ndtiff_and_ndram.NDTiffandRAM import NDRAMStorage -from exengine.events.detector_events import StartCapture, ReadoutData -from mmpycorex import create_core_instance, terminate_core_instances, get_default_install_location - -mm_install_dir = get_default_install_location() -config_file = os.path.join(mm_install_dir, 'MMConfig_demo.cfg') -create_core_instance(mm_install_dir, config_file, - buffer_size_mb=1024, max_memory_mb=1024, # set these low for github actions - python_backend=True, - debug=False) - - -executor = ExecutionEngine() - - - -camera = MicroManagerCamera() - -num_images = 100 -# Create a data handle to manage the handoff of data from the camera to the storage backend -storage = NDRAMStorage() -data_handler = DataHandler(storage=storage) - -start_capture_event = StartCapture(num_images=num_images, camera=camera) -readout_images_event = ReadoutData(number=num_images, camera=camera, - data_coordinate_iterator=[DataCoordinates(time=t) for t in range(num_images)], - data_handler=data_handler) -executor.submit(start_capture_event) -future = executor.submit(readout_images_event) - -# Wait for all images to be readout -future.await_execution() -executor.check_exceptions() - -data_handler.finish() -executor.shutdown() -terminate_core_instances() diff --git a/src/exengine/integration_tests/work_in_progress/sbox.py b/src/exengine/integration_tests/work_in_progress/sbox.py deleted file mode 100644 index cb27667..0000000 --- a/src/exengine/integration_tests/work_in_progress/sbox.py +++ /dev/null @@ -1,48 +0,0 @@ -from mmpycorex import create_core_instance, terminate_core_instances, get_default_install_location -from exengine.kernel.data_coords import DataCoordinates -from exengine.backends.micromanager.mm_device_implementations import MicroManagerCamera -import os -from exengine.kernel.executor import ExecutionEngine -from exengine.kernel.ex_event_base import DataHandler -from storage_backends.ndtiff_and_ndram.NDTiffandRAM import NDRAMStorage -from exengine.events.detector_events import StartCapture, ReadoutData - -mm_install_dir = get_default_install_location() -config_file = os.path.join(mm_install_dir, 'MMConfig_demo.cfg') -create_core_instance(mm_install_dir, config_file, - buffer_size_mb=1024, max_memory_mb=1024, # set these low for github actions - python_backend=True, - debug=False) - - -executor = ExecutionEngine() - - - -camera = MicroManagerCamera() - -num_images = 100 -data_handler = DataHandler(storage=NDRAMStorage()) - -start_capture_event = StartCapture(num_images=num_images, camera=camera) -readout_images_event = ReadoutData(number=num_images, camera=camera, - image_coordinate_iterator=[DataCoordinates(time=t) for t in range(num_images)], - data_handler=data_handler) -executor.submit(start_capture_event) -future = executor.submit(readout_images_event) - -future.await_execution() - -data_handler.finish() - -executor.shutdown() -terminate_core_instances() - - - -# # print all threads that are still a -# import threading -# -# for thread in threading.enumerate(): -# print(thread) -# pass \ No newline at end of file diff --git a/src/exengine/kernel/device.py b/src/exengine/kernel/device.py index 735788f..c932e76 100644 --- a/src/exengine/kernel/device.py +++ b/src/exengine/kernel/device.py @@ -6,6 +6,7 @@ from typing import Any, Dict, Callable, Sequence, Optional, List, Tuple, Iterable, Union from weakref import WeakSet from dataclasses import dataclass +import types from .ex_event_base import ExecutorEvent from .executor import ExecutionEngine @@ -54,7 +55,7 @@ def _thread_start(self, *args, **kwargs): # Call this function to initialize the thread patching if not hasattr(threading.Thread, '_monkey_patched_start'): _python_debugger_active, _within_executor_threads, _original_thread_start = _initialize_thread_patching() - _no_executor_attrs = ['_name', '_no_executor', '_no_executor_attrs'] + _no_executor_attrs = ['_name', '_no_executor', '_no_executor_attrs', '_thread_name'] @dataclass @@ -108,14 +109,29 @@ def wrap_for_executor(attr_name, attr_value): if hasattr(attr_value, '_wrapped_for_executor'): return attr_value + # Add this block to handle properties + if isinstance(attr_value, property): + return property( + fget=DeviceMetaclass.wrap_for_executor(f"{attr_name}_getter", attr_value.fget) if attr_value.fget else None, + fset=DeviceMetaclass.wrap_for_executor(f"{attr_name}_setter", attr_value.fset) if attr_value.fset else None, + fdel=DeviceMetaclass.wrap_for_executor(f"{attr_name}_deleter", attr_value.fdel) if attr_value.fdel else None, + doc=attr_value.__doc__ + ) + @wraps(attr_value) def wrapper(self: 'Device', *args: Any, **kwargs: Any) -> Any: if attr_name in _no_executor_attrs or self._no_executor: return attr_value(self, *args, **kwargs) if DeviceMetaclass._is_reroute_exempted_thread(): return attr_value(self, *args, **kwargs) + # check for method-level preferred thread name first, then class-level + thread_name = getattr(attr_value, '_thread_name', None) or getattr(self, '_thread_name', None) + if ExecutionEngine.on_any_executor_thread(): + # check for device-level preferred thread + if thread_name is None or threading.current_thread().name == thread_name: + return attr_value(self, *args, **kwargs) event = MethodCallEvent(method_name=attr_name, args=args, kwargs=kwargs, instance=self) - return ExecutionEngine.get_instance().submit(event).await_execution() + return ExecutionEngine.get_instance().submit(event, thread_name=thread_name).await_execution() wrapper._wrapped_for_executor = True return wrapper @@ -133,8 +149,7 @@ def is_debugger_thread(): @staticmethod def _is_reroute_exempted_thread() -> bool: - return (DeviceMetaclass.is_debugger_thread() or ExecutionEngine.on_any_executor_thread() or - threading.current_thread() in _within_executor_threads) + return (DeviceMetaclass.is_debugger_thread() or threading.current_thread() in _within_executor_threads) @staticmethod def find_in_bases(bases, method_name): @@ -147,10 +162,12 @@ def __new__(mcs, name: str, bases: tuple, attrs: dict) -> Any: new_attrs = {} for attr_name, attr_value in attrs.items(): if not attr_name.startswith('_'): - if callable(attr_value): + if isinstance(attr_value, property): # Property + new_attrs[attr_name] = mcs.wrap_for_executor(attr_name, attr_value) + elif callable(attr_value): # Regular method new_attrs[attr_name] = mcs.wrap_for_executor(attr_name, attr_value) - else: - pass + else: # Attribute + new_attrs[attr_name] = attr_value else: new_attrs[attr_name] = attr_value @@ -174,18 +191,26 @@ def __getattribute__(self: 'Device', name: str) -> Any: return object.__getattribute__(self, name) if DeviceMetaclass._is_reroute_exempted_thread(): return getattribute_with_fallback(self, name) - else: - event = GetAttrEvent(attr_name=name, instance=self, method=getattribute_with_fallback) - return ExecutionEngine.get_instance().submit(event).await_execution() + thread_name = getattr(self, '_thread_name', None) + if ExecutionEngine.on_any_executor_thread(): + # check for device-level preferred thread + if thread_name is None or threading.current_thread().name == thread_name: + return getattribute_with_fallback(self, name) + event = GetAttrEvent(attr_name=name, instance=self, method=getattribute_with_fallback) + return ExecutionEngine.get_instance().submit(event, thread_name=thread_name).await_execution() def __setattr__(self: 'Device', name: str, value: Any) -> None: if name in _no_executor_attrs or self._no_executor: - original_setattr(self, name, value) - elif DeviceMetaclass._is_reroute_exempted_thread(): - original_setattr(self, name, value) - else: - event = SetAttrEvent(attr_name=name, value=value, instance=self, method=original_setattr) - ExecutionEngine.get_instance().submit(event).await_execution() + return original_setattr(self, name, value) + if DeviceMetaclass._is_reroute_exempted_thread(): + return original_setattr(self, name, value) + thread_name = getattr(self, '_thread_name', None) + if ExecutionEngine.on_any_executor_thread(): + # Check for device-level preferred thread + if thread_name is None or threading.current_thread().name == thread_name: + return original_setattr(self, name, value) + event = SetAttrEvent(attr_name=name, value=value, instance=self, method=original_setattr) + ExecutionEngine.get_instance().submit(event, thread_name=thread_name).await_execution() new_attrs['__getattribute__'] = __getattribute__ new_attrs['__setattr__'] = __setattr__ diff --git a/src/exengine/kernel/ex_event_base.py b/src/exengine/kernel/ex_event_base.py index 49f5114..5cd0d1c 100644 --- a/src/exengine/kernel/ex_event_base.py +++ b/src/exengine/kernel/ex_event_base.py @@ -42,12 +42,15 @@ class ExecutorEvent(ABC, metaclass=_ExecutorEventMeta): # Base events just have an event executed event. Subclasses can also add their own lists # of notifications types, and the metaclass will merge them into one big list notification_types: ClassVar[List[Type[Notification]]] = [EventExecutedNotification] + _thread_name: Optional[str] = None def __init__(self, *args, **kwargs): super().__init__() self._num_retries_on_exception = 0 self._finished = False self._initialized = False + # Check for method-level preferred thread name first, then class-level + self._thread_name = getattr(self.execute, '_thread_name', None) or getattr(self.__class__, '_thread_name', None) def _pre_execution(self, engine) -> ExecutionFuture: """ diff --git a/src/exengine/kernel/executor.py b/src/exengine/kernel/executor.py index cda63f0..7defde8 100644 --- a/src/exengine/kernel/executor.py +++ b/src/exengine/kernel/executor.py @@ -236,7 +236,8 @@ def submit(self, event_or_events: Union[ExecutorEvent, Iterable[ExecutorEvent]], # TODO: transpile events pass - futures = tuple(self._submit_single_event(event, thread_name, use_free_thread, prioritize) + futures = tuple(self._submit_single_event(event, thread_name or getattr(event_or_events[0], '_thread_name', None), + use_free_thread, prioritize) for event in event_or_events) if len(futures) == 1: return futures[0] @@ -290,7 +291,6 @@ def shutdown(self): thread.shutdown() for thread in self._thread_managers.values(): thread.join() - self._thread_managers = None # Make sure the notification thread is stopped if self._notification_thread is not None: diff --git a/src/exengine/kernel/test/test_executor.py b/src/exengine/kernel/test/test_executor.py index 05baafd..9689cdb 100644 --- a/src/exengine/kernel/test/test_executor.py +++ b/src/exengine/kernel/test/test_executor.py @@ -22,11 +22,13 @@ def execution_engine(): # Tests for automated rerouting of method calls to the ExecutionEngine to executor threads ############################################################################################# counter = 1 -class MockDevice(Device): +class TestDevice(Device): def __init__(self): global counter - super().__init__(name=f'mock_device_{counter}') + super().__init__(name=f'mock_device_{counter}', no_executor_attrs=('property_getter_monitor', 'property_setter_monitor')) counter += 1 + self.property_getter_monitor = False + self.property_setter_monitor = False self._test_attribute = None def test_method(self): @@ -44,27 +46,40 @@ def get_test_attribute(self): assert threading.current_thread().execution_engine_thread return self._test_attribute + @property + def test_property(self): + assert ExecutionEngine.on_any_executor_thread() + self.property_getter_monitor = True + return self._test_attribute + + @test_property.setter + def test_property(self, value): + assert ExecutionEngine.on_any_executor_thread() + self.property_setter_monitor = True + self._test_attribute = value + + def test_device_method_execution(execution_engine): - mock_device = MockDevice() + mock_device = TestDevice() result = mock_device.test_method() assert result is True def test_device_attribute_setting(execution_engine): - mock_device = MockDevice() + mock_device = TestDevice() mock_device.set_test_attribute("test_value") result = mock_device.get_test_attribute() assert result == "test_value" def test_device_attribute_direct_setting(execution_engine): - mock_device = MockDevice() + mock_device = TestDevice() mock_device.direct_set_attribute = "direct_test_value" assert mock_device.direct_set_attribute == "direct_test_value" def test_multiple_method_calls(execution_engine): - mock_device = MockDevice() + mock_device = TestDevice() result1 = mock_device.test_method() mock_device.set_test_attribute("test_value") @@ -73,6 +88,17 @@ def test_multiple_method_calls(execution_engine): assert result1 is True assert result2 == "test_value" +def test_device_property_getter(execution_engine): + mock_device = TestDevice() + + _ = mock_device.test_property + assert mock_device.property_getter_monitor + +def test_device_property_setter(execution_engine): + mock_device = TestDevice() + + mock_device.test_property = "test_value" + assert mock_device.property_setter_monitor ####################################################### # Tests for internal threads in Devices @@ -204,9 +230,11 @@ def create_sync_event(start_event, finish_event): event.executed = False event.executed_time = None event.execute_count = 0 + event.executed_thread_name = None + event._thread_name = None def execute(): - event.thread_name = threading.current_thread().name + event.executed_thread_name = threading.current_thread().name start_event.set() # Signal that the execution has started finish_event.wait() # Wait for the signal to finish event.executed_time = time.time() @@ -390,7 +418,7 @@ def test_submit_to_main_thread(execution_engine): start_event.wait() finish_event.set() - assert event.thread_name == _MAIN_THREAD_NAME + assert event.executed_thread_name == _MAIN_THREAD_NAME def test_submit_to_new_anonymous_thread(execution_engine): """ @@ -415,8 +443,8 @@ def test_submit_to_new_anonymous_thread(execution_engine): finish_event1.set() finish_event2.set() - assert event1.thread_name == _MAIN_THREAD_NAME - assert event2.thread_name.startswith(_ANONYMOUS_THREAD_NAME) + assert event1.executed_thread_name == _MAIN_THREAD_NAME + assert event2.executed_thread_name.startswith(_ANONYMOUS_THREAD_NAME) assert len(execution_engine._thread_managers) == 2 # Main thread + 1 anonymous thread def test_multiple_anonymous_threads(execution_engine): @@ -444,7 +472,7 @@ def test_multiple_anonymous_threads(execution_engine): for finish_event in finish_events: finish_event.set() - thread_names = set(event.thread_name for event in events) + thread_names = set(event.executed_thread_name for event in events) assert len(thread_names) == num_events # Each event should be on a different thread assert all(name.startswith(_ANONYMOUS_THREAD_NAME) or name == _MAIN_THREAD_NAME for name in thread_names) assert len(execution_engine._thread_managers) == num_events # num_events anonymous threads @@ -475,5 +503,5 @@ def test_reuse_named_thread(execution_engine): for start_event in start_events: start_event.wait() - assert all(event.thread_name == thread_name for event in events) + assert all(event.executed_thread_name == thread_name for event in events) assert len(execution_engine._thread_managers) == 2 # Main thread + 1 custom named thread \ No newline at end of file diff --git a/src/exengine/kernel/threading_decorator.py b/src/exengine/kernel/threading_decorator.py new file mode 100644 index 0000000..3a4f420 --- /dev/null +++ b/src/exengine/kernel/threading_decorator.py @@ -0,0 +1,21 @@ +import functools +from typing import Optional, Union, Type, Callable + +def on_thread(thread_name: Optional[str] = None): + """ + Decorator to specify the preferred thread name for a class or method. + """ + def decorator(obj: Union[Type, Callable]): + if isinstance(obj, type): + # It's a class + obj._thread_name = thread_name + return obj + else: + # It's a method or function + @functools.wraps(obj) + def wrapper(*args, **kwargs): + return obj(*args, **kwargs) + wrapper._thread_name = thread_name + return wrapper + + return decorator \ No newline at end of file