Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly wait for futures to complete #52

Merged
merged 5 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ RUN rosdep init && rosdep update
ENV PIP_BREAK_SYSTEM_PACKAGES=1

# Need to have setuptools version 64+ for editable installs
RUN pip install --upgrade pip setuptools
# Also fixes: https://github.com/pypa/setuptools/issues/4483#issuecomment-2236339726
RUN pip install --upgrade pip "setuptools>=64.0" "packaging>=22.0"

# Ensure sourced ROS environment at startup
RUN echo 'source /opt/ros/$ROS_DISTRO/setup.bash' >> ~/.bashrc
6 changes: 3 additions & 3 deletions ros2_easy_test/ros2_easy_test/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,9 @@ def wrapper(*args_inner, **kwargs_inner) -> None:
node.destroy_node()

has_finished = executor.shutdown(shutdown_timeout)
assert (
has_finished
), f"Executor shutdown did not complete in {shutdown_timeout} seconds."
assert has_finished, (
f"Executor shutdown did not complete in {shutdown_timeout} seconds."
)

# Make sure that the executor and the node are cleaned up/freed afterwards.
# Cleanup is critical for correctness since subsequent tests may NEVER reference old
Expand Down
37 changes: 23 additions & 14 deletions ros2_easy_test/ros2_easy_test/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from collections import defaultdict
from importlib import import_module
from queue import Empty, SimpleQueue
from threading import RLock
from threading import Event, RLock
from time import monotonic, sleep

# Typing
Expand Down Expand Up @@ -123,9 +123,9 @@ def _get_mailbox_for(self, topic: str) -> "SimpleQueue[RosMessage]":
"""

with self._subscriber_mailboxes_lock:
assert (
topic in self._subscriber_mailboxes
), f"topic {topic} it not being watched: please specify it in the constructor"
assert topic in self._subscriber_mailboxes, (
f"topic {topic} it not being watched: please specify it in the constructor"
)

return self._subscriber_mailboxes[topic]

Expand Down Expand Up @@ -169,7 +169,7 @@ def assert_no_message_published(self, topic: str, time_span: float = 0.5) -> Non
pass # this is what we expect
else:
raise AssertionError(
f"A message was published on topic {topic} although none was expected: " f"{repr(message)}"
f"A message was published on topic {topic} although none was expected: {repr(message)}"
) from None

def assert_message_published(self, topic: str, timeout: Optional[float] = _DEFAULT_TIMEOUT) -> RosMessage:
Expand Down Expand Up @@ -297,14 +297,14 @@ def clear_messages(self, topic: Optional[str] = None) -> None:
self.clear_messages(topic=topic)

else:
# There is not clear() in SimpleQueue
# There is no clear() in SimpleQueue
self.listen_for_messages(topic, time_span=None) # ignore the result

def await_future(self, future: Future, timeout: Optional[float] = 10) -> Any:
"""Waits for the given future to complete.

Args:
future: The future to wait for
future: The ROS Future to wait for
timeout: The timeout to wait for the future

Raises:
Expand All @@ -315,16 +315,25 @@ def await_future(self, future: Future, timeout: Optional[float] = 10) -> Any:
The result of the future
"""

# This does not work with the default executor, so we use the one from the node
assert self.executor, "executor is not set"
# The type ignore is needed due to a bug in ROS2 Humble+
self.executor.spin_until_future_complete(future, timeout_sec=timeout) # type: ignore[arg-type]
# The event trick follows rclpy's service client implementation of the synchronous call()
# https://github.com/ros2/rclpy/blob/2b38e662b3635fffe5b68309f75e39f9e7441602/rclpy/rclpy/client.py#L97-L118
event = Event()

if future.done():
return future.result()
else:
def unblock(_: Future) -> None:
nonlocal event
event.set()

# This immediately calls the callback if the future is already done
future.add_done_callback(unblock)

if not event.wait(timeout):
# Timed out. remove_pending_request() to free resources
self.remove_pending_request(future)
raise TimeoutError(f"Future did not complete within {timeout} seconds")

# Potential exception is raised here
return future.result()

def _get_service_client(self, name: str, request_class: Type) -> Client:
"""Returns the service client for the given service name."""

Expand Down
Loading