-
Notifications
You must be signed in to change notification settings - Fork 25
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Reduce boilerplate in StandardReadable
This reduces the amount of duplication and repetition when adding signals to a StandardReadable. As part of this, classes defining the types of signal have been created, which control the behaviour of the Signal being registered Signals must be registered either using the "add_children_as_readables" contextmanager, or the "add_readables" function.
- Loading branch information
1 parent
669dc7d
commit a0675e5
Showing
10 changed files
with
209 additions
and
77 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,74 +1,168 @@ | ||
from typing import Dict, Sequence, Tuple | ||
from contextlib import contextmanager | ||
from typing import Dict, Generator, List, Optional, Sequence, Type, Union | ||
|
||
from bluesky.protocols import Descriptor, Reading, Stageable | ||
from bluesky.protocols import Descriptor, HasHints, Hints, Reading, Stageable | ||
|
||
from ophyd_async.protocols import AsyncConfigurable, AsyncReadable | ||
from ophyd_async.protocols import AsyncConfigurable, AsyncReadable, AsyncStageable | ||
|
||
from .async_status import AsyncStatus | ||
from .device import Device | ||
from .signal import SignalR | ||
from .utils import merge_gathered_dicts | ||
|
||
|
||
class StandardReadable(Device, AsyncReadable, AsyncConfigurable, Stageable): | ||
class StandardReadable( | ||
Device, AsyncReadable, AsyncConfigurable, AsyncStageable, HasHints | ||
): | ||
"""Device that owns its children and provides useful default behavior. | ||
- When its name is set it renames child Devices | ||
- Signals can be registered for read() and read_configuration() | ||
- These signals will be subscribed for read() between stage() and unstage() | ||
""" | ||
|
||
_read_signals: Tuple[SignalR, ...] = () | ||
_configuration_signals: Tuple[SignalR, ...] = () | ||
_read_uncached_signals: Tuple[SignalR, ...] = () | ||
_readables: List[AsyncReadable] = [] | ||
_configurables: List[AsyncConfigurable] = [] | ||
_stageables: List[AsyncStageable] = [] | ||
|
||
def set_readable_signals( | ||
self, | ||
read: Sequence[SignalR] = (), | ||
config: Sequence[SignalR] = (), | ||
read_uncached: Sequence[SignalR] = (), | ||
): | ||
""" | ||
Parameters | ||
---------- | ||
read: | ||
Signals to make up :meth:`~StandardReadable.read` | ||
conf: | ||
Signals to make up :meth:`~StandardReadable.read_configuration` | ||
read_uncached: | ||
Signals to make up :meth:`~StandardReadable.read` that won't be cached | ||
""" | ||
self._read_signals = tuple(read) | ||
self._configuration_signals = tuple(config) | ||
self._read_uncached_signals = tuple(read_uncached) | ||
_hints: Hints = {} | ||
|
||
@AsyncStatus.wrap | ||
async def stage(self) -> None: | ||
for sig in self._read_signals + self._configuration_signals: | ||
for sig in self._stageables: | ||
await sig.stage().task | ||
|
||
@AsyncStatus.wrap | ||
async def unstage(self) -> None: | ||
for sig in self._read_signals + self._configuration_signals: | ||
for sig in self._stageables: | ||
await sig.unstage().task | ||
|
||
async def describe_configuration(self) -> Dict[str, Descriptor]: | ||
return await merge_gathered_dicts( | ||
[sig.describe() for sig in self._configuration_signals] | ||
[sig.describe_configuration() for sig in self._configurables] | ||
) | ||
|
||
async def read_configuration(self) -> Dict[str, Reading]: | ||
return await merge_gathered_dicts( | ||
[sig.read() for sig in self._configuration_signals] | ||
[sig.read_configuration() for sig in self._configurables] | ||
) | ||
|
||
async def describe(self) -> Dict[str, Descriptor]: | ||
return await merge_gathered_dicts( | ||
[sig.describe() for sig in self._read_signals + self._read_uncached_signals] | ||
) | ||
return await merge_gathered_dicts([sig.describe() for sig in self._readables]) | ||
|
||
async def read(self) -> Dict[str, Reading]: | ||
return await merge_gathered_dicts( | ||
[sig.read() for sig in self._read_signals] | ||
+ [sig.read(cached=False) for sig in self._read_uncached_signals] | ||
) | ||
return await merge_gathered_dicts([sig.read() for sig in self._readables]) | ||
|
||
@property | ||
def hints(self) -> Hints: | ||
return self._hints | ||
|
||
@contextmanager | ||
def add_children_as_readables( | ||
self, | ||
wrapper: Optional[Type[Union["ConfigSignal", "HintedSignal"]]] = None, | ||
) -> Generator[None, None, None]: | ||
dict_copy = self.__dict__.copy() | ||
|
||
yield | ||
|
||
# Set symmetric difference operator gives all newly added items | ||
new_attributes = dict_copy.items() ^ self.__dict__.items() | ||
new_signals: List[SignalR] = [x[1] for x in new_attributes] | ||
|
||
self._wrap_signals(wrapper, new_signals) | ||
|
||
def add_readables( | ||
self, | ||
wrapper: Type[Union["ConfigSignal", "HintedSignal"]], | ||
*signals: SignalR, | ||
) -> None: | ||
|
||
self._wrap_signals(wrapper, signals) | ||
|
||
def _wrap_signals( | ||
self, | ||
wrapper: Optional[Type[Union["ConfigSignal", "HintedSignal"]]], | ||
signals: Sequence[SignalR], | ||
): | ||
|
||
for signal in signals: | ||
obj: Union[SignalR, "ConfigSignal", "HintedSignal"] = signal | ||
if wrapper: | ||
obj = wrapper(signal) | ||
|
||
if isinstance(obj, AsyncReadable): | ||
self._readables.append(obj) | ||
|
||
if isinstance(obj, AsyncConfigurable): | ||
self._configurables.append(obj) | ||
|
||
if isinstance(obj, AsyncStageable): | ||
self._stageables.append(obj) | ||
|
||
if isinstance(obj, HasHints): | ||
new_hint = obj.hints | ||
|
||
# Merge the existing and new hints, based on the type of the value. | ||
# This avoids default dict merge behaviour that overrided the values; | ||
# we want to combine them when they are Sequences, and ensure they are | ||
# identical when string values. | ||
for key, value in new_hint.items(): | ||
if isinstance(value, Sequence): | ||
if key in self._hints: | ||
self._hints[key] = ( # type: ignore[literal-required] | ||
self._hints[key] # type: ignore[literal-required] | ||
+ value | ||
) | ||
else: | ||
self._hints[key] = value # type: ignore[literal-required] | ||
elif isinstance(value, str): | ||
if key in self._hints: | ||
assert ( | ||
self._hints[key] # type: ignore[literal-required] | ||
== value | ||
), "Hints value may not be overridden" | ||
else: | ||
self._hints[key] = value # type: ignore[literal-required] | ||
else: | ||
raise AssertionError("Unknown type in Hints dictionary") | ||
|
||
|
||
class ConfigSignal(AsyncConfigurable): | ||
|
||
def __init__(self, signal: SignalR) -> None: | ||
self.signal = signal | ||
|
||
async def read_configuration(self) -> Dict[str, Reading]: | ||
return await self.signal.read() | ||
|
||
async def describe_configuration(self) -> Dict[str, Descriptor]: | ||
return await self.signal.describe() | ||
|
||
|
||
class HintedSignal(HasHints, AsyncReadable): | ||
|
||
def __init__(self, signal: SignalR, cached: Optional[bool] = None) -> None: | ||
self.signal = signal | ||
self.cached = cached | ||
if cached: | ||
self.stage = signal.stage | ||
self.unstage = signal.unstage | ||
|
||
async def read(self) -> Dict[str, Reading]: | ||
return await self.signal.read(cached=self.cached) | ||
|
||
async def describe(self) -> Dict[str, Descriptor]: | ||
return await self.signal.describe() | ||
|
||
@property | ||
def name(self) -> str: | ||
return self.signal.name | ||
|
||
@property | ||
def hints(self) -> Hints: | ||
return {"fields": [self.signal.name]} | ||
|
||
@classmethod | ||
def uncached(cls, signal: SignalR): | ||
return cls(signal, cached=False) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,3 @@ | ||
from .protocols import AsyncConfigurable, AsyncPausable, AsyncReadable | ||
from .protocols import AsyncConfigurable, AsyncPausable, AsyncReadable, AsyncStageable | ||
|
||
__all__ = ["AsyncReadable", "AsyncConfigurable", "AsyncPausable"] | ||
__all__ = ["AsyncReadable", "AsyncConfigurable", "AsyncPausable", "AsyncStageable"] |
Oops, something went wrong.