diff --git a/CMakeLists.txt b/CMakeLists.txt index ae8a29bed..90ee1d496 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,7 +34,8 @@ option(UAGENT_P2P_PROFILE "Build P2P discovery profile." ON) option(UAGENT_LOGGER_PROFILE "Build logger profile." ON) option(UAGENT_SECURITY_PROFILE "Build security profile." OFF) option(UAGENT_CLI_PROFILE "Build CLI profile." OFF) -option(UAGENT_BUILD_EXECUTABLE "Build MicroXRCE-DDS Agent provided executable." ON) +option(UAGENT_BUILD_EXECUTABLE "Build Micro XRCE-DDS Agent provided executable." ON) +option(UAGENT_BUILD_USAGE_EXAMPLES "Build Micro XRCE-DDS Agent built-in usage examples" OFF) set(UAGENT_P2P_CLIENT_VERSION 1.2.5 CACHE STRING "Sets Micro XRCE-DDS client version for P2P") set(UAGENT_P2P_CLIENT_TAG develop CACHE STRING "Sets Micro XRCE-DDS client tag for P2P") @@ -194,7 +195,8 @@ set(SRCS src/cpp/message/OutputMessage.cpp $<$>:src/cpp/utils/ArgumentParser.cpp> src/cpp/transport/Server.cpp - src/cpp/transport/serial/SerialProtocol.cpp + src/cpp/transport/stream_framing/StreamFramingProtocol.cpp + src/cpp/transport/custom/CustomAgent.cpp ${TRANSPORT_SRCS} $<$:src/cpp/transport/discovery/DiscoveryServer.cpp> $<$:src/cpp/types/TopicPubSubType.cpp> @@ -328,6 +330,11 @@ if(UAGENT_BUILD_EXECUTABLE) ) endif() +# Examples +if(UAGENT_BUILD_USAGE_EXAMPLES) + add_subdirectory(examples/custom_agent) +endif() + # XML default profile used to launch exec in the building folder file(COPY ${PROJECT_SOURCE_DIR}/agent.refs DESTINATION ${PROJECT_BINARY_DIR} diff --git a/examples/custom_agent/CMakeLists.txt b/examples/custom_agent/CMakeLists.txt new file mode 100644 index 000000000..2dd21d0bc --- /dev/null +++ b/examples/custom_agent/CMakeLists.txt @@ -0,0 +1,30 @@ +# Copyright 2021-present Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +if (UAGENT_BUILD_USAGE_EXAMPLES) + add_executable(CustomXRCEAgent custom_agent.cpp) + target_link_libraries(CustomXRCEAgent + PRIVATE + ${PROJECT_NAME} + $<$>:rt> + $<$>:dl> + ) + + set_target_properties(CustomXRCEAgent + PROPERTIES + CXX_STANDARD + 11 + CXX_STANDARD_REQUIRED + YES + ) +endif() \ No newline at end of file diff --git a/examples/custom_agent/custom_agent.cpp b/examples/custom_agent/custom_agent.cpp new file mode 100644 index 000000000..745be33c7 --- /dev/null +++ b/examples/custom_agent/custom_agent.cpp @@ -0,0 +1,251 @@ +// Copyright 2021-present Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include +#include +#include +#include +#include + +/** + * This custom XRCE Agent example attempts to show how easy is for the user to define a custom + * Micro XRCE-DDS Agent behaviour, in terms of transport initialization and closing, and also + * regarding read and write operations. + * For this simple case, an UDP socket is opened on port 8888. Additionally, some information + * messages are being printed to demonstrate the custom behaviour. + * As the endpoint is already defined, we are using the provided + * `eprosima::uxr::IPv4EndPoint` by the library. + * Other transport protocols might need to implement their own endpoint struct. + */ + +int main(int argc, char** argv) +{ + eprosima::uxr::Middleware::Kind mw_kind(eprosima::uxr::Middleware::Kind::FASTDDS); + uint16_t agent_port(8888); + struct pollfd poll_fd; + + /** + * @brief Agent's initialization behaviour description. + */ + eprosima::uxr::CustomAgent::InitFunction init_function = [&]() -> bool + { + bool rv = false; + poll_fd.fd = socket(PF_INET, SOCK_DGRAM, 0); + + if (-1 != poll_fd.fd) + { + struct sockaddr_in address{}; + + address.sin_family = AF_INET; + address.sin_port = htons(agent_port); + address.sin_addr.s_addr = INADDR_ANY; + memset(address.sin_zero, '\0', sizeof(address.sin_zero)); + + if (-1 != bind(poll_fd.fd, + reinterpret_cast(&address), + sizeof(address))) + { + poll_fd.events = POLLIN; + rv = true; + + UXR_AGENT_LOG_INFO( + UXR_DECORATE_GREEN( + "This is an example of a custom Micro XRCE-DDS Agent INIT function"), + "port: {}", + agent_port); + } + } + + return rv; + }; + + /** + * @brief Agent's destruction actions. + */ + eprosima::uxr::CustomAgent::FiniFunction fini_function = [&]() -> bool + { + if (-1 == poll_fd.fd) + { + return true; + } + + if (0 == ::close(poll_fd.fd)) + { + poll_fd.fd = -1; + + UXR_AGENT_LOG_INFO( + UXR_DECORATE_GREEN( + "This is an example of a custom Micro XRCE-DDS Agent FINI function"), + "port: {}", + agent_port); + + return true; + } + else + { + return false; + } + }; + + /** + * @brief Agent's incoming data functionality. + */ + eprosima::uxr::CustomAgent::RecvMsgFunction recv_msg_function = [&]( + eprosima::uxr::CustomEndPoint* source_endpoint, + uint8_t* buffer, + size_t buffer_length, + int timeout, + eprosima::uxr::TransportRc& transport_rc) -> ssize_t + { + struct sockaddr_in client_addr{}; + socklen_t client_addr_len = sizeof(struct sockaddr_in); + ssize_t bytes_received = -1; + + int poll_rv = poll(&poll_fd, 1, timeout); + + if (0 < poll_rv) + { + bytes_received = recvfrom( + poll_fd.fd, + buffer, + buffer_length, + 0, + reinterpret_cast(&client_addr), + &client_addr_len); + + transport_rc = (-1 != bytes_received) + ? eprosima::uxr::TransportRc::ok + : eprosima::uxr::TransportRc::server_error; + } + else + { + transport_rc = (0 == poll_rv) + ? eprosima::uxr::TransportRc::timeout_error + : eprosima::uxr::TransportRc::server_error; + bytes_received = 0; + } + + if (eprosima::uxr::TransportRc::ok == transport_rc) + { + UXR_AGENT_LOG_INFO( + UXR_DECORATE_GREEN( + "This is an example of a custom Micro XRCE-DDS Agent RECV_MSG function"), + "port: {}", + agent_port); + source_endpoint->set_member_value("address", + static_cast(client_addr.sin_addr.s_addr)); + source_endpoint->set_member_value("port", + static_cast(client_addr.sin_port)); + } + + + return bytes_received; + }; + + /** + * @brief Agent's outcoming data flow definition. + */ + eprosima::uxr::CustomAgent::SendMsgFunction send_msg_function = [&]( + const eprosima::uxr::CustomEndPoint* destination_endpoint, + uint8_t* buffer, + size_t message_length, + eprosima::uxr::TransportRc& transport_rc) -> ssize_t + { + struct sockaddr_in client_addr{}; + + memset(&client_addr, 0, sizeof(client_addr)); + client_addr.sin_family = AF_INET; + client_addr.sin_port = destination_endpoint->get_member("port"); + client_addr.sin_addr.s_addr = destination_endpoint->get_member("address"); + + ssize_t bytes_sent = + sendto( + poll_fd.fd, + buffer, + message_length, + 0, + reinterpret_cast(&client_addr), + sizeof(client_addr)); + + transport_rc = (-1 != bytes_sent) + ? eprosima::uxr::TransportRc::ok + : eprosima::uxr::TransportRc::server_error; + + if (eprosima::uxr::TransportRc::ok == transport_rc) + { + UXR_AGENT_LOG_INFO( + UXR_DECORATE_GREEN( + "This is an example of a custom Micro XRCE-DDS Agent SEND_MSG function"), + "port: {}", + agent_port); + } + + return bytes_sent; + }; + + /** + * Run the main application. + */ + try + { + /** + * EndPoint definition for this transport. We define an address and a port. + */ + eprosima::uxr::CustomEndPoint custom_endpoint; + custom_endpoint.add_member("address"); + custom_endpoint.add_member("port"); + + /** + * Create a custom agent instance. + */ + eprosima::uxr::CustomAgent custom_agent( + "UDPv4_CUSTOM", + &custom_endpoint, + mw_kind, + false, + init_function, + fini_function, + send_msg_function, + recv_msg_function); + + /** + * Set verbosity level + */ + custom_agent.set_verbose_level(6); + + /** + * Run agent and wait until receiving an stop signal. + */ + custom_agent.start(); + + int n_signal = 0; + sigset_t signals; + sigwait(&signals, &n_signal); + + /** + * Stop agent, and exit. + */ + custom_agent.stop(); + return 0; + } + catch (const std::exception& e) + { + std::cout << e.what() << std::endl; + return 1; + } +} \ No newline at end of file diff --git a/include/uxr/agent/transport/custom/CustomAgent.hpp b/include/uxr/agent/transport/custom/CustomAgent.hpp new file mode 100644 index 000000000..e145d75d6 --- /dev/null +++ b/include/uxr/agent/transport/custom/CustomAgent.hpp @@ -0,0 +1,199 @@ +// Copyright 2021-present Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef UXR_AGENT_TRANSPORT_CUSTOM_AGENT_HPP_ +#define UXR_AGENT_TRANSPORT_CUSTOM_AGENT_HPP_ + +#include +#include +#include + +#include +#include + +#ifdef _WIN32 +#include +typedef SSIZE_T ssize_t; +#endif + +namespace eprosima { +namespace uxr { + +/** + * @brief This class allows final users to easily implement a custom + * Micro XRCE-DDS Agent, in terms of transport behaviour. + * To do so, several methods must be implemented, in order to init + * an agent instance, close it, and send and receive messages using + * the desired transport mechanism. + */ + +class CustomAgent : public Server +{ +public: + /** + * @brief Init function signature, to be implemented by final users. + * return true if successful transport initialization; false otherwise. + */ + using InitFunction = std::function; + + /** + * @brief Fini function signature, to be implemented by final users. + * return true if successful transport shutdown; false otherwise. + */ + using FiniFunction = std::function; + + /** + * @brief Receive message function signature, to be implemented by final users. + * @param source_endpoint User-defined, it should be filled accordingly + * with the source metadata acquired from receiving a new packet/ + * byte stream through the middleware. + * @param buffer Pointer to octec buffer used to receive the information. + * @param buffer_length Reception buffer size. + * @param timeout Connection timeout for receiving a new message. + * @param transport_rc Transport return code, to be filled by the user. + * @return ssize_t Number of received bytes. + */ + using RecvMsgFunction = std::function; + /** + * @brief Send message function signature, to be implemented by final users. + * @param destination_endpoint Allows to retrieve the required endpoint + * information to send the message back to the client. + * @param buffer Holds the message to be sent back to the client. + * @param message_length Number of bytes to be sent. + * @param transport_rc Transport return code, to be filled by the user. + * @return ssize_t Number of sent bytes. + */ + using SendMsgFunction = std::function; + + /** + * @brief Constructor. + * @param name Name of the middleware to be implemented by this CustomAgent. + * @param middleware_kind The middleware selected to represent the XRCE entities + * in the DDS world (FastDDS, FastRTPS, CED...) + * @param framing Whether this agent transport shall use framing or not. + * @param init_function Custom user-defined function, called during initialization. + * @param fini_function Custom user-defined function, called upon agent's destruction. + * @param recv_msg_function Custom user-defined function, called when receiving some data. + * @param send_msg_function Custom user-defined function, called when sending some information. + */ + UXR_AGENT_EXPORT CustomAgent( + const std::string& name, + CustomEndPoint* endpoint, + Middleware::Kind middleware_kind, + bool framing, + InitFunction& init_function, + FiniFunction& fini_function, + SendMsgFunction& send_msg_function, + RecvMsgFunction& recv_msg_function); + + /** + * @brief Destructor. + */ + UXR_AGENT_EXPORT ~CustomAgent() final; + +private: + /** + * @brief Override virtual Server operations. + */ + bool init() final; + + bool fini() final; + +#ifdef UAGENT_DISCOVERY_PROFILE + inline bool init_discovery( + uint16_t /*discovery_port*/) final + { + return false; + } + + inline bool fini_discovery() final + { + return true; + } +#endif // UAGENT_DISCOVERY_PROFILE + +#ifdef UAGENT_P2P_PROFILE + inline bool init_p2p( + uint16_t /*p2p_port*/) final + { + return false; + } + + inline bool fini_p2p() final + { + return true; + } +#endif // UAGENT_P2P_PROFILE + + bool recv_message( + InputPacket& input_packet, + int timeout, + TransportRc& transport_rc) final; + + bool send_message( + OutputPacket output_packet, + TransportRc& transport_rc) final; + + bool handle_error( + TransportRc transport_rc) final; + + /** + * @brief Internal buffer used for receiving messages. + */ + uint8_t buffer_[SERVER_BUFFER_SIZE]; + + /** + * @brief Custom agent middleware's name. + */ + const std::string name_; + + /** + * @brief Pointers to this custom agent's endpoint definition. + * They are used for receive and send operations, respectively. + */ + CustomEndPoint* recv_endpoint_; + CustomEndPoint* send_endpoint_; + + /** + * @brief Reference to user-defined operations for the custom agent server. + */ + InitFunction& custom_init_func_; + FiniFunction& custom_fini_func_; + SendMsgFunction& custom_send_msg_func_; + RecvMsgFunction& custom_recv_msg_func_; + + /** + * @brief Indicates the usage or non-usage of framing for R/W operations. + */ + bool framing_; + + /** + * @brief Holds the framing logics, if framing is used. + */ + FramingIO framing_io_; +}; + +} // namespace uxr +} // namespace eprosima + +#endif // UXR_AGENT_TRANSPORT_CUSTOM_AGENT_HPP_ \ No newline at end of file diff --git a/include/uxr/agent/transport/endpoint/CustomEndPoint.hpp b/include/uxr/agent/transport/endpoint/CustomEndPoint.hpp new file mode 100644 index 000000000..96483a4d1 --- /dev/null +++ b/include/uxr/agent/transport/endpoint/CustomEndPoint.hpp @@ -0,0 +1,507 @@ +// Copyright 2021 - present Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef UXR_AGENT_TRANSPORT_ENDPOINT_CUSTOM_ENDPOINT_HPP_ +#define UXR_AGENT_TRANSPORT_ENDPOINT_CUSTOM_ENDPOINT_HPP_ + +#include +#include +#include + +namespace eprosima { +namespace uxr { + +/** + * @brief Class to allow user to easily create an endpoint for their Custom Agent + * implementation, if applicable. + * A certain set of values are permitted, including unsigned integers and + * strings, which usually are more than enough to characterize an endpoint. + */ +class CustomEndPoint +{ +private: + /** + * @brief Enum class which holds all the available type kinds for an endpoint member. + */ + enum class MemberKind + { + UINT8, + UINT16, + UINT32, + UINT64, +#ifdef __SIZEOF_UINT128__ + UINT128, +#endif // __SIZEOF_UINT128__ + STRING + }; + + /** + * @brief Struct defining a member in terms of its kind and an opaque pointer, + * containing the member data itself. + */ + typedef struct Member + { + MemberKind kind; + std::shared_ptr data; + } Member; + + /** + * @brief Exception to be launched when trying to insert two elements + * with the same key on the CustomEndPoint map. + */ + class SameKeyException : public std::exception + { + public: + SameKeyException( + const char* file, + int line, + const char* func, + const std::string& key) + { + std::stringstream what; + what << file << ":" << line << ":" << func + << ": Cannot add two members with the key '" + << key << "'."; + message_ = what.str(); + } + + const char* what() const noexcept + { + return message_.c_str(); + } + + private: + std::string message_; + }; + + class NoExistingMemberException : public std::exception + { + public: + NoExistingMemberException( + const char* file, + int line, + const char* func, + const std::string& key) + { + std::stringstream what; + what << file << ":" << line << ":" << func + << ": Member '" + << key << "' does not exist."; + message_ = what.str(); + } + + const char* what() const noexcept + { + return message_.c_str(); + } + + private: + std::string message_; + }; + + class EmptyMemberException : public std::exception + { + public: + EmptyMemberException( + const std::string& member_name) + { + std::stringstream what; + what << ": Member '" + << member_name + << "' is empty."; + message_ = what.str(); + } + + const char* what() const noexcept + { + return message_.c_str(); + } + + private: + std::string message_; + }; +public: + /** + * @brief Default constructor. + */ + CustomEndPoint() = default; + + /** + * @brief Default destructor. + */ + ~CustomEndPoint() = default; + + /** + * @brief Adds a member to the member list. + * The member must not already exist in the EndPoint. + * @param name The member name, + * @param kind The member kind. + * @throw SameKeyException if member already exists. + * @return true if the insert was successful, or false otherwise. + */ + bool add_member( + const std::string& name, + const MemberKind& kind) + { + if (members_.end() != members_.find(name)) + { + throw SameKeyException(__FILE__, __LINE__, __FUNCTION__, name); + } + + Member member; + member.kind = kind; + member.data.reset(); + + auto res = members_.emplace(name, std::move(member)); + return res.second; + } + + /** + * @brief Allows to add a member without specifying the MemberKind. + * This template must be specialized for every supported type. + * @param name The new member name. + * @returns true if the insert was successful, false otherwise. + */ + template + bool add_member( + const std::string& name); + + /** + * @brief Helper method to reset all the contained data within members. + */ + void reset() + { + for (auto& member : members_) + { + member.second.data.reset(); + } + } + + /** + * @brief Allows setting a value for a given member. + * @param name Key value to be searched in the members map. + * @param value A const reference to the value to be set. + * @throw NoExistingMemberException if trying to set a value not registered in the map. + */ + template + void set_member_value( + const std::string& name, + const T& value) + { + if (members_.end() == members_.find(name)) + { + throw NoExistingMemberException(__FILE__, __LINE__, __FUNCTION__, name); + } + else + { + members_.at(name).data.reset(new T(value)); + } + } + + /** + * @brief Allows setting a value for a given member. + * @param name Key value to be searched in the members map. + * @param value A movable reference to the value to be set. + * @throw NoExistingMemberException if trying to set a value not registered in the map. + */ + template + void set_member_value( + const std::string& name, + T&& value) + { + if (members_.end() == members_.find(name)) + { + throw NoExistingMemberException(__FILE__, __LINE__, __FUNCTION__, name); + } + else + { + members_.at(name).data.reset(new T(std::move(value))); + } + } + + /** + * + */ + void check_non_empty_members() + { + for (const auto& member : members_) + { + if (nullptr == member.second.data.get()) + { + throw EmptyMemberException(member.first); + } + } + } + + /** + * @brief Helper static method that allows to compare two members of the same type. + * @param first First member to be compared. + * @param second Second member to be compared against the first. + * @return 0 if the members are equal, -1 if first < second, 1 if second > first. + */ + template + static int8_t less_than_members( + const Member& first, + const Member& second) + { + T first_data = *static_cast(first.data.get()); + T second_data = *static_cast(second.data.get()); + + if (first_data == second_data) + { + return 0; + } + else if (first_data < second_data) + { + return -1; + } + else + { + return 1; + } + } + + /** + * @brief Operator < overload. + * @param other The CustomEndPoint to be checked against this one. + * @return True if this < other, false otherwise. + */ + bool operator <( + const CustomEndPoint& other) const + { + for (const auto& member : members_) + { + const std::string& member_name = member.first; + + auto other_member = other.members_.at(member_name); + switch (member.second.kind) + { + case MemberKind::UINT8: + { + int8_t res = less_than_members(member.second, other_member); + if (0 == res) + { + break; + } + return (res < 0) ? true : false; + } + case MemberKind::UINT16: + { + int8_t res = less_than_members(member.second, other_member); + if (0 == res) + { + break; + } + return (res < 0) ? true : false; + } + case MemberKind::UINT32: + { + int8_t res = less_than_members(member.second, other_member); + if (0 == res) + { + break; + } + return (res < 0) ? true : false; + } + case MemberKind::UINT64: + { + int8_t res = less_than_members(member.second, other_member); + if (0 == res) + { + break; + } + return (res < 0) ? true : false; + } +#ifdef __SIZEOF_UINT128__ + case MemberKind::UINT128: + { + int8_t res = less_than_members(member.second, other_member); + if (0 == res) + { + break; + } + return (res < 0) ? true : false; + } +#endif // __SIZEOF_UINT128__ + case MemberKind::STRING: + { + int8_t res = less_than_members(member.second, other_member); + if (0 == res) + { + break; + } + return (res < 0) ? true : false; + } + } + } + return false; + } + + /** + * @brief Operator << overload for ostream operations. + * @param os The ostream object to which the output is sent. + * @param endpoint The CustomEndPoint to be sent to the ostream. + * @return Reference to the modified ostream. + */ + friend std::ostream& operator <<( + std::ostream& os, + const CustomEndPoint& endpoint) + { + for (const auto& member : endpoint.members_) + { + os << member.first << ": "; + + if (nullptr == member.second.data.get()) + { + os << ""; + } + else + { + switch (member.second.kind) + { + case MemberKind::UINT8: + { + os << *static_cast(member.second.data.get()); + break; + } + case MemberKind::UINT16: + { + os << *static_cast(member.second.data.get()); + break; + } + case MemberKind::UINT32: + { + os << *static_cast(member.second.data.get()); + break; + } + case MemberKind::UINT64: + { + os << *static_cast(member.second.data.get()); + break; + } +#ifdef __SIZEOF_UINT128__ + case MemberKind::UINT128: + { + os << *static_cast(member.second.data.get()); + break; + } +#endif // __SIZEOF_UINT128__ + case MemberKind::STRING: + { + os << "'" + << *static_cast(member.second.data.get()) + << "'"; + break; + } + } + } + + if (&(*endpoint.members_.rbegin()) != &member) + { + os << ", "; + } + } + + return os; + } + + /** + * @brief Get a member's value, given its key. + * @param key The member's key. + * @throw NoExistingMemberException if the member is not found. + * @return Const reference to the requested value. + */ + template + const T& get_member( + const char* key) const + { + if (members_.end() == members_.find(key)) + { + throw NoExistingMemberException(__FILE__, __LINE__, __FUNCTION__, key); + } + else + { + return *static_cast(members_.at(key).data.get()); + } + } + + /** + * @brief Get a member's value, given its key. + * @param key The member's key. + * @throw NoExistingMemberException if the member is not found. + * @return Const reference to the requested value. + */ + template + const T& get_member( + const std::string& key) const + { + return this->get_member(key.c_str()); + } + +private: + std::map members_; +}; + +/** + * @brief CustomEndPoint::add_member template method specializations, + * for each of the available MemberKind types. + */ +template <> +inline bool CustomEndPoint::add_member( + const std::string& name) +{ + return add_member(name, MemberKind::UINT8); +} + +template <> +inline bool CustomEndPoint::add_member( + const std::string& name) +{ + return add_member(name, MemberKind::UINT16); +} + +template <> +inline bool CustomEndPoint::add_member( + const std::string& name) +{ + return add_member(name, MemberKind::UINT32); +} + +template <> +inline bool CustomEndPoint::add_member( + const std::string& name) +{ + return add_member(name, MemberKind::UINT64); +} + +#ifdef __SIZEOF_UINT128__ +template <> +inline bool CustomEndPoint::add_member( + const std::string& name) +{ + return add_member(name, MemberKind::UINT128); +} +#endif // __SIZEOF_UINT128__ + +template <> +inline bool CustomEndPoint::add_member( + const std::string& name) +{ + return add_member(name, MemberKind::STRING); +} + +} // namespace uxr +} // namespace eprosima + +#endif // UXR_AGENT_TRANSPORT_ENDPOINT_IPV4_ENDPOINT_HPP_ diff --git a/include/uxr/agent/transport/serial/SerialAgentLinux.hpp b/include/uxr/agent/transport/serial/SerialAgentLinux.hpp index 478889bc4..a296a4d6c 100644 --- a/include/uxr/agent/transport/serial/SerialAgentLinux.hpp +++ b/include/uxr/agent/transport/serial/SerialAgentLinux.hpp @@ -17,7 +17,7 @@ #include #include -#include +#include #include #include @@ -61,12 +61,12 @@ class SerialAgent : public Server OutputPacket output_packet, TransportRc& transport_rc) final; - size_t write_data( + ssize_t write_data( uint8_t* buf, size_t len, TransportRc& transport_rc); - size_t read_data( + ssize_t read_data( uint8_t* buf, size_t len, int timeout, @@ -76,7 +76,7 @@ class SerialAgent : public Server const uint8_t addr_; struct pollfd poll_fd_; uint8_t buffer_[SERVER_BUFFER_SIZE]; - SerialIO serial_io_; + FramingIO framing_io_; }; } // namespace uxr diff --git a/include/uxr/agent/transport/serial/SerialProtocol.hpp b/include/uxr/agent/transport/serial/SerialProtocol.hpp deleted file mode 100644 index fbfe4ad8b..000000000 --- a/include/uxr/agent/transport/serial/SerialProtocol.hpp +++ /dev/null @@ -1,532 +0,0 @@ -// Copyright 2019 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef UXR_AGENT_TRANSPORT_SERIAL_PROTOCOL_HPP_ -#define UXR_AGENT_TRANSPORT_SERIAL_PROTOCOL_HPP_ - -#include - -#include -#include -#include - -namespace eprosima { -namespace uxr { - -typedef const std::function WriteCallback; -typedef const std::function ReadCallback; - -class SerialIO -{ - static constexpr uint16_t crc16_table[256] = { - 0x0000, 0xC0C1, 0xC181, 0x0140, 0xC301, 0x03C0, 0x0280, 0xC241, - 0xC601, 0x06C0, 0x0780, 0xC741, 0x0500, 0xC5C1, 0xC481, 0x0440, - 0xCC01, 0x0CC0, 0x0D80, 0xCD41, 0x0F00, 0xCFC1, 0xCE81, 0x0E40, - 0x0A00, 0xCAC1, 0xCB81, 0x0B40, 0xC901, 0x09C0, 0x0880, 0xC841, - 0xD801, 0x18C0, 0x1980, 0xD941, 0x1B00, 0xDBC1, 0xDA81, 0x1A40, - 0x1E00, 0xDEC1, 0xDF81, 0x1F40, 0xDD01, 0x1DC0, 0x1C80, 0xDC41, - 0x1400, 0xD4C1, 0xD581, 0x1540, 0xD701, 0x17C0, 0x1680, 0xD641, - 0xD201, 0x12C0, 0x1380, 0xD341, 0x1100, 0xD1C1, 0xD081, 0x1040, - 0xF001, 0x30C0, 0x3180, 0xF141, 0x3300, 0xF3C1, 0xF281, 0x3240, - 0x3600, 0xF6C1, 0xF781, 0x3740, 0xF501, 0x35C0, 0x3480, 0xF441, - 0x3C00, 0xFCC1, 0xFD81, 0x3D40, 0xFF01, 0x3FC0, 0x3E80, 0xFE41, - 0xFA01, 0x3AC0, 0x3B80, 0xFB41, 0x3900, 0xF9C1, 0xF881, 0x3840, - 0x2800, 0xE8C1, 0xE981, 0x2940, 0xEB01, 0x2BC0, 0x2A80, 0xEA41, - 0xEE01, 0x2EC0, 0x2F80, 0xEF41, 0x2D00, 0xEDC1, 0xEC81, 0x2C40, - 0xE401, 0x24C0, 0x2580, 0xE541, 0x2700, 0xE7C1, 0xE681, 0x2640, - 0x2200, 0xE2C1, 0xE381, 0x2340, 0xE101, 0x21C0, 0x2080, 0xE041, - 0xA001, 0x60C0, 0x6180, 0xA141, 0x6300, 0xA3C1, 0xA281, 0x6240, - 0x6600, 0xA6C1, 0xA781, 0x6740, 0xA501, 0x65C0, 0x6480, 0xA441, - 0x6C00, 0xACC1, 0xAD81, 0x6D40, 0xAF01, 0x6FC0, 0x6E80, 0xAE41, - 0xAA01, 0x6AC0, 0x6B80, 0xAB41, 0x6900, 0xA9C1, 0xA881, 0x6840, - 0x7800, 0xB8C1, 0xB981, 0x7940, 0xBB01, 0x7BC0, 0x7A80, 0xBA41, - 0xBE01, 0x7EC0, 0x7F80, 0xBF41, 0x7D00, 0xBDC1, 0xBC81, 0x7C40, - 0xB401, 0x74C0, 0x7580, 0xB541, 0x7700, 0xB7C1, 0xB681, 0x7640, - 0x7200, 0xB2C1, 0xB381, 0x7340, 0xB101, 0x71C0, 0x7080, 0xB041, - 0x5000, 0x90C1, 0x9181, 0x5140, 0x9301, 0x53C0, 0x5280, 0x9241, - 0x9601, 0x56C0, 0x5780, 0x9741, 0x5500, 0x95C1, 0x9481, 0x5440, - 0x9C01, 0x5CC0, 0x5D80, 0x9D41, 0x5F00, 0x9FC1, 0x9E81, 0x5E40, - 0x5A00, 0x9AC1, 0x9B81, 0x5B40, 0x9901, 0x59C0, 0x5880, 0x9841, - 0x8801, 0x48C0, 0x4980, 0x8941, 0x4B00, 0x8BC1, 0x8A81, 0x4A40, - 0x4E00, 0x8EC1, 0x8F81, 0x4F40, 0x8D01, 0x4DC0, 0x4C80, 0x8C41, - 0x4400, 0x84C1, 0x8581, 0x4540, 0x8701, 0x47C0, 0x4680, 0x8641, - 0x8201, 0x42C0, 0x4380, 0x8341, 0x4100, 0x81C1, 0x8081, 0x4040 - }; - static constexpr uint8_t framing_begin_flag = 0x7E; - static constexpr uint8_t framing_esc_flag = 0x7D; - static constexpr uint8_t framing_xor_flag = 0x20; - - enum class InputState : uint8_t - { - UXR_SERIAL_UNINITIALIZED, - UXR_SERIAL_READING_SRC_ADDR, - UXR_SERIAL_READING_DST_ADDR, - UXR_SERIAL_READING_LEN_LSB, - UXR_SERIAL_READING_LEN_MSB, - UXR_SERIAL_READING_PAYLOAD, - UXR_SERIAL_READING_CRC_LSB, - UXR_SERIAL_READING_CRC_MSB, - - }; - -public: - SerialIO( - uint8_t local_addr, - WriteCallback write_callback, - ReadCallback read_callback); - - size_t write_msg( - const uint8_t* buf, - size_t len, - uint8_t remote_addr, - TransportRc& transport_rc); - - size_t read_msg( - uint8_t* buf, - size_t len, - uint8_t& remote_addr, - int timeout, - TransportRc& transport_rc); - -private: - static void update_crc( - uint16_t& crc, - const uint8_t data); - - bool get_next_octet( - uint8_t& octet); - - bool add_next_octet( - uint8_t octet); - -private: - InputState state_; - uint8_t local_addr_; - uint8_t read_buffer_[42]; - uint8_t read_buffer_head_; - uint8_t read_buffer_tail_; - uint8_t remote_addr_; - uint16_t msg_len_; - uint16_t msg_pos_; - uint16_t msg_crc_; - uint16_t cmp_crc_; - uint8_t write_buffer_[42]; - uint8_t write_buffer_pos_; - WriteCallback write_callback_; - ReadCallback read_callback_; -}; - -inline SerialIO::SerialIO( - uint8_t local_addr, - WriteCallback write_callback, - ReadCallback read_callback) - : state_{InputState::UXR_SERIAL_UNINITIALIZED} - , local_addr_{local_addr} - , write_callback_{write_callback} - , read_callback_{read_callback} -{} - -inline void SerialIO::update_crc( - uint16_t& crc, - const uint8_t data) -{ - crc = (crc >> 8) ^ SerialIO::crc16_table[(crc ^ data) & 0xFF]; -} - -inline bool SerialIO::get_next_octet( - uint8_t& octet) -{ - bool rv = false; - octet = 0; - if (read_buffer_head_ != read_buffer_tail_) - { - if (framing_esc_flag != read_buffer_[read_buffer_tail_]) - { - octet = read_buffer_[read_buffer_tail_]; - read_buffer_tail_ = uint8_t(size_t(read_buffer_tail_ + 1) % sizeof(read_buffer_)); - rv = (framing_begin_flag != octet); - } - else - { - uint8_t temp_tail = uint8_t(size_t(read_buffer_tail_ + 1) % sizeof(read_buffer_)); - if (temp_tail != read_buffer_head_) - { - octet = read_buffer_[temp_tail]; - read_buffer_tail_ = uint8_t(size_t(read_buffer_tail_ + 2) % sizeof(read_buffer_)); - if (framing_begin_flag != octet) - { - octet ^= framing_xor_flag; - rv = true; - } - } - } - } - return rv; -} - -inline bool SerialIO::add_next_octet( - uint8_t octet) -{ - bool rv = false; - - if (framing_begin_flag == octet || framing_esc_flag == octet) - { - if (uint8_t(write_buffer_pos_ + 1) < sizeof(write_buffer_)) - { - write_buffer_[write_buffer_pos_] = framing_esc_flag; - write_buffer_[write_buffer_pos_ + 1] = octet ^ framing_xor_flag; - write_buffer_pos_ += 2; - rv = true; - } - } - else - { - if (write_buffer_pos_ < sizeof(write_buffer_)) - { - write_buffer_[write_buffer_pos_] = octet; - ++write_buffer_pos_; - rv = true; - } - } - - return rv; -} - -inline size_t SerialIO::write_msg( - const uint8_t *buf, - size_t len, - uint8_t remote_addr, - TransportRc& transport_rc) -{ - /* Buffer being flag. */ - write_buffer_[0] = framing_begin_flag; - write_buffer_pos_ = 1; - - /* Buffer header. */ - add_next_octet(local_addr_); - add_next_octet(remote_addr); - add_next_octet(uint8_t(len & 0xFF)); - add_next_octet(uint8_t(len >> 8)); - - /* Write payload. */ - uint8_t octet = 0; - uint16_t written_len = 0; - uint16_t crc = 0; - bool cond = true; - while (written_len < len && cond) - { - octet = *(buf + written_len); - if (add_next_octet(octet)) - { - update_crc(crc, octet); - ++written_len; - } - else - { - size_t bytes_written = write_callback_(write_buffer_, write_buffer_pos_, transport_rc); - if (0 < bytes_written) - { - cond = true; - write_buffer_pos_ = uint8_t(write_buffer_pos_ - bytes_written); - } - else - { - cond = false; - } - } - } - - /* Write CRC. */ - uint8_t tmp_crc[2]; - tmp_crc[0] = uint8_t(crc & 0xFF); - tmp_crc[1] = uint8_t(crc >> 8); - written_len = 0; - while (written_len < sizeof(tmp_crc) && cond) - { - octet = *(tmp_crc + written_len); - if (add_next_octet(octet)) - { - update_crc(crc, octet); - ++written_len; - } - else - { - size_t bytes_written = write_callback_(write_buffer_, write_buffer_pos_, transport_rc); - if (0 < bytes_written) - { - cond = true; - write_buffer_pos_ -= (uint8_t)bytes_written; - } - else - { - cond = false; - } - } - } - - /* Flus write buffer. */ - if (cond && (0 < write_buffer_pos_)) - { - size_t bytes_written = write_callback_(write_buffer_, write_buffer_pos_, transport_rc); - if (0 < bytes_written) - { - cond = true; - write_buffer_pos_ -= (uint8_t)bytes_written; - } - else - { - cond = false; - } - } - - return cond ? uint16_t(len) : 0; -} - -inline size_t SerialIO::read_msg( - uint8_t* buf, - size_t len, - uint8_t& remote_addr, - int timeout, - TransportRc& transport_rc) -{ - size_t rv = 0; - - /* Compute read-buffer available size. */ - uint8_t av_len[2] = {0, 0}; - if (read_buffer_head_ == read_buffer_tail_) - { - read_buffer_head_ = 0; - read_buffer_tail_ = 0; - av_len[0] = sizeof(read_buffer_) - 1; - } - else if (read_buffer_head_ > read_buffer_tail_) - { - if (0 < read_buffer_tail_) - { - av_len[0] = uint8_t(sizeof(read_buffer_) - read_buffer_head_); - av_len[1] = uint8_t(read_buffer_tail_ - 1); - } - else - { - av_len[0] = uint8_t(sizeof(read_buffer_) - read_buffer_head_ - 1); - } - } - else - { - av_len[0] = uint8_t(read_buffer_tail_ - read_buffer_head_ - 1); - } - - /* Read from serial. */ - size_t bytes_read[2] = {0}; - if (0 < av_len[0]) - { - bytes_read[0] = read_callback_(&read_buffer_[read_buffer_head_], av_len[0], timeout, transport_rc); - read_buffer_head_ = uint8_t(size_t(read_buffer_head_ + bytes_read[0]) % sizeof(read_buffer_)); - if (0 < bytes_read[0]) - { - if ((bytes_read[0] == av_len[0]) && (0 < av_len[1])) - { - bytes_read[1] = read_callback_(&read_buffer_[read_buffer_head_], av_len[1], 0, transport_rc); - read_buffer_head_ = uint8_t(size_t(read_buffer_head_ + bytes_read[1]) % sizeof(read_buffer_)); - } - } - } - - if (0 < (bytes_read[0] + bytes_read[1]) || (read_buffer_tail_ != read_buffer_head_)) - { - /* State Machine. */ - bool exit_cond = false; - while (!exit_cond) - { - uint8_t octet = 0; - switch (state_) - { - case InputState::UXR_SERIAL_UNINITIALIZED: - { - octet = 0; - while ((framing_begin_flag != octet) && (read_buffer_head_ != read_buffer_tail_)) - { - octet = read_buffer_[read_buffer_tail_]; - read_buffer_tail_ = uint8_t(size_t(read_buffer_tail_ + 1) % sizeof(read_buffer_)); - } - - if (framing_begin_flag == octet) - { - state_ = InputState::UXR_SERIAL_READING_SRC_ADDR; - } - else - { - exit_cond = true; - } - break; - } - case InputState::UXR_SERIAL_READING_SRC_ADDR: - { - if (get_next_octet(remote_addr_)) - { - state_ = InputState::UXR_SERIAL_READING_DST_ADDR; - } - else - { - if (framing_begin_flag != remote_addr_) - { - exit_cond = true; - } - } - break; - } - case InputState::UXR_SERIAL_READING_DST_ADDR: - if (get_next_octet(octet)) - { - state_ = (octet == local_addr_) - ? InputState::UXR_SERIAL_READING_LEN_LSB - : InputState::UXR_SERIAL_UNINITIALIZED; - } - else - { - if (framing_begin_flag == octet) - { - state_ = InputState::UXR_SERIAL_READING_SRC_ADDR; - } - else - { - exit_cond = true; - } - } - break; - case InputState::UXR_SERIAL_READING_LEN_LSB: - if (get_next_octet(octet)) - { - msg_len_ = octet; - state_ = InputState::UXR_SERIAL_READING_LEN_MSB; - } - else - { - if (framing_begin_flag == octet) - { - state_ = InputState::UXR_SERIAL_READING_SRC_ADDR; - } - else - { - exit_cond = true; - } - } - break; - case InputState::UXR_SERIAL_READING_LEN_MSB: - if (get_next_octet(octet)) - { - msg_len_ += (octet << 8); - msg_pos_ = 0; - cmp_crc_ = 0; - if (len < msg_len_) - { - state_ = InputState::UXR_SERIAL_UNINITIALIZED; - exit_cond = true; - } - else - { - state_ = InputState::UXR_SERIAL_READING_PAYLOAD; - } - } - else - { - if (framing_begin_flag == octet) - { - state_ = InputState::UXR_SERIAL_READING_SRC_ADDR; - } - else - { - exit_cond = true; - } - } - break; - case InputState::UXR_SERIAL_READING_PAYLOAD: - { - while ((msg_pos_ < msg_len_) && get_next_octet(octet)) - { - buf[size_t(msg_pos_)] = octet; - ++msg_pos_; - update_crc(cmp_crc_, octet); - } - - if (msg_pos_ == msg_len_) - { - state_ = InputState::UXR_SERIAL_READING_CRC_LSB; - } - else - { - if (framing_begin_flag == octet) - { - state_ = InputState::UXR_SERIAL_READING_SRC_ADDR; - } - else - { - exit_cond = true; - } - } - break; - } - case InputState::UXR_SERIAL_READING_CRC_LSB: - if (get_next_octet(octet)) - { - msg_crc_ = octet; - state_ = InputState::UXR_SERIAL_READING_CRC_MSB; - } - else - { - if (framing_begin_flag == octet) - { - state_ = InputState::UXR_SERIAL_READING_SRC_ADDR; - } - else - { - exit_cond = true; - } - } - break; - case InputState::UXR_SERIAL_READING_CRC_MSB: - if (get_next_octet(octet)) - { - msg_crc_ += (octet << 8); - state_ = InputState::UXR_SERIAL_UNINITIALIZED; - if (cmp_crc_ == msg_crc_) - { - remote_addr = remote_addr_; - rv = msg_len_; - } - exit_cond = true; - } - else - { - if (framing_begin_flag == octet) - { - state_ = InputState::UXR_SERIAL_READING_SRC_ADDR; - } - else - { - exit_cond = true; - } - } - break; - } - } - } - - return rv; -} - -} // namespace uxr -} // namespace eprosima - -#endif // UXR_AGENT_TRANSPORT_SERIAL_PROTOCOL_HPP_ diff --git a/include/uxr/agent/transport/stream_framing/StreamFramingProtocol.hpp b/include/uxr/agent/transport/stream_framing/StreamFramingProtocol.hpp new file mode 100644 index 000000000..8e402b415 --- /dev/null +++ b/include/uxr/agent/transport/stream_framing/StreamFramingProtocol.hpp @@ -0,0 +1,223 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef UXR_AGENT_TRANSPORT_STREAM_FRAMING_PROTOCOL_HPP_ +#define UXR_AGENT_TRANSPORT_STREAM_FRAMING_PROTOCOL_HPP_ + +#include + +#include +#include +#include + +#ifdef _WIN32 +#include +typedef SSIZE_T ssize_t; +#endif + +namespace eprosima { +namespace uxr { + +class FramingIO +{ +private: + static constexpr uint16_t crc16_table[256] = + { + 0x0000, 0xC0C1, 0xC181, 0x0140, 0xC301, 0x03C0, 0x0280, 0xC241, + 0xC601, 0x06C0, 0x0780, 0xC741, 0x0500, 0xC5C1, 0xC481, 0x0440, + 0xCC01, 0x0CC0, 0x0D80, 0xCD41, 0x0F00, 0xCFC1, 0xCE81, 0x0E40, + 0x0A00, 0xCAC1, 0xCB81, 0x0B40, 0xC901, 0x09C0, 0x0880, 0xC841, + 0xD801, 0x18C0, 0x1980, 0xD941, 0x1B00, 0xDBC1, 0xDA81, 0x1A40, + 0x1E00, 0xDEC1, 0xDF81, 0x1F40, 0xDD01, 0x1DC0, 0x1C80, 0xDC41, + 0x1400, 0xD4C1, 0xD581, 0x1540, 0xD701, 0x17C0, 0x1680, 0xD641, + 0xD201, 0x12C0, 0x1380, 0xD341, 0x1100, 0xD1C1, 0xD081, 0x1040, + 0xF001, 0x30C0, 0x3180, 0xF141, 0x3300, 0xF3C1, 0xF281, 0x3240, + 0x3600, 0xF6C1, 0xF781, 0x3740, 0xF501, 0x35C0, 0x3480, 0xF441, + 0x3C00, 0xFCC1, 0xFD81, 0x3D40, 0xFF01, 0x3FC0, 0x3E80, 0xFE41, + 0xFA01, 0x3AC0, 0x3B80, 0xFB41, 0x3900, 0xF9C1, 0xF881, 0x3840, + 0x2800, 0xE8C1, 0xE981, 0x2940, 0xEB01, 0x2BC0, 0x2A80, 0xEA41, + 0xEE01, 0x2EC0, 0x2F80, 0xEF41, 0x2D00, 0xEDC1, 0xEC81, 0x2C40, + 0xE401, 0x24C0, 0x2580, 0xE541, 0x2700, 0xE7C1, 0xE681, 0x2640, + 0x2200, 0xE2C1, 0xE381, 0x2340, 0xE101, 0x21C0, 0x2080, 0xE041, + 0xA001, 0x60C0, 0x6180, 0xA141, 0x6300, 0xA3C1, 0xA281, 0x6240, + 0x6600, 0xA6C1, 0xA781, 0x6740, 0xA501, 0x65C0, 0x6480, 0xA441, + 0x6C00, 0xACC1, 0xAD81, 0x6D40, 0xAF01, 0x6FC0, 0x6E80, 0xAE41, + 0xAA01, 0x6AC0, 0x6B80, 0xAB41, 0x6900, 0xA9C1, 0xA881, 0x6840, + 0x7800, 0xB8C1, 0xB981, 0x7940, 0xBB01, 0x7BC0, 0x7A80, 0xBA41, + 0xBE01, 0x7EC0, 0x7F80, 0xBF41, 0x7D00, 0xBDC1, 0xBC81, 0x7C40, + 0xB401, 0x74C0, 0x7580, 0xB541, 0x7700, 0xB7C1, 0xB681, 0x7640, + 0x7200, 0xB2C1, 0xB381, 0x7340, 0xB101, 0x71C0, 0x7080, 0xB041, + 0x5000, 0x90C1, 0x9181, 0x5140, 0x9301, 0x53C0, 0x5280, 0x9241, + 0x9601, 0x56C0, 0x5780, 0x9741, 0x5500, 0x95C1, 0x9481, 0x5440, + 0x9C01, 0x5CC0, 0x5D80, 0x9D41, 0x5F00, 0x9FC1, 0x9E81, 0x5E40, + 0x5A00, 0x9AC1, 0x9B81, 0x5B40, 0x9901, 0x59C0, 0x5880, 0x9841, + 0x8801, 0x48C0, 0x4980, 0x8941, 0x4B00, 0x8BC1, 0x8A81, 0x4A40, + 0x4E00, 0x8EC1, 0x8F81, 0x4F40, 0x8D01, 0x4DC0, 0x4C80, 0x8C41, + 0x4400, 0x84C1, 0x8581, 0x4540, 0x8701, 0x47C0, 0x4680, 0x8641, + 0x8201, 0x42C0, 0x4380, 0x8341, 0x4100, 0x81C1, 0x8081, 0x4040 + }; + + static constexpr uint8_t framing_begin_flag = 0x7E; + static constexpr uint8_t framing_esc_flag = 0x7D; + static constexpr uint8_t framing_xor_flag = 0x20; + + /** + * @brief Possible states for the framing protocol. + */ + enum class InputState : uint8_t + { + UXR_FRAMING_UNINITIALIZED, + UXR_FRAMING_READING_SRC_ADDR, + UXR_FRAMING_READING_DST_ADDR, + UXR_FRAMING_READING_LEN_LSB, + UXR_FRAMING_READING_LEN_MSB, + UXR_FRAMING_READING_PAYLOAD, + UXR_FRAMING_READING_CRC_LSB, + UXR_FRAMING_READING_CRC_MSB, + }; + +public: + /** + * @brief Write callback function signature. + * @param buffer Raw octet buffer to write into. + * @param message_length Number of bytes to be written. + * @param transport_rc Return code to be set by the callback logic. + * @return size_t Number of written bytes. + */ + using WriteCallback = std::function; + + /** + * @brief Read callback function signature. + * @param buffer Raw octet buffer to read data from. + * @param buffer_length Length of the buffer. + * @param timeout Read timeout. + * @param transport_rc Return code to be set by the callback logic. + * @return size_t Number of read bytes. + */ + using ReadCallback = std::function; + + FramingIO( + uint8_t local_addr, + WriteCallback write_callback, + ReadCallback read_callback); + + /** + * @brief Write message using the stream framing protocol + * and the previously user provided WriteCallback method. + * @param buf Buffer to write data into. + * @param len Length of the message to be written. + * @param remote_addr Remote address to where the message will be sent. + * @param transport_rc Return code of the write operation. + * @return size_t Number of written bytes. + */ + size_t write_framed_msg( + const uint8_t* buf, + size_t len, + uint8_t remote_addr, + TransportRc& transport_rc); + + /** + * @brief Read message using the stream framing protocol + * and the previously user provided ReadCallback method. + * @param buf Buffer to read data from. + * @param len Length of the buffer. + * @param remote_addr Remote address tfrom which the message will be read. + * @param timeout Timeout in milliseconds. + * @param transport_rc Return code of the read operation. + * @return size_t Number of read bytes. + */ + size_t read_framed_msg( + uint8_t* buf, + size_t len, + uint8_t& remote_addr, + int timeout, + TransportRc& transport_rc); + +private: + /** + * @brief Static method to update CRC. + * @param crc CRC code to be updated. + * @param data New data to be loaded into the CRC code. + */ + static void update_crc( + uint16_t& crc, + const uint8_t data); + + /** + * @brief Get next octet from the read buffer. + * @param octet Octet to which the data will be written. + * @return True if successful read, false otherwise. + */ + bool get_next_octet( + uint8_t& octet); + + /** + * @brief Add next octet to the write buffer. + * @param octet Data to be written. + * @return true if successful write, false otherwise. + */ + bool add_next_octet( + uint8_t octet); + + /** + * @brief Internal write method implemented using circular write buffer. + * @param transport_rc Return code of the write operation. + * @return True if success, false otherwise. + */ + bool transport_write( + TransportRc& transport_rc); + + /** + * @brief Internal read method implemented using circular read buffer. + * @param timeout Read timeout in milliseconds. + * @param transport_rc Return code of the read operation. + * @return Number of Bytes read. + */ + size_t transport_read( + int& timeout, + TransportRc& transport_rc); + + InputState state_; + + uint8_t local_addr_; + uint8_t remote_addr_; + + uint8_t read_buffer_[42]; + uint8_t read_buffer_head_; + uint8_t read_buffer_tail_; + + ReadCallback read_callback_; + + uint16_t msg_len_; + uint16_t msg_pos_; + uint16_t msg_crc_; + uint16_t cmp_crc_; + + uint8_t write_buffer_[42]; + uint8_t write_buffer_pos_; + + WriteCallback write_callback_; +}; + +} // namespace uxr +} // namespace eprosima + +#endif // UXR_AGENT_TRANSPORT_STREAM_FRAMING_PROTOCOL_HPP_ \ No newline at end of file diff --git a/src/cpp/processor/Processor.cpp b/src/cpp/processor/Processor.cpp index 6bfc6fdd9..8bc63f96a 100644 --- a/src/cpp/processor/Processor.cpp +++ b/src/cpp/processor/Processor.cpp @@ -24,6 +24,7 @@ #include #include #include +#include namespace eprosima { namespace uxr { @@ -895,6 +896,7 @@ void Processor::check_heartbeats() template class Processor; template class Processor; template class Processor; +template class Processor; } // namespace uxr } // namespace eprosima diff --git a/src/cpp/transport/Server.cpp b/src/cpp/transport/Server.cpp index 03a2ea76e..1df335c71 100644 --- a/src/cpp/transport/Server.cpp +++ b/src/cpp/transport/Server.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include @@ -32,6 +33,7 @@ namespace uxr { extern template class Processor; extern template class Processor; extern template class Processor; +extern template class Processor; template Server::Server(Middleware::Kind middleware_kind) @@ -112,6 +114,7 @@ bool Server::stop() /* Close servers. */ bool rv = true; + // TODO: check at run time if P2P and discovery are implemented #ifdef UAGENT_DISCOVERY_PROFILE rv = fini_discovery() && rv; #endif @@ -262,6 +265,7 @@ void Server::error_handler_loop() template class Server; template class Server; template class Server; +template class Server; } // namespace uxr } // namespace eprosima diff --git a/src/cpp/transport/custom/CustomAgent.cpp b/src/cpp/transport/custom/CustomAgent.cpp new file mode 100644 index 000000000..86425beb0 --- /dev/null +++ b/src/cpp/transport/custom/CustomAgent.cpp @@ -0,0 +1,333 @@ +// Copyright 2021-present Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +namespace eprosima { +namespace uxr { + +namespace { +const std::string transport_rc_to_str( + const TransportRc& transport_rc) +{ + switch (transport_rc) + { + case TransportRc::connection_error: + { + return std::string("connection error"); + } + case TransportRc::timeout_error: + { + return std::string("timeout error"); + } + case TransportRc::server_error: + { + return std::string("server error"); + } + default: + { + return std::string(); + } + } +} + +} // anonymous namespace + +CustomAgent::CustomAgent( + const std::string& name, + CustomEndPoint* endpoint, + Middleware::Kind middleware_kind, + bool framing, + InitFunction& init_function, + FiniFunction& fini_function, + SendMsgFunction& send_msg_function, + RecvMsgFunction& recv_msg_function) + : Server(middleware_kind) + , name_(name) + , recv_endpoint_(endpoint) + , send_endpoint_(nullptr) + , custom_init_func_(init_function) + , custom_fini_func_(fini_function) + , custom_send_msg_func_(send_msg_function) + , custom_recv_msg_func_(recv_msg_function) + , framing_(framing) + , framing_io_(0x00, + [&]( + uint8_t* buffer, + size_t message_length, + TransportRc& transport_rc) -> ssize_t + { + return custom_send_msg_func_( + send_endpoint_, + buffer, + message_length, + transport_rc); + }, + [&]( + uint8_t* buffer, + size_t buffer_length, + int timeout, + TransportRc& transport_rc) -> ssize_t + { + return custom_recv_msg_func_( + recv_endpoint_, + buffer, + buffer_length, + timeout, + transport_rc); + }) +{ +} + +CustomAgent::~CustomAgent() +{ + try + { + this->stop(); + } + catch (const std::exception& e) + { + UXR_AGENT_LOG_CRITICAL( + UXR_DECORATE_RED("Error stopping custom agent server"), + "{} agent exception: {}", + name_, e.what()); + } +} + +bool CustomAgent::init() +{ + try + { + bool user_init_res = custom_init_func_(); + + if (user_init_res) + { + UXR_AGENT_LOG_INFO( + UXR_DECORATE_GREEN("Custom agent status: opened"), + "{} agent running", + name_); + } + else + { + UXR_AGENT_LOG_ERROR( + UXR_DECORATE_RED("Error initializing custom agent server"), + "{} agent error", + name_); + } + + return user_init_res; + } + catch (const std::exception& e) + { + UXR_AGENT_LOG_CRITICAL( + UXR_DECORATE_RED("Error initializing custom agent server"), + "{} agent exception: {}", + name_, e.what()); + + return false; + } +} + +bool CustomAgent::fini() +{ + try + { + bool user_fini_res = custom_fini_func_(); + + if (user_fini_res) + { + UXR_AGENT_LOG_INFO( + UXR_DECORATE_GREEN("Custom agent status: closed"), + "{} agent closed", + name_); + } + else + { + UXR_AGENT_LOG_ERROR( + UXR_DECORATE_RED("Error terminating custom agent server"), + "{} agent error", + name_); + } + + return user_fini_res; + } + catch (const std::exception& e) + { + UXR_AGENT_LOG_CRITICAL( + UXR_DECORATE_RED("Error terminating custom agent server"), + "{} agent exception: {}", + name_, e.what()); + + return false; + } +} + +bool CustomAgent::recv_message( + InputPacket& input_packet, + int timeout, + TransportRc& transport_rc) +{ + // Reset recv_endpoint_ members before receiving a new message. + recv_endpoint_->reset(); + + try + { + ssize_t recv_bytes; + if (framing_) + { + uint8_t remote_addr = 0x00; + recv_bytes = framing_io_.read_framed_msg( + buffer_, SERVER_BUFFER_SIZE, remote_addr, timeout, transport_rc); + } + else + { + recv_bytes = custom_recv_msg_func_( + recv_endpoint_, buffer_, SERVER_BUFFER_SIZE, timeout, transport_rc); + } + + bool success = (0 <= recv_bytes && TransportRc::ok == transport_rc); + if (success) + { + // User must have filled all the members of the endpoint. + recv_endpoint_->check_non_empty_members(); + + input_packet.message.reset( + new eprosima::uxr::InputMessage( + buffer_, static_cast(recv_bytes))); + input_packet.source = *recv_endpoint_; + + uint32_t raw_client_key = 0u; + this->get_client_key(input_packet.source, raw_client_key); + + std::stringstream ss; + ss << UXR_COLOR_YELLOW << "[==>> " << name_ << " <<==]" << UXR_COLOR_RESET; + UXR_AGENT_LOG_MESSAGE( + ss.str(), + raw_client_key, + input_packet.message->get_buf(), + input_packet.message->get_len()); + } + else if (TransportRc::timeout_error != transport_rc) + { + // Printing a trace for timeout_error would fill the log with too much messages. + std::stringstream ss; + ss << UXR_COLOR_RED << "Error while receiving message: " + << transport_rc_to_str(transport_rc) << UXR_COLOR_RESET; + UXR_AGENT_LOG_ERROR( + ss.str(), + "{} agent error", + name_); + } + + return success; + } + catch (const std::exception& e) + { + UXR_AGENT_LOG_ERROR( + UXR_DECORATE_RED("Error while receiving message"), + "custom {} agent, exception: {}", + name_, e.what()); + + return false; + } +} + +bool CustomAgent::send_message( + OutputPacket output_packet, + TransportRc& transport_rc) +{ + try + { + ssize_t sent_bytes; + if (framing_) + { + send_endpoint_ = &output_packet.destination; + sent_bytes = framing_io_.write_framed_msg( + output_packet.message->get_buf(), + output_packet.message->get_len(), + 0x00, + transport_rc); + send_endpoint_ = nullptr; + } + else + { + sent_bytes = custom_send_msg_func_( + &output_packet.destination, + output_packet.message->get_buf(), + output_packet.message->get_len(), + transport_rc); + } + + bool success = (output_packet.message->get_len() == static_cast(sent_bytes)); + if (success) + { + uint32_t raw_client_key = 0u; + this->get_client_key(output_packet.destination, raw_client_key); + + std::stringstream ss; + ss << UXR_COLOR_YELLOW << "[** <<" << name_ << ">> **]" << UXR_COLOR_RESET; + UXR_AGENT_LOG_MESSAGE( + ss.str(), + raw_client_key, + output_packet.message->get_buf(), + output_packet.message->get_len()); + } + else + { + std::stringstream ss; + ss << UXR_COLOR_RED + << "Error while sending message: " + << transport_rc_to_str(transport_rc) + << ". Expected to send " + << output_packet.message->get_len() + << " bytes, but sent " + << sent_bytes + << "instead" + << UXR_COLOR_RESET; + UXR_AGENT_LOG_ERROR( + ss.str(), + "{} agent error", + name_); + } + + return success; + } + catch (const std::exception& e) + { + UXR_AGENT_LOG_ERROR( + UXR_DECORATE_RED("Error while sending message"), + "custom {} agent, exception: {}", + name_, e.what()); + + return false; + } +} + +bool CustomAgent::handle_error( + TransportRc transport_rc) +{ + std::stringstream ss; + ss << UXR_COLOR_RED << "Recovering from error: " + << transport_rc_to_str(transport_rc) << UXR_COLOR_RESET; + UXR_AGENT_LOG_ERROR( + ss.str(), + "{} agent error", + name_); + return fini() && init(); +} + +} // namespace uxr +} // namespace eprosima \ No newline at end of file diff --git a/src/cpp/transport/serial/SerialAgentLinux.cpp b/src/cpp/transport/serial/SerialAgentLinux.cpp index 29f9aaa74..bc0082ea9 100644 --- a/src/cpp/transport/serial/SerialAgentLinux.cpp +++ b/src/cpp/transport/serial/SerialAgentLinux.cpp @@ -28,13 +28,13 @@ SerialAgent::SerialAgent( , addr_{addr} , poll_fd_{} , buffer_{0} - , serial_io_( + , framing_io_( addr, std::bind(&SerialAgent::write_data, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), std::bind(&SerialAgent::read_data, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)) {} -size_t SerialAgent::write_data( +ssize_t SerialAgent::write_data( uint8_t* buf, size_t len, TransportRc& transport_rc) @@ -52,22 +52,18 @@ size_t SerialAgent::write_data( return rv; } -size_t SerialAgent::read_data( +ssize_t SerialAgent::read_data( uint8_t* buf, size_t len, int timeout, TransportRc& transport_rc) { - size_t rv = 0; + ssize_t bytes_read = 0; int poll_rv = poll(&poll_fd_, 1, timeout); if (0 < poll_rv) { - ssize_t bytes_read = read(poll_fd_.fd, buf, len); - if (0 < bytes_read) - { - rv = size_t(bytes_read); - } - else + bytes_read = read(poll_fd_.fd, buf, len); + if (0 > bytes_read) { transport_rc = TransportRc::server_error; } @@ -76,7 +72,7 @@ size_t SerialAgent::read_data( { transport_rc = (poll_rv == 0) ? TransportRc::timeout_error : TransportRc::server_error; } - return rv; + return bytes_read; } bool SerialAgent::recv_message( @@ -86,7 +82,7 @@ bool SerialAgent::recv_message( { bool rv = false; uint8_t remote_addr; - size_t bytes_read = serial_io_.read_msg(buffer_,sizeof (buffer_), remote_addr, timeout, transport_rc); + ssize_t bytes_read = framing_io_.read_framed_msg(buffer_,sizeof (buffer_), remote_addr, timeout, transport_rc); if (0 < bytes_read) { input_packet.message.reset(new InputMessage(buffer_, static_cast(bytes_read))); @@ -111,13 +107,14 @@ bool SerialAgent::send_message( TransportRc& transport_rc) { bool rv = false; - size_t bytes_written = - serial_io_.write_msg( + ssize_t bytes_written = + framing_io_.write_framed_msg( output_packet.message->get_buf(), output_packet.message->get_len(), output_packet.destination.get_addr(), transport_rc); - if ((0 < bytes_written) && (bytes_written == output_packet.message->get_len())) + if ((0 < bytes_written) && ( + static_cast(bytes_written) == output_packet.message->get_len())) { rv = true; diff --git a/src/cpp/transport/serial/SerialProtocol.cpp b/src/cpp/transport/serial/SerialProtocol.cpp deleted file mode 100644 index 32a2d5063..000000000 --- a/src/cpp/transport/serial/SerialProtocol.cpp +++ /dev/null @@ -1,9 +0,0 @@ -#include - -namespace eprosima { -namespace uxr { - -constexpr uint16_t SerialIO::crc16_table[256]; - -} // namespace uxr -} // namespace eprosima diff --git a/src/cpp/transport/stream_framing/StreamFramingProtocol.cpp b/src/cpp/transport/stream_framing/StreamFramingProtocol.cpp new file mode 100644 index 000000000..5833f76de --- /dev/null +++ b/src/cpp/transport/stream_framing/StreamFramingProtocol.cpp @@ -0,0 +1,493 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace eprosima { +namespace uxr { + +constexpr uint16_t FramingIO::crc16_table[256]; + +FramingIO::FramingIO( + uint8_t local_addr, + WriteCallback write_callback, + ReadCallback read_callback) + : state_(InputState::UXR_FRAMING_UNINITIALIZED) + , local_addr_(local_addr) + , remote_addr_(0) + , read_buffer_() + , read_buffer_head_(0) + , read_buffer_tail_(0) + , read_callback_(read_callback) + , msg_len_(0) + , msg_pos_(0) + , msg_crc_(0) + , cmp_crc_(0) + , write_buffer_() + , write_buffer_pos_(0) + , write_callback_(write_callback) +{ +} + +size_t FramingIO::write_framed_msg( + const uint8_t* buf, + size_t len, + uint8_t remote_addr, + TransportRc& transport_rc) +{ + /* Buffer being flag. */ + write_buffer_[0] = framing_begin_flag; + write_buffer_pos_ = 1; + + /* Buffer header. */ + add_next_octet(local_addr_); + add_next_octet(remote_addr); + add_next_octet(static_cast(len & 0xFF)); + add_next_octet(static_cast(len >> 8)); + + /* Write payload. */ + uint8_t octet = 0; + uint16_t written_len = 0; + uint16_t crc = 0; + bool cond = true; + while (written_len < len && cond) + { + octet = static_cast(*(buf + written_len)); + if (add_next_octet(octet)) + { + update_crc(crc, octet); + ++written_len; + } + else + { + cond = transport_write(transport_rc); + } + } + + /* Write CRC. */ + uint8_t tmp_crc[2]; + tmp_crc[0] = static_cast(crc & 0xFF); + tmp_crc[1] = static_cast(crc >> 8); + written_len = 0; + while (written_len < sizeof(tmp_crc) && cond) + { + octet = *(tmp_crc + written_len); + if (add_next_octet(octet)) + { + update_crc(crc, octet); + ++written_len; + } + else + { + cond = transport_write(transport_rc); + } + } + + /* Flus write buffer. */ + if (cond && (0 < write_buffer_pos_)) + { + cond = transport_write(transport_rc); + } + + return cond ? len : 0; +} + +size_t FramingIO::read_framed_msg( + uint8_t* buf, + size_t len, + uint8_t& remote_addr, + int timeout, + TransportRc& transport_rc) +{ + size_t rv = 0; + + size_t read_bytes = transport_read(timeout, transport_rc); + + if (0 < read_bytes || (read_buffer_tail_ != read_buffer_head_)) + { + /** + * State Machine. + */ + bool exit_cond = false; + while (!exit_cond) + { + uint8_t octet = 0; + switch (state_) + { + case InputState::UXR_FRAMING_UNINITIALIZED: + { + octet = 0; + while ((framing_begin_flag != octet) && + (read_buffer_head_ != read_buffer_tail_)) + { + octet = read_buffer_[read_buffer_tail_]; + read_buffer_tail_ = + static_cast( + static_cast( + read_buffer_tail_ + 1) % + sizeof(read_buffer_)); + } + + if (framing_begin_flag == octet) + { + state_ = InputState::UXR_FRAMING_READING_SRC_ADDR; + } + else + { + exit_cond = true; + } + break; + } + case InputState::UXR_FRAMING_READING_SRC_ADDR: + { + if (get_next_octet(remote_addr_)) + { + state_ = InputState::UXR_FRAMING_READING_DST_ADDR; + } + else + { + if (framing_begin_flag != remote_addr_) + { + exit_cond = true; + } + } + break; + } + case InputState::UXR_FRAMING_READING_DST_ADDR: + { + if (get_next_octet(octet)) + { + state_ = (octet == local_addr_) + ? InputState::UXR_FRAMING_READING_LEN_LSB + : InputState::UXR_FRAMING_UNINITIALIZED; + } + else + { + if (framing_begin_flag == octet) + { + state_ = InputState::UXR_FRAMING_READING_SRC_ADDR; + } + else + { + exit_cond = true; + } + } + break; + } + case InputState::UXR_FRAMING_READING_LEN_LSB: + { + if (get_next_octet(octet)) + { + msg_len_ = octet; + state_ = InputState::UXR_FRAMING_READING_LEN_MSB; + } + else + { + if (framing_begin_flag == octet) + { + state_ = InputState::UXR_FRAMING_READING_SRC_ADDR; + } + else + { + exit_cond = true; + } + } + break; + } + case InputState::UXR_FRAMING_READING_LEN_MSB: + { + if (get_next_octet(octet)) + { + msg_len_ += (octet << 8); + msg_pos_ = 0; + cmp_crc_ = 0; + if (len < msg_len_) + { + state_ = InputState::UXR_FRAMING_UNINITIALIZED; + exit_cond = true; + } + else + { + state_ = InputState::UXR_FRAMING_READING_PAYLOAD; + } + } + else + { + if (framing_begin_flag == octet) + { + state_ = InputState::UXR_FRAMING_READING_SRC_ADDR; + } + else + { + exit_cond = true; + } + } + break; + } + case InputState::UXR_FRAMING_READING_PAYLOAD: + { + while ((msg_pos_ < msg_len_) && get_next_octet(octet)) + { + buf[static_cast(msg_pos_)] = octet; + ++msg_pos_; + update_crc(cmp_crc_, octet); + } + + if (msg_pos_ == msg_len_) + { + state_ = InputState::UXR_FRAMING_READING_CRC_LSB; + } + else + { + if (framing_begin_flag == octet) + { + state_ = InputState::UXR_FRAMING_READING_SRC_ADDR; + } + else if (0 < transport_read(timeout, transport_rc)) + { + /* Do nothing */ + } + else + { + exit_cond = true; + } + } + break; + } + case InputState::UXR_FRAMING_READING_CRC_LSB: + { + if (get_next_octet(octet)) + { + msg_crc_ = octet; + state_ = InputState::UXR_FRAMING_READING_CRC_MSB; + } + else + { + if (framing_begin_flag == octet) + { + state_ = InputState::UXR_FRAMING_READING_SRC_ADDR; + } + else + { + exit_cond = true; + } + } + break; + } + case InputState::UXR_FRAMING_READING_CRC_MSB: + if (get_next_octet(octet)) + { + msg_crc_ += (octet << 8); + state_ = InputState::UXR_FRAMING_UNINITIALIZED; + if (cmp_crc_ == msg_crc_) + { + remote_addr = remote_addr_; + rv = msg_len_; + } + exit_cond = true; + } + else + { + if (framing_begin_flag == octet) + { + state_ = InputState::UXR_FRAMING_READING_SRC_ADDR; + } + else + { + exit_cond = true; + } + } + break; + } + } + } + + return rv; +} + +void FramingIO::update_crc( + uint16_t& crc, + const uint8_t data) +{ + crc = (crc >> 8) ^ crc16_table[(crc ^ data) & 0xFF]; +} + +bool FramingIO::get_next_octet( + uint8_t& octet) +{ + bool rv = false; + octet = 0; + + if (read_buffer_head_ != read_buffer_tail_) + { + if (framing_esc_flag != read_buffer_[read_buffer_tail_]) + { + octet = read_buffer_[read_buffer_tail_]; + read_buffer_tail_ = static_cast( + static_cast(read_buffer_tail_ + 1) % sizeof(read_buffer_)); + + rv = (framing_begin_flag != octet); + } + else + { + uint8_t temp_tail = static_cast( + static_cast(read_buffer_tail_ + 1) % sizeof(read_buffer_)); + + if (temp_tail != read_buffer_head_) + { + octet = read_buffer_[temp_tail]; + read_buffer_tail_ = static_cast( + static_cast(read_buffer_tail_ + 2) % sizeof(read_buffer_)); + + if (framing_begin_flag != octet) + { + octet ^= framing_xor_flag; + rv = true; + } + } + } + } + + return rv; +} + +bool FramingIO::add_next_octet( + uint8_t octet) +{ + bool rv = false; + + if (framing_begin_flag == octet || framing_esc_flag == octet) + { + if (static_cast(write_buffer_pos_ + 1) < sizeof(write_buffer_)) + { + write_buffer_[write_buffer_pos_] = framing_esc_flag; + write_buffer_[write_buffer_pos_ + 1] = octet ^ framing_xor_flag; + write_buffer_pos_ += 2; + rv = true; + } + } + else if (write_buffer_pos_ < sizeof(write_buffer_)) + { + write_buffer_[write_buffer_pos_] = octet; + ++write_buffer_pos_; + rv = true; + } + + return rv; +} + +bool FramingIO::transport_write( + TransportRc& transport_rc) +{ + size_t bytes_written = 0; + size_t last_written = 0; + + do + { + ssize_t write_res = write_callback_(write_buffer_, write_buffer_pos_, transport_rc); + last_written = (0 < write_res) ? write_res : 0; + bytes_written += last_written; + } while (bytes_written < write_buffer_pos_ && 0 < last_written); + + if (write_buffer_pos_ == bytes_written) + { + write_buffer_pos_ = 0; + return true; + } + + return false; +} + +size_t FramingIO::transport_read( + int& timeout, + TransportRc& transport_rc) +{ + const auto time_init = std::chrono::system_clock::now(); + + /** + * Compute read-buffer available size. + * We keep track of two possible fragments to handle the case of + * some intermediate section of the circular buffer being written, + * that is, head > tail. + */ + uint8_t available_length[2] = {0, 0}; + if (read_buffer_head_ == read_buffer_tail_) + { + read_buffer_head_ = 0; + read_buffer_tail_ = 0; + available_length[0] = sizeof(read_buffer_) - 1; + } + else if (read_buffer_head_ > read_buffer_tail_) + { + if (0 < read_buffer_tail_) + { + available_length[0] = + static_cast(sizeof(read_buffer_) - read_buffer_head_); + available_length[1] = + static_cast(read_buffer_tail_ - 1); + } + else + { + available_length[0] = + static_cast(sizeof(read_buffer_) - read_buffer_head_ - 1); + } + } + else + { + available_length[0] = + static_cast(read_buffer_tail_ - read_buffer_head_ - 1); + } + + /** + * Read from serial, according to the fragment size previously calculated. + */ + size_t bytes_read[2] = {0, 0}; + if (0 < available_length[0]) + { + ssize_t read_res = read_callback_(&read_buffer_[read_buffer_head_], + available_length[0], + timeout, + transport_rc); + bytes_read[0] = (0 < read_res) ? read_res : 0; + + read_buffer_head_ = static_cast( + static_cast(read_buffer_head_ + bytes_read[0]) % sizeof(read_buffer_)); + + if (0 < bytes_read[0]) + { + if ((bytes_read[0] == available_length[0]) && (0 < available_length[1])) + { + read_res = read_callback_(&read_buffer_[read_buffer_head_], + available_length[1], + 0, + transport_rc); + bytes_read[1] = (0 < read_res) ? read_res : 0; + + read_buffer_head_ = static_cast( + static_cast(read_buffer_head_ + bytes_read[1]) % sizeof(read_buffer_)); + } + } + } + + timeout -= static_cast( + std::chrono::duration_cast( + time_init - std::chrono::system_clock::now()) + .count()); + + return bytes_read[0] + bytes_read[1]; +} + +} // namespace uxr +} // namespace eprosima \ No newline at end of file