Skip to content

Commit

Permalink
Merge pull request bluesky#1718 from CoePaul/blueskyHash1646
Browse files Browse the repository at this point in the history
MNT: replaces KeyError handling logic

close bluesky#1646
  • Loading branch information
tacaswell authored Apr 19, 2024
2 parents df90882 + f9ae328 commit add624e
Showing 1 changed file with 73 additions and 67 deletions.
140 changes: 73 additions & 67 deletions src/bluesky/run_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1638,12 +1638,11 @@ async def _run(self):
self._msg_cache.append(msg)

# try to look up the coroutine to execute the command
try:
coro = self._command_registry[msg.command]
# replace KeyError with a local sub-class and go
# to top of the loop
except KeyError:
# TODO make this smarter
if (
coro := self._command_registry.get(msg.command, key_absence_sentinel := object())
) is key_absence_sentinel:
# flag invalid command
# and return to the top of the loop
new_response = InvalidCommand(msg.command)
continue

Expand Down Expand Up @@ -1865,12 +1864,11 @@ async def _close_run(self, msg):
"""
# TODO extract this from the Msg
run_key = msg.run
try:
current_run = self._run_bundlers[run_key]
except KeyError as ke:
raise IllegalMessageSequence(
"A 'close_run' message was not received before the 'open_run' message"
) from ke
if (
current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object)
) is key_absence_sentinel:
ims_msg = "A 'close_run' message was not received before the 'open_run' message"
raise IllegalMessageSequence(ims_msg)
ret = await current_run.close_run(msg)
del self._run_bundlers[run_key]
return ret
Expand All @@ -1891,14 +1889,15 @@ async def _create(self, msg):
Descriptor document.
"""
run_key = msg.run
try:
current_run = self._run_bundlers[run_key]
except KeyError as ke:
raise IllegalMessageSequence(
if (
current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object())
) is key_absence_sentinel:
ims_msg = (
"Cannot bundle readings without "
"an open run. That is, 'create' must "
"be preceded by 'open_run'."
) from ke
)
raise IllegalMessageSequence(ims_msg)
return await current_run.create(msg)

async def _declare_stream(self, msg):
Expand All @@ -1918,14 +1917,15 @@ async def _declare_stream(self, msg):
on declare_stream, rather than `describe`.
"""
run_key = msg.run
try:
current_run = self._run_bundlers[run_key]
except KeyError as ke:
raise IllegalMessageSequence(
if (
current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object())
) is key_absence_sentinel:
ims_msg = (
"Cannot bundle readings without "
"an open run. That is, 'create' must "
"be preceded by 'open_run'."
) from ke
)
raise IllegalMessageSequence(ims_msg)
return await current_run.declare_stream(msg)

async def _read(self, msg):
Expand All @@ -1948,11 +1948,9 @@ async def _read(self, msg):
"`read` must return a dictionary."
)
run_key = msg.run
try:
current_run = self._run_bundlers[run_key]
except KeyError:
...
else:
if (
current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object())
) is not key_absence_sentinel:
await current_run.read(msg, ret)

return ret
Expand Down Expand Up @@ -1996,11 +1994,13 @@ async def _monitor(self, msg):
"""

run_key = msg.run
try:
current_run = self._run_bundlers[run_key]
except KeyError as ke:
raise IllegalMessageSequence("A 'monitor' message was sent but no run is open.") from ke
await current_run.monitor(msg)
if (
current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object())
) is key_absence_sentinel:
ims_msg = "A 'monitor' message was sent but no run is open."
raise IllegalMessageSequence(ims_msg)
else:
await current_run.monitor(msg)
await self._reset_checkpoint_state_coro()

async def _unmonitor(self, msg):
Expand All @@ -2012,11 +2012,13 @@ async def _unmonitor(self, msg):
Msg('unmonitor', obj)
"""
run_key = msg.run
try:
current_run = self._run_bundlers[run_key]
except KeyError as ke:
raise IllegalMessageSequence("A 'unmonitor' message was sent but no run is open.") from ke
await current_run.unmonitor(msg)
if (
current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object())
) is key_absence_sentinel:
ims_msg = "An 'unmonitor' message was sent but no run is open."
raise IllegalMessageSequence(ims_msg)
else:
await current_run.unmonitor(msg)
await self._reset_checkpoint_state_coro()

async def _save(self, msg):
Expand All @@ -2027,14 +2029,15 @@ async def _save(self, msg):
Msg('save')
"""
run_key = msg.run
try:
current_run = self._run_bundlers[run_key]
except KeyError as ke:
if (
current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object())
) is key_absence_sentinel:
# sanity check -- this should be caught by 'create' which makes
# this code path impossible
raise IllegalMessageSequence("A 'save' message was sent but no run is open.") from ke

await current_run.save(msg)
ims_msg = "A 'save' message was sent but no run is open."
raise IllegalMessageSequence(ims_msg)
else:
await current_run.save(msg)

async def _drop(self, msg):
"""Drop the event that is currently being bundled
Expand All @@ -2044,11 +2047,13 @@ async def _drop(self, msg):
Msg('drop')
"""
run_key = msg.run
try:
current_run = self._run_bundlers[run_key]
except KeyError as ke:
raise IllegalMessageSequence("A 'drop' message was sent but no run is open.") from ke
await current_run.drop(msg)
if (
current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object())
) is key_absence_sentinel:
ims_msg = "A 'drop' message was sent but no run is open."
raise IllegalMessageSequence(ims_msg)
else:
await current_run.drop(msg)

async def _prepare(self, msg):
"""Prepare a flyer for a flyscan
Expand Down Expand Up @@ -2104,10 +2109,11 @@ async def _kickoff(self, msg):
Msg('kickoff', flyer_object, start, stop, step, group=<name>)
"""
run_key = msg.run
try:
current_run = self._run_bundlers[run_key]
except KeyError as ke:
raise IllegalMessageSequence("A 'kickoff' message was sent but no run is open.") from ke
if (
current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object())
) is key_absence_sentinel:
ims_msg = "A 'kickoff' message was sent but no run is open."
raise IllegalMessageSequence(ims_msg)

_, obj, args, kwargs, _ = msg
obj = check_supports(obj, Flyable)
Expand Down Expand Up @@ -2190,10 +2196,12 @@ async def _collect(self, msg):
Msg('collect', flyer_object, stream=True, return_payload=False, name="a_name")
"""
run_key = msg.run
try:
current_run = self._run_bundlers[run_key]
except KeyError as ke:
raise IllegalMessageSequence("A 'collect' message was sent but no run is open.") from ke
if (
current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object())
) is key_absence_sentinel:
# TODO add test exercising this path
ims_msg = "A 'collect' message was sent but no run is open."
raise IllegalMessageSequence(ims_msg)

return await current_run.collect(msg)

Expand Down Expand Up @@ -2439,13 +2447,13 @@ async def _configure(self, msg):
object.configure(*args, **kwargs)
"""
run_key = msg.run
try:
current_run = self._run_bundlers[run_key]
except KeyError:
if (
current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object())
) is key_absence_sentinel:
current_run = None
else:
if current_run.bundling:
raise IllegalMessageSequence("Cannot configure after 'create' but before 'save' Aborting!")
elif current_run.bundling:
ims_msg = "Cannot configure after 'create' but before 'save' Aborting!"
raise IllegalMessageSequence(ims_msg)
_, obj, args, kwargs, _ = msg

old, new = obj.configure(*args, **kwargs)
Expand Down Expand Up @@ -2572,11 +2580,9 @@ async def _unsubscribe(self, msg):
where ``TOKEN`` is the return value from ``RunEngine._subscribe()``
"""
self.log.debug("Removing subscription %r", msg)
_, obj, args, kwargs, _ = msg
try:
token = kwargs["token"]
except KeyError:
(token,) = args
_, obj, arg, kwargs, _ = msg
if (token := kwargs.get("token", key_absence_sentinel := object())) is key_absence_sentinel:
(token,) = arg
self.unsubscribe(token)
self._temp_callback_ids.remove(token)
await self._reset_checkpoint_state_coro()
Expand Down

0 comments on commit add624e

Please sign in to comment.