Skip to content

Commit

Permalink
Add tests for new QoS getters from raw XML (#189)
Browse files Browse the repository at this point in the history
* Refs #21745. Remove zombie file

Signed-off-by: Juan Lopez Fernandez <[email protected]>

* Refs #21745. Add tests for new API

Signed-off-by: Juan Lopez Fernandez <[email protected]>

* Refs #21745. Apply suggestions

Signed-off-by: Juan Lopez Fernandez <[email protected]>

---------

Signed-off-by: Juan Lopez Fernandez <[email protected]>
  • Loading branch information
juanlofer-eprosima authored Oct 14, 2024
1 parent 9a18286 commit e89a029
Show file tree
Hide file tree
Showing 6 changed files with 529 additions and 47 deletions.
47 changes: 0 additions & 47 deletions fastdds_python/test/api/DEFAULT_FASTRTPS_PROFILES.xml

This file was deleted.

245 changes: 245 additions & 0 deletions fastdds_python/test/api/test_domainparticipant.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,3 +796,248 @@ def second_participant():
factory.delete_participant(participant))
assert(fastdds.RETCODE_OK ==
factory.delete_participant(participant2))

def test_get_publisher_qos_from_xml():

with open("test_xml_profile.xml", "r", encoding="utf-8") as file:
xml_content = file.read()

factory = fastdds.DomainParticipantFactory.get_instance()
participant = factory.create_participant(
0, fastdds.PARTICIPANT_QOS_DEFAULT)

qos = fastdds.PublisherQos()
ret = participant.get_publisher_qos_from_xml(
xml_content, qos, 'test_publisher_profile')
assert(fastdds.RETCODE_OK == ret)

qos_no_name = fastdds.PublisherQos()
ret = participant.get_publisher_qos_from_xml(
xml_content, qos_no_name)
assert(fastdds.RETCODE_OK == ret)

# Non matching name takes the first publisher found (the only one)
assert(qos == qos_no_name)

assert(fastdds.RETCODE_OK ==
factory.delete_participant(participant))

def test_get_default_publisher_qos_from_xml():

with open("test_xml_profile.xml", "r", encoding="utf-8") as file:
xml_content = file.read()

factory = fastdds.DomainParticipantFactory.get_instance()
participant = factory.create_participant(
0, fastdds.PARTICIPANT_QOS_DEFAULT)

default_qos = fastdds.PublisherQos()
ret = participant.get_default_publisher_qos_from_xml(
xml_content, default_qos)
assert(fastdds.RETCODE_OK == ret)

qos = fastdds.PublisherQos()
ret = participant.get_publisher_qos_from_xml(
xml_content, qos, 'test_publisher_profile')
assert(fastdds.RETCODE_OK == ret)

assert(default_qos == qos)

assert(fastdds.RETCODE_OK ==
factory.delete_participant(participant))

def test_get_subscriber_qos_from_xml():

with open("test_xml_profile.xml", "r", encoding="utf-8") as file:
xml_content = file.read()

factory = fastdds.DomainParticipantFactory.get_instance()
participant = factory.create_participant(
0, fastdds.PARTICIPANT_QOS_DEFAULT)

qos = fastdds.SubscriberQos()
ret = participant.get_subscriber_qos_from_xml(
xml_content, qos, 'test_subscriber_profile')
assert(fastdds.RETCODE_OK == ret)

qos_no_name = fastdds.SubscriberQos()
ret = participant.get_subscriber_qos_from_xml(
xml_content, qos_no_name)
assert(fastdds.RETCODE_OK == ret)

# Non matching name takes the first subscriber found (the only one)
assert(qos == qos_no_name)

assert(fastdds.RETCODE_OK ==
factory.delete_participant(participant))

def test_get_default_subscriber_qos_from_xml():

with open("test_xml_profile.xml", "r", encoding="utf-8") as file:
xml_content = file.read()

factory = fastdds.DomainParticipantFactory.get_instance()
participant = factory.create_participant(
0, fastdds.PARTICIPANT_QOS_DEFAULT)

default_qos = fastdds.SubscriberQos()
ret = participant.get_default_subscriber_qos_from_xml(
xml_content, default_qos)
assert(fastdds.RETCODE_OK == ret)

qos = fastdds.SubscriberQos()
ret = participant.get_subscriber_qos_from_xml(
xml_content, qos, 'test_subscriber_profile')
assert(fastdds.RETCODE_OK == ret)

assert(default_qos == qos)

assert(fastdds.RETCODE_OK ==
factory.delete_participant(participant))

def test_get_topic_qos_from_xml():

with open("test_xml_profile.xml", "r", encoding="utf-8") as file:
xml_content = file.read()

factory = fastdds.DomainParticipantFactory.get_instance()
participant = factory.create_participant(
0, fastdds.PARTICIPANT_QOS_DEFAULT)

qos = fastdds.TopicQos()
ret = participant.get_topic_qos_from_xml(
xml_content, qos, 'test_topic_profile')
assert(fastdds.RETCODE_OK == ret)

qos_no_name = fastdds.TopicQos()
ret = participant.get_topic_qos_from_xml(
xml_content, qos_no_name)
assert(fastdds.RETCODE_OK == ret)

# Non matching name takes the first topic found (the only one)
assert(qos == qos_no_name)

assert(fastdds.RETCODE_OK ==
factory.delete_participant(participant))

def test_get_default_topic_qos_from_xml():

with open("test_xml_profile.xml", "r", encoding="utf-8") as file:
xml_content = file.read()

factory = fastdds.DomainParticipantFactory.get_instance()
participant = factory.create_participant(
0, fastdds.PARTICIPANT_QOS_DEFAULT)

default_qos = fastdds.TopicQos()
ret = participant.get_default_topic_qos_from_xml(
xml_content, default_qos)
assert(fastdds.RETCODE_OK == ret)

qos = fastdds.TopicQos()
ret = participant.get_topic_qos_from_xml(
xml_content, qos, 'test_topic_profile')
assert(fastdds.RETCODE_OK == ret)

assert(default_qos == qos)

assert(fastdds.RETCODE_OK ==
factory.delete_participant(participant))

def test_get_requester_qos_from_xml():

with open("test_xml_profile.xml", "r", encoding="utf-8") as file:
xml_content = file.read()

factory = fastdds.DomainParticipantFactory.get_instance()
participant = factory.create_participant(
0, fastdds.PARTICIPANT_QOS_DEFAULT)

qos = fastdds.RequesterQos()
ret = participant.get_requester_qos_from_xml(
xml_content, qos, 'test_requester_profile')
assert(fastdds.RETCODE_OK == ret)

qos_no_name = fastdds.RequesterQos()
ret = participant.get_requester_qos_from_xml(
xml_content, qos_no_name)
assert(fastdds.RETCODE_OK == ret)

# Non matching name takes the first requester found (the only one)
assert(qos == qos_no_name)

assert(fastdds.RETCODE_OK ==
factory.delete_participant(participant))

def test_get_default_requester_qos_from_xml():

with open("test_xml_profile.xml", "r", encoding="utf-8") as file:
xml_content = file.read()

factory = fastdds.DomainParticipantFactory.get_instance()
participant = factory.create_participant(
0, fastdds.PARTICIPANT_QOS_DEFAULT)

default_qos = fastdds.RequesterQos()
ret = participant.get_default_requester_qos_from_xml(
xml_content, default_qos)
assert(fastdds.RETCODE_OK == ret)

qos = fastdds.RequesterQos()
ret = participant.get_requester_qos_from_xml(
xml_content, qos, 'test_requester_profile')
assert(fastdds.RETCODE_OK == ret)

assert(default_qos == qos)

assert(fastdds.RETCODE_OK ==
factory.delete_participant(participant))

def test_get_replier_qos_from_xml():

with open("test_xml_profile.xml", "r", encoding="utf-8") as file:
xml_content = file.read()

factory = fastdds.DomainParticipantFactory.get_instance()
participant = factory.create_participant(
0, fastdds.PARTICIPANT_QOS_DEFAULT)

qos = fastdds.ReplierQos()
ret = participant.get_replier_qos_from_xml(
xml_content, qos, 'test_replier_profile')
assert(fastdds.RETCODE_OK == ret)

qos_no_name = fastdds.ReplierQos()
ret = participant.get_replier_qos_from_xml(
xml_content, qos_no_name)
assert(fastdds.RETCODE_OK == ret)

# Non matching name takes the first replier found (the only one)
assert(qos == qos_no_name)

assert(fastdds.RETCODE_OK ==
factory.delete_participant(participant))

def test_get_default_replier_qos_from_xml():

with open("test_xml_profile.xml", "r", encoding="utf-8") as file:
xml_content = file.read()

factory = fastdds.DomainParticipantFactory.get_instance()
participant = factory.create_participant(
0, fastdds.PARTICIPANT_QOS_DEFAULT)

default_qos = fastdds.ReplierQos()
ret = participant.get_default_replier_qos_from_xml(
xml_content, default_qos)
assert(fastdds.RETCODE_OK == ret)

qos = fastdds.ReplierQos()
ret = participant.get_replier_qos_from_xml(
xml_content, qos, 'test_replier_profile')
assert(fastdds.RETCODE_OK == ret)

assert(default_qos == qos)

assert(fastdds.RETCODE_OK ==
factory.delete_participant(participant))
78 changes: 78 additions & 0 deletions fastdds_python/test/api/test_domainparticipantfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,81 @@ def test(status_mask, listnr=None):
fastdds.StatusMask.sample_rejected() <<
fastdds.StatusMask.subscription_matched(),
listener)

def test_get_participant_qos_from_xml():

with open("test_xml_profile.xml", "r", encoding="utf-8") as file:
xml_content = file.read()

factory = fastdds.DomainParticipantFactory.get_instance()

qos = fastdds.DomainParticipantQos()
ret = factory.get_participant_qos_from_xml(
xml_content, qos, 'test_participant_profile')
assert(fastdds.RETCODE_OK == ret)

qos_no_name = fastdds.DomainParticipantQos()
ret = factory.get_participant_qos_from_xml(
xml_content, qos_no_name)
assert(fastdds.RETCODE_OK == ret)

# Non matching name takes the first participant found (the only one)
assert(qos == qos_no_name)

def test_get_default_participant_qos_from_xml():

with open("test_xml_profile.xml", "r", encoding="utf-8") as file:
xml_content = file.read()

factory = fastdds.DomainParticipantFactory.get_instance()

default_qos = fastdds.DomainParticipantQos()
ret = factory.get_default_participant_qos_from_xml(
xml_content, default_qos)
assert(fastdds.RETCODE_OK == ret)

qos = fastdds.DomainParticipantQos()
ret = factory.get_participant_qos_from_xml(
xml_content, qos, 'test_participant_profile')
assert(fastdds.RETCODE_OK == ret)

assert(default_qos == qos)

def test_get_participant_extended_qos_from_xml():

with open("test_xml_profile.xml", "r", encoding="utf-8") as file:
xml_content = file.read()

factory = fastdds.DomainParticipantFactory.get_instance()

qos = fastdds.DomainParticipantExtendedQos()
ret = factory.get_participant_extended_qos_from_xml(
xml_content, qos, 'test_participant_profile')
assert(fastdds.RETCODE_OK == ret)

qos_no_name = fastdds.DomainParticipantExtendedQos()
ret = factory.get_participant_extended_qos_from_xml(
xml_content, qos_no_name)
assert(fastdds.RETCODE_OK == ret)

# Non matching name takes the first participant found (the only one)
assert(qos == qos_no_name)

def test_get_default_participant_extended_qos_from_xml():

with open("test_xml_profile.xml", "r", encoding="utf-8") as file:
xml_content = file.read()

factory = fastdds.DomainParticipantFactory.get_instance()

default_qos = fastdds.DomainParticipantExtendedQos()
ret = factory.get_default_participant_extended_qos_from_xml(
xml_content, default_qos)
assert(fastdds.RETCODE_OK == ret)

qos = fastdds.DomainParticipantExtendedQos()
ret = factory.get_participant_extended_qos_from_xml(
xml_content, qos, 'test_participant_profile')
assert(fastdds.RETCODE_OK == ret)

assert(default_qos == qos)
Loading

0 comments on commit e89a029

Please sign in to comment.