diff --git a/src/bluesky/run_engine.py b/src/bluesky/run_engine.py index 0c926a312..f1c4712ca 100644 --- a/src/bluesky/run_engine.py +++ b/src/bluesky/run_engine.py @@ -1638,12 +1638,11 @@ async def _run(self): self._msg_cache.append(msg) # try to look up the coroutine to execute the command - try: - coro = self._command_registry[msg.command] - # replace KeyError with a local sub-class and go - # to top of the loop - except KeyError: - # TODO make this smarter + if ( + coro := self._command_registry.get(msg.command, key_absence_sentinel := object()) + ) is key_absence_sentinel: + # flag invalid command + # and return to the top of the loop new_response = InvalidCommand(msg.command) continue @@ -1865,12 +1864,11 @@ async def _close_run(self, msg): """ # TODO extract this from the Msg run_key = msg.run - try: - current_run = self._run_bundlers[run_key] - except KeyError as ke: - raise IllegalMessageSequence( - "A 'close_run' message was not received before the 'open_run' message" - ) from ke + if ( + current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object) + ) is key_absence_sentinel: + ims_msg = "A 'close_run' message was not received before the 'open_run' message" + raise IllegalMessageSequence(ims_msg) ret = await current_run.close_run(msg) del self._run_bundlers[run_key] return ret @@ -1891,14 +1889,15 @@ async def _create(self, msg): Descriptor document. """ run_key = msg.run - try: - current_run = self._run_bundlers[run_key] - except KeyError as ke: - raise IllegalMessageSequence( + if ( + current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object()) + ) is key_absence_sentinel: + ims_msg = ( "Cannot bundle readings without " "an open run. That is, 'create' must " "be preceded by 'open_run'." - ) from ke + ) + raise IllegalMessageSequence(ims_msg) return await current_run.create(msg) async def _declare_stream(self, msg): @@ -1918,14 +1917,15 @@ async def _declare_stream(self, msg): on declare_stream, rather than `describe`. """ run_key = msg.run - try: - current_run = self._run_bundlers[run_key] - except KeyError as ke: - raise IllegalMessageSequence( + if ( + current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object()) + ) is key_absence_sentinel: + ims_msg = ( "Cannot bundle readings without " "an open run. That is, 'create' must " "be preceded by 'open_run'." - ) from ke + ) + raise IllegalMessageSequence(ims_msg) return await current_run.declare_stream(msg) async def _read(self, msg): @@ -1948,11 +1948,9 @@ async def _read(self, msg): "`read` must return a dictionary." ) run_key = msg.run - try: - current_run = self._run_bundlers[run_key] - except KeyError: - ... - else: + if ( + current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object()) + ) is not key_absence_sentinel: await current_run.read(msg, ret) return ret @@ -1996,11 +1994,13 @@ async def _monitor(self, msg): """ run_key = msg.run - try: - current_run = self._run_bundlers[run_key] - except KeyError as ke: - raise IllegalMessageSequence("A 'monitor' message was sent but no run is open.") from ke - await current_run.monitor(msg) + if ( + current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object()) + ) is key_absence_sentinel: + ims_msg = "A 'monitor' message was sent but no run is open." + raise IllegalMessageSequence(ims_msg) + else: + await current_run.monitor(msg) await self._reset_checkpoint_state_coro() async def _unmonitor(self, msg): @@ -2012,11 +2012,13 @@ async def _unmonitor(self, msg): Msg('unmonitor', obj) """ run_key = msg.run - try: - current_run = self._run_bundlers[run_key] - except KeyError as ke: - raise IllegalMessageSequence("A 'unmonitor' message was sent but no run is open.") from ke - await current_run.unmonitor(msg) + if ( + current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object()) + ) is key_absence_sentinel: + ims_msg = "An 'unmonitor' message was sent but no run is open." + raise IllegalMessageSequence(ims_msg) + else: + await current_run.unmonitor(msg) await self._reset_checkpoint_state_coro() async def _save(self, msg): @@ -2027,14 +2029,15 @@ async def _save(self, msg): Msg('save') """ run_key = msg.run - try: - current_run = self._run_bundlers[run_key] - except KeyError as ke: + if ( + current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object()) + ) is key_absence_sentinel: # sanity check -- this should be caught by 'create' which makes # this code path impossible - raise IllegalMessageSequence("A 'save' message was sent but no run is open.") from ke - - await current_run.save(msg) + ims_msg = "A 'save' message was sent but no run is open." + raise IllegalMessageSequence(ims_msg) + else: + await current_run.save(msg) async def _drop(self, msg): """Drop the event that is currently being bundled @@ -2044,11 +2047,13 @@ async def _drop(self, msg): Msg('drop') """ run_key = msg.run - try: - current_run = self._run_bundlers[run_key] - except KeyError as ke: - raise IllegalMessageSequence("A 'drop' message was sent but no run is open.") from ke - await current_run.drop(msg) + if ( + current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object()) + ) is key_absence_sentinel: + ims_msg = "A 'drop' message was sent but no run is open." + raise IllegalMessageSequence(ims_msg) + else: + await current_run.drop(msg) async def _prepare(self, msg): """Prepare a flyer for a flyscan @@ -2104,10 +2109,11 @@ async def _kickoff(self, msg): Msg('kickoff', flyer_object, start, stop, step, group=) """ run_key = msg.run - try: - current_run = self._run_bundlers[run_key] - except KeyError as ke: - raise IllegalMessageSequence("A 'kickoff' message was sent but no run is open.") from ke + if ( + current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object()) + ) is key_absence_sentinel: + ims_msg = "A 'kickoff' message was sent but no run is open." + raise IllegalMessageSequence(ims_msg) _, obj, args, kwargs, _ = msg obj = check_supports(obj, Flyable) @@ -2190,10 +2196,12 @@ async def _collect(self, msg): Msg('collect', flyer_object, stream=True, return_payload=False, name="a_name") """ run_key = msg.run - try: - current_run = self._run_bundlers[run_key] - except KeyError as ke: - raise IllegalMessageSequence("A 'collect' message was sent but no run is open.") from ke + if ( + current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object()) + ) is key_absence_sentinel: + # TODO add test exercising this path + ims_msg = "A 'collect' message was sent but no run is open." + raise IllegalMessageSequence(ims_msg) return await current_run.collect(msg) @@ -2439,13 +2447,13 @@ async def _configure(self, msg): object.configure(*args, **kwargs) """ run_key = msg.run - try: - current_run = self._run_bundlers[run_key] - except KeyError: + if ( + current_run := self._run_bundlers.get(run_key, key_absence_sentinel := object()) + ) is key_absence_sentinel: current_run = None - else: - if current_run.bundling: - raise IllegalMessageSequence("Cannot configure after 'create' but before 'save' Aborting!") + elif current_run.bundling: + ims_msg = "Cannot configure after 'create' but before 'save' Aborting!" + raise IllegalMessageSequence(ims_msg) _, obj, args, kwargs, _ = msg old, new = obj.configure(*args, **kwargs) @@ -2572,11 +2580,9 @@ async def _unsubscribe(self, msg): where ``TOKEN`` is the return value from ``RunEngine._subscribe()`` """ self.log.debug("Removing subscription %r", msg) - _, obj, args, kwargs, _ = msg - try: - token = kwargs["token"] - except KeyError: - (token,) = args + _, obj, arg, kwargs, _ = msg + if (token := kwargs.get("token", key_absence_sentinel := object())) is key_absence_sentinel: + (token,) = arg self.unsubscribe(token) self._temp_callback_ids.remove(token) await self._reset_checkpoint_state_coro()