Skip to content

Commit

Permalink
non blocking semantics for receiving soeckts and the tcp acceptor
Browse files Browse the repository at this point in the history
tests updated
  • Loading branch information
Foo committed Mar 23, 2024
1 parent 4964820 commit 21742dd
Show file tree
Hide file tree
Showing 28 changed files with 1,188 additions and 656 deletions.
1 change: 1 addition & 0 deletions samples/tcp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
MakeApp(TcpClient)
MakeApp(TcpServer)
MakeApp(TcpRepeater)
MakeApp(NonBlockingExperiment)

MakeSample(Sample01_server_client Tcp)
add_dependencies(TcpSample01_server_client TcpClient TcpServer)
Expand Down
91 changes: 91 additions & 0 deletions samples/tcp/NonBlockingExperiment.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#include <MinimalSocket/tcp/TcpClient.h>
#include <MinimalSocket/tcp/TcpServer.h>

#include <atomic>
#include <iostream>
#include <mutex>
#include <sstream>
#include <thread>
#include <vector>
using namespace std;

using namespace MinimalSocket;

const std::uint16_t PORT = 44553;
const string MESSAGE = "Hello from the sender";

struct Logger {
Logger(const string &name) : name_{name} {}

template <typename... Args> void log(const Args &...args) {
std::stringstream buff;
(pack(buff, args), ...);
std::scoped_lock lock{getMtx()};
std::cout << '|' << name_ << "|: " << buff.str() << std::endl;
}

private:
template <typename T> static void pack(std::stringstream &recipient, T val) {
recipient << val;
}

static std::mutex &getMtx() {
static std::mutex res = std::mutex{};
return res;
}

string name_;
};

void server_loop(std::atomic_bool &done) {
Logger logger{"Server"};

std::optional<tcp::TcpConnectionBlocking> connection;
{
tcp::TcpServer<true> server{PORT, MinimalSocket::AddressFamily::IP_V4};
if (!server.open())
throw std::runtime_error{"unable to open the server"};
done = true;
logger.log("listening");
connection.emplace(server.acceptNewClient());
}
logger.log("connected");
while (true) {
logger.log("sending");
connection->send(MESSAGE);
logger.log("sleeping ... ");
std::this_thread::sleep_for(std::chrono::milliseconds{1500});
}
}

void client_loop() {
Logger logger{"Client"};

tcp::TcpClient<false> connection{Address{PORT}};

logger.log("connecting");
connection.open();
logger.log("connected");

while (true) {
auto res = connection.receive(MESSAGE.size());
if (res.empty())
logger.log("nothing");
else
logger.log("received `", res, '`');
std::this_thread::sleep_for(std::chrono::milliseconds{100});
}
}

int main() {
std::atomic_bool done = false;
std::thread server([&done]() { server_loop(done); });
while (!done.load()) {
}

client_loop();

server.join();

return EXIT_SUCCESS;
}
2 changes: 1 addition & 1 deletion src/header/MinimalSocket/Error.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ class SocketError : public ErrorCodeHolder, public Error {

class TimeOutError : public Error {
public:
TimeOutError() : Error("Timeout"){};
TimeOutError() : Error("Timeout reached"){};
};
} // namespace MinimalSocket
70 changes: 50 additions & 20 deletions src/header/MinimalSocket/core/Receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,11 @@
#include <mutex>

namespace MinimalSocket {
class ReceiverBase : public virtual Socket {
class ReceiverWithTimeout : public virtual Socket {
protected:
template <typename Pred>
void lazyUpdateAndUseTimeout(const Timeout &to, Pred what) {
std::scoped_lock lock{receive_mtx};
updateTimeout_(to);
what(receive_timeout);
}

private:
void updateTimeout_(const Timeout &timeout);

std::mutex receive_mtx;
private:
Timeout receive_timeout = NULL_TIMEOUT;
};

Expand All @@ -36,7 +28,7 @@ class ReceiverBase : public virtual Socket {
* receive, they will be satisfited one at a time, as an internal mutex must be
* locked before starting to receive.
*/
class Receiver : public ReceiverBase {
class ReceiverBlocking : public ReceiverWithTimeout {
public:
/**
* @param message the buffer that will store the received bytes.
Expand All @@ -63,6 +55,33 @@ class Receiver : public ReceiverBase {
*/
std::string receive(std::size_t expected_max_bytes,
const Timeout &timeout = NULL_TIMEOUT);

private:
std::mutex recv_mtx;
};

class ReceiverNonBlocking : public virtual Socket {
public:
std::size_t receive(BufferView message);

std::string receive(std::size_t expected_max_bytes);

private:
std::mutex recv_mtx;
};

template <bool BlockMode> class Receiver {};
template <> class Receiver<true> : public ReceiverBlocking {};
template <> class Receiver<false> : public ReceiverNonBlocking {};

struct ReceiveResult {
Address sender;
std::size_t received_bytes;
};

struct ReceiveStringResult {
Address sender;
std::string received_message;
};

/**
Expand All @@ -72,12 +91,8 @@ class Receiver : public ReceiverBase {
* receive, they will be satisfited one at a time, as an internal mutex must be
* locked before starting to receive.
*/
class ReceiverUnkownSender : public ReceiverBase {
class ReceiverUnkownSenderBlocking : public ReceiverWithTimeout {
public:
struct ReceiveResult {
Address sender;
std::size_t received_bytes;
};
/**
* @param message the buffer that will store the received bytes.
* @param timeout the timeout to consider. A NULL_TIMEOUT means actually to
Expand All @@ -90,10 +105,6 @@ class ReceiverUnkownSender : public ReceiverBase {
std::optional<ReceiveResult> receive(BufferView message,
const Timeout &timeout = NULL_TIMEOUT);

struct ReceiveStringResult {
Address sender;
std::string received_message;
};
/**
* @brief Similar to ReceiverUnkownSender::receive(Buffer &, const Timeout &),
* but internally building the recipient buffer which is converted into a
Expand All @@ -110,5 +121,24 @@ class ReceiverUnkownSender : public ReceiverBase {
std::optional<ReceiveStringResult>
receive(std::size_t expected_max_bytes,
const Timeout &timeout = NULL_TIMEOUT);

private:
std::mutex recv_mtx;
};

class ReceiverUnkownSenderNonBlocking : public virtual Socket {
public:
std::optional<ReceiveResult> receive(BufferView message);

std::optional<ReceiveStringResult> receive(std::size_t expected_max_bytes);

private:
std::mutex recv_mtx;
};

template <bool BlockMode> class ReceiverUnkownSender {};
template <>
class ReceiverUnkownSender<true> : public ReceiverUnkownSenderBlocking {};
template <>
class ReceiverUnkownSender<false> : public ReceiverUnkownSenderNonBlocking {};
} // namespace MinimalSocket
19 changes: 19 additions & 0 deletions src/header/MinimalSocket/core/SocketContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#pragma once

#include <MinimalSocket/core/Address.h>
#include <MinimalSocket/core/Socket.h>

#include <atomic>
#include <mutex>
Expand Down Expand Up @@ -98,4 +99,22 @@ class RemoteAddressFamilyAware {
private:
std::atomic<AddressFamily> remote_address_family;
};

class BlockingMode : virtual public Socket {
public:
BlockingMode &operator=(const BlockingMode &o) {
mode = o.mode;
return *this;
}

bool isBlocking() const { return mode; }

protected:
BlockingMode(bool mode) : mode{mode} {}

void setUp();

private:
bool mode;
};
} // namespace MinimalSocket
40 changes: 29 additions & 11 deletions src/header/MinimalSocket/tcp/TcpClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,48 @@
#include <MinimalSocket/core/SocketContext.h>

namespace MinimalSocket::tcp {
class TcpClient : public NonCopiable,
public Openable,
public Sender,
public Receiver,
public RemoteAddressAware {
public:
TcpClient(TcpClient &&o);
TcpClient &operator=(TcpClient &&o);
class TcpClientBase : public NonCopiable,
public Openable,
public BlockingMode,
public Sender,
public RemoteAddressAware {
protected:
TcpClientBase(TcpClientBase &&o);

void stealBase(TcpClientBase &o);

/**
* @brief The connection to the server is not asked in this c'tor which
* simply initialize this object. Such a connection is tried to be established
* when calling open(...)
* @param server_address the server to reach when opening this socket
*/
TcpClient(const Address &server_address);
TcpClientBase(const Address &server_address, bool block_mode);

protected:
void open_() override;
};

template <bool BlockMode>
class TcpClient : public TcpClientBase, public Receiver<BlockMode> {
public:
TcpClient(const Address &server_address)
: TcpClientBase{server_address, BlockMode} {}

TcpClient(TcpClient<BlockMode> &&o)
: TcpClientBase{std::forward<TcpClientBase>(o)} {}
TcpClient &operator=(TcpClient<BlockMode> &&o) {
this->stealBase(o);
return *this;
}
};

/**
* @return a client ready to ask the connection to the same server.
* Beware that a closed socket is returned, which can be later opened.
*/
TcpClient clone(const TcpClient &o);
template <bool BlockMode>
TcpClient<BlockMode> clone(const TcpClient<BlockMode> &o) {
return TcpClient<BlockMode>{o.getRemoteAddress()};
}

} // namespace MinimalSocket::tcp
Loading

0 comments on commit 21742dd

Please sign in to comment.