-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
wait_for_value_interface_change #582
base: main
Are you sure you want to change the base?
Conversation
3751b80
to
caadaca
Compare
|
||
# Get the initial value from the monitor to make sure we've created it | ||
current_value = await anext(values_gen) | ||
|
||
status = set_signal.set(set_value, timeout=set_timeout) | ||
set_signal.set(set_value, timeout=set_timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@DominicOram this is what @ZohebShaikh was alluding to in #402 (comment).
If we choose A, then we need to decide what to do with the Status. Something needs to keep track of it an await it, we can't drop it on the floor as we are doing here or we get teardown errors in the tests, so we have to return it. The calling code then looks like:
status = await set_and_wait_for_other_value(dev.acquire, 1, dev.acquire_rbv, 1)
# do something that you can do as soon as the thing is acquiring
await status
# now the device has finished acquiring
If we choose B, then we keep a track of status
here, then await it at the end of the function. The calling code becomes:
await set_and_wait_for_other_value(dev.acquire, 1, dev.acquire_rbv, 1)
# now the device has finished acquiring,
What was your actual use case for this? Will the write_signal ever take significantly longer to caput-callback than the read_signal will take to change to the match_value? I know we have this case for the areaDetector acquire PV above, but we decided to not use this function as it was clearer to write:
arm_status = dev.acquire.set(1)
await wait_for_value(dev.acquire_rbv, 1)
# do something we can do when the device is armed
await arm_status
# now the device is disarmed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My original usecase for this was the actually the areaDetector one. We tried the code as suggested:
arm_status = dev.acquire.set(1)
await wait_for_value(dev.acquire_rbv, 1)
await arm_status
But found that the wait_for_value
was failing. This is because for this particular configuration the detector immediately takes data and does so so quickly that it has finished by the time we start monitoring the RBV. See #453 (comment). My main motivation for putting this into ophyd-async
was to provide a function that would always protect against the potential race condition so we're less like to see it with people taking the naive approach.
I believe for my use case the callback has already returned on the set before the RBV goes to the expected value but I would have to check. Either way I think what I would expect to happen is that set_and_wait_for_other_value
returns a gather of both statuses. If you want to do some in between these then you need to do that yourself. Maybe we should have a chat about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please can we have a test with 2 soft signals updating and making callbacks?
src/ophyd_async/core/_signal.py
Outdated
async for value in observe_value(sig): | ||
do_something_with(value) | ||
""" | ||
q: asyncio.Queue[tuple[SignalR[T], T | Status]] = asyncio.Queue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
q: asyncio.Queue[tuple[SignalR[T], T | Status]] = asyncio.Queue() | |
q: asyncio.Queue[tuple[SignalR[T], T] | Status] = asyncio.Queue() |
src/ophyd_async/core/_signal.py
Outdated
def wrapped_signal_put(signal: SignalR[T]): | ||
def queue_value(value: T): | ||
q.put_nowait((signal, value)) | ||
|
||
def queue_status(status: Status): | ||
q.put_nowait((signal, status)) | ||
|
||
def clear_signals(): | ||
signal.clear_sub(queue_value) | ||
signal.clear_sub(queue_status) | ||
|
||
return queue_value, queue_status, clear_signals | ||
|
||
clear_signals = [] | ||
for signal in signals: | ||
queue_value, queue_status, clear_signal = wrapped_signal_put(signal) | ||
clear_signals.append(clear_signal) | ||
if done_status is not None: | ||
done_status.add_callback(queue_status) | ||
signal.subscribe_value(queue_value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def wrapped_signal_put(signal: SignalR[T]): | |
def queue_value(value: T): | |
q.put_nowait((signal, value)) | |
def queue_status(status: Status): | |
q.put_nowait((signal, status)) | |
def clear_signals(): | |
signal.clear_sub(queue_value) | |
signal.clear_sub(queue_status) | |
return queue_value, queue_status, clear_signals | |
clear_signals = [] | |
for signal in signals: | |
queue_value, queue_status, clear_signal = wrapped_signal_put(signal) | |
clear_signals.append(clear_signal) | |
if done_status is not None: | |
done_status.add_callback(queue_status) | |
signal.subscribe_value(queue_value) | |
cbs: Dict[SignalR, Callback] = {} | |
for signal in signals: | |
def queue_value(value: T, signal=signal): | |
q.put_nowait((signal, value)) | |
cbs[signal] = queue_value | |
signal.subscribe_value(queue_value) | |
if done_status is not None: | |
done_status.add_callback(q.put_nowait) |
src/ophyd_async/core/_signal.py
Outdated
for clear_signal in clear_signals: | ||
clear_signal() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for clear_signal in clear_signals: | |
clear_signal() | |
for signal, cb in cbs.items(): | |
signal.clear_sub(cb) |
@@ -490,6 +490,75 @@ async def get_value(): | |||
signal.clear_sub(q.put_nowait) | |||
|
|||
|
|||
async def observe_signals_values( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After the above changes are made, can we share any code with observe_value
above?
read_signal: SignalR[S], | ||
read_value: S, | ||
match_signal: SignalR[S], | ||
match_value: S | Callable[[S], bool], | ||
timeout: float = DEFAULT_TIMEOUT, | ||
set_timeout: float | None = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We decided to add wait_for_set_completion = True
(or something like that) to both functions and always return the status, but then pass False
from the areaDetector utility function
closes #461