Skip to content
This repository has been archived by the owner on Sep 3, 2024. It is now read-only.

Control and reuse streams #98

Merged
merged 17 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ all: ${BUILD_DIR}
cmake --build ${BUILD_DIR} --parallel 8

${BUILD_DIR}: CMakeLists.txt cmd/CMakeLists.txt
cmake -B${BUILD_DIR} -DBUILD_TESTING=TRUE -DQTRANSPORT_BUILD_TESTS=TRUE -DCMAKE_BUILD_TYPE=Release .
cmake -B${BUILD_DIR} -DBUILD_TESTING=ON -DQTRANSPORT_BUILD_TESTS=ON -DCMAKE_BUILD_TYPE=Debug .

clean:
cmake --build ${BUILD_DIR} --target clean
Expand Down
63 changes: 47 additions & 16 deletions cmd/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ struct Delegate : public ITransport::TransportDelegate
private:
std::shared_ptr<ITransport> client;
uint64_t msgcount;
TransportContextId tcid;
TransportConnId conn_id;
cantina::LoggerPointer logger;

public:
Delegate(const cantina::LoggerPointer& logger)
: logger(std::make_shared<cantina::Logger>("CMD", logger))
{
msgcount = 0;
tcid = 0;
conn_id = 0;
}

void stop() {
Expand All @@ -34,29 +34,35 @@ struct Delegate : public ITransport::TransportDelegate

void setClientTransport(std::shared_ptr<ITransport> client) { this->client = client; }

TransportContextId getContextId() { return tcid; }
TransportConnId getContextId() const { return conn_id; }

void on_connection_status(const TransportContextId& context_id, const TransportStatus status)
void on_connection_status(const TransportConnId& conn_id, const TransportStatus status)
{
tcid = context_id;
logger->info << "Connection state change context: " << context_id << ", " << int(status) << std::flush;
logger->info << "Connection state change conn_id: " << conn_id << ", " << int(status) << std::flush;
}
void on_new_connection(const TransportContextId& /* context_id */, const TransportRemote& /* remote */) {}

void on_recv_notify(const TransportContextId& context_id, const StreamId& streamId)
void on_new_connection(const TransportConnId& , const TransportRemote&) {}

void on_recv_notify(const TransportConnId& conn_id, const DataContextId& data_ctx_id, [[maybe_unused]] const bool is_bidir)
{
static uint32_t prev_msg_num = 0;
static uint32_t prev_msgcount = 0;

while (true) {
auto data = client->dequeue(context_id, streamId);
auto data = client->dequeue(conn_id, data_ctx_id);

if (data.has_value()) {
msgcount++;

if (msgcount % 2000 == 0 && prev_msgcount != msgcount) {
logger->info << "conn_id: " << conn_id << " data_ctx_id: " << data_ctx_id << " msgcount: " << msgcount << std::flush;
}

uint32_t* msg_num = (uint32_t*)data.value().data();
if (msg_num == NULL) break;

if (prev_msg_num && (*msg_num - prev_msg_num) > 1) {
logger->info << "cid: " << context_id << " sid: " << streamId << " length: " << data->size()
logger->info << "conn_id: " << conn_id << " data_ctx_id: " << data_ctx_id << " length: " << data->size()
<< " RecvMsg (" << msgcount << ")"
<< " msg_num: " << *msg_num << " prev_num: " << prev_msg_num << "("
<< *msg_num - prev_msg_num << ")" << std::flush;
Expand All @@ -69,7 +75,9 @@ struct Delegate : public ITransport::TransportDelegate
}
}
}
void on_new_stream(const TransportContextId& /* context_id */, const StreamId& /* streamId */) {}


void on_new_data_context(const TransportConnId&, const DataContextId&) {}
};

cantina::LoggerPointer logger = std::make_shared<cantina::Logger>();
Expand All @@ -95,31 +103,52 @@ main()
if ((envVar = getenv("RELAY_PORT")))
server.port = atoi(envVar);

bool bidir = true;
if (getenv("RELAY_UNIDIR"))
bidir = false;

auto client = ITransport::make_client_transport(server, tconfig, d, logger);

logger->info << "bidir is " << (bidir ? "True" : "False") << std::flush;
logger->info << "client use_count: " << client.use_count() << std::flush;

d.setClientTransport(client);
logger->info << "after set client transport client use_count: " << client.use_count() << std::flush;

auto tcid = client->start();
auto conn_id = client->start();
uint8_t data_buf[4200]{ 0 };

while (client->status() != TransportStatus::Ready) {
logger->Log("Waiting for client to be ready");
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}

StreamId stream_id = client->createStream(tcid, true);
DataContextId data_ctx_id = client->createDataContext(conn_id, true, 1, bidir);

uint32_t* msg_num = (uint32_t*)&data_buf;
int period_count = 0;

while (true) {
ITransport::EnqueueFlags encode_flags { .new_stream = true, .clear_tx_queue = true, .use_reset = true};

while (client->status() != TransportStatus::Shutdown && client->status() != TransportStatus::Disconnected) {
period_count++;
for (int i = 0; i < 10; i++) {
(*msg_num)++;
auto data = bytes(data_buf, data_buf + sizeof(data_buf));

client->enqueue(tcid, server.proto == TransportProtocol::UDP ? 1 : stream_id, std::move(data));
if (period_count > 2000) {
GhostofCookie marked this conversation as resolved.
Show resolved Hide resolved
period_count = 0;
client->enqueue(conn_id,
server.proto == TransportProtocol::UDP ? 1 : data_ctx_id,
std::move(data),
1,
350,
encode_flags);
}
else {
client->enqueue(conn_id, server.proto == TransportProtocol::UDP ? 1 : data_ctx_id, std::move(data));
}

}

// Increase delay if using UDP, need to pace more
Expand All @@ -128,9 +157,11 @@ main()
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(2));
}


}

client->closeStream(tcid, stream_id);
client->deleteDataContext(conn_id, data_ctx_id);

logger->Log("Done with transport, closing");
client.reset();
Expand Down
52 changes: 36 additions & 16 deletions cmd/echoServer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ using namespace qtransport;
struct Delegate : public ITransport::TransportDelegate {
private:
std::shared_ptr<ITransport> server;
uint64_t msgcount;
cantina::LoggerPointer logger;

uint64_t msgcount {0};
uint64_t prev_msgcount {0};
uint32_t prev_msg_num {0};
DataContextId out_data_ctx {0};

public:
Delegate(const cantina::LoggerPointer& logger)
: logger(std::make_shared<cantina::Logger>("ECHO", logger))
{
msgcount = 0;
}

void stop() {
Expand All @@ -29,48 +32,65 @@ struct Delegate : public ITransport::TransportDelegate {
this->server = server;
}

void on_connection_status(const TransportContextId &context_id,
void on_connection_status(const TransportConnId &conn_id,
const TransportStatus status) {
logger->info << "Connection state change context: " << context_id << ", "
logger->info << "Connection state change conn_id: " << conn_id << ", "
<< int(status) << std::flush;
}

void on_new_connection(const TransportContextId &context_id,
void on_new_connection(const TransportConnId &conn_id,
const TransportRemote &remote) {
logger->info << "New connection cid: " << context_id << " from "
logger->info << "New connection conn_id: " << conn_id << " from "
<< remote.host_or_ip << ":" << remote.port << std::flush;

out_data_ctx = this->server->createDataContext(conn_id, true, 10);
}

void on_recv_notify(const TransportContextId &context_id,
const StreamId &streamId) {
static uint32_t prev_msg_num = 0;
void on_recv_notify(const TransportConnId &conn_id,
const DataContextId &data_ctx_id, const bool is_bidir) {

while (true) {
auto data = server->dequeue(context_id, streamId);
auto data = server->dequeue(conn_id, data_ctx_id);

if (data.has_value()) {
msgcount++;

if (msgcount % 2000 == 0 && prev_msgcount != msgcount) {
GhostofCookie marked this conversation as resolved.
Show resolved Hide resolved
prev_msgcount = msgcount;
logger->info << "conn_id: " << conn_id << " data_ctx_id: " << data_ctx_id << " msgcount: " << msgcount << std::flush;
}

uint32_t *msg_num = (uint32_t *)data->data();
if (msg_num == nullptr)
continue;

if (prev_msg_num && (*msg_num - prev_msg_num) > 1) {
logger->info << "cid: " << context_id << " sid: " << streamId << " length: " << data->size()
logger->info << "conn_id: " << conn_id << " data_ctx_id: " << data_ctx_id << " length: " << data->size()
<< " RecvMsg (" << msgcount << ")"
<< " msg_num: " << *msg_num << " prev_num: " << prev_msg_num << "("
<< *msg_num - prev_msg_num << ")" << std::flush;
}

prev_msg_num = *msg_num;

server->enqueue(context_id, streamId, std::move(data.value()));
if (is_bidir) {
server->enqueue(conn_id, data_ctx_id, std::move(data.value()));

} else {
server->enqueue(conn_id, out_data_ctx, std::move(data.value()));
}

} else {
break;
}
}
}

void on_new_stream(const TransportContextId & /* context_id */,
const StreamId & /* streamId */) {}
void on_new_data_context(const TransportConnId& conn_id, const DataContextId& data_ctx_id)
{
logger->info << "Callback for new data context conn_id: " << conn_id << " data_ctx_id: " << data_ctx_id
<< std::flush;
}
};

int main() {
Expand All @@ -94,8 +114,8 @@ int main() {

d.setServerTransport(server);

while (true) {
std::this_thread::sleep_for(std::chrono::seconds(60));
while (server->status() != TransportStatus::Shutdown) {
std::this_thread::sleep_for(std::chrono::seconds(3));
}

server.reset();
Expand Down
13 changes: 13 additions & 0 deletions include/transport/priority_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,19 @@ namespace qtransport {
}
}

/**
* @brief Clear queue
*/
void clear() {
std::lock_guard<std::mutex> _(_mutex);

for (auto& tqueue: _queue) {
if (tqueue && !tqueue->empty()) {
tqueue->clear();
}
}
}

// TODO: Consider changing empty/size to look at timeQueue sizes - maybe support blocking pops
size_t size() const
{
Expand Down
20 changes: 18 additions & 2 deletions include/transport/safe_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ class safe_queue
pop_front_internal();
}



/**
* @brief Block waiting for data in queue, then remove the first object from
* queue (oldest object)
Expand Down Expand Up @@ -145,6 +143,15 @@ class safe_queue
return queue.size();
}

/**
* @brief Clear the queue
*/
void clear() {
std::lock_guard<std::mutex> _(mutex);
std::queue<T> empty;
std::swap(queue, empty);
}

/**
* @brief Check if queue is empty
*
Expand Down Expand Up @@ -189,6 +196,10 @@ class safe_queue
auto elem = queue.front();
queue.pop();

if (queue.empty()) {
_empty = true;
}

return std::move(elem);
}

Expand All @@ -200,10 +211,15 @@ class safe_queue
void pop_front_internal()
{
if (queue.empty()) {
_empty = true;
return;
}

queue.pop();

if (queue.empty()) {
_empty = true;
}
}

std::atomic<bool> _empty { true };
Expand Down
6 changes: 5 additions & 1 deletion include/transport/time_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ namespace qtransport {
size_t size() const noexcept { return _queue.size() - _queue_index; }
bool empty() const noexcept { return _queue.empty() || _queue_index >= _queue.size(); }

private:
/**
* @brief Clear/reset the queue to no objects
*/
Expand All @@ -322,6 +321,9 @@ namespace qtransport {
}
}

private:


/**
* @brief Based on current time, adjust and move the bucket index with time
* (sliding window)
Expand Down Expand Up @@ -367,6 +369,8 @@ namespace qtransport {
{
if (ttl > _duration) {
throw std::invalid_argument("TTL is greater than max duration");
} else if (ttl == 0) {
ttl = _duration;
}

ttl = ttl / _interval;
Expand Down
Loading