diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp index f09caf33..55a3a0b9 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp @@ -33,10 +33,6 @@ #include "rmw/rmw.h" #include "rosidl_typesupport_cpp/message_type_support.hpp" -#ifndef DDS_GUID_INITIALIZER -#define DDS_GUID_INITIALIZER DDS_GUID_DEFAULT -#endif /* DDS_GUID_INITIALIZER */ - class RMW_Connext_MessageTypeSupport; class RMW_Connext_Publisher; class RMW_Connext_Subscriber; @@ -296,9 +292,4 @@ rmw_connextdds_get_cft_filter_expression( rcutils_allocator_t * const allocator, rmw_subscription_content_filter_options_t * const options); -rmw_ret_t -rmw_connextdds_guid_to_instance_handle( - const struct DDS_GUID_t * const guid, - DDS_InstanceHandle_t * const instanceHandle); - #endif // RMW_CONNEXTDDS__DDS_API_HPP_ diff --git a/rmw_connextdds_common/include/rmw_connextdds/resource_limits.hpp b/rmw_connextdds_common/include/rmw_connextdds/resource_limits.hpp index f7bd4ae9..f091e2b2 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/resource_limits.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/resource_limits.hpp @@ -55,13 +55,4 @@ #define RMW_CONNEXT_LIMIT_KEEP_ALL_SAMPLES 1000 #endif /* RMW_CONNEXT_LIMIT_SAMPLES_MAX */ -#ifndef RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_DEFAULT -#define RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_DEFAULT 100000 /* 100ms */ -#endif /* RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_DEFAULT */ - -#ifndef RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_INFINITE -#define RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_INFINITE (31536000ull * 1000000ull) /* 1 year */ -#endif /* RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_INFINITE */ - - #endif // RMW_CONNEXTDDS__RESOURCE_LIMITS_HPP_ diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index 943d5f8b..56fffdad 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -21,7 +21,6 @@ #include #include #include -#include #include "rmw_connextdds/context.hpp" #include "rmw_connextdds/type_support.hpp" @@ -71,28 +70,6 @@ bool rmw_connextdds_find_string_in_list( DDS_Duration_t rmw_connextdds_duration_from_ros_time( const rmw_time_t * const ros_time); -class RMW_Connext_OrderedGid -{ -public: - explicit RMW_Connext_OrderedGid(const rmw_gid_t & value) - : value(value) - { - } - - bool operator<(const RMW_Connext_OrderedGid & other) const - { - return memcmp(value.data, other.value.data, RMW_GID_STORAGE_SIZE) < 0; - } - - bool operator==(const RMW_Connext_OrderedGid & other) const - { - return memcmp(value.data, other.value.data, RMW_GID_STORAGE_SIZE) == 0; - } - -private: - rmw_gid_t value; -}; - /****************************************************************************** * WaitSet wrapper ******************************************************************************/ @@ -180,7 +157,7 @@ class RMW_Connext_Publisher RMW_Connext_WriteParams * const params); rmw_ret_t - enable() + enable() const { if (DDS_RETCODE_OK != DDS_Entity_enable( @@ -206,8 +183,6 @@ class RMW_Connext_Publisher return RMW_RET_ERROR; } - max_blocking_time = load_max_blocking_time(); - return RMW_RET_OK; } @@ -240,12 +215,6 @@ class RMW_Connext_Publisher DDS_SampleIdentity_t * const sample_identity, DDS_SampleIdentity_t * const related_sample_identity); - rmw_ret_t - wait_for_subscription( - rmw_gid_t & reader_gid, - bool & unmatched, - rmw_gid_t & related_writer_gid); - DDS_Topic * dds_topic() const { return DDS_DataWriter_get_topic(this->dds_writer); @@ -263,82 +232,19 @@ class RMW_Connext_Publisher return DDS_Publisher_get_participant(pub); } - void - push_related_endpoints(const rmw_gid_t & endpoint, const rmw_gid_t & related) - { - std::lock_guard lock(matched_mutex); - known_endpoints.emplace(RMW_Connext_OrderedGid(endpoint), related); - known_endpoints.emplace(RMW_Connext_OrderedGid(related), endpoint); - } - - void - on_publication_matched( - const DDS_PublicationMatchedStatus * const status) - { - std::lock_guard lock(matched_mutex); - DDS_ReturnCode_t dds_rc = - DDS_DataWriter_get_matched_subscriptions(writer(), &matched_subscriptions); - if (DDS_RETCODE_OK != dds_rc) { - RMW_CONNEXT_LOG_ERROR_A_SET("failed to list matched subscriptions: dds_rc=%d", dds_rc) - } - if (status->current_count_change < 0) { - rmw_gid_t unmatched_gid; - rmw_connextdds_ih_to_gid(status->last_subscription_handle, unmatched_gid); - pop_related_endpoints(unmatched_gid); - } - matched_cv.notify_all(); - } - - void - on_related_subscription_matched( - RMW_Connext_Subscriber * const sub, - const DDS_SubscriptionMatchedStatus * const status) - { - UNUSED_ARG(sub); - std::lock_guard lock(matched_mutex); - if (status->current_count_change < 0) { - rmw_gid_t unmatched_gid; - rmw_connextdds_ih_to_gid(status->last_publication_handle, unmatched_gid); - pop_related_endpoints(unmatched_gid); - } - matched_cv.notify_all(); - } - - rmw_context_impl_t * const ctx; - - ~RMW_Connext_Publisher(); - private: + rmw_context_impl_t * ctx; DDS_DataWriter * dds_writer; RMW_Connext_MessageTypeSupport * type_support; const bool created_topic; rmw_gid_t ros_gid; RMW_Connext_PublisherStatusCondition status_condition; - std::mutex matched_mutex; - std::condition_variable matched_cv; - std::chrono::microseconds max_blocking_time; - std::map known_endpoints; - DDS_InstanceHandleSeq matched_subscriptions; RMW_Connext_Publisher( rmw_context_impl_t * const ctx, DDS_DataWriter * const dds_writer, RMW_Connext_MessageTypeSupport * const type_support, const bool created_topic); - - void - pop_related_endpoints(const rmw_gid_t & endpoint) - { - auto endpoint_entry = known_endpoints.find(RMW_Connext_OrderedGid(endpoint)); - if (endpoint_entry == known_endpoints.end()) { - return; - } - known_endpoints.erase(RMW_Connext_OrderedGid(endpoint)); - known_endpoints.erase(RMW_Connext_OrderedGid(endpoint_entry->second)); - } - - std::chrono::microseconds - load_max_blocking_time() const; }; rmw_publisher_t * @@ -380,8 +286,7 @@ class RMW_Connext_Subscriber const bool intro_members_cpp = false, std::string * const type_name = nullptr, const char * const cft_name = nullptr, - const char * const cft_filter = nullptr, - RMW_Connext_Publisher * const related_pub = nullptr); + const char * const cft_filter = nullptr); rmw_ret_t finalize(); @@ -616,7 +521,7 @@ class RMW_Connext_Subscriber const bool ignore_local; private: - rmw_context_impl_t * const ctx; + rmw_context_impl_t * ctx; DDS_DataReader * dds_reader; DDS_Topic * dds_topic; DDS_TopicDescription * dds_topic_cft; @@ -630,7 +535,6 @@ class RMW_Connext_Subscriber size_t loan_len; size_t loan_next; std::mutex loan_mutex; - RMW_Connext_Publisher * const related_pub; RMW_Connext_Subscriber( rmw_context_impl_t * const ctx, @@ -641,8 +545,7 @@ class RMW_Connext_Subscriber const bool created_topic, DDS_TopicDescription * const dds_topic_cft, const char * const cft_expression, - const bool internal, - RMW_Connext_Publisher * const related_pub); + const bool internal); friend class RMW_Connext_SubscriberStatusCondition; }; diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp index 878b2698..45bcedf6 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp @@ -680,9 +680,7 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition DDS_DataReader * reader); rmw_ret_t - install( - RMW_Connext_Subscriber * const sub, - RMW_Connext_Publisher * const related_pub = nullptr); + install(RMW_Connext_Subscriber * const sub); void on_requested_deadline_missed( @@ -911,8 +909,7 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition DDS_SampleLostStatus status_sample_lost_last; DDS_SubscriptionMatchedStatus status_matched_last; - RMW_Connext_Subscriber * sub{nullptr}; - RMW_Connext_Publisher * related_pub{nullptr}; + RMW_Connext_Subscriber * sub; std::mutex new_data_event_mutex_; rmw_event_callback_t new_data_event_cb_{nullptr}; diff --git a/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp b/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp index 09b8ae5f..98c3bad2 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp @@ -45,7 +45,6 @@ struct RMW_Connext_RequestReplyMessage { bool request; rmw_gid_t gid; - rmw_gid_t writer_gid; int64_t sn; void * payload; }; diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index a6c82e67..3deb1f14 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -658,8 +658,7 @@ RMW_Connext_Publisher::RMW_Connext_Publisher( dds_writer(dds_writer), type_support(type_support), created_topic(created_topic), - status_condition(dds_writer), - matched_subscriptions(DDS_SEQUENCE_INITIALIZER) + status_condition(dds_writer) { rmw_connextdds_get_entity_gid(this->dds_writer, this->ros_gid); if (RMW_RET_OK != this->status_condition.install(this)) { @@ -668,11 +667,6 @@ RMW_Connext_Publisher::RMW_Connext_Publisher( } } -RMW_Connext_Publisher::~RMW_Connext_Publisher() -{ - DDS_InstanceHandleSeq_finalize(&matched_subscriptions); -} - RMW_Connext_Publisher * RMW_Connext_Publisher::create( rmw_context_impl_t * const ctx, @@ -890,6 +884,10 @@ RMW_Connext_Publisher::finalize() return RMW_RET_OK; } +#ifndef DDS_GUID_INITIALIZER +#define DDS_GUID_INITIALIZER DDS_GUID_DEFAULT +#endif /* DDS_GUID_INITIALIZER */ + rmw_ret_t RMW_Connext_Publisher::requestreply_header_to_dds( const RMW_Connext_RequestReplyMessage * const rr_msg, @@ -1022,97 +1020,6 @@ RMW_Connext_Publisher::qos(rmw_qos_profile_t * const qos) return rc; } -std::chrono::microseconds -RMW_Connext_Publisher::load_max_blocking_time() const -{ -#if !RMW_CONNEXT_DDS_API_PRO_LEGACY - DDS_DataWriterQos dw_qos = DDS_DataWriterQos_INITIALIZER; -#else - DDS_DataWriterQos dw_qos; - if (DDS_RETCODE_OK != DDS_DataWriterQos_initialize(&dw_qos)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to initialize datawriter qos") - return std::chrono::microseconds(0); - } -#endif /* !RMW_CONNEXT_DDS_API_PRO_LEGACY */ - - auto scope_exit_dw_qos_delete = - rcpputils::make_scope_exit( - [&dw_qos]() - { - if (DDS_RETCODE_OK != DDS_DataWriterQos_finalize(&dw_qos)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to finalize DataWriterQoS") - } - }); - - if (DDS_DataWriter_get_qos(writer(), &dw_qos)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to get datawriter qos") - return std::chrono::microseconds(RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_DEFAULT); - } - if (DDS_Duration_is_auto(&dw_qos.reliability.max_blocking_time)) { - return std::chrono::microseconds(RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_DEFAULT); - } - if (DDS_Duration_is_infinite(&dw_qos.reliability.max_blocking_time) || - DDS_Duration_is_auto(&dw_qos.reliability.max_blocking_time)) - { - return std::chrono::microseconds(RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_INFINITE); - } - if (DDS_Duration_is_zero(&dw_qos.reliability.max_blocking_time)) { - return std::chrono::microseconds(0); - } - std::chrono::microseconds max_blocking_time = - std::chrono::duration_cast( - std::chrono::seconds(dw_qos.reliability.max_blocking_time.sec) + - std::chrono::nanoseconds(dw_qos.reliability.max_blocking_time.nanosec)); - - return max_blocking_time; -} - -rmw_ret_t -RMW_Connext_Publisher::wait_for_subscription( - rmw_gid_t & reader_gid, - bool & unmatched, - rmw_gid_t & related_writer_gid) -{ - unmatched = false; - - struct DDS_GUID_t reader_guid = DDS_GUID_INITIALIZER; - rmw_ret_t rc = RMW_RET_ERROR; - rc = rmw_connextdds_gid_to_guid(reader_gid, reader_guid); - if (RMW_RET_OK != rc) { - return rc; - } - - std::unique_lock lock(matched_mutex); - auto endpoint_entry = known_endpoints.find(RMW_Connext_OrderedGid(reader_gid)); - if (endpoint_entry == known_endpoints.end()) { - unmatched = true; - return RMW_RET_OK; - } - related_writer_gid = endpoint_entry->second; - - DDS_InstanceHandle_t reader_ih = DDS_HANDLE_NIL; - rc = rmw_connextdds_guid_to_instance_handle(&reader_guid, &reader_ih); - if (RMW_RET_OK != rc) { - return rc; - } - auto done_waiting = [this, &reader_ih]() { - bool matched = false; - const DDS_Long subs_len = DDS_InstanceHandleSeq_get_length(&matched_subscriptions); - for (DDS_Long i = 0; i < subs_len && !matched; i++) { - DDS_InstanceHandle_t * const matched_ih = - DDS_InstanceHandleSeq_get_reference(&matched_subscriptions, i); - matched = DDS_InstanceHandle_compare(matched_ih, &reader_ih) == 0; - } - return matched; - }; - - if (!matched_cv.wait_for(lock, this->max_blocking_time, done_waiting)) { - return RMW_RET_TIMEOUT; - } - - return RMW_RET_OK; -} - rmw_publisher_t * rmw_connextdds_create_publisher( rmw_context_impl_t * const ctx, @@ -1252,8 +1159,7 @@ RMW_Connext_Subscriber::RMW_Connext_Subscriber( const bool created_topic, DDS_TopicDescription * const dds_topic_cft, const char * const cft_expression, - const bool internal, - RMW_Connext_Publisher * const related_pub) + const bool internal) : internal(internal), ignore_local(ignore_local), ctx(ctx), @@ -1263,8 +1169,7 @@ RMW_Connext_Subscriber::RMW_Connext_Subscriber( cft_expression(cft_expression), type_support(type_support), created_topic(created_topic), - status_condition(dds_reader, ignore_local, internal), - related_pub(related_pub) + status_condition(dds_reader, ignore_local, internal) { rmw_connextdds_get_entity_gid(this->dds_reader, this->ros_gid); @@ -1275,7 +1180,7 @@ RMW_Connext_Subscriber::RMW_Connext_Subscriber( this->loan_info = def_info_seq; this->loan_len = 0; this->loan_next = 0; - if (RMW_RET_OK != this->status_condition.install(this, this->related_pub)) { + if (RMW_RET_OK != this->status_condition.install(this)) { RMW_CONNEXT_LOG_ERROR("failed to install condition on reader") throw std::runtime_error("failed to install condition on reader"); } @@ -1296,8 +1201,7 @@ RMW_Connext_Subscriber::create( const bool intro_members_cpp, std::string * const type_name, const char * const cft_name, - const char * const cft_filter, - RMW_Connext_Publisher * const related_pub) + const char * const cft_filter) { RMW_Connext_MessageTypeSupport * const type_support = RMW_Connext_MessageTypeSupport::register_type_support( @@ -1486,8 +1390,7 @@ RMW_Connext_Subscriber::create( topic_created, cft_topic, sub_cft_expr, - internal, - related_pub); + internal); if (nullptr == rmw_sub_impl) { RMW_CONNEXT_LOG_ERROR_SET("failed to allocate RMW subscriber") @@ -1764,21 +1667,17 @@ RMW_Connext_Subscriber::requestreply_header_from_dds( { const struct DDS_GUID_t * src_guid = nullptr; const struct DDS_SequenceNumber_t * src_sn = nullptr; - const struct DDS_GUID_t * writer_guid = nullptr; if (rr_msg->request) { - src_guid = &related_sample_identity->writer_guid; + src_guid = &sample_identity->writer_guid; src_sn = &sample_identity->sequence_number; - writer_guid = &sample_identity->writer_guid; } else { src_guid = &related_sample_identity->writer_guid; src_sn = &related_sample_identity->sequence_number; - writer_guid = &sample_identity->writer_guid; } rmw_connextdds_guid_to_gid(*src_guid, rr_msg->gid); rmw_connextdds_sn_dds_to_ros(*src_sn, rr_msg->sn); - rmw_connextdds_guid_to_gid(*writer_guid, rr_msg->writer_gid); } rmw_ret_t @@ -2616,7 +2515,7 @@ RMW_Connext_Client::create( char * cft_name = nullptr, *cft_filter = nullptr; auto scope_exit_cft_name = rcpputils::make_scope_exit( - [&cft_name, &cft_filter]() + [cft_name, cft_filter]() { if (nullptr != cft_name) { DDS_String_free(cft_name); @@ -2763,11 +2662,9 @@ RMW_Connext_Client::take_response( if (taken_msg) { request_header->request_id.sequence_number = rr_msg.sn; - /* (asorbini) although messages are correlated using the reply DataReader's GUID, - we report the writer's GUID to upper layers. */ memcpy( request_header->request_id.writer_guid, - this->request_pub->gid()->data, + rr_msg.gid.data, 16); if (this->ctx->cyclone_compatible) { @@ -2809,15 +2706,10 @@ RMW_Connext_Client::send_request( if (this->ctx->request_reply_mapping == RMW_Connext_RequestReplyMapping::Basic) { *sequence_id = ++this->next_request_id; rr_msg.sn = *sequence_id; - rr_msg.gid = *this->request_pub->gid(); } else { rr_msg.sn = -1; - /* (asorbini) use the reply DataReader's GUID to correlate request and reply, instead of the - request DataWriter's GUID, so that the service may try to wait for the DataReader to be - matched before writing the reply. */ - rr_msg.gid = *this->reply_sub->gid(); } - + rr_msg.gid = *this->request_pub->gid(); rr_msg.payload = const_cast(ros_request); RMW_CONNEXT_LOG_DEBUG_A( @@ -3031,13 +2923,7 @@ RMW_Connext_Service::create( RMW_CONNEXT_MESSAGE_REQUEST, svc_members_req, svc_members_req_cpp, - &request_type, - nullptr /* cft_name */, - nullptr /* cft_filter */, - /* If we are using the extended RPC mapping, then we cache the reply writer - so that we can notify it of "subscription_match" events. */ - (ctx->request_reply_mapping == RMW_Connext_RequestReplyMapping::Extended ? - svc_impl->reply_pub : nullptr)); + &request_type); if (nullptr == svc_impl->request_sub) { RMW_CONNEXT_LOG_ERROR("failed to create service requester") @@ -3051,14 +2937,14 @@ RMW_Connext_Service::create( rmw_ret_t RMW_Connext_Service::enable() { - rmw_ret_t rc = this->request_sub->enable(); + rmw_ret_t rc = this->reply_pub->enable(); if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to enable service's subscription") + RMW_CONNEXT_LOG_ERROR("failed to enable service's publisher") return rc; } - rc = this->reply_pub->enable(); + rc = this->request_sub->enable(); if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to enable service's publisher") + RMW_CONNEXT_LOG_ERROR("failed to enable service's subscription") return rc; } return RMW_RET_OK; @@ -3098,11 +2984,6 @@ RMW_Connext_Service::take_request( request_header->source_timestamp = message_info.source_timestamp; request_header->received_timestamp = message_info.received_timestamp; - if (ctx->request_reply_mapping == RMW_Connext_RequestReplyMapping::Extended) { - /* Cache the writer/reader GUIDs */ - reply_pub->push_related_endpoints(rr_msg.gid, rr_msg.writer_gid); - } - *taken = true; RMW_CONNEXT_LOG_DEBUG_A( @@ -3154,59 +3035,6 @@ RMW_Connext_Service::send_response( reinterpret_cast(rr_msg.gid.data)[3], rr_msg.sn) - /* (asorbini) The following logic tries to partially work around some race conditions that exists - in the way request/reply interactions between clients and services are mapped to DDS topics - and endpoints. - - It is possible for the service's request DataReader to receive a request from the client, - handle it, and try to write the response before the service's reply DataWriter has actually - matched the client's reply DataReader. If this is the case, the response will be lost - (unless the endpoints have been configured with "transient local" or higher Durability QoS, - but this is not the default configuration, and it is unlikely to be used in practice...). - The data race and loss of reply message may also occur in the case of an - "asymmetric discovery" between the client's reply DataReader and the service's reply - DataWriter, where the latter has discovered and matched the former, but not viceversa. - - The only way to fully resolve these data races is to have the underlying implementation force - a "match ordering" when dealing with pairs of endpoints used for RPC exchanges. The solution - is described by the OMG DDS-RPC 1.0 specification, section 7.6.2 (Enhanced Service Mapping). - It requires the DDS implementation to prevent the DataWriter from matching its remote - counterpart before the associated DataReader has matched. This behavior must be enforced - by both service and client for all data races to be resolved. - - This partial workaround relies on (ab)using the "related sample identity" field to communicate - the client's reply DataReader's GUID to the service. Normally, this field should would be - used to communicate the client's request DataWriter's GUID. - - When the service detects the presence of an unusual DataReader GUID, it will enter a - (time-bounded) loop to query the reply DataWriter's list of matched subscriptions, and wait - for the GUID to appear. - - If the reader is not matched within the timeout (HistoryQosPolicy::max_blocking_time), the - service will return RMW_RET_TIMEOUT to the upper layers. Otherwise, the response is written - out as normal. - - It is still possible for the response to be lost by the client, in case of an asymmetric - discovery, but this issue partially mitigated by the implementation of - Client::is_service_available. - */ - if (ctx->request_reply_mapping == RMW_Connext_RequestReplyMapping::Extended) { - struct DDS_GUID_t src_guid = DDS_GUID_INITIALIZER; - rmw_ret_t rc = RMW_RET_ERROR; - rc = rmw_connextdds_gid_to_guid(rr_msg.gid, src_guid); - if (RMW_RET_OK != rc) { - return rc; - } - DDS_RTPS_GUID_t * const rtps_guid = DDS_GUID_as_rtps_guid(&src_guid); - if (rtps_guid->entityId.entityKind & 0x04) { - bool unmatched = false; - rc = reply_pub->wait_for_subscription(rr_msg.gid, unmatched, rr_msg.writer_gid); - if (RMW_RET_OK != rc || unmatched) { - return rc; - } - } - } - return this->reply_pub->write(&rr_msg, false /* serialized */, &write_params); } diff --git a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp index 6f7f0405..d9a4759b 100644 --- a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp @@ -614,8 +614,7 @@ RMW_Connext_StatusCondition::update_status_inconsistent_topic( rmw_ret_t RMW_Connext_SubscriberStatusCondition::install( - RMW_Connext_Subscriber * const sub, - RMW_Connext_Publisher * const related_pub) + RMW_Connext_Subscriber * const sub) { DDS_DataReaderListener listener = DDS_DataReaderListener_INITIALIZER; DDS_StatusMask listener_mask = DDS_STATUS_MASK_NONE; @@ -645,9 +644,6 @@ RMW_Connext_SubscriberStatusCondition::install( rmw_connextdds_configure_subscriber_condition_listener( this, &listener, &listener_mask); - this->sub = sub; - this->related_pub = related_pub; - if (DDS_RETCODE_OK != DDS_DataReader_set_listener(sub->reader(), &listener, listener_mask)) { @@ -666,6 +662,8 @@ RMW_Connext_SubscriberStatusCondition::install( return RMW_RET_ERROR; } + this->sub = sub; + return RMW_RET_OK; } @@ -885,10 +883,6 @@ RMW_Connext_SubscriberStatusCondition::update_status_matched( this->status_matched.current_count_change = this->status_matched.current_count - this->status_matched_last.current_count; - if (nullptr != this->sub && nullptr != this->related_pub) { - this->related_pub->on_related_subscription_matched(this->sub, status); - } - this->notify_new_event(RMW_EVENT_SUBSCRIPTION_MATCHED); } @@ -915,8 +909,6 @@ RMW_Connext_PublisherStatusCondition::install( DDS_LIVELINESS_LOST_STATUS | DDS_PUBLICATION_MATCHED_STATUS; - this->pub = pub; - if (DDS_RETCODE_OK != DDS_DataWriter_set_listener( pub->writer(), &listener, listener_mask)) @@ -942,6 +934,8 @@ RMW_Connext_PublisherStatusCondition::install( return RMW_RET_ERROR; } + this->pub = pub; + return RMW_RET_OK; } @@ -1081,9 +1075,5 @@ RMW_Connext_PublisherStatusCondition::update_status_matched( this->status_matched.current_count_change = this->status_matched.current_count - this->status_matched_last.current_count; - if (nullptr != this->pub) { - this->pub->on_publication_matched(status); - } - this->notify_new_event(RMW_EVENT_PUBLICATION_MATCHED); } diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index d9fcd91d..42d6ad6b 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -30,8 +30,6 @@ #include "rmw_connextdds/rmw_impl.hpp" #include "rmw_connextdds/graph_cache.hpp" -#include "dds_c/dds_c_infrastructure_impl.h" - const char * const RMW_CONNEXTDDS_ID = "rmw_connextdds"; const char * const RMW_CONNEXTDDS_SERIALIZATION_FORMAT = "cdr"; @@ -752,20 +750,22 @@ rmw_connextdds_write_message( const RMW_Connext_RequestReplyMessage * const rr_msg = reinterpret_cast(message->user_data); - // Propagate the request's sample identity via the related_sample_identity field - int64_t sn_ros = rr_msg->sn >= 0 ? rr_msg->sn : 0; - rmw_connextdds_sn_ros_to_dds( - sn_ros, - write_params.related_sample_identity.sequence_number); - - rmw_ret_t rc = rmw_connextdds_gid_to_guid( - rr_msg->request ? rr_msg->gid : rr_msg->writer_gid, - write_params.related_sample_identity.writer_guid); - if (RMW_RET_OK != rc) { - return rc; - } + if (!rr_msg->request) { + /* If this is a reply, propagate the request's sample identity + via the related_sample_identity field */ + rmw_ret_t rc = RMW_RET_ERROR; + + rmw_connextdds_sn_ros_to_dds( + rr_msg->sn, + write_params.related_sample_identity.sequence_number); - if (rr_msg->request) { + rc = rmw_connextdds_gid_to_guid( + rr_msg->gid, + write_params.related_sample_identity.writer_guid); + if (RMW_RET_OK != rc) { + return rc; + } + } else { // enable WriteParams::replace_auto to retrieve SN of published message write_params.replace_auto = DDS_BOOLEAN_TRUE; } @@ -1599,12 +1599,3 @@ rmw_connextdds_get_cft_filter_expression( return RMW_RET_OK; } - -rmw_ret_t -rmw_connextdds_guid_to_instance_handle( - const struct DDS_GUID_t * const guid, - DDS_InstanceHandle_t * const instance_handle) -{ - DDS_GUID_to_instance_handle(guid, instance_handle); - return RMW_RET_OK; -} diff --git a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp index 332a8fc5..d0372f9a 100644 --- a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp +++ b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp @@ -2041,13 +2041,3 @@ rmw_connextdds_get_cft_filter_expression( UNUSED_ARG(options); return RMW_RET_UNSUPPORTED; } - -rmw_ret_t -rmw_connextdds_guid_to_instance_handle( - const struct DDS_GUID_t * const guid, - DDS_InstanceHandle_t * const instance_handle) -{ - UNUSED_ARG(guid); - UNUSED_ARG(instance_handle); - return RMW_RET_UNSUPPORTED; -}