Skip to content

Commit

Permalink
ENH: wrap incoming data in EpicsData dataclass for consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
tangkong committed Aug 5, 2024
1 parent 682830a commit e081276
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 7 deletions.
1 change: 1 addition & 0 deletions superscore/control_layers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from ._base_shim import EpicsData # noqa
from .core import ControlLayer # noqa
from .status import TaskStatus # noqa
9 changes: 6 additions & 3 deletions superscore/control_layers/_aioca.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@
from typing import Any, Callable

from aioca import CANothing, caget, camonitor, caput
from epicscorelibs.ca import dbr

from superscore.control_layers._base_shim import _BaseShim
from superscore.control_layers._base_shim import EpicsData, _BaseShim
from superscore.errors import CommunicationError

logger = logging.getLogger(__name__)


class AiocaShim(_BaseShim):
"""async compatible EPICS channel access shim layer"""
async def get(self, address: str) -> Any:
async def get(self, address: str) -> EpicsData:
"""
Get the value at the PV: ``address``.
Expand All @@ -34,11 +35,13 @@ async def get(self, address: str) -> Any:
If the caget operation fails for any reason.
"""
try:
return await caget(address)
value = await caget(address, format=dbr.FORMAT_TIME)
except CANothing as ex:
logger.debug(f"CA get failed {ex.__repr__()}")
raise CommunicationError(f'CA get failed for {ex}')

return EpicsData.from_aioca(value)

async def put(self, address: str, value: Any) -> None:
"""
Put ``value`` to the PV ``address``.
Expand Down
46 changes: 46 additions & 0 deletions superscore/control_layers/_base_shim.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
"""
Base shim abstract base class
"""
from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Callable

from aioca.types import AugmentedValue

from superscore.model import Severity, Status
from superscore.type_hints import AnyEpicsType
from superscore.utils import utcnow


class _BaseShim:
Expand All @@ -15,3 +23,41 @@ async def put(self, address: str, value: Any):

def monitor(self, address: str, callback: Callable):
raise NotImplementedError


@dataclass
class EpicsData:
"""Unified EPICS data type for holding data and relevant metadata"""
data: AnyEpicsType
status: Severity = Status.UDF
severity: Status = Severity.INVALID
timestamp: datetime = field(default_factory=utcnow)

@classmethod
def from_aioca(cls, value: AugmentedValue) -> EpicsData:
"""
Creates an EpicsData instance from an aioca provided AugmentedValue
Assumes the augmented value was collected with FORMAT_TIME qualifier.
AugmentedValue subclasses primitive datatypes, so they can be used as
data directly.
Parameters
----------
value : AugmentedValue
The value to repackage
Returns
-------
EpicsData
The filled EpicsData instance
"""
severity = Severity(value.severity)
status = Status(value.status)
timestamp = value.timestamp

return EpicsData(
data=value,
status=status,
severity=severity,
timestamp=timestamp
)
9 changes: 5 additions & 4 deletions superscore/control_layers/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from functools import singledispatchmethod
from typing import Any, Callable, Dict, List, Optional, Union

from superscore.control_layers._base_shim import EpicsData
from superscore.control_layers.status import TaskStatus

from ._aioca import AiocaShim
Expand Down Expand Up @@ -74,7 +75,7 @@ def shim_from_pv(self, address: str) -> _BaseShim:
return shim

@singledispatchmethod
def get(self, address: Union[str, list[str]]) -> Any:
def get(self, address: Union[str, list[str]]) -> Union[EpicsData, list[EpicsData]]:
"""
Get the value(s) in ``address``.
If a single pv is provided, will return a single value.
Expand All @@ -87,20 +88,20 @@ def get(self, address: Union[str, list[str]]) -> Any:
Returns
-------
Any
Union[EpicsData, list[EpicsData]]
The requested data
"""
# Dispatches to _get_single and _get_list depending on type
print(f"PV is of an unsupported type: {type(address)}. Provide either "
"a string or list of strings")

@get.register
def _get_single(self, address: str) -> Any:
def _get_single(self, address: str) -> EpicsData:
"""Synchronously get a single ``address``"""
return asyncio.run(self._get_one(address))

@get.register
def _get_list(self, address: Iterable) -> Any:
def _get_list(self, address: Iterable) -> Iterable[EpicsData]:
"""Synchronously get a list of ``address``"""
async def gathered_coros():
coros = []
Expand Down

0 comments on commit e081276

Please sign in to comment.