Skip to content

Commit

Permalink
DataReader closing (#130) (#133)
Browse files Browse the repository at this point in the history
* Refs #7017. Fix DataReader issue.

* Refs #7017. Complete ProxyClient mock.

* Fix Root::reset function.
  • Loading branch information
Julián Bermúdez Ortega authored and BorjaOuterelo committed Dec 17, 2019
1 parent cb1d053 commit 60b08dc
Show file tree
Hide file tree
Showing 18 changed files with 93 additions and 89 deletions.
6 changes: 5 additions & 1 deletion include/uxr/agent/client/ProxyClient.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
namespace eprosima {
namespace uxr {

class ProxyClient
class ProxyClient : public std::enable_shared_from_this<ProxyClient>
{
public:
enum class State : uint8_t
Expand Down Expand Up @@ -64,12 +64,16 @@ class ProxyClient

dds::xrce::SessionId get_session_id() const { return representation_.session_id(); }

void release();

Session& session();

State get_state();

void update_state();

Middleware& get_middleware() { return *middleware_ ; };

private:
bool create_object(
const dds::xrce::ObjectId& object_id,
Expand Down
5 changes: 2 additions & 3 deletions include/uxr/agent/datareader/DataReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@
namespace eprosima {
namespace uxr {

class ProxyClient;
class Subscriber;
class Topic;
class Middleware;

/**
* Callback data structure.
*/
struct ReadCallbackArgs
{
std::shared_ptr<ProxyClient> client;
dds::xrce::ClientKey client_key;
dds::xrce::StreamId stream_id;
dds::xrce::ObjectId object_id;
Expand Down Expand Up @@ -68,8 +69,6 @@ class DataReader : public XRCEObject
bool matched(
const dds::xrce::ObjectVariant& new_object_rep) const override;

Middleware& get_middleware() const override;

bool read(
const dds::xrce::READ_DATA_Payload& read_data,
read_callback read_cb,
Expand Down
1 change: 0 additions & 1 deletion include/uxr/agent/datawriter/DataWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class DataWriter : public XRCEObject

void release(ObjectContainer&) override {}
bool matched(const dds::xrce::ObjectVariant& new_object_rep) const override;
Middleware& get_middleware() const override;

bool write(dds::xrce::WRITE_DATA_Payload_Data& write_data);
bool write(const std::vector<uint8_t>& data);
Expand Down
1 change: 0 additions & 1 deletion include/uxr/agent/object/XRCEObject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class XRCEObject
uint16_t get_raw_id() const { return conversion::objectid_to_raw(id_); }
virtual bool matched(const dds::xrce::ObjectVariant& new_object_rep) const = 0;
virtual void release(ObjectContainer& root_objects) = 0;
virtual Middleware& get_middleware() const = 0;

private:
dds::xrce::ObjectId id_;
Expand Down
16 changes: 9 additions & 7 deletions include/uxr/agent/participant/Participant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
namespace eprosima {
namespace uxr {

class Middleware;
class ProxyClient;

class Participant : public XRCEObject
{
public:
static std::unique_ptr<Participant> create(const dds::xrce::ObjectId& object_id,
const dds::xrce::OBJK_PARTICIPANT_Representation& representation,
Middleware& middleware);
static std::unique_ptr<Participant> create(
const dds::xrce::ObjectId& object_id,
const std::shared_ptr<ProxyClient>& proxy_client,
const dds::xrce::OBJK_PARTICIPANT_Representation& representation);

virtual ~Participant() override;

Expand All @@ -43,16 +44,17 @@ class Participant : public XRCEObject
void tie_object(const dds::xrce::ObjectId& object_id) { tied_objects_.insert(object_id); }
void untie_object(const dds::xrce::ObjectId& object_id) { tied_objects_.erase(object_id); }
bool matched(const dds::xrce::ObjectVariant& new_object_rep) const override;
Middleware& get_middleware() const override { return middleware_; }

const std::shared_ptr<ProxyClient>& get_proxy_client() { return proxy_client_; };

private:
Participant(
const dds::xrce::ObjectId& id,
Middleware& middleware);
const std::shared_ptr<ProxyClient>& proxy_client);

private:
std::shared_ptr<ProxyClient> proxy_client_;
std::set<dds::xrce::ObjectId> tied_objects_;
Middleware& middleware_;
};

} // namespace uxr
Expand Down
1 change: 0 additions & 1 deletion include/uxr/agent/publisher/Publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class Publisher : public XRCEObject
void tie_object(const dds::xrce::ObjectId& object_id) { tied_objects_.insert(object_id); }
void untie_object(const dds::xrce::ObjectId& object_id) { tied_objects_.erase(object_id); }
bool matched(const dds::xrce::ObjectVariant& ) const override { return true; }
Middleware& get_middleware() const override;

const std::shared_ptr<Participant>& get_participant() { return participant_; }

Expand Down
1 change: 0 additions & 1 deletion include/uxr/agent/subscriber/Subscriber.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class Subscriber : public XRCEObject
void tie_object(const dds::xrce::ObjectId& object_id) { tied_objects_.insert(object_id); }
void untie_object(const dds::xrce::ObjectId& object_id) { tied_objects_.erase(object_id); }
bool matched(const dds::xrce::ObjectVariant& ) const override { return true; }
Middleware& get_middleware() const override;

const std::shared_ptr<Participant>& get_participant() { return participant_; }

Expand Down
1 change: 0 additions & 1 deletion include/uxr/agent/topic/Topic.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class Topic : public XRCEObject
void tie_object(const dds::xrce::ObjectId& object_id) { tied_objects_.insert(object_id); }
void untie_object(const dds::xrce::ObjectId& object_id) { tied_objects_.erase(object_id); }
bool matched(const dds::xrce::ObjectVariant& new_object_rep) const override;
Middleware& get_middleware() const override;

private:
Topic(
Expand Down
17 changes: 15 additions & 2 deletions src/cpp/Root.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,14 @@ Root::Root()
}

/* It must be here instead of the hpp because the forward declaration of Middleware in the hpp. */
Root::~Root() = default;
Root::~Root()
{
for (auto it = clients_.begin(); it != clients_.end(); )
{
it->second->release();
it = clients_.erase(it);
}
}

dds::xrce::ResultStatus Root::create_client(
const dds::xrce::CLIENT_Representation& client_representation,
Expand Down Expand Up @@ -157,13 +164,14 @@ dds::xrce::ResultStatus Root::get_info(dds::xrce::ObjectInfo& agent_info)
dds::xrce::ResultStatus Root::delete_client(const dds::xrce::ClientKey& client_key)
{
dds::xrce::ResultStatus result_status;
if (get_client(client_key))
if (std::shared_ptr<ProxyClient> client = get_client(client_key))
{
std::lock_guard<std::mutex> lock(mtx_);
if (current_client_ != clients_.end() && client_key == current_client_->first)
{
++current_client_;
}
client->release();
clients_.erase(client_key);
result_status.status(dds::xrce::STATUS_OK);
UXR_AGENT_LOG_INFO(
Expand Down Expand Up @@ -283,6 +291,11 @@ void Root::set_verbose_level(uint8_t verbose_level)
void Root::reset()
{
std::lock_guard<std::mutex> lock(mtx_);
for (auto it = clients_.begin(); it != clients_.end(); )
{
it->second->release();
it = clients_.erase(it);
}
clients_.clear();
current_client_ = clients_.begin();
}
Expand Down
7 changes: 6 additions & 1 deletion src/cpp/client/ProxyClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ std::shared_ptr<XRCEObject> ProxyClient::get_object(const dds::xrce::ObjectId& o
return object;
}

void ProxyClient::release()
{
objects_.clear();
}

Session& ProxyClient::session()
{
return session_;
Expand Down Expand Up @@ -248,7 +253,7 @@ bool ProxyClient::create_participant(
{
bool rv = false;

if (std::unique_ptr<Participant> participant = Participant::create(object_id, representation, *middleware_))
if (std::unique_ptr<Participant> participant = Participant::create(object_id, shared_from_this(), representation))
{
if (objects_.emplace(object_id, std::move(participant)).second)
{
Expand Down
35 changes: 20 additions & 15 deletions src/cpp/datareader/DataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#include <uxr/agent/subscriber/Subscriber.hpp>
#include <uxr/agent/participant/Participant.hpp>
#include <uxr/agent/topic/Topic.hpp>
#include <uxr/agent/middleware/Middleware.hpp>
#include <uxr/agent/client/ProxyClient.hpp>
#include <uxr/agent/utils/TokenBucket.hpp>
#include <uxr/agent/logger/Logger.hpp>

Expand Down Expand Up @@ -64,7 +64,7 @@ std::unique_ptr<DataReader> DataReader::create(
uint16_t raw_object_id = conversion::objectid_to_raw(object_id);
std::shared_ptr<Topic> topic;

Middleware& middleware = subscriber->get_middleware();
Middleware& middleware = subscriber->get_participant()->get_proxy_client()->get_middleware();
switch (representation.representation()._d())
{
case dds::xrce::REPRESENTATION_BY_REFERENCE:
Expand Down Expand Up @@ -118,7 +118,7 @@ DataReader::~DataReader() noexcept
stop_read();
subscriber_->untie_object(get_id());
topic_->untie_object(get_id());
get_middleware().delete_datareader(get_raw_id());
subscriber_->get_participant()->get_proxy_client()->get_middleware().delete_datareader(get_raw_id());
}

bool DataReader::matched(
Expand All @@ -136,13 +136,13 @@ bool DataReader::matched(
case dds::xrce::REPRESENTATION_BY_REFERENCE:
{
const std::string& ref = new_object_rep.data_reader().representation().object_reference();
rv = get_middleware().matched_datareader_from_ref(get_raw_id(), ref);
rv = subscriber_->get_participant()->get_proxy_client()->get_middleware().matched_datareader_from_ref(get_raw_id(), ref);
break;
}
case dds::xrce::REPRESENTATION_AS_XML_STRING:
{
const std::string& xml = new_object_rep.data_reader().representation().xml_string_representation();
rv = get_middleware().matched_datareader_from_xml(get_raw_id(), xml);
rv = subscriber_->get_participant()->get_proxy_client()->get_middleware().matched_datareader_from_xml(get_raw_id(), xml);
break;
}
default:
Expand All @@ -151,11 +151,6 @@ bool DataReader::matched(
return rv;
}

Middleware& DataReader::get_middleware() const
{
return subscriber_->get_middleware();
}

bool DataReader::read(
const dds::xrce::READ_DATA_Payload& read_data,
read_callback read_cb,
Expand Down Expand Up @@ -206,21 +201,31 @@ bool DataReader::start_read(

bool DataReader::stop_read()
{
bool rv = true;
std::lock_guard<std::mutex> lock(mtx_);
running_cond_ = false;

if (read_thread_.joinable())
if (running_cond_)
{
read_thread_.join();
running_cond_ = false;

if (read_thread_.joinable())
{
read_thread_.join();
}
else
{
rv = false;
}
}
return true;
return rv;
}

void DataReader::read_task(
dds::xrce::DataDeliveryControl delivery_control,
read_callback read_cb,
ReadCallbackArgs cb_args)
{
cb_args.client = subscriber_->get_participant()->get_proxy_client();
size_t rate = (MAX_BYTES_PER_SECOND_UNLIMITED == delivery_control.max_bytes_per_second())
? SIZE_MAX
: delivery_control.max_bytes_per_second();
Expand All @@ -235,7 +240,7 @@ void DataReader::read_task(
while (running_cond_ && !stop_cond)
{
std::chrono::milliseconds read_timeout = get_read_timeout(final_time, delivery_control.max_elapsed_time());
if (get_middleware().read_data(get_raw_id(), data, read_timeout))
if (subscriber_->get_participant()->get_proxy_client()->get_middleware().read_data(get_raw_id(), data, read_timeout))
{
bool submessage_pushed = false;
do
Expand Down
20 changes: 7 additions & 13 deletions src/cpp/datawriter/DataWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#include <uxr/agent/publisher/Publisher.hpp>
#include <uxr/agent/participant/Participant.hpp>
#include <uxr/agent/topic/Topic.hpp>
#include <uxr/agent/middleware/Middleware.hpp>
#include <uxr/agent/client/ProxyClient.hpp>
#include <uxr/agent/logger/Logger.hpp>

namespace eprosima {
Expand All @@ -32,7 +32,7 @@ std::unique_ptr<DataWriter> DataWriter::create(
uint16_t raw_object_id = conversion::objectid_to_raw(object_id);
std::shared_ptr<Topic> topic;

Middleware& middleware = publisher->get_middleware();
Middleware& middleware = publisher->get_participant()->get_proxy_client()->get_middleware();
switch (representation.representation()._d())
{
case dds::xrce::REPRESENTATION_BY_REFERENCE:
Expand Down Expand Up @@ -81,7 +81,7 @@ DataWriter::~DataWriter()
{
publisher_->untie_object(get_id());
topic_->untie_object(get_id());
get_middleware().delete_datawriter(get_raw_id());
publisher_->get_participant()->get_proxy_client()->get_middleware().delete_datawriter(get_raw_id());
}

bool DataWriter::matched(const dds::xrce::ObjectVariant& new_object_rep) const
Expand All @@ -98,13 +98,13 @@ bool DataWriter::matched(const dds::xrce::ObjectVariant& new_object_rep) const
case dds::xrce::REPRESENTATION_BY_REFERENCE:
{
const std::string& ref = new_object_rep.data_writer().representation().object_reference();
rv = get_middleware().matched_datawriter_from_ref(get_raw_id(), ref);
rv = publisher_->get_participant()->get_proxy_client()->get_middleware().matched_datawriter_from_ref(get_raw_id(), ref);
break;
}
case dds::xrce::REPRESENTATION_AS_XML_STRING:
{
const std::string& xml = new_object_rep.data_writer().representation().xml_string_representation();
rv = get_middleware().matched_datawriter_from_xml(get_raw_id(), xml);
rv = publisher_->get_participant()->get_proxy_client()->get_middleware().matched_datawriter_from_xml(get_raw_id(), xml);
break;
}
default:
Expand All @@ -116,7 +116,7 @@ bool DataWriter::matched(const dds::xrce::ObjectVariant& new_object_rep) const
bool DataWriter::write(dds::xrce::WRITE_DATA_Payload_Data& write_data)
{
bool rv = false;
if (get_middleware().write_data(get_raw_id(), write_data.data().serialized_data()))
if (publisher_->get_participant()->get_proxy_client()->get_middleware().write_data(get_raw_id(), write_data.data().serialized_data()))
{
UXR_AGENT_LOG_MESSAGE(
UXR_DECORATE_YELLOW("[** <<DDS>> **]"),
Expand All @@ -131,7 +131,7 @@ bool DataWriter::write(dds::xrce::WRITE_DATA_Payload_Data& write_data)
bool DataWriter::write(const std::vector<uint8_t>& data)
{
bool rv = false;
if (get_middleware().write_data(get_raw_id(), data))
if (publisher_->get_participant()->get_proxy_client()->get_middleware().write_data(get_raw_id(), data))
{
UXR_AGENT_LOG_MESSAGE(
UXR_DECORATE_YELLOW("[** <<DDS>> **]"),
Expand All @@ -143,11 +143,5 @@ bool DataWriter::write(const std::vector<uint8_t>& data)
return rv;
}

Middleware& DataWriter::get_middleware() const
{
return publisher_->get_middleware();
}


} // namespace uxr
} // namespace eprosima
Loading

0 comments on commit 60b08dc

Please sign in to comment.