Skip to content

Commit

Permalink
Added Admin API methods to tweak BWE debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
lminiero committed Oct 9, 2023
1 parent c1d5a87 commit 63a65a7
Show file tree
Hide file tree
Showing 5 changed files with 277 additions and 56 deletions.
162 changes: 113 additions & 49 deletions src/bwe.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@

#include <inttypes.h>
#include <string.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/socket.h>

#include "bwe.h"
#include "debug.h"
#include "utils.h"
#include "ip-utils.h"

#ifdef BWE_DEBUGGING
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/socket.h>
#endif

const char *janus_bwe_twcc_status_description(janus_bwe_twcc_status status) {
switch(status) {
Expand Down Expand Up @@ -66,36 +65,7 @@ janus_bwe_context *janus_bwe_context_create(void) {
bwe->acked = janus_bwe_stream_bitrate_create();
bwe->delays = janus_bwe_delay_tracker_create(0);
bwe->probing_mindex = -1;
#ifdef BWE_DEBUGGING
char filename[256];
g_snprintf(filename, sizeof(filename), "/tmp/bwe-janus-%"SCNi64, janus_get_real_time());
bwe->csv = fopen(filename, "wt");
char line[2048];
g_snprintf(line, sizeof(line), "time,status,estimate,probing_target,bitrate_out,rtx_out,probing_out,bitrate_in,rtx_in,probing_in,acked,lost,loss_ratio,avg_delay,avg_delay_weighted,avg_delay_fb\n");
fwrite(line, sizeof(char), strlen(line), bwe->csv);
bwe->fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if(bwe->fd == -1) {
JANUS_LOG(LOG_ERR, "Error creating socket: %s\n", g_strerror(errno));
} else {
struct sockaddr_in address = { 0 };
address.sin_family = AF_INET;
address.sin_port = htons(0);
address.sin_addr.s_addr = INADDR_ANY;
if(bind(bwe->fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
JANUS_LOG(LOG_ERR, "Error binding socket: %s\n", g_strerror(errno));
close(bwe->fd);
bwe->fd = -1;
} else {
address.sin_port = htons(8878);
address.sin_addr.s_addr = inet_addr("127.0.0.1");
if(connect(bwe->fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
JANUS_LOG(LOG_ERR, "Error connecting socket: %s\n", g_strerror(errno));
close(bwe->fd);
bwe->fd = -1;
}
}
}
#endif
bwe->fd = -1;
return bwe;
}

Expand All @@ -106,11 +76,10 @@ void janus_bwe_context_destroy(janus_bwe_context *bwe) {
janus_bwe_stream_bitrate_destroy(bwe->sent);
janus_bwe_stream_bitrate_destroy(bwe->acked);
janus_bwe_delay_tracker_destroy(bwe->delays);
#ifdef BWE_DEBUGGING
fclose(bwe->csv);
if(bwe->csv != NULL)
fclose(bwe->csv);
if(bwe->fd > -1)
close(bwe->fd);
#endif
g_free(bwe);
}
}
Expand Down Expand Up @@ -280,17 +249,20 @@ void janus_bwe_context_update(janus_bwe_context *bwe) {
now, janus_bwe_status_description(bwe->status),
bitrate_out / 1024, probing_out / 1024, bitrate_in / 1024, probing_in / 1024,
bwe->loss_ratio, bwe->avg_delay, bwe->estimate);
#ifdef BWE_DEBUGGING
/* Save the details to CSV */
char line[2048];
g_snprintf(line, sizeof(line), "%"SCNi64",%d,%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu16",%"SCNu16",%.2f,%.2f,%.2f,%.2f\n",
now - bwe->started, bwe->status,
bwe->estimate, bwe->probing_target, bitrate_out, rtx_out, probing_out, bitrate_in, rtx_in, probing_in,
bwe->received_pkts, bwe->lost_pkts, bwe->loss_ratio, avg_delay, avg_delay_weighted, avg_delay_latest);
fwrite(line, sizeof(char), strlen(line), bwe->csv);
if(bwe->fd > -1)
send(bwe->fd, line, strlen(line), 0);
#endif

/* Save the details to CSV and/or send them externally via UDP, if enabled */
if(bwe->csv != NULL || bwe->fd > -1) {
char line[2048];
g_snprintf(line, sizeof(line), "%"SCNi64",%d,%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu16",%"SCNu16",%.2f,%.2f,%.2f,%.2f\n",
now - bwe->started, bwe->status,
bwe->estimate, bwe->probing_target, bitrate_out, rtx_out, probing_out, bitrate_in, rtx_in, probing_in,
bwe->received_pkts, bwe->lost_pkts, bwe->loss_ratio, avg_delay, avg_delay_weighted, avg_delay_latest);
if(bwe->csv != NULL)
fwrite(line, sizeof(char), strlen(line), bwe->csv);
if(bwe->fd > -1)
send(bwe->fd, line, strlen(line), 0);
}

/* Reset values */
bwe->delay = 0;
bwe->received_pkts = 0;
Expand All @@ -302,6 +274,98 @@ void janus_bwe_context_update(janus_bwe_context *bwe) {
}
}

gboolean janus_bwe_save_csv(janus_bwe_context *bwe, const char *path) {
if(bwe == NULL || path == NULL || bwe->csv != NULL)
return FALSE;
/* Open the CSV file */
bwe->csv = fopen(path, "wt");
if(bwe->csv == NULL) {
JANUS_LOG(LOG_ERR, "Couldn't open CSV file for BWE stats: %s\n", g_strerror(errno));
return FALSE;
}
/* Write a header line with the names of the fields we'll save */
char line[2048];
g_snprintf(line, sizeof(line), "time,status,estimate,probing_target,bitrate_out,rtx_out,probing_out,bitrate_in,rtx_in,probing_in,acked,lost,loss_ratio,avg_delay,avg_delay_weighted,avg_delay_fb\n");
fwrite(line, sizeof(char), strlen(line), bwe->csv);
fflush(bwe->csv);
/* Done */
return TRUE;
}

void janus_bwe_close_csv(janus_bwe_context *bwe) {
if(bwe == NULL || bwe->csv == NULL)
return;
fclose(bwe->csv);
bwe->csv = NULL;
}

gboolean janus_bwe_save_live(janus_bwe_context *bwe, const char *host, uint16_t port) {
if(bwe == NULL || host == NULL || port == 0 || bwe->fd > -1)
return FALSE;
/* Check if we need to resolve this host address */
struct addrinfo *res = NULL, *start = NULL;
janus_network_address addr;
janus_network_address_string_buffer addr_buf;
const char *resolved_host = NULL;
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
if(getaddrinfo(host, NULL, NULL, &res) == 0) {
start = res;
while(res != NULL) {
if(janus_network_address_from_sockaddr(res->ai_addr, &addr) == 0 &&
janus_network_address_to_string_buffer(&addr, &addr_buf) == 0) {
/* Resolved */
resolved_host = janus_network_address_string_from_buffer(&addr_buf);
freeaddrinfo(start);
start = NULL;
break;
}
res = res->ai_next;
}
}
if(resolved_host == NULL) {
if(start)
freeaddrinfo(start);
JANUS_LOG(LOG_ERR, "Could not resolve address (%s) for BWE stats...\n", host);
return FALSE;
}
host = resolved_host;
/* Create the socket */
bwe->fd = socket(addr.family, SOCK_DGRAM, IPPROTO_UDP);
if(bwe->fd == -1) {
JANUS_LOG(LOG_ERR, "Error creating socket for BWE stats: %s\n", g_strerror(errno));
return FALSE;
}
struct sockaddr_in serv_addr = { 0 };
struct sockaddr_in6 serv_addr6 = { 0 };
if(addr.family == AF_INET6) {
serv_addr6.sin6_family = AF_INET6;
inet_pton(AF_INET6, host, &serv_addr6.sin6_addr);
serv_addr6.sin6_port = htons(port);
} else {
serv_addr.sin_family = AF_INET;
inet_pton(AF_INET, host, &serv_addr.sin_addr);
serv_addr.sin_port = htons(port);
}
/* Connect the socket to the provided address */
struct sockaddr *address = (addr.family == AF_INET ? (struct sockaddr *)&serv_addr : (struct sockaddr *)&serv_addr6);
size_t addrlen = (addr.family == AF_INET ? sizeof(serv_addr) : sizeof(serv_addr6));
if(connect(bwe->fd, (struct sockaddr *)address, addrlen) < 0) {
JANUS_LOG(LOG_ERR, "Error connecting socket for BWE stats: %s\n", g_strerror(errno));
close(bwe->fd);
bwe->fd = -1;
}
/* Done */
return TRUE;
}

void janus_bwe_close_live(janus_bwe_context *bwe) {
if(bwe == NULL || bwe->fd == -1)
return;
close(bwe->fd);
bwe->fd = -1;
}

janus_bwe_stream_bitrate *janus_bwe_stream_bitrate_create(void) {
janus_bwe_stream_bitrate *bwe_sb = g_malloc0(sizeof(janus_bwe_stream_bitrate));
janus_mutex_init(&bwe_sb->mutex);
Expand Down
22 changes: 18 additions & 4 deletions src/bwe.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

#include "mutex.h"

#define BWE_DEBUGGING

/*! \brief Tracker for a stream bitrate (whether it's simulcast/SVC or not) */
typedef struct janus_bwe_stream_bitrate {
/*! \brief Time based queue of packet sizes */
Expand Down Expand Up @@ -181,12 +179,10 @@ typedef struct janus_bwe_context {
gboolean notify_plugin;
/*! \brief When we last notified the plugin */
int64_t last_notified;
#ifdef BWE_DEBUGGING
/*! \brief CSV where we save the debugging information */
FILE *csv;
/*! \brief UDP socket where to send the debugging information */
int fd;
#endif
} janus_bwe_context;
/*! \brief Helper to create a new bandwidth estimation context
* @returns a new janus_bwe_context instance, if successful, or NULL otherwise */
Expand Down Expand Up @@ -216,4 +212,22 @@ void janus_bwe_context_handle_feedback(janus_bwe_context *bwe,
* @param[in] bwe The janus_bwe_context instance to update */
void janus_bwe_context_update(janus_bwe_context *bwe);

/*! \brief Helper method to start saving the stats related to the BWE processing to a CSV file
* @param[in] bwe The janus_bwe_context instance to save
* @param[in] path Path where to save the file to
* @returns TRUE, if successful, or FALSE otherwise */
gboolean janus_bwe_save_csv(janus_bwe_context *bwe, const char *path);
/*! \brief Helper method to stop saving the stats related to the BWE processing to a CSV file
* @param[in] bwe The janus_bwe_context instance to update */
void janus_bwe_close_csv(janus_bwe_context *bwe);
/*! \brief Helper method to relay stats related to the BWE processing to an external UDP address
* @param[in] bwe The janus_bwe_context instance to save
* @param[in] host The address to send stats to
* @param[in] port The port to send stats to
* @returns TRUE, if successful, or FALSE otherwise */
gboolean janus_bwe_save_live(janus_bwe_context *bwe, const char *host, uint16_t port);
/*! \brief Helper method to stop relaying the stats related to the BWE processing
* @param[in] bwe The janus_bwe_context instance to update */
void janus_bwe_close_live(janus_bwe_context *bwe);

#endif
51 changes: 49 additions & 2 deletions src/ice.c
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ static janus_ice_queued_packet
janus_ice_media_stopped,
janus_ice_enable_bwe,
janus_ice_set_bwe_target,
janus_ice_debug_bwe,
janus_ice_disable_bwe,
janus_ice_hangup_peerconnection,
janus_ice_detach_handle,
Expand Down Expand Up @@ -654,7 +655,8 @@ static void janus_ice_free_queued_packet(janus_ice_queued_packet *pkt) {
if(pkt == NULL || pkt == &janus_ice_start_gathering ||
pkt == &janus_ice_add_candidates ||
pkt == &janus_ice_dtls_handshake ||
pkt == &janus_ice_enable_bwe || pkt == &janus_ice_set_bwe_target || pkt == &janus_ice_disable_bwe ||
pkt == &janus_ice_enable_bwe || pkt == &janus_ice_set_bwe_target ||
pkt == &janus_ice_debug_bwe || pkt == &janus_ice_disable_bwe ||
pkt == &janus_ice_media_stopped ||
pkt == &janus_ice_hangup_peerconnection ||
pkt == &janus_ice_detach_handle ||
Expand Down Expand Up @@ -1617,6 +1619,8 @@ static void janus_ice_handle_free(const janus_refcount *handle_ref) {
}
g_free(handle->opaque_id);
g_free(handle->token);
g_free(handle->bwe_csv);
g_free(handle->bwe_host);
g_free(handle);
}

Expand Down Expand Up @@ -3904,6 +3908,18 @@ void janus_ice_handle_set_bwe_target(janus_ice_handle *handle, uint32_t bitrate)
}
}

void janus_ice_handle_debug_bwe(janus_ice_handle *handle) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Tweaking debugging of bandwidth estimation\n", handle->handle_id);
if(handle->queued_packets != NULL) {
#if GLIB_CHECK_VERSION(2, 46, 0)
g_async_queue_push_front(handle->queued_packets, &janus_ice_debug_bwe);
#else
g_async_queue_push(handle->queued_packets, &janus_ice_debug_bwe);
#endif
g_main_context_wakeup(handle->mainctx);
}
}

void janus_ice_handle_disable_bwe(janus_ice_handle *handle) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Disabling bandwidth estimation\n", handle->handle_id);
if(handle->queued_packets != NULL) {
Expand Down Expand Up @@ -4714,6 +4730,15 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu
pc->bwe = janus_bwe_context_create();
janus_mutex_lock(&handle->mutex);
handle->bwe_target = 0;
/* Check if we need debugging */
if(handle->bwe_csv) {
JANUS_LOG(LOG_INFO, "[%"SCNu64"] Enabling CSV debugging of bandwidth estimation\n", handle->handle_id);
janus_bwe_save_csv(pc->bwe, handle->bwe_csv);
}
if(handle->bwe_host && handle->bwe_port > 0) {
JANUS_LOG(LOG_INFO, "[%"SCNu64"] Enabling live debugging of bandwidth estimation\n", handle->handle_id);
janus_bwe_save_live(pc->bwe, handle->bwe_host, handle->bwe_port);
}
janus_mutex_unlock(&handle->mutex);
/* Let's create a source for BWE, for plugin notifications and probing */
handle->bwe_source = g_timeout_source_new(50); /* FIXME */
Expand Down Expand Up @@ -4743,8 +4768,30 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu
//~ pc->bwe->probing_buildup_timer = janus_get_monotonic_time();
pc->bwe->probing_deferred = janus_get_monotonic_time() + G_USEC_PER_SEC;
return G_SOURCE_CONTINUE;
} else if(pkt == &janus_ice_debug_bwe) {
/* Enable or disable debugging for the bandwidth estimator */
if(pc == NULL || pc->bwe == NULL)
return G_SOURCE_CONTINUE;
JANUS_LOG(LOG_INFO, "[%"SCNu64"] Tweaking debugging for bandwidth estimation\n", handle->handle_id);
janus_mutex_lock(&handle->mutex);
if(handle->bwe_csv) {
JANUS_LOG(LOG_INFO, "[%"SCNu64"] Enabling CSV debugging of bandwidth estimation\n", handle->handle_id);
janus_bwe_save_csv(pc->bwe, handle->bwe_csv);
} else {
JANUS_LOG(LOG_INFO, "[%"SCNu64"] Disabling CSV debugging of bandwidth estimation\n", handle->handle_id);
janus_bwe_close_csv(pc->bwe);
}
if(handle->bwe_host && handle->bwe_port > 0) {
JANUS_LOG(LOG_INFO, "[%"SCNu64"] Enabling live debugging of bandwidth estimation\n", handle->handle_id);
janus_bwe_save_live(pc->bwe, handle->bwe_host, handle->bwe_port);
} else {
JANUS_LOG(LOG_INFO, "[%"SCNu64"] Disabling live debugging of bandwidth estimation\n", handle->handle_id);
janus_bwe_close_live(pc->bwe);
}
janus_mutex_unlock(&handle->mutex);
return G_SOURCE_CONTINUE;
} else if(pkt == &janus_ice_disable_bwe) {
/* We need to get rif of the bandwidth estimator */
/* We need to get rid of the bandwidth estimator */
if(pc == NULL || pc->bwe == NULL)
return G_SOURCE_CONTINUE;
JANUS_LOG(LOG_INFO, "[%"SCNu64"] Disabling bandwidth estimation\n", handle->handle_id);
Expand Down
9 changes: 9 additions & 0 deletions src/ice.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,12 @@ struct janus_ice_handle {
gint last_event_stats;
/*! \brief Bandwidth estimation target, if any, as asked by the plugin */
uint32_t bwe_target;
/*! \brief In case offline BWE debugging is enabled, the CSV file to save to */
char *bwe_csv;
/*! \brief In case live BWE debugging is enabled, the host to send stats to */
char *bwe_host;
/*! \brief In case live BWE debugging is enabled, the port to send stats to */
uint16_t bwe_port;
/*! \brief Flag to decide whether or not packets need to be dumped to a text2pcap file */
volatile gint dump_packets;
/*! \brief In case this session must be saved to text2pcap, the instance to dump packets to */
Expand Down Expand Up @@ -796,6 +802,9 @@ void janus_ice_handle_enable_bwe(janus_ice_handle *handle);
* @param[in] handle The Janus ICE handle this method refers to
* @param[in] bitrate The bitrate to target (will be used to generate probing) */
void janus_ice_handle_set_bwe_target(janus_ice_handle *handle, uint32_t bitrate);
/*! \brief Method to dynamically tweak the bandwidth estimation debugging for a handle
* @param[in] handle The Janus ICE handle this method refers to */
void janus_ice_handle_debug_bwe(janus_ice_handle *handle);
/*! \brief Method to dynamically disable bandwidth estimation for a handle
* @param[in] handle The Janus ICE handle this method refers to */
void janus_ice_handle_disable_bwe(janus_ice_handle *handle);
Expand Down
Loading

0 comments on commit 63a65a7

Please sign in to comment.