Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add udp backend #76

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions libcaf_net/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ add_library(libcaf_net_obj OBJECT ${CAF_NET_HEADERS}
src/multiplexer.cpp
src/net/backend/test.cpp
src/net/backend/tcp.cpp
src/net/backend/udp.cpp
src/net/endpoint_manager_queue.cpp
src/net/middleman.cpp
src/net/middleman_backend.cpp
Expand Down Expand Up @@ -149,4 +150,5 @@ caf_incubator_add_test_suites(caf-net-test
udp_datagram_socket
network_socket
net.backend.tcp
net.backend.udp
)
86 changes: 86 additions & 0 deletions libcaf_net/caf/net/backend/udp.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/******************************************************************************
* ____ _ _____ *
* / ___| / \ | ___| C++ *
* | | / _ \ | |_ Actor *
* | |___ / ___ \| _| Framework *
* \____/_/ \_|_| *
* *
* Copyright 2011-2020 Dominik Charousset *
* *
* Distributed under the terms and conditions of the BSD 3-Clause License or *
* (at your option) under the terms and conditions of the Boost Software *
* License 1.0. See accompanying files LICENSE and LICENSE_ALTERNATIVE. *
* *
* If you did not receive a copy of the license files, see *
* http://opensource.org/licenses/BSD-3-Clause and *
* http://www.boost.org/LICENSE_1_0.txt. *
******************************************************************************/

#pragma once

#include <map>
#include <mutex>

#include "caf/detail/net_export.hpp"
#include "caf/error.hpp"
#include "caf/expected.hpp"
#include "caf/net/basp/application.hpp"
#include "caf/net/fwd.hpp"
#include "caf/net/make_endpoint_manager.hpp"
#include "caf/net/middleman.hpp"
#include "caf/net/middleman_backend.hpp"
#include "caf/net/multiplexer.hpp"
#include "caf/net/udp_datagram_socket.hpp"
#include "caf/node_id.hpp"

namespace caf::net::backend {

/// Minimal backend for udp communication.
class CAF_NET_EXPORT udp : public middleman_backend {
public:
// -- constructors, destructors, and assignment operators --------------------

udp(middleman& mm);

~udp() override;

// -- interface functions ----------------------------------------------------

error init() override;

void stop() override;

expected<endpoint_manager_ptr> get_or_connect(const uri& locator) override;

endpoint_manager_ptr peer(const node_id&) override;

void resolve(const uri& locator, const actor& listener) override;

strong_actor_ptr make_proxy(node_id nid, actor_id aid) override;

void set_last_hop(node_id*) override;

// -- properties -------------------------------------------------------------

uint16_t port() const noexcept override {
return listening_port_;
}

expected<endpoint_manager_ptr> emplace(udp_datagram_socket sock,
uint16_t port);

private:
middleman& mm_;

endpoint_manager_ptr ep_manager_;

std::vector<node_id> node_ids_;

proxy_registry proxies_;

uint16_t listening_port_;

std::mutex lock_;
};

} // namespace caf::net::backend
52 changes: 52 additions & 0 deletions libcaf_net/caf/net/basp/datagram_application_factory.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/******************************************************************************
* ____ _ _____ *
* / ___| / \ | ___| C++ *
* | | / _ \ | |_ Actor *
* | |___ / ___ \| _| Framework *
* \____/_/ \_|_| *
* *
* Copyright 2011-2020 Dominik Charousset *
* *
* Distributed under the terms and conditions of the BSD 3-Clause License or *
* (at your option) under the terms and conditions of the Boost Software *
* License 1.0. See accompanying files LICENSE and LICENSE_ALTERNATIVE. *
* *
* If you did not receive a copy of the license files, see *
* http://opensource.org/licenses/BSD-3-Clause and *
* http://www.boost.org/LICENSE_1_0.txt. *
******************************************************************************/

#pragma once

#include "caf/detail/net_export.hpp"
#include "caf/error.hpp"
#include "caf/net/basp/application.hpp"
#include "caf/net/datagram_adaptor.hpp"
#include "caf/proxy_registry.hpp"

namespace caf::net::basp {

/// Factory for datagram oriented basp::applications.
/// @relates transport_worker_dispatcher
class CAF_NET_EXPORT datagram_application_factory {
public:
using application_type = datagram_adaptor<basp::application>;

datagram_application_factory(proxy_registry& proxies) : proxies_(proxies) {
// nop
}

template <class Parent>
error init(Parent&) {
return none;
}

application_type make() const {
return application_type{basp::application{proxies_}};
}

private:
proxy_registry& proxies_;
};

} // namespace caf::net::basp
86 changes: 86 additions & 0 deletions libcaf_net/caf/net/datagram_adaptor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/******************************************************************************
* ____ _ _____ *
* / ___| / \ | ___| C++ *
* | | / _ \ | |_ Actor *
* | |___ / ___ \| _| Framework *
* \____/_/ \_|_| *
* *
* Copyright 2011-2020 Dominik Charousset *
* *
* Distributed under the terms and conditions of the BSD 3-Clause License or *
* (at your option) under the terms and conditions of the Boost Software *
* License 1.0. See accompanying files LICENSE and LICENSE_ALTERNATIVE. *
* *
* If you did not receive a copy of the license files, see *
* http://opensource.org/licenses/BSD-3-Clause and *
* http://www.boost.org/LICENSE_1_0.txt. *
******************************************************************************/

#pragma once

#include "caf/byte_buffer.hpp"
#include "caf/net/basp/header.hpp"

namespace caf::net {

/// Implements an adaption layer for datagram oriented transport protocols.
template <class Application>
class datagram_adaptor {
public:
datagram_adaptor(Application application)
: application_(std::move(application)) {
// nop
}

template <class Parent>
error init(Parent& parent) {
return application_.init(parent);
}

template <class Parent>
error write_message(Parent& parent,
std::unique_ptr<endpoint_manager_queue::message> msg) {
return application_.write_message(parent, std::move(msg));
}

template <class Parent>
error handle_data(Parent& parent, span<const byte> received) {
if (auto err = application_.handle_data(
parent, make_span(received.data(), basp::header_size)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hardwires the datagram_adaptor to BASP. Both in terms of constants and in terms of processing logic (header, then payload).

If you see this as a one-shot class to fit BASP into a datagram manager, then we should rename it. If we want to generalize this adaptor abstraction, we should also rethink the interface to the application. We could split handle_data into handle_header and handle_payload for example. Alternatively, we always call handle_data on the application and it dispatches internally (more flexible). In any case we'd also need to propagate how much of the buffer was consumed (if any). Not all protocols have fixed sizes. HTTP, for example, has to read the header until hitting \r\n\r\n. I was already running into this exact problem in another branch I was working on, so maybe should sort that out first? If we'd go with that, the adaptor would simply call handle_data until the application either has consumed the entire datagram or discarded some of it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I think with the proposal of #81 the datagram_adaptor would be obsolete. We should discuss that issue first before renaming or rethinking the datagram_adaptor for BASP.

return err;
auto data = received.data() + basp::header_size;
auto size = received.size() - basp::header_size;
if (size > 0)
return application_.handle_data(parent, make_span(data, size));
return none;
}

template <class Parent>
void resolve(Parent& parent, string_view path, const actor& listener) {
application_.resolve(parent, path, listener);
}

template <class Parent>
void new_proxy(Parent& parent, actor_id id) {
application_.new_proxy(parent, id);
}

template <class Parent>
void local_actor_down(Parent& parent, actor_id id, error reason) {
application_.local_actor_down(parent, id, std::move(reason));
}

template <class Parent>
void timeout(Parent& parent, std::string tag, uint64_t id) {
application_.timeout(parent, std::move(tag), id);
}

void handle_error(sec error) {
application_.handle_error(error);
}

private:
Application application_;
};

} // namespace caf::net
2 changes: 2 additions & 0 deletions libcaf_net/caf/net/datagram_transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class datagram_transport : public datagram_transport_base<Factory> {

using buffer_cache_type = typename super::buffer_cache_type;

using dispatcher_type = transport_worker_dispatcher<Factory, ip_endpoint>;

// -- constructors, destructors, and assignment operators --------------------

datagram_transport(udp_datagram_socket handle, factory_type factory)
Expand Down
3 changes: 3 additions & 0 deletions libcaf_net/caf/net/defaults.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,7 @@ CAF_NET_EXPORT extern const size_t max_header_buffers;
/// Port to listen on for tcp.
CAF_NET_EXPORT extern const uint16_t tcp_port;

/// Port to listen on for udp.
CAF_NET_EXPORT extern const uint16_t udp_port;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need two constants? I assume this is for exposing BASP. Since we're going to use the locator (URI) as identifier for the CAF node, we must use either TCP or UDP. In other words, one default port would suffice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that BASP could be used with udp or tcp at the same time, thus both transports require different ports to be accessible from the network.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the new model, the node ID is the locator in order to get rid of the pseudo-DNS CAF is currently doing. This means there must be exactly one valid way to connect to a BASP instance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot about the fixed node ID.. Still I am not sure if that is the best way to solve this problem. Is it really necessary to allow only one possible way to reach a node via BASP?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By definition, a node can only have a single ID. If we allow multiple IDs pointing to the same BASP, we have the exact same issues we wanted to get rid of with the redesign that @josephnoir initiated.

We could of course offer a "connect there and see who's responding" function, but it would convolute the design quite a bit. Because then, we can't use the ID any longer to identify connections. I honestly don't see an upside here that would be worth the troubles in the first place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can understand that. I'll remove the duplicate port field when I'm fixing the other remark.


} // namespace caf::defaults::middleman
4 changes: 4 additions & 0 deletions libcaf_net/caf/net/doorman.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ class doorman {
CAF_LOG_ERROR("doorman encounterd error: " << err);
}

error emplace(const uri&) {
return make_error(sec::runtime_error, "function not implemented");
}

private:
net::tcp_accept_socket acceptor_;

Expand Down
68 changes: 46 additions & 22 deletions libcaf_net/caf/net/transport_worker_dispatcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@

#include <unordered_map>

#include "caf/ip_endpoint.hpp"
#include "caf/logger.hpp"
#include "caf/net/endpoint_manager_queue.hpp"
#include "caf/net/fwd.hpp"
#include "caf/net/ip.hpp"
#include "caf/net/packet_writer_decorator.hpp"
#include "caf/net/transport_worker.hpp"
#include "caf/node_id.hpp"
#include "caf/sec.hpp"
#include "caf/send.hpp"

Expand Down Expand Up @@ -65,9 +68,10 @@ class transport_worker_dispatcher {
error handle_data(Parent& parent, span<const byte> data, id_type id) {
if (auto worker = find_worker(id))
return worker->handle_data(parent, data);
// TODO: Where to get node_id from here?
auto worker = add_new_worker(parent, node_id{}, id);
if (worker)
auto locator = make_uri("udp://" + to_string(id));
if (!locator)
return locator.error();
if (auto worker = add_new_worker(parent, make_node_id(*locator), id))
return (*worker)->handle_data(parent, data);
else
return std::move(worker.error());
Expand All @@ -77,25 +81,33 @@ class transport_worker_dispatcher {
void write_message(Parent& parent,
std::unique_ptr<endpoint_manager_queue::message> msg) {
auto receiver = msg->receiver;
if (!receiver)
if (!receiver) {
CAF_LOG_ERROR("no receiver was specified");
return;
}
auto nid = receiver->node();
if (auto worker = find_worker(nid)) {
auto worker = find_worker(nid);
if (!worker)
CAF_LOG_ERROR("could not find worker for endpoint");
else
worker->write_message(parent, std::move(msg));
return;
}
// TODO: where to get id_type from here?
if (auto worker = add_new_worker(parent, nid, id_type{}))
(*worker)->write_message(parent, std::move(msg));
}

template <class Parent>
void resolve(Parent& parent, const uri& locator, const actor& listener) {
if (auto worker = find_worker(make_node_id(locator)))
worker->resolve(parent, locator.path(), listener);
else
if (auto auth = locator.authority_only()) {
if (auto worker = find_worker(make_node_id(*auth))) {
worker->resolve(parent, locator.path(), listener);
} else {
if (auto ret = emplace(parent, locator))
(*ret)->resolve(parent, locator.path(), listener);
else
anon_send(listener, ret.error());
}
} else {
anon_send(listener,
make_error(sec::runtime_error, "could not resolve node"));
make_error(sec::runtime_error, "could not get authority"));
}
}

template <class Parent>
Expand Down Expand Up @@ -125,15 +137,29 @@ class transport_worker_dispatcher {
}

void handle_error(sec error) {
for (const auto& p : workers_by_id_) {
auto worker = p.second;
worker->handle_error(error);
for (const auto& p : workers_by_id_)
p.second->handle_error(error);
}

template <class Parent>
expected<worker_ptr> emplace(Parent& parent, const uri& locator) {
auto& auth = locator.authority();
ip_address addr;
if (auto hostname = get_if<std::string>(&auth.host)) {
auto addrs = ip::resolve(*hostname);
if (addrs.empty())
return sec::remote_lookup_failed;
addr = addrs.at(0);
} else {
addr = *get_if<ip_address>(&auth.host);
}
return add_new_worker(parent, make_node_id(*locator.authority_only()),
ip_endpoint{addr, auth.port});
}

template <class Parent>
expected<worker_ptr> add_new_worker(Parent& parent, node_id node,
id_type id) {
expected<worker_ptr>
add_new_worker(Parent& parent, node_id node, id_type id) {
CAF_LOG_TRACE(CAF_ARG(node) << CAF_ARG(id));
auto application = factory_.make();
auto worker = std::make_shared<worker_type>(std::move(application), id);
Expand All @@ -156,10 +182,8 @@ class transport_worker_dispatcher {
template <class Key>
worker_ptr find_worker_impl(const std::unordered_map<Key, worker_ptr>& map,
const Key& key) {
if (map.count(key) == 0) {
CAF_LOG_DEBUG("could not find worker: " << CAF_ARG(key));
if (map.count(key) == 0)
return nullptr;
}
return map.at(key);
}

Expand Down
2 changes: 2 additions & 0 deletions libcaf_net/src/defaults.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ const size_t max_header_buffers = 10;

const uint16_t tcp_port = 0;

const uint16_t udp_port = 0;

} // namespace caf::defaults::middleman
Loading