Skip to content

Commit

Permalink
Merge pull request #40 from andreacasalino/NonBlocking
Browse files Browse the repository at this point in the history
non blocking semantics
  • Loading branch information
andreacasalino authored Mar 31, 2024
2 parents 4964820 + 1342de9 commit 9ba186f
Show file tree
Hide file tree
Showing 59 changed files with 2,061 additions and 1,208 deletions.
127 changes: 95 additions & 32 deletions README.md

Large diffs are not rendered by default.

76 changes: 66 additions & 10 deletions samples/README.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
#include <MinimalSocket/tcp/TcpServer.h>
int main() {
MinimalSocket::Port port = 15768; // the port to bind
MinimalSocket::tcp::TcpServer tcp_server(port,
MinimalSocket::AddressFamily::IP_V4);
MinimalSocket::tcp::TcpServer<true> tcp_server(
port, MinimalSocket::AddressFamily::IP_V4);

// Open the server. This will bind the port and the server will start to
// listen for connection requests.
bool success = tcp_server.open();

// accepts the next client that will ask the connection
MinimalSocket::tcp::TcpConnection accepted_connection =
MinimalSocket::tcp::TcpConnectionBlocking accepted_connection =
tcp_server.acceptNewClient(); // blocking till a client actually asks the
// connection

Expand All @@ -28,7 +28,7 @@ int main() {
int main() {
MinimalSocket::Port server_port = 15768;
std::string server_address = "192.168.125.85";
MinimalSocket::tcp::TcpClient tcp_client(
MinimalSocket::tcp::TcpClient<true> tcp_client(
MinimalSocket::Address{server_address, server_port});

// Open the server. Here, the client will ask the connection to specified
Expand All @@ -49,7 +49,7 @@ int main() {
#include <MinimalSocket/udp/UdpSocket.h>
int main() {
MinimalSocket::Port this_socket_port = 15768;
MinimalSocket::udp::UdpBinded udp_socket(this_socket_port,
MinimalSocket::udp::Udp<true> udp_socket(this_socket_port,
MinimalSocket::AddressFamily::IP_V6);

// Open the server. This will bind the specified port.
Expand All @@ -71,11 +71,13 @@ int main() {

MinimalSocket::Address permanent_sender_udp =
MinimalSocket::Address{"192.168.125.85", 15768};
MinimalSocket::udp::UdpConnected udp_connected_socket = udp_socket.connect(
permanent_sender_udp); // ownership of the underlying socket is transfered
// from udp_socket to udp_connected_socket, meaning
// that you can't use anymore udp_socket (unless
// you re-open it)
MinimalSocket::udp::UdpConnected<true> udp_connected_socket =
udp_socket.connect(
permanent_sender_udp); // ownership of the underlying socket is
// transfered from udp_socket to
// udp_connected_socket, meaning that you can't
// use anymore udp_socket (unless you re-open
// it)

// receive a message
std::size_t message_max_size = 1000;
Expand All @@ -85,3 +87,57 @@ int main() {
// send a message
udp_connected_socket.send("a message to send");
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// tcp server, non blocking
int main() {
MinimalSocket::Port port = 15768; // the port to bind
MinimalSocket::tcp::TcpServer<false> tcp_server(
port, MinimalSocket::AddressFamily::IP_V4);
tcp_server.open();

// check if a client asked for the connection. If no, the function immediately
// returns a nullopt. On the contrary, the returned optional contains the
// handler to the connected client
std::optional<MinimalSocket::tcp::TcpConnectionBlocking>
maybe_accepted_connection = tcp_server.acceptNewClient();

MinimalSocket::tcp::TcpConnectionNonBlocking accepted_connection_nn_block =
maybe_accepted_connection->turnToNonBlocking();
}

// tcp client, non blocking
int main() {
MinimalSocket::Port server_port = 15768;
std::string server_address = "192.168.125.85";
MinimalSocket::tcp::TcpClient<false> tcp_client(
MinimalSocket::Address{server_address, server_port});
tcp_client.open();

std::size_t message_max_size = 1000;
// non blocking receive: returns immediately with an empty message in case no
// new data were available, or with a non empty message in the contrary case
std::string received_message = tcp_client.receive(message_max_size);
}

// udp socket, non blocking
int main() {
MinimalSocket::Port this_socket_port = 15768;
MinimalSocket::udp::Udp<false> udp_socket(
this_socket_port, MinimalSocket::AddressFamily::IP_V6);
udp_socket.open();

std::size_t message_max_size = 1000;
// non blocking receive: returns immediately with an empty message in case no
// new data were available, or with a non empty message in the contrary case
//
// struct ReceiveStringResult {
// Address sender;
// std::string received_message;
// };
std::optional<MinimalSocket::ReceiveStringResult> received_message =
udp_socket.receive(message_max_size);
}
4 changes: 4 additions & 0 deletions samples/tcp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
MakeApp(TcpClient)
MakeApp(TcpServer)
MakeApp(TcpServerNonBlocking)
MakeApp(TcpRepeater)

MakeSample(Sample01_server_client Tcp)
Expand All @@ -10,3 +11,6 @@ add_dependencies(TcpSample02_server_2_clients TcpClient TcpServer)

MakeSample(Sample03_chain_with_2_repeaters Tcp)
add_dependencies(TcpSample03_chain_with_2_repeaters TcpClient TcpServer TcpRepeater)

MakeSample(Sample04_server_nn_block_2_clients Tcp)
add_dependencies(TcpSample04_server_nn_block_2_clients TcpClient TcpServerNonBlocking)
32 changes: 32 additions & 0 deletions samples/tcp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,36 @@ The above classes of samples can be described as follows:
TcpRepeater2->>TcpClient: forawrd response 1
```

- **TcpSample04_server_nn_block_2_clients** is an example of non blocking tcp server. The application uses one single thread to spin multiple connections. More in detail:
- related config file is [Sample04_server_nn_block_2_clients](./Sample04_server_nn_block_2_clients)
- runs **TcpServerNonBlocking**, creating a tcp server that binds and listen to a specified port
- runs **TcpClient**, creating a first tcp client that connections to the previous server, exchanging messages with it.
- runs **TcpClient**, creating a second tcp client that connections to the previous server, exchanging messages with it with a different frequency.
- the following sequence diagram summarizes this sample
```mermaid
sequenceDiagram
TcpServer->>TcpServer: bind a port
TcpClient1->>TcpServer: ask for connection
TcpServer->>TcpClient1: connection done
TcpClient2->>TcpServer: ask for connection
TcpServer->>TcpClient2: connection done
TcpServer->>TcpServer: has something arrived from client 1? if so send response
TcpServer->>TcpServer: has something arrived from client 2? if so send response
TcpClient1->>TcpServer: request 1
TcpServer->>TcpServer: has something arrived from client 1? if so send response
TcpServer->>TcpServer: has something arrived from client 2? if so send response
TcpServer->>TcpClient1: response 1
TcpServer->>TcpServer: has something arrived from client 1? if so send response
TcpServer->>TcpServer: has something arrived from client 2? if so send response
TcpClient2->>TcpServer: request 1
TcpServer->>TcpServer: has something arrived from client 1? if so send response
TcpServer->>TcpServer: has something arrived from client 2? if so send response
TcpServer->>TcpClient2: response 1
TcpServer->>TcpServer: has something arrived from client 1? if so send response
TcpServer->>TcpServer: has something arrived from client 2? if so send response
TcpClient1->>TcpServer: request 2
TcpServer->>TcpServer: has something arrived from client 1? if so send response
TcpServer->>TcpServer: has something arrived from client 2? if so send response
TcpServer->>TcpClient1: response 2
**TcpServer** and **TcpClient** can be also used as stand alone processes, in order to check connections locally or on a different host.
3 changes: 3 additions & 0 deletions samples/tcp/Sample04_server_nn_block_2_clients
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
TcpServerNonBlocking --port 35998 --clients 2
TcpClient --port 35998
TcpClient --port 35998 --rate 400
14 changes: 7 additions & 7 deletions samples/tcp/TcpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ int main(const int argc, const char **argv) {
cout << "----------------------- Client -----------------------" << endl;
PARSE_ARGS

const auto server_host = options->getValue("host", "127.0.0.1");
const auto server_port =
static_cast<MinimalSocket::Port>(options->getIntValue("port"));
const auto rate =
std::chrono::milliseconds{options->getIntValue<250>("rate")};
const auto server_host = options->getValue<std::string>("host", "127.0.0.1");
const auto server_port = options->getValue<MinimalSocket::Port>("port");
const auto rate = options->getValue<std::chrono::milliseconds>(
"rate", std::chrono::milliseconds{250});

const MinimalSocket::Address server_address(server_host, server_port);
MinimalSocket::tcp::TcpClient client(server_address);
MinimalSocket::tcp::TcpClient<true> client(server_address);

cout << "Connecting to " << MinimalSocket::to_string(server_address) << endl;
if (!client.open()) {
Expand All @@ -38,7 +37,8 @@ int main(const int argc, const char **argv) {
}
cout << "Connected" << endl;

MinimalSocket::samples::ask(client, rate, options->getIntValue<5>("cycles"));
MinimalSocket::samples::ask(client, rate,
options->getValue<int>("cycles", 5));

// the connection will be close when destroying the client object
return EXIT_SUCCESS;
Expand Down
19 changes: 10 additions & 9 deletions samples/tcp/TcpRepeater.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
#include <iostream>
using namespace std;

void repeat(MinimalSocket::tcp::TcpConnection &preceding,
MinimalSocket::tcp::TcpClient &following) {
void repeat(MinimalSocket::tcp::TcpConnectionBlocking &preceding,
MinimalSocket::tcp::TcpClient<true> &following) {
while (true) {
auto request = preceding.receive(500, std::chrono::seconds{5});
if (request.empty()) {
Expand All @@ -45,25 +45,26 @@ int main(const int argc, const char **argv) {
cout << "----------------------- Repeater -----------------------" << endl;
PARSE_ARGS

const auto following_host = options->getValue("host", "127.0.0.1");
const auto following_host =
options->getValue<std::string>("host", "127.0.0.1");
const auto following_port =
static_cast<MinimalSocket::Port>(options->getIntValue("next_port"));
options->getValue<MinimalSocket::Port>("next_port");
MinimalSocket::Address following_address(following_host, following_port);

const auto port_to_reserve =
static_cast<MinimalSocket::Port>(options->getIntValue("port"));
const auto port_to_reserve = options->getValue<MinimalSocket::Port>("port");

// reserve port
MinimalSocket::tcp::TcpServer acceptor(port_to_reserve,
following_address.getFamily());
MinimalSocket::tcp::TcpServer<true> acceptor(port_to_reserve,
following_address.getFamily());
if (!acceptor.open()) {
cerr << "Failed to bind and listen to specified port" << endl;
return EXIT_FAILURE;
}
cout << "Listening on port " << port_to_reserve << endl;

// ask connection to follower
MinimalSocket::tcp::TcpClient connection_to_following(following_address);
MinimalSocket::tcp::TcpClient<true> connection_to_following(
following_address);
cout << "Connecting to next on chain at "
<< MinimalSocket::to_string(following_address) << endl;
if (!connection_to_following.open()) {
Expand Down
15 changes: 7 additions & 8 deletions samples/tcp/TcpServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#include <vector>
using namespace std;

std::thread accept_new_client(MinimalSocket::tcp::TcpServer &server) {
MinimalSocket::tcp::TcpConnection accepted_connection =
std::thread accept_new_client(MinimalSocket::tcp::TcpServer<true> &server) {
MinimalSocket::tcp::TcpConnectionBlocking accepted_connection =
server.acceptNewClient();
cout << "New client accepted" << endl;
return std::thread([connection = std::move(accepted_connection)]() mutable {
Expand All @@ -33,13 +33,12 @@ int main(const int argc, const char **argv) {
cout << "----------------------- Server -----------------------" << endl;
PARSE_ARGS

const auto server_port =
static_cast<MinimalSocket::Port>(options->getIntValue("port"));
const auto max_clients = options->getIntValue("clients");
const auto family =
MinimalSocket::samples::to_family(options->getValue("family", "v4"));
const auto server_port = options->getValue<MinimalSocket::Port>("port");
const auto max_clients = options->getValue<int>("clients", 0);
const auto family = options->getValue<MinimalSocket::AddressFamily>(
"family", MinimalSocket::AddressFamily::IP_V4);

MinimalSocket::tcp::TcpServer server(server_port, family);
MinimalSocket::tcp::TcpServer<true> server(server_port, family);

if (!server.open()) {
cerr << "Failed to bind and listen to specified port" << endl;
Expand Down
94 changes: 94 additions & 0 deletions samples/tcp/TcpServerNonBlocking.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Author: Andrea Casalino
* Created: 16.05.2019
*
* report any bug to [email protected].
**/

///////////////////////////////////////////////////////////////////////////
// Have a look to README.md //
///////////////////////////////////////////////////////////////////////////

// elements from the MinimalSocket library
#include <MinimalSocket/tcp/TcpServer.h>

// just a bunch of utilities
#include <Args.h>
#include <Pollables.h>
#include <Respond.h>
#include <TimeOfDay.h>

#include <functional>
#include <list>
using namespace std;

int main(const int argc, const char **argv) {
cout << "----------------------- Server -----------------------" << endl;
PARSE_ARGS

const auto server_port = options->getValue<MinimalSocket::Port>("port");
const auto max_clients = options->getValue<int>("clients", 0);
const auto family = options->getValue<MinimalSocket::AddressFamily>(
"family", MinimalSocket::AddressFamily::IP_V4);

MinimalSocket::tcp::TcpServer<false> server(server_port, family);

if (!server.open()) {
cerr << "Failed to bind and listen to specified port" << endl;
return EXIT_FAILURE;
}
cout << "Listening for new clients on port " << server_port << endl;

std::size_t connected = 0;
std::list<MinimalSocket::tcp::TcpConnectionNonBlocking> connections;
MinimalSocket::samples::Pollables pollables;

auto create_pollable_connection =
[&](MinimalSocket::tcp::TcpConnectionNonBlocking &&connection) {
auto &conn = connections.emplace_back(
std::forward<MinimalSocket::tcp::TcpConnectionNonBlocking>(
connection));
return [conn = &conn]() {
// poll the connection by doing a non blocking receive
try {
auto request = conn->receive(500);
if (request.empty()) {
return MinimalSocket::samples::PollableStatus::NOT_ADVANCED;
}
const auto &response =
MinimalSocket::samples::NamesCircularIterator::NAMES_SURNAMES
.find(request)
->second;
cout << MinimalSocket::samples::TimeOfDay{}
<< " received: " << request << " ; sending: " << response
<< endl;
conn->send(response);
} catch (const MinimalSocket::SocketError &) {
// if here the connection was closed
return MinimalSocket::samples::PollableStatus::COMPLETED;
}
return MinimalSocket::samples::PollableStatus::ADVANCED;
};
};

pollables.emplace([&]() {
// poll the acceptor by trying to accept a new client
auto maybe_new_connection = server.acceptNewNonBlockingClient();
if (maybe_new_connection.has_value()) {
cout << MinimalSocket::samples::TimeOfDay{}
<< " connected a new client from "
<< MinimalSocket::to_string(maybe_new_connection->getRemoteAddress())
<< endl;
pollables.emplace(
create_pollable_connection(std::move(maybe_new_connection.value())));
return (max_clients != 0 && ++connected == max_clients)
? MinimalSocket::samples::PollableStatus::COMPLETED
: MinimalSocket::samples::PollableStatus::ADVANCED;
}
return MinimalSocket::samples::PollableStatus::NOT_ADVANCED;
});

pollables.loop(std::chrono::seconds{5});

return EXIT_SUCCESS;
}
4 changes: 4 additions & 0 deletions samples/udp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
MakeApp(UdpAsker)
MakeApp(UdpResponder)
MakeApp(UdpResponderNonBlocking)

MakeSample(Sample01_asker_responder Udp)
add_dependencies(UdpSample01_asker_responder UdpAsker UdpResponder)
Expand All @@ -9,3 +10,6 @@ add_dependencies(UdpSample02_asker_connected_responer UdpAsker UdpResponder)

MakeSample(Sample03_2_askers_responder Udp)
add_dependencies(UdpSample03_2_askers_responder UdpAsker UdpResponder)

MakeSample(Sample04_2_askers_2_nn_block_responders Udp)
add_dependencies(UdpSample04_2_askers_2_nn_block_responders UdpAsker UdpResponderNonBlocking)
Loading

0 comments on commit 9ba186f

Please sign in to comment.