Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wait_for_value_interface_change #582

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

ZohebShaikh
Copy link
Contributor

@ZohebShaikh ZohebShaikh commented Sep 17, 2024

closes #461

@ZohebShaikh ZohebShaikh linked an issue Sep 17, 2024 that may be closed by this pull request
@ZohebShaikh ZohebShaikh changed the title initial commit wait_for_value_interface_change Sep 17, 2024
@ZohebShaikh ZohebShaikh force-pushed the 461-rationalise-interface-for-wait_for_value branch from 3751b80 to caadaca Compare September 17, 2024 13:46

# Get the initial value from the monitor to make sure we've created it
current_value = await anext(values_gen)

status = set_signal.set(set_value, timeout=set_timeout)
set_signal.set(set_value, timeout=set_timeout)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DominicOram this is what @ZohebShaikh was alluding to in #402 (comment).

If we choose A, then we need to decide what to do with the Status. Something needs to keep track of it an await it, we can't drop it on the floor as we are doing here or we get teardown errors in the tests, so we have to return it. The calling code then looks like:

status = await set_and_wait_for_other_value(dev.acquire, 1, dev.acquire_rbv, 1)
# do something that you can do as soon as the thing is acquiring
await status
# now the device has finished acquiring

If we choose B, then we keep a track of status here, then await it at the end of the function. The calling code becomes:

await set_and_wait_for_other_value(dev.acquire, 1, dev.acquire_rbv, 1)
# now the device has finished acquiring,

What was your actual use case for this? Will the write_signal ever take significantly longer to caput-callback than the read_signal will take to change to the match_value? I know we have this case for the areaDetector acquire PV above, but we decided to not use this function as it was clearer to write:

arm_status = dev.acquire.set(1)
await wait_for_value(dev.acquire_rbv, 1)
# do something we can do when the device is armed
await arm_status
# now the device is disarmed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original usecase for this was the actually the areaDetector one. We tried the code as suggested:

arm_status = dev.acquire.set(1)
await wait_for_value(dev.acquire_rbv, 1)
await arm_status

But found that the wait_for_value was failing. This is because for this particular configuration the detector immediately takes data and does so so quickly that it has finished by the time we start monitoring the RBV. See #453 (comment). My main motivation for putting this into ophyd-async was to provide a function that would always protect against the potential race condition so we're less like to see it with people taking the naive approach.

I believe for my use case the callback has already returned on the set before the RBV goes to the expected value but I would have to check. Either way I think what I would expect to happen is that set_and_wait_for_other_value returns a gather of both statuses. If you want to do some in between these then you need to do that yourself. Maybe we should have a chat about it?

Copy link
Collaborator

@coretl coretl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please can we have a test with 2 soft signals updating and making callbacks?

async for value in observe_value(sig):
do_something_with(value)
"""
q: asyncio.Queue[tuple[SignalR[T], T | Status]] = asyncio.Queue()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
q: asyncio.Queue[tuple[SignalR[T], T | Status]] = asyncio.Queue()
q: asyncio.Queue[tuple[SignalR[T], T] | Status] = asyncio.Queue()

Comment on lines 527 to 546
def wrapped_signal_put(signal: SignalR[T]):
def queue_value(value: T):
q.put_nowait((signal, value))

def queue_status(status: Status):
q.put_nowait((signal, status))

def clear_signals():
signal.clear_sub(queue_value)
signal.clear_sub(queue_status)

return queue_value, queue_status, clear_signals

clear_signals = []
for signal in signals:
queue_value, queue_status, clear_signal = wrapped_signal_put(signal)
clear_signals.append(clear_signal)
if done_status is not None:
done_status.add_callback(queue_status)
signal.subscribe_value(queue_value)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def wrapped_signal_put(signal: SignalR[T]):
def queue_value(value: T):
q.put_nowait((signal, value))
def queue_status(status: Status):
q.put_nowait((signal, status))
def clear_signals():
signal.clear_sub(queue_value)
signal.clear_sub(queue_status)
return queue_value, queue_status, clear_signals
clear_signals = []
for signal in signals:
queue_value, queue_status, clear_signal = wrapped_signal_put(signal)
clear_signals.append(clear_signal)
if done_status is not None:
done_status.add_callback(queue_status)
signal.subscribe_value(queue_value)
cbs: Dict[SignalR, Callback] = {}
for signal in signals:
def queue_value(value: T, signal=signal):
q.put_nowait((signal, value))
cbs[signal] = queue_value
signal.subscribe_value(queue_value)
if done_status is not None:
done_status.add_callback(q.put_nowait)

Comment on lines 558 to 559
for clear_signal in clear_signals:
clear_signal()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for clear_signal in clear_signals:
clear_signal()
for signal, cb in cbs.items():
signal.clear_sub(cb)

@@ -490,6 +490,75 @@ async def get_value():
signal.clear_sub(q.put_nowait)


async def observe_signals_values(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the above changes are made, can we share any code with observe_value above?

read_signal: SignalR[S],
read_value: S,
match_signal: SignalR[S],
match_value: S | Callable[[S], bool],
timeout: float = DEFAULT_TIMEOUT,
set_timeout: float | None = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decided to add wait_for_set_completion = True (or something like that) to both functions and always return the status, but then pass False from the areaDetector utility function

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Rationalise interface for *wait_for_value*
3 participants