Skip to content

Commit

Permalink
test: add a mocked Sock that allows inspecting what has been Send() t…
Browse files Browse the repository at this point in the history
…o it

And also allows gradually providing the data to be returned by `Recv()`
and sending and receiving net messages (`CNetMessage`).
  • Loading branch information
vasild committed Nov 18, 2024
1 parent 2f6ce54 commit 5766bbe
Show file tree
Hide file tree
Showing 2 changed files with 322 additions and 0 deletions.
168 changes: 168 additions & 0 deletions src/test/util/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
#include <random.h>
#include <serialize.h>
#include <span.h>
#include <sync.h>

#include <chrono>
#include <optional>
#include <vector>

void ConnmanTestMsg::Handshake(CNode& node,
Expand Down Expand Up @@ -240,3 +243,168 @@ StaticContentsSock& StaticContentsSock::operator=(Sock&& other)
assert(false && "Move of Sock into StaticContentsSock not allowed.");
return *this;
}

ssize_t DynSock::Pipe::GetBytes(void* buf, size_t len, int flags)
{
WAIT_LOCK(m_mutex, lock);

if (m_data.empty()) {
if (m_eof) {
return 0;
}
errno = EAGAIN; // Same as recv(2) on a non-blocking socket.
return -1;
}

const size_t read_bytes{std::min(len, m_data.size())};

std::memcpy(buf, m_data.data(), read_bytes);
if ((flags & MSG_PEEK) == 0) {
m_data.erase(m_data.begin(), m_data.begin() + read_bytes);
}

return read_bytes;
}

std::optional<CNetMessage> DynSock::Pipe::GetNetMsg()
{
V1Transport transport{NodeId{0}};

{
WAIT_LOCK(m_mutex, lock);

WaitForDataOrEof(lock);
if (m_eof && m_data.empty()) {
return std::nullopt;
}

for (;;) {
Span<const uint8_t> s{m_data};
if (!transport.ReceivedBytes(s)) { // Consumed bytes are removed from the front of s.
return std::nullopt;
}
m_data.erase(m_data.begin(), m_data.begin() + m_data.size() - s.size());
if (transport.ReceivedMessageComplete()) {
break;
}
if (m_data.empty()) {
WaitForDataOrEof(lock);
if (m_eof && m_data.empty()) {
return std::nullopt;
}
}
}
}

bool reject{false};
CNetMessage msg{transport.GetReceivedMessage(/*time=*/{}, reject)};
if (reject) {
return std::nullopt;
}
return std::make_optional<CNetMessage>(std::move(msg));
}

void DynSock::Pipe::PushBytes(const void* buf, size_t len)
{
LOCK(m_mutex);
const uint8_t* b = static_cast<const uint8_t*>(buf);
m_data.insert(m_data.end(), b, b + len);
m_cond.notify_all();
}

void DynSock::Pipe::Eof()
{
LOCK(m_mutex);
m_eof = true;
m_cond.notify_all();
}

void DynSock::Pipe::WaitForDataOrEof(UniqueLock<Mutex>& lock)
{
Assert(lock.mutex() == &m_mutex);

m_cond.wait(lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) {
AssertLockHeld(m_mutex);
return !m_data.empty() || m_eof;
});
}

DynSock::DynSock(std::shared_ptr<Pipes> pipes, std::shared_ptr<Queue> accept_sockets)
: m_pipes{pipes}, m_accept_sockets{accept_sockets}
{
}

DynSock::~DynSock()
{
m_pipes->send.Eof();
}

ssize_t DynSock::Recv(void* buf, size_t len, int flags) const
{
return m_pipes->recv.GetBytes(buf, len, flags);
}

ssize_t DynSock::Send(const void* buf, size_t len, int) const
{
m_pipes->send.PushBytes(buf, len);
return len;
}

std::unique_ptr<Sock> DynSock::Accept(sockaddr* addr, socklen_t* addr_len) const
{
ZeroSock::Accept(addr, addr_len);
return m_accept_sockets->Pop().value_or(nullptr);
}

bool DynSock::Wait(std::chrono::milliseconds timeout,
Event requested,
Event* occurred) const
{
EventsPerSock ev;
ev.emplace(this, Events{requested});
const bool ret{WaitMany(timeout, ev)};
if (occurred != nullptr) {
*occurred = ev.begin()->second.occurred;
}
return ret;
}

bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
{
const auto deadline = std::chrono::steady_clock::now() + timeout;
bool at_least_one_event_occurred{false};

for (;;) {
// Check all sockets for readiness without waiting.
for (auto& [sock, events] : events_per_sock) {
if ((events.requested & Sock::SEND) != 0) {
// Always ready for Send().
events.occurred |= Sock::SEND;
at_least_one_event_occurred = true;
}

if ((events.requested & Sock::RECV) != 0) {
auto dyn_sock = reinterpret_cast<const DynSock*>(sock.get());
uint8_t b;
if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) {
events.occurred |= Sock::RECV;
at_least_one_event_occurred = true;
}
}
}

if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) {
break;
}

std::this_thread::sleep_for(10ms);
}

return true;
}

DynSock& DynSock::operator=(Sock&&)
{
assert(false && "Move of Sock into DynSock not allowed.");
return *this;
}
154 changes: 154 additions & 0 deletions src/test/util/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define BITCOIN_TEST_UTIL_NET_H

#include <compat/compat.h>
#include <netmessagemaker.h>
#include <net.h>
#include <net_permissions.h>
#include <net_processing.h>
Expand All @@ -19,9 +20,11 @@
#include <array>
#include <cassert>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <cstring>
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
#include <vector>
Expand Down Expand Up @@ -206,4 +209,155 @@ class StaticContentsSock : public ZeroSock
mutable size_t m_consumed{0};
};

/**
* A mocked Sock alternative that allows providing the data to be returned by Recv()
* and inspecting the data that has been supplied to Send().
*/
class DynSock : public ZeroSock
{
public:
/**
* Unidirectional bytes or CNetMessage queue (FIFO).
*/
class Pipe
{
public:
/**
* Get bytes and remove them from the pipe.
* @param[in] buf Destination to write bytes to.
* @param[in] len Write up to this number of bytes.
* @param[in] flags Same as the flags of `recv(2)`. Just `MSG_PEEK` is honored.
* @return The number of bytes written to `buf`. `0` if `Eof()` has been called.
* If no bytes are available then `-1` is returned and `errno` is set to `EAGAIN`.
*/
ssize_t GetBytes(void* buf, size_t len, int flags = 0) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);

/**
* Deserialize a `CNetMessage` and remove it from the pipe.
* If not enough bytes are available then the function will wait. If parsing fails
* or EOF is signaled to the pipe, then `std::nullopt` is returned.
*/
std::optional<CNetMessage> GetNetMsg() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);

/**
* Push bytes to the pipe.
*/
void PushBytes(const void* buf, size_t len) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);

/**
* Construct and push CNetMessage to the pipe.
*/
template <typename... Args>
void PushNetMsg(const std::string& type, Args&&... payload) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);

/**
* Signal end-of-file on the receiving end (`GetBytes()` or `GetNetMsg()`).
*/
void Eof() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);

private:
/**
* Return when there is some data to read or EOF has been signaled.
* @param[in,out] lock Unique lock that must have been derived from `m_mutex` by `WAIT_LOCK(m_mutex, lock)`.
*/
void WaitForDataOrEof(UniqueLock<Mutex>& lock) EXCLUSIVE_LOCKS_REQUIRED(m_mutex);

Mutex m_mutex;
std::condition_variable m_cond;
std::vector<uint8_t> m_data GUARDED_BY(m_mutex);
bool m_eof GUARDED_BY(m_mutex){false};
};

struct Pipes {
Pipe recv;
Pipe send;
};

/**
* A basic thread-safe queue, used for queuing sockets to be returned by Accept().
*/
class Queue
{
public:
using S = std::unique_ptr<DynSock>;

void Push(S s) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
LOCK(m_mutex);
m_queue.push(std::move(s));
}

std::optional<S> Pop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
LOCK(m_mutex);
if (m_queue.empty()) {
return std::nullopt;
}
S front{std::move(m_queue.front())};
m_queue.pop();
return front;
}

bool Empty() const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
LOCK(m_mutex);
return m_queue.empty();
}

private:
mutable Mutex m_mutex;
std::queue<S> m_queue GUARDED_BY(m_mutex);
};

/**
* Create a new mocked sock.
* @param[in] pipes Send/recv pipes used by the Send() and Recv() methods.
* @param[in] accept_sockets Sockets to return by the Accept() method.
*/
explicit DynSock(std::shared_ptr<Pipes> pipes, std::shared_ptr<Queue> accept_sockets);

~DynSock();

ssize_t Recv(void* buf, size_t len, int flags) const override;

ssize_t Send(const void* buf, size_t len, int) const override;

std::unique_ptr<Sock> Accept(sockaddr* addr, socklen_t* addr_len) const override;

bool Wait(std::chrono::milliseconds timeout,
Event requested,
Event* occurred = nullptr) const override;

bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const override;

private:
DynSock& operator=(Sock&&) override;

std::shared_ptr<Pipes> m_pipes;
std::shared_ptr<Queue> m_accept_sockets;
};

template <typename... Args>
void DynSock::Pipe::PushNetMsg(const std::string& type, Args&&... payload)
{
auto msg = NetMsg::Make(type, std::forward<Args>(payload)...);
V1Transport transport{NodeId{0}};

const bool queued{transport.SetMessageToSend(msg)};
assert(queued);

LOCK(m_mutex);

for (;;) {
const auto& [bytes, _more, _msg_type] = transport.GetBytesToSend(/*have_next_message=*/true);
if (bytes.empty()) {
break;
}
m_data.insert(m_data.end(), bytes.begin(), bytes.end());
transport.MarkBytesSent(bytes.size());
}

m_cond.notify_all();
}

#endif // BITCOIN_TEST_UTIL_NET_H

0 comments on commit 5766bbe

Please sign in to comment.