Skip to content

Commit

Permalink
Metrics now work for concurrent connections.
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko committed Jul 4, 2024
1 parent 86c35a4 commit 4af5439
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 122 deletions.
99 changes: 52 additions & 47 deletions xtransmit/metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ bool validate_packet_checksum(const const_buffer& payload)

std:: string validator::stats()
{
std::lock_guard lock(m_mtx);
std::stringstream ss;

auto latency_str = [](long long val, long long na_val) -> string {
Expand Down Expand Up @@ -184,61 +185,65 @@ std:: string validator::stats()
return ss.str();
}

string validator::stats_csv(bool only_header)
string validator::stats_csv_header()
{
stringstream ss;

if (only_header)
{
#ifdef HAS_PUT_TIME
ss << "Timepoint,";
ss << "Timepoint,";
#endif
ss << "usLatencyMin,";
ss << "usLatencyMax,";
ss << "usLatencyAvg,";
ss << "usJitter,";
ss << "usDelayFactor,";
ss << "pktReceived,";
ss << "pktLost,";
ss << "pktReordered,";
ss << "pktReorderDist,";
ss << "pktChecksumError,";
ss << "pktLengthError";
ss << '\n';
}
else
{
ss << "iConn,";
ss << "usLatencyMin,";
ss << "usLatencyMax,";
ss << "usLatencyAvg,";
ss << "usJitter,";
ss << "usDelayFactor,";
ss << "pktReceived,";
ss << "pktLost,";
ss << "pktReordered,";
ss << "pktReorderDist,";
ss << "pktChecksumError,";
ss << "pktLengthError";
ss << '\n';
return ss.str();
}

string validator::stats_csv()
{
stringstream ss;

std::lock_guard lock(m_mtx);
#ifdef HAS_PUT_TIME
ss << print_timestamp_now() << ',';
ss << print_timestamp_now() << ',';
#endif
ss << m_id << ',';

// Empty string (N/A) on default-initialized latency min and max values.
auto latency_str = [](long long val, long long na_val) -> string {
if (val == na_val)
return "";
return to_string(val);
};

// Empty string (N/A) on default-initialized latency min and max values.
auto latency_str = [](long long val, long long na_val) -> string {
if (val == na_val)
return "";
return to_string(val);
};

const auto latency_min = m_latency.get_latency_min();
ss << latency_str(latency_min, numeric_limits<long long>::max()) << ',';
const auto latency_max = m_latency.get_latency_max();
ss << latency_str(latency_max, numeric_limits<long long>::min()) << ',';
ss << latency_str(m_latency.get_latency_avg(), -1) << ',';
ss << m_jitter.get_jitter() << ',';
ss << m_delay_factor.get_delay_factor() << ',';
const auto stats = m_reorder.get_stats();
ss << stats.pkts_processed << ',';
ss << stats.pkts_lost << ',';
ss << stats.pkts_reordered << ',';
ss << stats.reorder_dist << ',';
const auto intgr_stats = m_integrity.get_stats();
ss << intgr_stats.pkts_wrong_checksum << ',';
ss << intgr_stats.pkts_wrong_len;
ss << '\n';

m_latency.reset();
m_delay_factor.reset();
}
const auto latency_min = m_latency.get_latency_min();
ss << latency_str(latency_min, numeric_limits<long long>::max()) << ',';
const auto latency_max = m_latency.get_latency_max();
ss << latency_str(latency_max, numeric_limits<long long>::min()) << ',';
ss << latency_str(m_latency.get_latency_avg(), -1) << ',';
ss << m_jitter.get_jitter() << ',';
ss << m_delay_factor.get_delay_factor() << ',';
const auto stats = m_reorder.get_stats();
ss << stats.pkts_processed << ',';
ss << stats.pkts_lost << ',';
ss << stats.pkts_reordered << ',';
ss << stats.reorder_dist << ',';
const auto intgr_stats = m_integrity.get_stats();
ss << intgr_stats.pkts_wrong_checksum << ',';
ss << intgr_stats.pkts_wrong_len;
ss << '\n';

m_latency.reset();
m_delay_factor.reset();

return ss.str();
}
Expand Down
9 changes: 6 additions & 3 deletions xtransmit/metrics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ namespace metrics
class validator
{
public:
validator() {}
validator(int id) : m_id(id) {}

public:
inline void validate_packet(const const_buffer& payload)
{
std::lock_guard lock(m_mtx);
const auto sys_time_now = system_clock::now();
const auto std_time_now = steady_clock::now();

Expand All @@ -99,14 +99,17 @@ namespace metrics
}

std::string stats();
std::string stats_csv(bool only_header = false);
std::string stats_csv();
static std::string stats_csv_header();

private:
const int m_id;
latency m_latency;
jitter m_jitter;
delay_factor m_delay_factor;
reorder m_reorder;
integrity m_integrity;
mutable std::mutex m_mtx;
};


Expand Down
156 changes: 156 additions & 0 deletions xtransmit/metrics_writer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
#include <thread>
#include "metrics_writer.hpp"

// submodules
#include "spdlog/spdlog.h"

using namespace std;
using namespace std::chrono;

namespace xtransmit
{
namespace metrics
{

metrics_writer::metrics_writer(const std::string& filename, const std::chrono::milliseconds& interval)
: m_interval(interval)
{
if (!filename.empty())
{
m_file.open(filename, ios::out);
if (!m_file)
{
const auto msg = fmt::format("[METRICS] Failed to open file for output. Path: {0}.", filename);
spdlog::critical(msg);
throw runtime_error(msg);
}
m_file << validator::stats_csv_header() << flush;
}
}

metrics_writer::~metrics_writer() { stop(); }

void metrics_writer::add_validator(shared_validator v, SOCKET id)
{
if (!v)
{
spdlog::error("[METRICS] No validator supplied.");
return;
}

m_lock.lock();
m_validators.emplace(make_pair(id, std::move(v)));
m_lock.unlock();

spdlog::trace("[METRICS] Added validator {}.", id);

if (m_metrics_future.valid())
return;

m_stop_token = false;
m_metrics_future = launch();
}

void metrics_writer::remove_validator(SOCKET id)
{
m_lock.lock();
const size_t n = m_validators.erase(id);
m_lock.unlock();

if (n == 1)
{
spdlog::trace("[METRICS] Removed validator {}.", id);
}
else
{
spdlog::trace("[METRICS] Removing validator {}: not found, num removed: {}.", id, n);
}
}

void metrics_writer::clear()
{
std::lock_guard l(m_lock);
m_validators.clear();
}

void metrics_writer::stop()
{
m_stop_token = true;
if (m_metrics_future.valid())
m_metrics_future.wait();
}

future<void> metrics_writer::launch()
{
auto print_metrics = [](map<SOCKET, shared_validator>& validators, ofstream& fout, mutex& stats_lock)
{
const bool print_to_file = fout.is_open();
scoped_lock<mutex> lock(stats_lock);

for (auto& it : validators)
{
if (!it.second)
{
// Skip empty (will be erased in a separate loop below).
continue;
}

auto* v = it.second.get();
if (v == nullptr)
{
spdlog::warn("[METRICS] Removing validator {}. Reason: nullptr.", it.first);
it.second.reset();
continue;
}

if (print_to_file)
fout << v->stats_csv() << flush;
else
spdlog::info("[METRICS] @{}: {}", it.first, v->stats());
}

auto delete_empty = [&validators]()
{
auto it = find_if(validators.begin(),
validators.end(),
[](pair<int, shared_validator> const& p)
{
return !p.second; // true if empty
});
if (it == validators.end())
return false;

validators.erase(it);
return true;
};

// Check if there are empty sockets to delete.
// Cannot do it in the above for loop because this modifies the container.
while (delete_empty())
{
}

return;
};

auto metrics_func = [&print_metrics](map<SOCKET, shared_validator>& validators,
ofstream& out,
const milliseconds interval,
mutex& stats_lock,
const atomic_bool& stop_stats)
{
while (!stop_stats)
{
print_metrics(validators, out, stats_lock);

// No lock on stats_lock while sleeping
this_thread::sleep_for(interval);
}
};

return async(
::launch::async, metrics_func, ref(m_validators), ref(m_file), m_interval, ref(m_lock), ref(m_stop_token));
}

} // namespace metrics
} // namespace xtransmit
47 changes: 47 additions & 0 deletions xtransmit/metrics_writer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#pragma once
#include <atomic>
#include <chrono>
#include <future>
#include <fstream>
#include <mutex>
#include <string>
#include <vector>
#include <map>

#include "metrics.hpp"
#include "socket.hpp"


namespace xtransmit
{
namespace metrics
{

class metrics_writer
{
public:
metrics_writer(const std::string& filename, const std::chrono::milliseconds& interval);
~metrics_writer();

public:
using shared_sock = std::shared_ptr<socket::isocket>;
using shared_validator = std::shared_ptr<validator>;
void add_validator(shared_validator v, SOCKET id);
void remove_validator(SOCKET id);
void clear();
void stop();

private:
std::future<void> launch();

std::atomic<bool> m_stop_token;
std::ofstream m_file;
std::map<SOCKET, shared_validator> m_validators;
std::future<void> m_metrics_future;
const std::chrono::milliseconds m_interval;
std::mutex m_lock;
};

} // namespace socket
} // namespace xtransmit

Loading

0 comments on commit 4af5439

Please sign in to comment.