diff --git a/nslsii/__init__.py b/nslsii/__init__.py index b5d85732..34e757cd 100644 --- a/nslsii/__init__.py +++ b/nslsii/__init__.py @@ -13,7 +13,7 @@ def public(name): def configure_base(user_ns, broker_name, *, bec=True, epics_context=False, magics=True, mpl=True, - ophyd_logging=True, pbar=True): + ophyd_logging=True, pbar=True, baton=None): """ Perform base setup and instantiation of important objects. @@ -61,6 +61,12 @@ def configure_base(user_ns, broker_name, *, ophyd. pbar : boolean, optional True by default. Set false to skip ProgressBarManager. + baton : nslsii.baton.Baton, optional + Device to manage a baton and status IOC. Expected to have: + + - baton.acquire_baton + - baton.doc_callback + - baton.state_callback Returns ------- @@ -74,16 +80,49 @@ def configure_base(user_ns, broker_name, *, >>>> configure_base(get_ipython().user_ns, 'chx'); """ ns = {} # We will update user_ns with this at the end. + import bluesky # Set up a RunEngine and use metadata backed by a sqlite file. from bluesky import RunEngine - from bluesky.utils import get_history # if RunEngine already defined grab it # useful when users make their own custom RunEngine if 'RE' in user_ns: RE = user_ns['RE'] else: - RE = RunEngine(get_history()) + kwargs = {} + if bluesky.__version__ < '1.6': + from bluesky.utils import get_history + md = get_history() + else: + from bluesky.utils import PersistentDict + + from pathlib import Path + import os + SEARCH_PATH = [] + ENV_VAR = 'BLUESKY_HISTORY_PATH' + if ENV_VAR in os.environ: + SEARCH_PATH.append(Path(os.environ[ENV_VAR]).expanduser()) + SEARCH_PATH.extend([ + Path('~/.config/bluesky/bluesky_history').expanduser(), + Path('/etc/bluesky/bluesky_history')]) + for path in SEARCH_PATH: + if path.exists(): + break + else: + path = SEARCH_PATH[0] + md = PersistentDict(path) + + if baton is not None: + kwargs['acquire_baton'] = baton.acquire_baton + + RE = RunEngine(md, **kwargs) + + if baton is not None: + for d in ['start']: + tok = RE.subscribe(baton.doc_callback, d) + baton.tokens.append(tok) + RE.state_hook = baton.state_callback + ns['RE'] = RE # Set up SupplementalData. @@ -129,10 +168,10 @@ def configure_base(user_ns, broker_name, *, import matplotlib.pyplot as plt ns['plt'] = plt plt.ion() - - # Make plots update live while scans run. - from bluesky.utils import install_kicker - install_kicker() + if bluesky.__version__ < '1.6': + # Make plots update live while scans run. + from bluesky.utils import install_kicker + install_kicker() if epics_context: # Create a context in the underlying EPICS client. diff --git a/nslsii/baton.py b/nslsii/baton.py new file mode 100644 index 00000000..136fb49c --- /dev/null +++ b/nslsii/baton.py @@ -0,0 +1,105 @@ +import platform +import os +import uuid +import atexit +from ophyd import Device, Component as Cpt, EpicsSignal, EpicsSignalRO + + +class Baton(Device): + """ + Ophyd object to wrap the "baton" IOC + + This object has the methods that the RE needs to + install the baton and check it on every use, and update + the state while running. + + Examples + -------- + + >>>> b = Baton(PREFX, name='baton') + >>>> ip = get_ipython() + >>>> configure_base(ip.user_ns, 'temp', baton=b) + + """ + + baton = Cpt(EpicsSignal, "baton", string=True) + host = Cpt(EpicsSignal, "host", string=True) + pid = Cpt(EpicsSignal, "pid") + current_uid = Cpt(EpicsSignal, "current_uid", string=True) + current_scanid = Cpt(EpicsSignal, "current_scanid") + state = Cpt(EpicsSignal, "state", string=True) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._clear_baton = None + self.tokens = [] + + def acquire_baton(self, steal_baton=False): + existing_baton = self.baton.get() + if existing_baton and not steal_baton: + old_host = self.host.get() + old_pid = self.pid.get() + raise RuntimeError( + f"There is already a RE claiming the baton. " + f"It was running on {old_host}:{old_pid}." + ) + + new_baton = str(uuid.uuid4()) + self.baton.put(new_baton) + self.host.put(platform.node()) + self.pid.put(os.getpid()) + + def check_baton(): + ioc_baton = self.baton.get() + if ioc_baton != new_baton: + ioc_host = self.host.get() + ioc_pid = self.pid.get() + raise RuntimeError( + f"This RE installed {new_baton} but the " + f"IOC has {ioc_baton}. " + f"The baton was intalled by {ioc_host}:{ioc_pid}" + ) + + self.install_clear_baton() + return check_baton + + def install_clear_baton(self): + if self._clear_baton is not None: + return + + def clear_baton(baton): + try: + self.baton.put("") + self.host.put("") + self.pid.put(0) + self.state.put("unknown") + except Exception: + # if we fail in tear down 🤷 + pass + + atexit.register(clear_baton, self) + self._clear_baton = clear_baton + + def doc_callback(self, name, doc): + if name == "start": + self.current_uid.put(doc["uid"]) + self.current_scanid.put(doc.get("scan_id", -1)) + + def state_callback(self, new, old): + self.state.put(new) + + +class BatonDisplay(Device): + """ + Read-only Ophyd object to wrap the "baton" IOC. + + This is to populate a typhon screen. + """ + + baton = Cpt(EpicsSignalRO, "baton", string=True, kind="config") + host = Cpt(EpicsSignalRO, "host", string=True, kind="config") + pid = Cpt(EpicsSignalRO, "pid", kind="config") + + current_uid = Cpt(EpicsSignalRO, "current_uid", string=True) + current_scanid = Cpt(EpicsSignalRO, "current_scanid") + state = Cpt(EpicsSignalRO, "state", string=True, kind="hinted") diff --git a/nslsii/iocs/baton.py b/nslsii/iocs/baton.py new file mode 100644 index 00000000..2e7a649a --- /dev/null +++ b/nslsii/iocs/baton.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +from caproto.server import pvproperty, PVGroup +from caproto.server import ioc_arg_parser, run +from caproto import ChannelType +from textwrap import dedent + + +class IOC(PVGroup): + """ + A Baton IOC for managing + + """ + + baton = pvproperty( + value="", + dtype=ChannelType.STRING, + doc='"baton" for running RE', + mock_record="ai", + ) + host = pvproperty( + value="", + dtype=ChannelType.STRING, + doc="host name of computer running RE", + mock_record="ai", + ) + pid = pvproperty( + value=0, doc="pid of running RE on host", mock_record="ai" + ) + + current_uid = pvproperty( + value="", + dtype=ChannelType.STRING, + doc="Last finished uid.", + mock_record="ai", + ) + + current_scanid = pvproperty( + value=0, doc="Last finished scanid.", mock_record="ai" + ) + + state = pvproperty( + value="unknown", + doc="current state of RE", + enum_strings=[ + "unknown", + "idle", + "running", + "pausing", + "paused", + "halting", + "stopping", + "aborting", + "suspending", + "panicked", + ], + dtype=ChannelType.ENUM, + mock_record="ai", + ) + + +def main(): + ioc_options, run_options = ioc_arg_parser( + default_prefix="XF31ID:", desc=dedent(IOC.__doc__) + ) + + ioc = IOC(**ioc_options) + + run(ioc.pvdb, **run_options) + + +if __name__ == "__main__": + main() diff --git a/nslsii/tests/test_baton.py b/nslsii/tests/test_baton.py new file mode 100644 index 00000000..76612099 --- /dev/null +++ b/nslsii/tests/test_baton.py @@ -0,0 +1,84 @@ +import uuid +import asyncio +from bluesky import RunEngine +from bluesky import plans as bps +from nslsii.baton import Baton +from nslsii import configure_base +import pytest +import subprocess +import os + + +@pytest.fixture(scope="function") +def baton_ioc(request): + stdout = subprocess.PIPE + stdin = None + prefix = f"{str(uuid.uuid4())[:6]}:" + # Start up an IOC based on the thermo_sim device in caproto.ioc_examples + ioc_process = subprocess.Popen( + ["baton-ioc", "--prefix", prefix, "--list-pvs"], + stdout=stdout, + stdin=stdin, + env=os.environ, + ) + + def kill_ioc(): + ioc_process.terminate() + + request.addfinalizer(kill_ioc) + return prefix + + +def test_baton(baton_ioc): + b = Baton(baton_ioc, name="b") + b.wait_for_connection(timeout=5) + b.read() + + assert b.baton.get() == "" + cb = b.acquire_baton() + assert b.baton.get() != "" + cb() + cb() + + with pytest.raises(RuntimeError): + b.acquire_baton() + + cb2 = b.acquire_baton(steal_baton=True) + with pytest.raises(RuntimeError): + cb() + + cb2() + + +def _inner_test(RE, b): + assert b.baton.get() != "" + for _ in range(5): + uid, = RE(bps.count([])) + assert b.current_uid.get() == uid + assert b.current_scanid.get() == RE.md["scan_id"] + + b.baton.put("") + with pytest.raises(RuntimeError): + RE([]) + + +def test_baton_RE(baton_ioc): + b = Baton(baton_ioc, name="b") + b.wait_for_connection(timeout=5) + assert b.baton.get() == "" + loop = asyncio.new_event_loop() + loop.set_debug(True) + RE = RunEngine({}, loop=loop, acquire_baton=b.acquire_baton) + RE.subscribe(b.doc_callback, "start") + RE.state_hook = b.state_callback + _inner_test(RE, b) + + +def test_configure_base(baton_ioc): + out = {} + b = Baton(baton_ioc, name="b") + b.wait_for_connection(timeout=5) + configure_base(out, "temp", baton=b, magics=False) + RE = out["RE"] + + _inner_test(RE, b) diff --git a/setup.py b/setup.py index 31de79ae..1674f086 100644 --- a/setup.py +++ b/setup.py @@ -27,4 +27,9 @@ description='Tools for data collection and analysis at NSLS-II', author='Brookhaven National Laboratory', install_requires=requirements, + entry_points={ + 'console_scripts': [ + 'baton-ioc = nslsii.iocs.baton:main', + ] + } )