Skip to content

Commit

Permalink
Merge pull request #408 from husarion/ros2-lights-integration-tests
Browse files Browse the repository at this point in the history
Ros2 lights integration tests
  • Loading branch information
KmakD authored Sep 13, 2024
2 parents 86a26f6 + 962719b commit fb78242
Show file tree
Hide file tree
Showing 15 changed files with 254 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2022 TIER IV, Inc.
# Copyright 2024 Husarion sp. z o.o.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
34 changes: 22 additions & 12 deletions panther_lights/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ if(BUILD_TESTING)
find_package(ament_cmake_gmock REQUIRED)
find_package(ros_testing REQUIRED)

# Unit tests
ament_add_gtest(${PROJECT_NAME}_test_animation
test/animation/test_animation.cpp)
test/unit/animation/test_animation.cpp)
target_include_directories(
${PROJECT_NAME}_test_animation
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
Expand All @@ -103,7 +104,7 @@ if(BUILD_TESTING)

ament_add_gtest(
${PROJECT_NAME}_test_charging_animation
test/animation/test_charging_animation.cpp
test/unit/animation/test_charging_animation.cpp
src/animation/charging_animation.cpp)
target_include_directories(
${PROJECT_NAME}_test_charging_animation
Expand All @@ -115,7 +116,8 @@ if(BUILD_TESTING)

ament_add_gtest(
${PROJECT_NAME}_test_image_animation
test/animation/test_image_animation.cpp src/animation/image_animation.cpp)
test/unit/animation/test_image_animation.cpp
src/animation/image_animation.cpp)
target_include_directories(
${PROJECT_NAME}_test_image_animation
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
Expand All @@ -125,15 +127,16 @@ if(BUILD_TESTING)
target_link_libraries(${PROJECT_NAME}_test_image_animation png yaml-cpp)

ament_add_gtest(
${PROJECT_NAME}_test_led_panel test/led_components/test_led_panel.cpp
${PROJECT_NAME}_test_led_panel test/unit/led_components/test_led_panel.cpp
src/led_components/led_panel.cpp)
target_include_directories(
${PROJECT_NAME}_test_led_panel
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)

ament_add_gtest(
${PROJECT_NAME}_test_led_segment test/led_components/test_led_segment.cpp
${PROJECT_NAME}_test_led_segment
test/unit/led_components/test_led_segment.cpp
src/led_components/led_segment.cpp)
target_include_directories(
${PROJECT_NAME}_test_led_segment
Expand All @@ -145,7 +148,7 @@ if(BUILD_TESTING)

ament_add_gtest(
${PROJECT_NAME}_test_segment_converter
test/led_components/test_segment_converter.cpp
test/unit/led_components/test_segment_converter.cpp
src/led_components/segment_converter.cpp
src/led_components/led_panel.cpp
src/led_components/led_segment.cpp)
Expand All @@ -159,7 +162,7 @@ if(BUILD_TESTING)

ament_add_gtest(
${PROJECT_NAME}_test_led_animation
test/led_components/test_led_animation.cpp
test/unit/led_components/test_led_animation.cpp
src/led_components/led_panel.cpp
src/led_components/led_segment.cpp
src/led_components/led_animations_queue.cpp)
Expand All @@ -173,7 +176,7 @@ if(BUILD_TESTING)

ament_add_gtest(
${PROJECT_NAME}_test_led_animations_queue
test/led_components/test_led_animations_queue.cpp
test/unit/led_components/test_led_animations_queue.cpp
src/led_components/led_panel.cpp
src/led_components/led_segment.cpp
src/led_components/led_animations_queue.cpp)
Expand All @@ -185,16 +188,17 @@ if(BUILD_TESTING)
panther_utils rclcpp)
target_link_libraries(${PROJECT_NAME}_test_led_animations_queue yaml-cpp)

ament_add_gmock(${PROJECT_NAME}_test_apa102 test/test_apa102.cpp
ament_add_gmock(${PROJECT_NAME}_test_apa102 test/unit/test_apa102.cpp
src/apa102.cpp)
target_include_directories(
${PROJECT_NAME}_test_apa102
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)

ament_add_gmock(
${PROJECT_NAME}_test_lights_driver_node test/test_lights_driver_node.cpp
src/apa102.cpp src/lights_driver_node.cpp)
${PROJECT_NAME}_test_lights_driver_node
test/unit/test_lights_driver_node.cpp src/apa102.cpp
src/lights_driver_node.cpp)
target_include_directories(
${PROJECT_NAME}_test_lights_driver_node
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
Expand All @@ -206,7 +210,7 @@ if(BUILD_TESTING)

ament_add_gtest(
${PROJECT_NAME}_test_lights_controller_node
test/test_lights_controller_node.cpp
test/unit/test_lights_controller_node.cpp
src/lights_controller_node.cpp
src/led_components/led_segment.cpp
src/led_components/led_panel.cpp
Expand All @@ -221,6 +225,12 @@ if(BUILD_TESTING)
target_link_libraries(${PROJECT_NAME}_test_lights_controller_node
lights_controller_node_component yaml-cpp)

# Integration tests
option(TEST_INTEGRATION "Run integration tests" OFF)
if(TEST_INTEGRATION)
add_ros_test(test/integration/panther_lights.test.py)
endif()

endif()

ament_export_include_directories(include)
Expand Down
150 changes: 150 additions & 0 deletions panther_lights/test/integration/panther_lights.test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#!/usr/bin/env python3

# Copyright 2024 Husarion sp. z o.o.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import unittest

import launch
import launch_testing
import panther_utils.integration_test_utils as test_utils
import rclpy
import rclpy.qos
from diagnostic_msgs.msg import DiagnosticArray
from launch import LaunchDescription
from launch.substitutions import PathJoinSubstitution
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
from launch_testing_ros import WaitForTopics
from panther_msgs.msg import LEDAnimation
from panther_msgs.srv import SetLEDAnimation
from sensor_msgs.msg import Image
from std_srvs.srv import SetBool


def generate_test_description():

led_config_file = (
PathJoinSubstitution([FindPackageShare("panther_lights"), "config", "led_config.yaml"]),
)

lights_controller_node = Node(
package="panther_lights",
executable="lights_controller_node",
parameters=[{"led_config_file": led_config_file}],
)

lights_driver_node = Node(
package="panther_lights",
executable="lights_driver_node",
)

# Start test after 1s
delay_timer = launch.actions.TimerAction(
period=1.0, actions=[launch_testing.actions.ReadyToTest()]
)

actions = [lights_controller_node, lights_driver_node, delay_timer]

context = {}

return (
LaunchDescription(actions),
context,
)


class TestNodesIntegration(unittest.TestCase):

@classmethod
def setUpClass(cls):
rclpy.init()

@classmethod
def tearDownClass(cls):
rclpy.shutdown()

def setUp(self):
self._test_node = rclpy.create_node("test_node")
self._led_control_requested = None

self._led_control_enable_srv = self._test_node.create_service(
srv_type=SetBool,
srv_name="hardware/led_control_enable",
callback=self._led_control_enable_cb,
qos_profile=rclpy.qos.qos_profile_services_default,
)

self._set_led_animation_client = self._test_node.create_client(
srv_type=SetLEDAnimation,
srv_name="lights/set_animation",
qos_profile=rclpy.qos.qos_profile_services_default,
)

def tearDown(self):
self._test_node.destroy_node()

def _led_control_enable_cb(self, request, response):
self._led_control_requested = request.data
response.success = True
response.message = "LED control enabled"
return response

def test_initialization(self, proc_output):
rclpy.spin_until_future_complete(self._test_node, rclpy.task.Future(), timeout_sec=2.0)

self.assertTrue(self._led_control_requested)

# Controller initialization
proc_output.assertWaitFor("[lights_controller]: Loaded default animations.")
proc_output.assertWaitFor("[lights_controller]: Initialized successfully.")
# Driver initialization
proc_output.assertWaitFor("[lights_driver]: Node constructed successfully.")
proc_output.assertWaitFor("[lights_driver]: LED control granted.")

def _request_error_animation(self):
request = SetLEDAnimation.Request()
request.animation = LEDAnimation(id=LEDAnimation.ERROR)
request.repeating = False

self._set_led_animation_client.wait_for_service(timeout_sec=1.0)
self._set_led_animation_client.call_async(request)

def test_msg_publishers(self):
self._request_error_animation()

topic_list = [
("lights/channel_1_frame", Image),
("lights/channel_2_frame", Image),
("diagnostics", DiagnosticArray),
]

with WaitForTopics(topic_list, timeout=5.0) as wait_for_topics:
received_topics_str = ", ".join(wait_for_topics.topics_received())
print("Received messages from the following topics: [" + received_topics_str + "]")

def test_msg_subscribers(self):
node_info = test_utils.get_node_info("/lights_driver")

self.assertTrue("/lights/channel_1_frame" in node_info.subscribers)
self.assertTrue("/lights/channel_2_frame" in node_info.subscribers)


@launch_testing.post_shutdown_test()
class TestProcessOutput(unittest.TestCase):

def test_exit_code(self, proc_info):
# Check that process exits with code 0: no error
launch_testing.asserts.assertExitCodes(proc_info)
File renamed without changes.
81 changes: 81 additions & 0 deletions panther_utils/panther_utils/integration_test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#!/usr/bin/env python3

# Copyright 2024 Husarion sp. z o.o.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import subprocess
from typing import List


class ROSNodeInfo:
"""
Class representing the ROS node info.
"""

def __init__(self):
self.subscribers: List[str] = []
self.publishers: List[str] = []
self.service_servers: List[str] = []
self.service_clients: List[str] = []
self.action_servers: List[str] = []
self.action_clients: List[str] = []


def get_node_info(node_name: str) -> ROSNodeInfo:
"""
Executes the command 'ros2 node info <node_name>' and returns the ROSNodeInfo object.
Args:
node_name (str): The name of the ROS 2 node to get information about.
Returns:
ROSNodeInfo: An object representing a complete node info.
Raises:
RuntimeError: If the command execution fails.
"""
node_info = ROSNodeInfo()

section_map = {
"Subscribers:": node_info.subscribers,
"Publishers:": node_info.publishers,
"Service Servers:": node_info.service_servers,
"Service Clients:": node_info.service_clients,
"Action Servers:": node_info.action_servers,
"Action Clients:": node_info.action_clients,
}

try:
# Execute the `ros2 node info` command
result = subprocess.run(
["ros2", "node", "info", node_name], capture_output=True, text=True, check=True
)

# Parse the output
lines = result.stdout.splitlines()
current_section = None

for line in lines:
line = line.strip()
if line in section_map:
current_section = section_map[line]
elif line and current_section is not None:
current_section.append(line.split(":")[0].strip())
else:
current_section = None

except subprocess.CalledProcessError as e:
raise RuntimeError(f"Error executing command: {e}. stderr: {e.stderr}") from e

return node_info

0 comments on commit fb78242

Please sign in to comment.