Skip to content
This repository has been archived by the owner on Sep 3, 2024. It is now read-only.

Control and reuse streams #98

Merged
merged 17 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ all: ${BUILD_DIR}
cmake --build ${BUILD_DIR} --parallel 8

${BUILD_DIR}: CMakeLists.txt cmd/CMakeLists.txt
cmake -B${BUILD_DIR} -DBUILD_TESTING=TRUE -DQTRANSPORT_BUILD_TESTS=TRUE -DCMAKE_BUILD_TYPE=Release .
cmake -B${BUILD_DIR} -DBUILD_TESTING=TRUE -DQTRANSPORT_BUILD_TESTS=TRUE -DCMAKE_BUILD_TYPE=Debug .

clean:
cmake --build ${BUILD_DIR} --target clean
Expand Down
63 changes: 47 additions & 16 deletions cmd/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ struct Delegate : public ITransport::TransportDelegate
private:
std::shared_ptr<ITransport> client;
uint64_t msgcount;
TransportContextId tcid;
TransportConnId conn_id;
cantina::LoggerPointer logger;

public:
Delegate(const cantina::LoggerPointer& logger)
: logger(std::make_shared<cantina::Logger>("CMD", logger))
{
msgcount = 0;
tcid = 0;
conn_id = 0;
}

void stop() {
Expand All @@ -34,29 +34,35 @@ struct Delegate : public ITransport::TransportDelegate

void setClientTransport(std::shared_ptr<ITransport> client) { this->client = client; }

TransportContextId getContextId() { return tcid; }
TransportConnId getContextId() { return conn_id; }
GhostofCookie marked this conversation as resolved.
Show resolved Hide resolved

void on_connection_status(const TransportContextId& context_id, const TransportStatus status)
void on_connection_status(const TransportConnId& conn_id, const TransportStatus status)
{
tcid = context_id;
logger->info << "Connection state change context: " << context_id << ", " << int(status) << std::flush;
logger->info << "Connection state change conn_id: " << conn_id << ", " << int(status) << std::flush;
}
void on_new_connection(const TransportContextId& /* context_id */, const TransportRemote& /* remote */) {}

void on_recv_notify(const TransportContextId& context_id, const StreamId& streamId)
void on_new_connection(const TransportConnId& , const TransportRemote&) {}

void on_recv_notify(const TransportConnId& conn_id, const DataContextId& data_ctx_id, [[maybe_unused]] const bool is_bidir)
{
static uint32_t prev_msg_num = 0;
static uint32_t prev_msgcount = 0;

while (true) {
auto data = client->dequeue(context_id, streamId);
auto data = client->dequeue(conn_id, data_ctx_id);

if (data.has_value()) {
msgcount++;

if (msgcount % 2000 == 0 && prev_msgcount != msgcount) {
logger->info << "conn_id: " << conn_id << " data_ctx_id: " << data_ctx_id << " msgcount: " << msgcount << std::flush;
}

uint32_t* msg_num = (uint32_t*)data.value().data();
if (msg_num == NULL) break;

if (prev_msg_num && (*msg_num - prev_msg_num) > 1) {
logger->info << "cid: " << context_id << " sid: " << streamId << " length: " << data->size()
logger->info << "conn_id: " << conn_id << " data_ctx_id: " << data_ctx_id << " length: " << data->size()
<< " RecvMsg (" << msgcount << ")"
<< " msg_num: " << *msg_num << " prev_num: " << prev_msg_num << "("
<< *msg_num - prev_msg_num << ")" << std::flush;
Expand All @@ -69,7 +75,9 @@ struct Delegate : public ITransport::TransportDelegate
}
}
}
void on_new_stream(const TransportContextId& /* context_id */, const StreamId& /* streamId */) {}


void on_new_data_context(const TransportConnId&, const DataContextId&) {}
};

cantina::LoggerPointer logger = std::make_shared<cantina::Logger>();
Expand All @@ -95,31 +103,52 @@ main()
if ((envVar = getenv("RELAY_PORT")))
server.port = atoi(envVar);

bool bidir = true;
if (getenv("RELAY_UNIDIR"))
bidir = false;

auto client = ITransport::make_client_transport(server, tconfig, d, logger);

logger->info << "bidir is " << (bidir ? "True" : "False") << std::flush;
logger->info << "client use_count: " << client.use_count() << std::flush;

d.setClientTransport(client);
logger->info << "after set client transport client use_count: " << client.use_count() << std::flush;

auto tcid = client->start();
auto conn_id = client->start();
uint8_t data_buf[4200]{ 0 };

while (client->status() != TransportStatus::Ready) {
logger->Log("Waiting for client to be ready");
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}

StreamId stream_id = client->createStream(tcid, true);
DataContextId data_ctx_id = client->createDataContext(conn_id, true, 1, bidir);

uint32_t* msg_num = (uint32_t*)&data_buf;
int period_count = 0;

while (true) {
ITransport::EnqueueFlags encode_flags { .new_stream = true, .clear_tx_queue = true, .use_reset = true};

while (client->status() != TransportStatus::Shutdown && client->status() != TransportStatus::Disconnected) {
period_count++;
for (int i = 0; i < 10; i++) {
(*msg_num)++;
auto data = bytes(data_buf, data_buf + sizeof(data_buf));

client->enqueue(tcid, server.proto == TransportProtocol::UDP ? 1 : stream_id, std::move(data));
if (period_count > 2000) {
GhostofCookie marked this conversation as resolved.
Show resolved Hide resolved
period_count = 0;
client->enqueue(conn_id,
server.proto == TransportProtocol::UDP ? 1 : data_ctx_id,
std::move(data),
1,
350,
encode_flags);
}
else {
client->enqueue(conn_id, server.proto == TransportProtocol::UDP ? 1 : data_ctx_id, std::move(data));
}

}

// Increase delay if using UDP, need to pace more
Expand All @@ -128,9 +157,11 @@ main()
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(2));
}


}

client->closeStream(tcid, stream_id);
client->deleteDataContext(conn_id, data_ctx_id);

logger->Log("Done with transport, closing");
client.reset();
Expand Down
52 changes: 36 additions & 16 deletions cmd/echoServer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ using namespace qtransport;
struct Delegate : public ITransport::TransportDelegate {
private:
std::shared_ptr<ITransport> server;
uint64_t msgcount;
cantina::LoggerPointer logger;

uint64_t msgcount {0};
uint64_t prev_msgcount {0};
uint32_t prev_msg_num {0};
DataContextId out_data_ctx {0};

public:
Delegate(const cantina::LoggerPointer& logger)
: logger(std::make_shared<cantina::Logger>("ECHO", logger))
{
msgcount = 0;
}

void stop() {
Expand All @@ -29,48 +32,65 @@ struct Delegate : public ITransport::TransportDelegate {
this->server = server;
}

void on_connection_status(const TransportContextId &context_id,
void on_connection_status(const TransportConnId &conn_id,
const TransportStatus status) {
logger->info << "Connection state change context: " << context_id << ", "
logger->info << "Connection state change conn_id: " << conn_id << ", "
<< int(status) << std::flush;
}

void on_new_connection(const TransportContextId &context_id,
void on_new_connection(const TransportConnId &conn_id,
const TransportRemote &remote) {
logger->info << "New connection cid: " << context_id << " from "
logger->info << "New connection conn_id: " << conn_id << " from "
<< remote.host_or_ip << ":" << remote.port << std::flush;

out_data_ctx = this->server->createDataContext(conn_id, true, 10);
}

void on_recv_notify(const TransportContextId &context_id,
const StreamId &streamId) {
static uint32_t prev_msg_num = 0;
void on_recv_notify(const TransportConnId &conn_id,
const DataContextId &data_ctx_id, const bool is_bidir) {

while (true) {
auto data = server->dequeue(context_id, streamId);
auto data = server->dequeue(conn_id, data_ctx_id);

if (data.has_value()) {
msgcount++;

if (msgcount % 2000 == 0 && prev_msgcount != msgcount) {
GhostofCookie marked this conversation as resolved.
Show resolved Hide resolved
prev_msgcount = msgcount;
logger->info << "conn_id: " << conn_id << " data_ctx_id: " << data_ctx_id << " msgcount: " << msgcount << std::flush;
}

uint32_t *msg_num = (uint32_t *)data->data();
if (msg_num == nullptr)
continue;

if (prev_msg_num && (*msg_num - prev_msg_num) > 1) {
logger->info << "cid: " << context_id << " sid: " << streamId << " length: " << data->size()
logger->info << "conn_id: " << conn_id << " data_ctx_id: " << data_ctx_id << " length: " << data->size()
<< " RecvMsg (" << msgcount << ")"
<< " msg_num: " << *msg_num << " prev_num: " << prev_msg_num << "("
<< *msg_num - prev_msg_num << ")" << std::flush;
}

prev_msg_num = *msg_num;

server->enqueue(context_id, streamId, std::move(data.value()));
if (is_bidir) {
server->enqueue(conn_id, data_ctx_id, std::move(data.value()));

} else {
server->enqueue(conn_id, out_data_ctx, std::move(data.value()));
}

} else {
break;
}
}
}

void on_new_stream(const TransportContextId & /* context_id */,
const StreamId & /* streamId */) {}
void on_new_data_context(const TransportConnId& conn_id, const DataContextId& data_ctx_id)
{
logger->info << "Callback for new data context conn_id: " << conn_id << " data_ctx_id: " << data_ctx_id
<< std::flush;
}
};

int main() {
Expand All @@ -94,8 +114,8 @@ int main() {

d.setServerTransport(server);

while (true) {
std::this_thread::sleep_for(std::chrono::seconds(60));
while (server->status() != TransportStatus::Shutdown) {
std::this_thread::sleep_for(std::chrono::seconds(3));
}

server.reset();
Expand Down
13 changes: 13 additions & 0 deletions include/transport/priority_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,19 @@ namespace qtransport {
}
}

/**
* @brief Clear queue
*/
void clear() {
std::lock_guard<std::mutex> _(_mutex);

for (auto& tqueue: _queue) {
if (tqueue && !tqueue->empty()) {
tqueue->clear();
}
}
}

// TODO: Consider changing empty/size to look at timeQueue sizes - maybe support blocking pops
size_t size() const
{
Expand Down
20 changes: 18 additions & 2 deletions include/transport/safe_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ class safe_queue
pop_front_internal();
}



/**
* @brief Block waiting for data in queue, then remove the first object from
* queue (oldest object)
Expand Down Expand Up @@ -145,6 +143,15 @@ class safe_queue
return queue.size();
}

/**
* @brief Clear the queue
*/
void clear() {
std::lock_guard<std::mutex> _(mutex);
std::queue<T> empty;
std::swap(queue, empty);
}

/**
* @brief Check if queue is empty
*
Expand Down Expand Up @@ -189,6 +196,10 @@ class safe_queue
auto elem = queue.front();
queue.pop();

if (queue.empty()) {
_empty = true;
}

return std::move(elem);
}

Expand All @@ -200,10 +211,15 @@ class safe_queue
void pop_front_internal()
{
if (queue.empty()) {
_empty = true;
return;
}

queue.pop();

if (queue.empty()) {
_empty = true;
}
}

std::atomic<bool> _empty { true };
Expand Down
4 changes: 3 additions & 1 deletion include/transport/time_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ namespace qtransport {
size_t size() const noexcept { return _queue.size() - _queue_index; }
bool empty() const noexcept { return _queue.empty() || _queue_index >= _queue.size(); }

private:
/**
* @brief Clear/reset the queue to no objects
*/
Expand All @@ -322,6 +321,9 @@ namespace qtransport {
}
}

private:


/**
* @brief Based on current time, adjust and move the bucket index with time
* (sliding window)
Expand Down
Loading