Skip to content

Commit

Permalink
non blocking semantics for receiving soeckts and the tcp acceptor
Browse files Browse the repository at this point in the history
tests updated

documentation updated and improved
  • Loading branch information
Foo committed Mar 31, 2024
1 parent 4964820 commit 1342de9
Show file tree
Hide file tree
Showing 59 changed files with 2,061 additions and 1,208 deletions.
127 changes: 95 additions & 32 deletions README.md

Large diffs are not rendered by default.

76 changes: 66 additions & 10 deletions samples/README.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
#include <MinimalSocket/tcp/TcpServer.h>
int main() {
MinimalSocket::Port port = 15768; // the port to bind
MinimalSocket::tcp::TcpServer tcp_server(port,
MinimalSocket::AddressFamily::IP_V4);
MinimalSocket::tcp::TcpServer<true> tcp_server(
port, MinimalSocket::AddressFamily::IP_V4);

// Open the server. This will bind the port and the server will start to
// listen for connection requests.
bool success = tcp_server.open();

// accepts the next client that will ask the connection
MinimalSocket::tcp::TcpConnection accepted_connection =
MinimalSocket::tcp::TcpConnectionBlocking accepted_connection =
tcp_server.acceptNewClient(); // blocking till a client actually asks the
// connection

Expand All @@ -28,7 +28,7 @@ int main() {
int main() {
MinimalSocket::Port server_port = 15768;
std::string server_address = "192.168.125.85";
MinimalSocket::tcp::TcpClient tcp_client(
MinimalSocket::tcp::TcpClient<true> tcp_client(
MinimalSocket::Address{server_address, server_port});

// Open the server. Here, the client will ask the connection to specified
Expand All @@ -49,7 +49,7 @@ int main() {
#include <MinimalSocket/udp/UdpSocket.h>
int main() {
MinimalSocket::Port this_socket_port = 15768;
MinimalSocket::udp::UdpBinded udp_socket(this_socket_port,
MinimalSocket::udp::Udp<true> udp_socket(this_socket_port,
MinimalSocket::AddressFamily::IP_V6);

// Open the server. This will bind the specified port.
Expand All @@ -71,11 +71,13 @@ int main() {

MinimalSocket::Address permanent_sender_udp =
MinimalSocket::Address{"192.168.125.85", 15768};
MinimalSocket::udp::UdpConnected udp_connected_socket = udp_socket.connect(
permanent_sender_udp); // ownership of the underlying socket is transfered
// from udp_socket to udp_connected_socket, meaning
// that you can't use anymore udp_socket (unless
// you re-open it)
MinimalSocket::udp::UdpConnected<true> udp_connected_socket =
udp_socket.connect(
permanent_sender_udp); // ownership of the underlying socket is
// transfered from udp_socket to
// udp_connected_socket, meaning that you can't
// use anymore udp_socket (unless you re-open
// it)

// receive a message
std::size_t message_max_size = 1000;
Expand All @@ -85,3 +87,57 @@ int main() {
// send a message
udp_connected_socket.send("a message to send");
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// tcp server, non blocking
int main() {
MinimalSocket::Port port = 15768; // the port to bind
MinimalSocket::tcp::TcpServer<false> tcp_server(
port, MinimalSocket::AddressFamily::IP_V4);
tcp_server.open();

// check if a client asked for the connection. If no, the function immediately
// returns a nullopt. On the contrary, the returned optional contains the
// handler to the connected client
std::optional<MinimalSocket::tcp::TcpConnectionBlocking>
maybe_accepted_connection = tcp_server.acceptNewClient();

MinimalSocket::tcp::TcpConnectionNonBlocking accepted_connection_nn_block =
maybe_accepted_connection->turnToNonBlocking();
}

// tcp client, non blocking
int main() {
MinimalSocket::Port server_port = 15768;
std::string server_address = "192.168.125.85";
MinimalSocket::tcp::TcpClient<false> tcp_client(
MinimalSocket::Address{server_address, server_port});
tcp_client.open();

std::size_t message_max_size = 1000;
// non blocking receive: returns immediately with an empty message in case no
// new data were available, or with a non empty message in the contrary case
std::string received_message = tcp_client.receive(message_max_size);
}

// udp socket, non blocking
int main() {
MinimalSocket::Port this_socket_port = 15768;
MinimalSocket::udp::Udp<false> udp_socket(
this_socket_port, MinimalSocket::AddressFamily::IP_V6);
udp_socket.open();

std::size_t message_max_size = 1000;
// non blocking receive: returns immediately with an empty message in case no
// new data were available, or with a non empty message in the contrary case
//
// struct ReceiveStringResult {
// Address sender;
// std::string received_message;
// };
std::optional<MinimalSocket::ReceiveStringResult> received_message =
udp_socket.receive(message_max_size);
}
4 changes: 4 additions & 0 deletions samples/tcp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
MakeApp(TcpClient)
MakeApp(TcpServer)
MakeApp(TcpServerNonBlocking)
MakeApp(TcpRepeater)

MakeSample(Sample01_server_client Tcp)
Expand All @@ -10,3 +11,6 @@ add_dependencies(TcpSample02_server_2_clients TcpClient TcpServer)

MakeSample(Sample03_chain_with_2_repeaters Tcp)
add_dependencies(TcpSample03_chain_with_2_repeaters TcpClient TcpServer TcpRepeater)

MakeSample(Sample04_server_nn_block_2_clients Tcp)
add_dependencies(TcpSample04_server_nn_block_2_clients TcpClient TcpServerNonBlocking)
32 changes: 32 additions & 0 deletions samples/tcp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,36 @@ The above classes of samples can be described as follows:
TcpRepeater2->>TcpClient: forawrd response 1
```

- **TcpSample04_server_nn_block_2_clients** is an example of non blocking tcp server. The application uses one single thread to spin multiple connections. More in detail:
- related config file is [Sample04_server_nn_block_2_clients](./Sample04_server_nn_block_2_clients)
- runs **TcpServerNonBlocking**, creating a tcp server that binds and listen to a specified port
- runs **TcpClient**, creating a first tcp client that connections to the previous server, exchanging messages with it.
- runs **TcpClient**, creating a second tcp client that connections to the previous server, exchanging messages with it with a different frequency.
- the following sequence diagram summarizes this sample
```mermaid
sequenceDiagram
TcpServer->>TcpServer: bind a port
TcpClient1->>TcpServer: ask for connection
TcpServer->>TcpClient1: connection done
TcpClient2->>TcpServer: ask for connection
TcpServer->>TcpClient2: connection done
TcpServer->>TcpServer: has something arrived from client 1? if so send response
TcpServer->>TcpServer: has something arrived from client 2? if so send response
TcpClient1->>TcpServer: request 1
TcpServer->>TcpServer: has something arrived from client 1? if so send response
TcpServer->>TcpServer: has something arrived from client 2? if so send response
TcpServer->>TcpClient1: response 1
TcpServer->>TcpServer: has something arrived from client 1? if so send response
TcpServer->>TcpServer: has something arrived from client 2? if so send response
TcpClient2->>TcpServer: request 1
TcpServer->>TcpServer: has something arrived from client 1? if so send response
TcpServer->>TcpServer: has something arrived from client 2? if so send response
TcpServer->>TcpClient2: response 1
TcpServer->>TcpServer: has something arrived from client 1? if so send response
TcpServer->>TcpServer: has something arrived from client 2? if so send response
TcpClient1->>TcpServer: request 2
TcpServer->>TcpServer: has something arrived from client 1? if so send response
TcpServer->>TcpServer: has something arrived from client 2? if so send response
TcpServer->>TcpClient1: response 2
**TcpServer** and **TcpClient** can be also used as stand alone processes, in order to check connections locally or on a different host.
3 changes: 3 additions & 0 deletions samples/tcp/Sample04_server_nn_block_2_clients
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
TcpServerNonBlocking --port 35998 --clients 2
TcpClient --port 35998
TcpClient --port 35998 --rate 400
14 changes: 7 additions & 7 deletions samples/tcp/TcpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ int main(const int argc, const char **argv) {
cout << "----------------------- Client -----------------------" << endl;
PARSE_ARGS

const auto server_host = options->getValue("host", "127.0.0.1");
const auto server_port =
static_cast<MinimalSocket::Port>(options->getIntValue("port"));
const auto rate =
std::chrono::milliseconds{options->getIntValue<250>("rate")};
const auto server_host = options->getValue<std::string>("host", "127.0.0.1");
const auto server_port = options->getValue<MinimalSocket::Port>("port");
const auto rate = options->getValue<std::chrono::milliseconds>(
"rate", std::chrono::milliseconds{250});

const MinimalSocket::Address server_address(server_host, server_port);
MinimalSocket::tcp::TcpClient client(server_address);
MinimalSocket::tcp::TcpClient<true> client(server_address);

cout << "Connecting to " << MinimalSocket::to_string(server_address) << endl;
if (!client.open()) {
Expand All @@ -38,7 +37,8 @@ int main(const int argc, const char **argv) {
}
cout << "Connected" << endl;

MinimalSocket::samples::ask(client, rate, options->getIntValue<5>("cycles"));
MinimalSocket::samples::ask(client, rate,
options->getValue<int>("cycles", 5));

// the connection will be close when destroying the client object
return EXIT_SUCCESS;
Expand Down
19 changes: 10 additions & 9 deletions samples/tcp/TcpRepeater.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
#include <iostream>
using namespace std;

void repeat(MinimalSocket::tcp::TcpConnection &preceding,
MinimalSocket::tcp::TcpClient &following) {
void repeat(MinimalSocket::tcp::TcpConnectionBlocking &preceding,
MinimalSocket::tcp::TcpClient<true> &following) {
while (true) {
auto request = preceding.receive(500, std::chrono::seconds{5});
if (request.empty()) {
Expand All @@ -45,25 +45,26 @@ int main(const int argc, const char **argv) {
cout << "----------------------- Repeater -----------------------" << endl;
PARSE_ARGS

const auto following_host = options->getValue("host", "127.0.0.1");
const auto following_host =
options->getValue<std::string>("host", "127.0.0.1");
const auto following_port =
static_cast<MinimalSocket::Port>(options->getIntValue("next_port"));
options->getValue<MinimalSocket::Port>("next_port");
MinimalSocket::Address following_address(following_host, following_port);

const auto port_to_reserve =
static_cast<MinimalSocket::Port>(options->getIntValue("port"));
const auto port_to_reserve = options->getValue<MinimalSocket::Port>("port");

// reserve port
MinimalSocket::tcp::TcpServer acceptor(port_to_reserve,
following_address.getFamily());
MinimalSocket::tcp::TcpServer<true> acceptor(port_to_reserve,
following_address.getFamily());
if (!acceptor.open()) {
cerr << "Failed to bind and listen to specified port" << endl;
return EXIT_FAILURE;
}
cout << "Listening on port " << port_to_reserve << endl;

// ask connection to follower
MinimalSocket::tcp::TcpClient connection_to_following(following_address);
MinimalSocket::tcp::TcpClient<true> connection_to_following(
following_address);
cout << "Connecting to next on chain at "
<< MinimalSocket::to_string(following_address) << endl;
if (!connection_to_following.open()) {
Expand Down
15 changes: 7 additions & 8 deletions samples/tcp/TcpServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#include <vector>
using namespace std;

std::thread accept_new_client(MinimalSocket::tcp::TcpServer &server) {
MinimalSocket::tcp::TcpConnection accepted_connection =
std::thread accept_new_client(MinimalSocket::tcp::TcpServer<true> &server) {
MinimalSocket::tcp::TcpConnectionBlocking accepted_connection =
server.acceptNewClient();
cout << "New client accepted" << endl;
return std::thread([connection = std::move(accepted_connection)]() mutable {
Expand All @@ -33,13 +33,12 @@ int main(const int argc, const char **argv) {
cout << "----------------------- Server -----------------------" << endl;
PARSE_ARGS

const auto server_port =
static_cast<MinimalSocket::Port>(options->getIntValue("port"));
const auto max_clients = options->getIntValue("clients");
const auto family =
MinimalSocket::samples::to_family(options->getValue("family", "v4"));
const auto server_port = options->getValue<MinimalSocket::Port>("port");
const auto max_clients = options->getValue<int>("clients", 0);
const auto family = options->getValue<MinimalSocket::AddressFamily>(
"family", MinimalSocket::AddressFamily::IP_V4);

MinimalSocket::tcp::TcpServer server(server_port, family);
MinimalSocket::tcp::TcpServer<true> server(server_port, family);

if (!server.open()) {
cerr << "Failed to bind and listen to specified port" << endl;
Expand Down
94 changes: 94 additions & 0 deletions samples/tcp/TcpServerNonBlocking.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Author: Andrea Casalino
* Created: 16.05.2019
*
* report any bug to [email protected].
**/

///////////////////////////////////////////////////////////////////////////
// Have a look to README.md //
///////////////////////////////////////////////////////////////////////////

// elements from the MinimalSocket library
#include <MinimalSocket/tcp/TcpServer.h>

// just a bunch of utilities
#include <Args.h>
#include <Pollables.h>
#include <Respond.h>
#include <TimeOfDay.h>

#include <functional>
#include <list>
using namespace std;

int main(const int argc, const char **argv) {
cout << "----------------------- Server -----------------------" << endl;
PARSE_ARGS

const auto server_port = options->getValue<MinimalSocket::Port>("port");
const auto max_clients = options->getValue<int>("clients", 0);
const auto family = options->getValue<MinimalSocket::AddressFamily>(
"family", MinimalSocket::AddressFamily::IP_V4);

MinimalSocket::tcp::TcpServer<false> server(server_port, family);

if (!server.open()) {
cerr << "Failed to bind and listen to specified port" << endl;
return EXIT_FAILURE;
}
cout << "Listening for new clients on port " << server_port << endl;

std::size_t connected = 0;
std::list<MinimalSocket::tcp::TcpConnectionNonBlocking> connections;
MinimalSocket::samples::Pollables pollables;

auto create_pollable_connection =
[&](MinimalSocket::tcp::TcpConnectionNonBlocking &&connection) {
auto &conn = connections.emplace_back(
std::forward<MinimalSocket::tcp::TcpConnectionNonBlocking>(
connection));
return [conn = &conn]() {
// poll the connection by doing a non blocking receive
try {
auto request = conn->receive(500);
if (request.empty()) {
return MinimalSocket::samples::PollableStatus::NOT_ADVANCED;
}
const auto &response =
MinimalSocket::samples::NamesCircularIterator::NAMES_SURNAMES
.find(request)
->second;
cout << MinimalSocket::samples::TimeOfDay{}
<< " received: " << request << " ; sending: " << response
<< endl;
conn->send(response);
} catch (const MinimalSocket::SocketError &) {
// if here the connection was closed
return MinimalSocket::samples::PollableStatus::COMPLETED;
}
return MinimalSocket::samples::PollableStatus::ADVANCED;
};
};

pollables.emplace([&]() {
// poll the acceptor by trying to accept a new client
auto maybe_new_connection = server.acceptNewNonBlockingClient();
if (maybe_new_connection.has_value()) {
cout << MinimalSocket::samples::TimeOfDay{}
<< " connected a new client from "
<< MinimalSocket::to_string(maybe_new_connection->getRemoteAddress())
<< endl;
pollables.emplace(
create_pollable_connection(std::move(maybe_new_connection.value())));
return (max_clients != 0 && ++connected == max_clients)
? MinimalSocket::samples::PollableStatus::COMPLETED
: MinimalSocket::samples::PollableStatus::ADVANCED;
}
return MinimalSocket::samples::PollableStatus::NOT_ADVANCED;
});

pollables.loop(std::chrono::seconds{5});

return EXIT_SUCCESS;
}
4 changes: 4 additions & 0 deletions samples/udp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
MakeApp(UdpAsker)
MakeApp(UdpResponder)
MakeApp(UdpResponderNonBlocking)

MakeSample(Sample01_asker_responder Udp)
add_dependencies(UdpSample01_asker_responder UdpAsker UdpResponder)
Expand All @@ -9,3 +10,6 @@ add_dependencies(UdpSample02_asker_connected_responer UdpAsker UdpResponder)

MakeSample(Sample03_2_askers_responder Udp)
add_dependencies(UdpSample03_2_askers_responder UdpAsker UdpResponder)

MakeSample(Sample04_2_askers_2_nn_block_responders Udp)
add_dependencies(UdpSample04_2_askers_2_nn_block_responders UdpAsker UdpResponderNonBlocking)
Loading

0 comments on commit 1342de9

Please sign in to comment.