Skip to content

Commit

Permalink
Move load_parameter_file implementation to api, add test_verb_load
Browse files Browse the repository at this point in the history
Signed-off-by: Victor Lopez <[email protected]>
  • Loading branch information
Victor Lopez committed Feb 9, 2021
1 parent 857724d commit 6e06195
Show file tree
Hide file tree
Showing 3 changed files with 305 additions and 29 deletions.
19 changes: 19 additions & 0 deletions ros2param/ros2param/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,25 @@ def load_parameter_dict(*, node, node_name, parameter_dict):
print(msg, file=sys.stderr)


def load_parameter_file(*, node, node_name, parameter_file):
# Remove leading slash and namespaces
internal_node_name = node_name.split('/')[-1]
with open(parameter_file, "r") as f:
param_file = yaml.safe_load(f)
if internal_node_name not in param_file:
raise RuntimeError("Param file doesn't contain parameters for {}, "
" only for namespaces: {}" .format(internal_node_name,
param_file.keys()))

value = param_file[internal_node_name]
if type(value) != dict or "ros__parameters" not in value:
raise RuntimeError("Invalid structure of parameter file in namespace {}"
"expected same format as provided by ros2 param dump"
.format(internal_node_name))
load_parameter_dict(node=node, node_name=node_name,
parameter_dict=value["ros__parameters"])


def call_describe_parameters(*, node, node_name, parameter_names=None):
# create client
client = node.create_client(
Expand Down
31 changes: 2 additions & 29 deletions ros2param/ros2param/verb/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
from ros2node.api import get_absolute_node_name
from ros2node.api import get_node_names
from ros2node.api import NodeNameCompleter
from ros2param.api import load_parameter_dict
from ros2param.api import load_parameter_file
from ros2param.verb import VerbExtension

import yaml


class LoadVerb(VerbExtension):
"""Load parameter file for a node."""
Expand All @@ -38,9 +36,6 @@ def add_arguments(self, parser, cli_name): # noqa: D102
help='Consider hidden nodes as well')
arg = parser.add_argument(
'parameter_file', help='Parameter file')
parser.add_argument(
'--use-wildcard', action='store_true',
help='Load parameters in the \'/**\' namespace into the node')

def main(self, *, args): # noqa: D102
with NodeStrategy(args) as node:
Expand All @@ -50,28 +45,6 @@ def main(self, *, args): # noqa: D102
node_name = get_absolute_node_name(args.node_name)
if node_name not in {n.full_name for n in node_names}:
return 'Node not found'
# Remove leading slash
node_namespace = node_name[1:]

with DirectNode(args) as node:
with open(args.parameter_file, "r") as f:
param_file = yaml.safe_load(f)
param_namespaces = []
if args.use_wildcard and "/**" in param_file:
param_namespaces.append("/**")
if node_namespace in param_file:
param_namespaces.append(node_namespace)

if param_namespaces == []:
raise RuntimeError("Param file doesn't contain parameters for {}, "
" only for namespaces: {}" .format(node_namespace,
param_file.keys()))

for ns in param_namespaces:
value = param_file[ns]
if type(value) != dict or "ros__parameters" not in value:
raise RuntimeError("Invalid structure of parameter file in namespace {}"
"expected same format as provided by ros2 param dump"
.format(ns))
load_parameter_dict(node=node, node_name=node_name,
parameter_dict=value["ros__parameters"])
load_parameter_file(node=node, node_name=node_name, parameter_file=args.parameter_file)
284 changes: 284 additions & 0 deletions ros2param/test/test_verb_load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
# Copyright 2021 PAL Robotics S.L.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib
import os
import sys
import tempfile
import time
import unittest
import xmlrpc

from launch import LaunchDescription
from launch.actions import ExecuteProcess
from launch_ros.actions import Node
import launch_testing
import launch_testing.actions
import launch_testing.asserts
import launch_testing.markers
import launch_testing.tools
import launch_testing_ros.tools

import pytest

import rclpy
from rclpy.utilities import get_available_rmw_implementations

from ros2cli.node.strategy import NodeStrategy

TEST_NODE = 'test_node'
TEST_NAMESPACE = '/foo'

TEST_TIMEOUT = 20.0

INPUT_PARAMETER_FILE = (
f'{TEST_NODE}:\n'
' ros__parameters:\n'
' bool_array_param:\n'
' - true\n'
' - false\n'
' - false\n'
' bool_param: false\n'
' double_array_param:\n'
' - 2.125\n'
' - 1.25\n'
' - 2.5\n'
' double_param: 1.3\n'
' foo:\n'
' bar:\n'
' str_param: foo_bar\n'
' str_param: foobar\n'
' int_array_param:\n'
' - 42\n'
' - 3\n'
' - 3\n'
' int_param: -42\n'
' str_array_param:\n'
' - a_foo\n'
' - a_bar\n'
' - baz\n'
' str_param: Bye World\n'
' use_sim_time: false\n'
)

# Skip cli tests on Windows while they exhibit pathological behavior
# https://github.com/ros2/build_farmer/issues/248
if sys.platform.startswith('win'):
pytest.skip(
'CLI tests can block for a pathological amount of time on Windows.',
allow_module_level=True)


@pytest.mark.rostest
@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations())
def generate_test_description(rmw_implementation):
path_to_fixtures = os.path.join(os.path.dirname(__file__), 'fixtures')
additional_env = {'RMW_IMPLEMENTATION': rmw_implementation}

# Parameter node test fixture
path_to_parameter_node_script = os.path.join(path_to_fixtures, 'parameter_node.py')
parameter_node = Node(
executable=sys.executable,
name=TEST_NODE,
namespace=TEST_NAMESPACE,
arguments=[path_to_parameter_node_script],
additional_env=additional_env
)

return LaunchDescription([
# TODO(jacobperron): Provide a common RestartCliDaemon launch action in ros2cli
ExecuteProcess(
cmd=['ros2', 'daemon', 'stop'],
name='daemon-stop',
on_exit=[
ExecuteProcess(
cmd=['ros2', 'daemon', 'start'],
name='daemon-start',
on_exit=[
parameter_node,
launch_testing.actions.ReadyToTest(),
],
additional_env=additional_env
)
]
),
])


class TestVerbDump(unittest.TestCase):

@classmethod
def setUpClass(
cls,
launch_service,
proc_info,
proc_output,
rmw_implementation
):
rmw_implementation_filter = launch_testing_ros.tools.basic_output_filter(
filtered_rmw_implementation=rmw_implementation
)

@contextlib.contextmanager
def launch_param_load_command(self, arguments):
param_load_command_action = ExecuteProcess(
cmd=['ros2', 'param', 'load', *arguments],
additional_env={
'RMW_IMPLEMENTATION': rmw_implementation,
},
name='ros2param-load-cli',
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, param_load_command_action, proc_info, proc_output,
output_filter=rmw_implementation_filter
) as param_load_command:
yield param_load_command
cls.launch_param_load_command = launch_param_load_command

@contextlib.contextmanager
def launch_param_dump_command(self, arguments):
param_dump_command_action = ExecuteProcess(
cmd=['ros2', 'param', 'dump', *arguments],
additional_env={
'RMW_IMPLEMENTATION': rmw_implementation,
},
name='ros2param-dump-cli',
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, param_dump_command_action, proc_info, proc_output,
output_filter=rmw_implementation_filter
) as param_dump_command:
yield param_dump_command
cls.launch_param_dump_command = launch_param_dump_command

def setUp(self):
# Ensure the daemon node is running and discovers the test node
start_time = time.time()
timed_out = True
with NodeStrategy(None) as node:
while (time.time() - start_time) < TEST_TIMEOUT:
# TODO(jacobperron): Create a generic 'CliNodeError' so we can treat errors
# from DirectNode and DaemonNode the same
try:
services = node.get_service_names_and_types_by_node(TEST_NODE, TEST_NAMESPACE)
except rclpy.node.NodeNameNonExistentError:
continue
except xmlrpc.client.Fault as e:
if 'NodeNameNonExistentError' in e.faultString:
continue
raise

service_names = [name_type_tuple[0] for name_type_tuple in services]
if (
len(service_names) > 0
and f'{TEST_NAMESPACE}/{TEST_NODE}/get_parameters' in service_names
):
timed_out = False
break
if timed_out:
self.fail(f'CLI daemon failed to find test node after {TEST_TIMEOUT} seconds')

def _write_param_file(self, tmpdir, filename):
yaml_path = os.path.join(tmpdir, filename)
with open(yaml_path, 'w') as f:
f.write(INPUT_PARAMETER_FILE)
return yaml_path

def _output_file(self):
return f'{TEST_NAMESPACE}/{TEST_NODE}'.lstrip('/').replace('/', '__') + '.yaml'

def test_verb_load_missing_args(self):
with self.launch_param_load_command(arguments=[]) as param_load_command:
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_load_command.exit_code != launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=['ros2 param load: error: the following arguments are required: '
'node_name, parameter_file'],
text=param_load_command.output,
strict=False
)
with self.launch_param_load_command(arguments=['some_node']) as param_load_command:
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_load_command.exit_code != launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=['ros2 param load: error: the following arguments are required: '
'parameter_file'],
text=param_load_command.output,
strict=False
)

# TODO
def test_verb_load_invalid_node(self):
with tempfile.TemporaryDirectory() as tmpdir:
filepath = self._write_param_file(tmpdir, 'params.yaml')
with self.launch_param_load_command(
arguments=['invalid_node', filepath]
) as param_load_command:
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_load_command.exit_code != launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=['Node not found'],
text=param_load_command.output,
strict=True
)
with self.launch_param_load_command(
arguments=[f'invalid_ns/{TEST_NODE}', filepath]
) as param_load_command:
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_load_command.exit_code != launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=['Node not found'],
text=param_load_command.output,
strict=True
)

# TODO
def test_verb_load_invalid_path(self):
with self.launch_param_load_command(
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', 'invalid_path']
) as param_load_command:
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_load_command.exit_code != launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=['No such file or directory'],
text=param_load_command.output,
strict=False
)

def test_verb_load(self):
with tempfile.TemporaryDirectory() as tmpdir:
filepath = self._write_param_file(tmpdir, 'params.yaml')
with self.launch_param_load_command(
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', filepath]
) as param_load_command:
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_load_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[''],
text=param_load_command.output,
strict=True
)
# Dump with ros2 param dump and compare that output matches input file
with self.launch_param_dump_command(
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', '--print']
) as param_dump_command:
assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_dump_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_text=INPUT_PARAMETER_FILE + '\n',
text=param_dump_command.output,
strict=True
)

0 comments on commit 6e06195

Please sign in to comment.