Skip to content

Commit

Permalink
Merge pull request #25 from henrypinkard/main
Browse files Browse the repository at this point in the history
Callables as events
  • Loading branch information
henrypinkard authored Aug 26, 2024
2 parents 79c83a5 + f79925d commit 7b49751
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 82 deletions.
4 changes: 3 additions & 1 deletion docs/apis.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ Micro-Manager Devices
:members:


Implemented Events
.. _standard_events:

Standard Events
==================

Detector Events
Expand Down
39 changes: 26 additions & 13 deletions docs/usage/events.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,50 @@
Events
======

Events in ExEngine are the fundamental units of experimental workflows. They represent discrete tasks or operations that can be submitted to the ExecutionEngine for execution. Events provide a flexible and modular way to construct complex experimental workflows, ranging from simple hardware commands to sophisticated multi-step procedures that may involve data analysis.

Events in ExEngine are the fundamental units of experimental workflows. They represent discrete tasks or operations that can be submitted to the ExecutionEngine for processing. Events provide a flexible and modular way to construct complex experimental workflows, ranging from simple hardware commands to sophisticated multi-step procedures that may involve data analysis.
ExEngine supports two types of events:

The work of an event is fully contained in its execute method. This method can be called directly to run the event on the current thread:
- ``Callable`` objects (methods/functions/lambdas) for simple tasks
- ``ExecutorEvent`` subclasses for complex operations


Simple Events: Callable Objects
-------------------------------
For straightforward tasks, you can submit a callable object directly:

.. code-block:: python
from exengine.events.positioner_events import SetPosition2DEvent
def simple_task():
do_something()
engine.submit(simple_task)
# Create an event
move_event = SetPosition2DEvent(device=xy_stage, position=(10.0, 20.0))
# Execute the event directly on the current thread
move_event.execute()
More commonly, events are submitted to the execution engine to be executed asynchronously:
ExecutorEvent Objects
----------------------

For more complex operations, use ExecutorEvent subclasses. These provide additional capabilities like notifications and data handling:

.. code-block:: python
from exengine import ExecutionEngine
from exengine.events.positioner_events import SetPosition2DEvent
move_event = SetPosition2DEvent(device=xy_stage, position=(10.0, 20.0))
future = engine.submit(move_event)
engine = ExecutionEngine.get_instance()
# Submit the event to the execution engine
.. code-block:: python
# Asynchronous execution
future = engine.submit(move_event)
# This returns immediately, allowing other operations to continue
The power of this approach lies in its ability to separate the definition of what takes place from the details of how it is executed. While the event defines the operation to be performed, the execution engine manages the scheduling and execution of events across multiple threads. This separation allows for complex workflows to be built up from simple, reusable components, while the execution engine manages the details of scheduling execution, and error handling.

The power of this approach lies in its ability to separate the definition of what takes place from the details of how it is executed. While the event defines the operation to be performed, the execution engine manages the scheduling and execution of events across multiple threads. This separation allows for complex workflows to be built up from simple, reusable components, while the execution engine manages the details of scheduling and resource allocation.
A list of available events can be found in the :ref:`standard_events` section.

Monitoring Event Progress
--------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/exengine/_version.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version_info = (0, 1, 1)
version_info = (0, 2, 0)
__version__ = ".".join(map(str, version_info))
34 changes: 28 additions & 6 deletions src/exengine/kernel/ex_event_base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import warnings
from typing import Optional, Any,ClassVar, Type, List, Dict, Union, Iterable
from typing import Optional, Any,ClassVar, Type, List, Dict, Union, Iterable, Callable
from abc import ABC, abstractmethod, ABCMeta
import weakref
from .notification_base import Notification
import inspect
from functools import partial

from .notification_base import Notification
from .notification_base import EventExecutedNotification

# if TYPE_CHECKING: # avoid circular imports
from .ex_future import ExecutionFuture


Expand Down Expand Up @@ -103,8 +103,30 @@ def _post_execution(self, return_value: Optional[Any] = None, exception: Optiona
if self._future_weakref is None:
raise Exception("Future not set for event")
future = self._future_weakref()
if future is not None:
future._notify_execution_complete(return_value, exception)
self.finished = True
self._engine.publish_notification(EventExecutedNotification(payload=exception))
if future is not None:
future._notify_execution_complete(return_value, exception)



class AnonymousCallableEvent(ExecutorEvent):
"""
An event that wraps a callable object and calls it when the event is executed.
The callable object should take no arguments and optionally return a value.
"""
def __init__(self, callable_obj: Callable[[], Any]):
super().__init__()
self.callable_obj = callable_obj
# Check if the callable has no parameters (except for 'self' in case of methods)
if not callable(callable_obj):
raise TypeError("Callable object must be a function or method")
signature = inspect.signature(callable_obj)
if not all(param.default != param.empty or param.kind == param.VAR_POSITIONAL for param in
signature.parameters.values()):
raise TypeError("Callable object must take no arguments")


def execute(self):
return self.callable_obj()
26 changes: 16 additions & 10 deletions src/exengine/kernel/ex_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,16 +120,22 @@ def await_data(self, coordinates: Optional[Union[DataCoordinates, Dict[str, Unio
this function is called before the data is acquired, the data may have already been saved and not readily
available in RAM. In this case, the data will be read from disk.
:param coordinates: A single DataCoordinates object/dictionary, or Sequence (i.e. list or tuple) of
DataCoordinates objects/dictionaries. If None, this function will block until the next data is
acquired/processed/saved
:param return_data: whether to return the data
:param return_metadata: whether to return the metadata
:param processed: whether to wait until data has been processed. If not data processor is in use, then this
parameter has no effect
:param stored: whether to wait for data that has been stored. If the call to await data occurs before the data
gets passed off to the storage_backends class, then it will be stored in memory and returned immediately without having to retrieve
Parameters:
------------
coordinates: Union[DataCoordinates, Dict[str, Union[int, str]], DataCoordinatesIterator, Sequence[DataCoordinates], Sequence[Dict[str, Union[int, str]]]
A single DataCoordinates object/dictionary, or Sequence (i.e. list or tuple) of
DataCoordinates objects/dictionaries. If None, this function will block until the next data is
acquired/processed/saved
return_data: bool
whether to return the data
return_metadata: bool
whether to return the metadata
processed: bool
whether to wait until data has been processed. If not data processor is in use, then this
parameter has no effect
stored: bool
whether to wait for data that has been stored. If the call to await data occurs before the data
gets passed off to the storage_backends class, then it will be stored in memory and returned immediately without having to retrieve
"""

coordinates_iterator = DataCoordinatesIterator.create(coordinates)
Expand Down
57 changes: 27 additions & 30 deletions src/exengine/kernel/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
import traceback
from typing import Union, Iterable, List, Callable, Any, Type
import queue
import inspect

from .notification_base import Notification, NotificationCategory
from .ex_event_base import ExecutorEvent
from .ex_event_base import ExecutorEvent, AnonymousCallableEvent
from .ex_future import ExecutionFuture

from .data_handler import DataHandler
Expand Down Expand Up @@ -178,32 +179,23 @@ def check_exceptions(self):
raise MultipleExceptions(exceptions)

def submit(self, event_or_events: Union[ExecutorEvent, Iterable[ExecutorEvent]], thread_name=None,
transpile: bool = True, prioritize: bool = False, use_free_thread: bool = False,
data_handler: DataHandler = None) -> Union[ExecutionFuture, Iterable[ExecutionFuture]]:
prioritize: bool = False, use_free_thread: bool = False) -> Union[ExecutionFuture, Iterable[ExecutionFuture]]:
"""
Submit one or more acquisition events for execution.
Submit one or more acquisition events or callable objects for execution.
This method handles the submission of acquisition events to be executed on active threads. It provides
options for event prioritization, thread allocation, and performance optimization.
This method handles the submission of acquisition events or callable objects to be executed on active threads.
It provides options for event prioritization, thread allocation, and performance optimization.
Execution Behavior:
- By default, all events are executed on a single thread in submission order to prevent concurrency issues.
- Events can be parallelized across different threads using the 'use_free_thread' parameter.
- Priority execution can be requested using the 'prioritize' parameter.
Parameters:
-----------
event_or_events : Union[ExecutorEvent, Iterable[ExecutorEvent]]
A single ExecutorEvent or an iterable of ExecutorEvents to be submitted.
event_or_events : Union[ExecutorEvent, Iterable[ExecutorEvent], Callable[[], Any], Iterable[Callable[[], Any]]]
A single ExecutorEvent, an iterable of ExecutorEvents, or a callable object with no arguments.
thread_name : str, optional (default=None)
Name of the thread to submit the event to. If None, the thread is determined by the
'use_free_thread' parameter.
transpile : bool, optional (default=True)
If True and multiple events are submitted, attempt to optimize them for better performance.
This may result in events being combined or reorganized.
prioritize : bool, optional (default=False)
If True, execute the event(s) before any others in the queue on its assigned thread.
Useful for system-wide changes affecting other events, like hardware adjustments.
Expand All @@ -213,32 +205,37 @@ def submit(self, event_or_events: Union[ExecutorEvent, Iterable[ExecutorEvent]],
Useful for operations like cancelling or stopping events awaiting signals.
If False, execute on the primary thread.
data_handler : DataHandler, optional (default=None)
Object to handle data and metadata produced by DataProducingExecutorEvents.
Returns:
--------
Union[AcquisitionFuture, Iterable[AcquisitionFuture]]
For a single event: returns a single AcquisitionFuture.
For multiple events: returns an Iterable of AcquisitionFutures.
Note: The number of returned futures may differ from the input if transpilation occurs.
For a single event or callable: returns a single ExecutionFuture.
For multiple events: returns an Iterable of ExecutionFutures.
Notes:
------
- Transpilation may optimize multiple events, potentially altering their number or structure.
- Use 'prioritize' for critical system changes that should occur before other queued events.
- 'use_free_thread' is essential for operations that need to run independently, like cancellation events.
- If a callable object with no arguments is submitted, it will be automatically wrapped in a AnonymousCallableEvent.
"""
if isinstance(event_or_events, ExecutorEvent):
# Auto convert single callable to event
if callable(event_or_events) and len(inspect.signature(event_or_events).parameters) == 0:
event_or_events = AnonymousCallableEvent(event_or_events)

if isinstance(event_or_events, (ExecutorEvent, Callable)):
event_or_events = [event_or_events]

if transpile:
# TODO: transpile events
pass
events = []
for event in event_or_events:
if callable(event):
events.append(AnonymousCallableEvent(event))
elif isinstance(event, ExecutorEvent):
events.append(event)
else:
raise TypeError(f"Invalid event type: {type(event)}. "
f"Expected ExecutorEvent or callable with no arguments.")

futures = tuple(self._submit_single_event(event, thread_name or getattr(event_or_events[0], '_thread_name', None),
use_free_thread, prioritize)
for event in event_or_events)
futures = tuple(self._submit_single_event(event, thread_name or getattr(event, '_thread_name', None),
use_free_thread, prioritize) for event in events)
if len(futures) == 1:
return futures[0]
return futures
Expand Down
80 changes: 59 additions & 21 deletions src/exengine/kernel/test/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,28 +222,28 @@ def test_device_threadpool_executor(execution_engine):
#######################################################
# Tests for other ExecutionEngine functionalities
#######################################################

class SyncEvent(ExecutorEvent):

def __init__(self, start_event, finish_event):
super().__init__()
self.executed = False
self.executed_time = None
self.execute_count = 0
self.executed_thread_name = None
self.start_event = start_event
self.finish_event = finish_event

def execute(self):
self.executed_thread_name = threading.current_thread().name
self.start_event.set() # Signal that the execution has started
self.finish_event.wait() # Wait for the signal to finish
self.executed_time = time.time()
self.execute_count += 1
self.executed = True

def create_sync_event(start_event, finish_event):
event = MagicMock(spec=ExecutorEvent)
event._finished = False
event._initialized = False
event.num_retries_on_exception = 0
event.executed = False
event.executed_time = None
event.execute_count = 0
event.executed_thread_name = None
event._thread_name = None

def execute():
event.executed_thread_name = threading.current_thread().name
start_event.set() # Signal that the execution has started
finish_event.wait() # Wait for the signal to finish
event.executed_time = time.time()
event.execute_count += 1
event.executed = True

event.execute.side_effect = execute
event._post_execution = MagicMock()
return event
return SyncEvent(start_event, finish_event)


def test_submit_single_event(execution_engine):
Expand Down Expand Up @@ -397,6 +397,44 @@ def test_single_execution_with_free_thread(execution_engine):
assert event1.execute_count == 1
assert event2.execute_count == 1

#### Callable submission tests ####
def test_submit_callable(execution_engine):
def simple_function():
return 42

future = execution_engine.submit(simple_function)
result = future.await_execution()
assert result == 42

def test_submit_lambda(execution_engine):
future = execution_engine.submit(lambda: "Hello, World!")
result = future.await_execution()
assert result == "Hello, World!"

def test_class_method(execution_engine):
class TestClass:
def test_method(self):
return "Test method executed"

future = execution_engine.submit(TestClass().test_method)
result = future.await_execution()
assert result == "Test method executed"

def test_submit_mixed(execution_engine):
class TestEvent(ExecutorEvent):
def execute(self):
return "Event executed"

futures = execution_engine.submit([TestEvent(), lambda: 42, lambda: "Lambda"])
results = [future.await_execution() for future in futures]
assert results == ["Event executed", 42, "Lambda"]

def test_submit_invalid(execution_engine):
with pytest.raises(TypeError):
execution_engine.submit(lambda x: x + 1) # Callable with arguments should raise TypeError

with pytest.raises(TypeError):
execution_engine.submit("Not a callable") # Non-callable, non-ExecutorEvent should raise TypeError

#######################################################
# Tests for named thread functionalities ##############
Expand Down

0 comments on commit 7b49751

Please sign in to comment.