Skip to content

Commit

Permalink
refactor(sink/tcp): export TCP declaration
Browse files Browse the repository at this point in the history
In the future we’d probably better to return an unique pointer from
factories instead of completely defined types to be able to hide real
sink implementations.
  • Loading branch information
3Hren committed Feb 18, 2016
1 parent 5da1a74 commit dc71ba1
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 121 deletions.
113 changes: 0 additions & 113 deletions include/blackhole/detail/sink/socket/tcp.hpp

This file was deleted.

23 changes: 22 additions & 1 deletion include/blackhole/sink/socket/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <memory>

#include <blackhole/sink.hpp>

namespace blackhole {
inline namespace v1 {

Expand Down Expand Up @@ -30,7 +32,26 @@ namespace socket {
/// and port.
///
/// \remark All methods of this class are thread-safe.
class tcp_t;
class tcp_t : public sink_t {
struct data_t;
std::unique_ptr<data_t> data;

public:
tcp_t(std::string host, std::uint16_t port);

tcp_t(const tcp_t& other) = delete;
tcp_t(tcp_t&& other) noexcept;

~tcp_t();

auto operator=(const tcp_t& other) -> tcp_t& = delete;
auto operator=(tcp_t&& other) noexcept -> tcp_t&;

auto host() const noexcept -> const std::string&;
auto port() const noexcept -> std::uint16_t;

auto emit(const record_t& record, const string_view& message) -> void;
};

} // namespace socket
} // namespace sink
Expand Down
4 changes: 0 additions & 4 deletions src/registry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
#include "blackhole/sink.hpp"
#include "blackhole/sink/console.hpp"
#include "blackhole/sink/null.hpp"
#include "blackhole/sink/socket/tcp.hpp"

#include "blackhole/detail/sink/socket/tcp.hpp"

namespace blackhole {
inline namespace v1 {
Expand Down Expand Up @@ -116,7 +113,6 @@ auto registry_t::configured() -> registry_t {

registry.add<sink::console_t>();
registry.add<sink::null_t>();
registry.add<sink::socket::tcp_t>();

registry.add<handler::blocking_t>();

Expand Down
109 changes: 108 additions & 1 deletion src/sink/socket/tcp.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,115 @@
#include <mutex>

#include <boost/asio/connect.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/write.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/optional/optional.hpp>

#include "blackhole/config/node.hpp"
#include "blackhole/config/option.hpp"
#include "blackhole/cpp17/string_view.hpp"
#include "blackhole/extensions/format.hpp"
#include "blackhole/sink.hpp"
#include "blackhole/sink/socket/tcp.hpp"

#include "blackhole/detail/sink/socket/tcp.hpp"
#include "blackhole/detail/util/optional.hpp"

namespace blackhole {
inline namespace v1 {
namespace sink {
namespace socket {

typedef boost::asio::ip::tcp protocol_type;
typedef protocol_type::socket socket_type;
typedef protocol_type::endpoint endpoint_type;

struct tcp_t::data_t {
std::string host;
std::uint16_t port;

boost::asio::io_service io_service;
std::unique_ptr<socket_type> socket;

mutable std::mutex mutex;
};

namespace {

/// Resolves specified host and tries to connect to the socket.
template<typename Protocol>
auto connect(boost::asio::io_service& ev, typename Protocol::socket& socket,
const std::string& host, std::uint16_t port) -> void
{
typename Protocol::resolver::iterator endpoint;

try {
typename Protocol::resolver resolver(ev);
typename Protocol::resolver::query query(host, boost::lexical_cast<std::string>(port),
Protocol::resolver::query::flags::numeric_service);
endpoint = resolver.resolve(query);
} catch (const boost::system::system_error& err) {
throw std::system_error(err.code().value(), std::system_category(),
fmt::format("failed to resolve {}:{} - {}", host, port, err.what()));
}

try {
boost::asio::connect(socket, endpoint);
} catch (const boost::system::system_error& err) {
throw std::system_error(err.code().value(), std::system_category(),
fmt::format("failed to connect to {}:{} - {}", host, port, err.what()));
}
}

auto reconnect(boost::asio::io_service& io_service, const std::string& host, std::uint16_t port) ->
std::unique_ptr<socket_type>
{
auto socket = std::unique_ptr<socket_type>(new socket_type(io_service));
connect<protocol_type>(io_service, *socket, host, port);

return socket;
}

} // namespace

tcp_t::tcp_t(std::string host, std::uint16_t port) :
data(new data_t)
{
data->host = std::move(host);
data->port = port;
}

tcp_t::~tcp_t() = default;

auto tcp_t::host() const noexcept -> const std::string& {
return data->host;
}

auto tcp_t::port() const noexcept -> std::uint16_t {
return data->port;
}

auto tcp_t::emit(const record_t&, const string_view& message) -> void {
std::lock_guard<std::mutex> lock(data->mutex);

if (!data->socket) {
data->socket = reconnect(data->io_service, data->host, data->port);
}

try {
boost::asio::write(*data->socket, boost::asio::buffer(message.data(), message.size()));
} catch (const boost::system::system_error&) {
data->socket.reset();
std::rethrow_exception(std::current_exception());
}
}

} // namespace socket
} // namespace sink
} // namespace v1
} // namespace blackhole

namespace blackhole {
inline namespace v1 {

Expand Down
3 changes: 1 addition & 2 deletions tests/sink/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@

#include <blackhole/attribute.hpp>
#include <blackhole/record.hpp>

#include <blackhole/detail/sink/socket/tcp.hpp>
#include <blackhole/sink/socket/tcp.hpp>

#include "mocks/node.hpp"

Expand Down

0 comments on commit dc71ba1

Please sign in to comment.