From 63a65a7390ba55d59a256414c790c91b94b3163e Mon Sep 17 00:00:00 2001 From: Lorenzo Miniero Date: Mon, 9 Oct 2023 12:06:59 +0200 Subject: [PATCH] Added Admin API methods to tweak BWE debugging --- src/bwe.c | 162 ++++++++++++++++++++++++++++++++++++---------------- src/bwe.h | 22 +++++-- src/ice.c | 51 ++++++++++++++++- src/ice.h | 9 +++ src/janus.c | 89 ++++++++++++++++++++++++++++- 5 files changed, 277 insertions(+), 56 deletions(-) diff --git a/src/bwe.c b/src/bwe.c index 35156924a4..4c95c8fbd8 100644 --- a/src/bwe.c +++ b/src/bwe.c @@ -11,16 +11,15 @@ #include #include +#include +#include +#include #include "bwe.h" #include "debug.h" #include "utils.h" +#include "ip-utils.h" -#ifdef BWE_DEBUGGING -#include -#include -#include -#endif const char *janus_bwe_twcc_status_description(janus_bwe_twcc_status status) { switch(status) { @@ -66,36 +65,7 @@ janus_bwe_context *janus_bwe_context_create(void) { bwe->acked = janus_bwe_stream_bitrate_create(); bwe->delays = janus_bwe_delay_tracker_create(0); bwe->probing_mindex = -1; -#ifdef BWE_DEBUGGING - char filename[256]; - g_snprintf(filename, sizeof(filename), "/tmp/bwe-janus-%"SCNi64, janus_get_real_time()); - bwe->csv = fopen(filename, "wt"); - char line[2048]; - g_snprintf(line, sizeof(line), "time,status,estimate,probing_target,bitrate_out,rtx_out,probing_out,bitrate_in,rtx_in,probing_in,acked,lost,loss_ratio,avg_delay,avg_delay_weighted,avg_delay_fb\n"); - fwrite(line, sizeof(char), strlen(line), bwe->csv); - bwe->fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - if(bwe->fd == -1) { - JANUS_LOG(LOG_ERR, "Error creating socket: %s\n", g_strerror(errno)); - } else { - struct sockaddr_in address = { 0 }; - address.sin_family = AF_INET; - address.sin_port = htons(0); - address.sin_addr.s_addr = INADDR_ANY; - if(bind(bwe->fd, (struct sockaddr *)&address, sizeof(address)) < 0) { - JANUS_LOG(LOG_ERR, "Error binding socket: %s\n", g_strerror(errno)); - close(bwe->fd); - bwe->fd = -1; - } else { - address.sin_port = htons(8878); - address.sin_addr.s_addr = inet_addr("127.0.0.1"); - if(connect(bwe->fd, (struct sockaddr *)&address, sizeof(address)) < 0) { - JANUS_LOG(LOG_ERR, "Error connecting socket: %s\n", g_strerror(errno)); - close(bwe->fd); - bwe->fd = -1; - } - } - } -#endif + bwe->fd = -1; return bwe; } @@ -106,11 +76,10 @@ void janus_bwe_context_destroy(janus_bwe_context *bwe) { janus_bwe_stream_bitrate_destroy(bwe->sent); janus_bwe_stream_bitrate_destroy(bwe->acked); janus_bwe_delay_tracker_destroy(bwe->delays); -#ifdef BWE_DEBUGGING - fclose(bwe->csv); + if(bwe->csv != NULL) + fclose(bwe->csv); if(bwe->fd > -1) close(bwe->fd); -#endif g_free(bwe); } } @@ -280,17 +249,20 @@ void janus_bwe_context_update(janus_bwe_context *bwe) { now, janus_bwe_status_description(bwe->status), bitrate_out / 1024, probing_out / 1024, bitrate_in / 1024, probing_in / 1024, bwe->loss_ratio, bwe->avg_delay, bwe->estimate); -#ifdef BWE_DEBUGGING - /* Save the details to CSV */ - char line[2048]; - g_snprintf(line, sizeof(line), "%"SCNi64",%d,%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu16",%"SCNu16",%.2f,%.2f,%.2f,%.2f\n", - now - bwe->started, bwe->status, - bwe->estimate, bwe->probing_target, bitrate_out, rtx_out, probing_out, bitrate_in, rtx_in, probing_in, - bwe->received_pkts, bwe->lost_pkts, bwe->loss_ratio, avg_delay, avg_delay_weighted, avg_delay_latest); - fwrite(line, sizeof(char), strlen(line), bwe->csv); - if(bwe->fd > -1) - send(bwe->fd, line, strlen(line), 0); -#endif + + /* Save the details to CSV and/or send them externally via UDP, if enabled */ + if(bwe->csv != NULL || bwe->fd > -1) { + char line[2048]; + g_snprintf(line, sizeof(line), "%"SCNi64",%d,%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu16",%"SCNu16",%.2f,%.2f,%.2f,%.2f\n", + now - bwe->started, bwe->status, + bwe->estimate, bwe->probing_target, bitrate_out, rtx_out, probing_out, bitrate_in, rtx_in, probing_in, + bwe->received_pkts, bwe->lost_pkts, bwe->loss_ratio, avg_delay, avg_delay_weighted, avg_delay_latest); + if(bwe->csv != NULL) + fwrite(line, sizeof(char), strlen(line), bwe->csv); + if(bwe->fd > -1) + send(bwe->fd, line, strlen(line), 0); + } + /* Reset values */ bwe->delay = 0; bwe->received_pkts = 0; @@ -302,6 +274,98 @@ void janus_bwe_context_update(janus_bwe_context *bwe) { } } +gboolean janus_bwe_save_csv(janus_bwe_context *bwe, const char *path) { + if(bwe == NULL || path == NULL || bwe->csv != NULL) + return FALSE; + /* Open the CSV file */ + bwe->csv = fopen(path, "wt"); + if(bwe->csv == NULL) { + JANUS_LOG(LOG_ERR, "Couldn't open CSV file for BWE stats: %s\n", g_strerror(errno)); + return FALSE; + } + /* Write a header line with the names of the fields we'll save */ + char line[2048]; + g_snprintf(line, sizeof(line), "time,status,estimate,probing_target,bitrate_out,rtx_out,probing_out,bitrate_in,rtx_in,probing_in,acked,lost,loss_ratio,avg_delay,avg_delay_weighted,avg_delay_fb\n"); + fwrite(line, sizeof(char), strlen(line), bwe->csv); + fflush(bwe->csv); + /* Done */ + return TRUE; +} + +void janus_bwe_close_csv(janus_bwe_context *bwe) { + if(bwe == NULL || bwe->csv == NULL) + return; + fclose(bwe->csv); + bwe->csv = NULL; +} + +gboolean janus_bwe_save_live(janus_bwe_context *bwe, const char *host, uint16_t port) { + if(bwe == NULL || host == NULL || port == 0 || bwe->fd > -1) + return FALSE; + /* Check if we need to resolve this host address */ + struct addrinfo *res = NULL, *start = NULL; + janus_network_address addr; + janus_network_address_string_buffer addr_buf; + const char *resolved_host = NULL; + struct addrinfo hints; + memset(&hints, 0, sizeof(hints)); + if(getaddrinfo(host, NULL, NULL, &res) == 0) { + start = res; + while(res != NULL) { + if(janus_network_address_from_sockaddr(res->ai_addr, &addr) == 0 && + janus_network_address_to_string_buffer(&addr, &addr_buf) == 0) { + /* Resolved */ + resolved_host = janus_network_address_string_from_buffer(&addr_buf); + freeaddrinfo(start); + start = NULL; + break; + } + res = res->ai_next; + } + } + if(resolved_host == NULL) { + if(start) + freeaddrinfo(start); + JANUS_LOG(LOG_ERR, "Could not resolve address (%s) for BWE stats...\n", host); + return FALSE; + } + host = resolved_host; + /* Create the socket */ + bwe->fd = socket(addr.family, SOCK_DGRAM, IPPROTO_UDP); + if(bwe->fd == -1) { + JANUS_LOG(LOG_ERR, "Error creating socket for BWE stats: %s\n", g_strerror(errno)); + return FALSE; + } + struct sockaddr_in serv_addr = { 0 }; + struct sockaddr_in6 serv_addr6 = { 0 }; + if(addr.family == AF_INET6) { + serv_addr6.sin6_family = AF_INET6; + inet_pton(AF_INET6, host, &serv_addr6.sin6_addr); + serv_addr6.sin6_port = htons(port); + } else { + serv_addr.sin_family = AF_INET; + inet_pton(AF_INET, host, &serv_addr.sin_addr); + serv_addr.sin_port = htons(port); + } + /* Connect the socket to the provided address */ + struct sockaddr *address = (addr.family == AF_INET ? (struct sockaddr *)&serv_addr : (struct sockaddr *)&serv_addr6); + size_t addrlen = (addr.family == AF_INET ? sizeof(serv_addr) : sizeof(serv_addr6)); + if(connect(bwe->fd, (struct sockaddr *)address, addrlen) < 0) { + JANUS_LOG(LOG_ERR, "Error connecting socket for BWE stats: %s\n", g_strerror(errno)); + close(bwe->fd); + bwe->fd = -1; + } + /* Done */ + return TRUE; +} + +void janus_bwe_close_live(janus_bwe_context *bwe) { + if(bwe == NULL || bwe->fd == -1) + return; + close(bwe->fd); + bwe->fd = -1; +} + janus_bwe_stream_bitrate *janus_bwe_stream_bitrate_create(void) { janus_bwe_stream_bitrate *bwe_sb = g_malloc0(sizeof(janus_bwe_stream_bitrate)); janus_mutex_init(&bwe_sb->mutex); diff --git a/src/bwe.h b/src/bwe.h index bf02921ab5..e77fa7f74e 100644 --- a/src/bwe.h +++ b/src/bwe.h @@ -16,8 +16,6 @@ #include "mutex.h" -#define BWE_DEBUGGING - /*! \brief Tracker for a stream bitrate (whether it's simulcast/SVC or not) */ typedef struct janus_bwe_stream_bitrate { /*! \brief Time based queue of packet sizes */ @@ -181,12 +179,10 @@ typedef struct janus_bwe_context { gboolean notify_plugin; /*! \brief When we last notified the plugin */ int64_t last_notified; -#ifdef BWE_DEBUGGING /*! \brief CSV where we save the debugging information */ FILE *csv; /*! \brief UDP socket where to send the debugging information */ int fd; -#endif } janus_bwe_context; /*! \brief Helper to create a new bandwidth estimation context * @returns a new janus_bwe_context instance, if successful, or NULL otherwise */ @@ -216,4 +212,22 @@ void janus_bwe_context_handle_feedback(janus_bwe_context *bwe, * @param[in] bwe The janus_bwe_context instance to update */ void janus_bwe_context_update(janus_bwe_context *bwe); +/*! \brief Helper method to start saving the stats related to the BWE processing to a CSV file + * @param[in] bwe The janus_bwe_context instance to save + * @param[in] path Path where to save the file to + * @returns TRUE, if successful, or FALSE otherwise */ +gboolean janus_bwe_save_csv(janus_bwe_context *bwe, const char *path); +/*! \brief Helper method to stop saving the stats related to the BWE processing to a CSV file + * @param[in] bwe The janus_bwe_context instance to update */ +void janus_bwe_close_csv(janus_bwe_context *bwe); +/*! \brief Helper method to relay stats related to the BWE processing to an external UDP address + * @param[in] bwe The janus_bwe_context instance to save + * @param[in] host The address to send stats to + * @param[in] port The port to send stats to + * @returns TRUE, if successful, or FALSE otherwise */ +gboolean janus_bwe_save_live(janus_bwe_context *bwe, const char *host, uint16_t port); +/*! \brief Helper method to stop relaying the stats related to the BWE processing + * @param[in] bwe The janus_bwe_context instance to update */ +void janus_bwe_close_live(janus_bwe_context *bwe); + #endif diff --git a/src/ice.c b/src/ice.c index 96f298abf5..a506556313 100644 --- a/src/ice.c +++ b/src/ice.c @@ -475,6 +475,7 @@ static janus_ice_queued_packet janus_ice_media_stopped, janus_ice_enable_bwe, janus_ice_set_bwe_target, + janus_ice_debug_bwe, janus_ice_disable_bwe, janus_ice_hangup_peerconnection, janus_ice_detach_handle, @@ -654,7 +655,8 @@ static void janus_ice_free_queued_packet(janus_ice_queued_packet *pkt) { if(pkt == NULL || pkt == &janus_ice_start_gathering || pkt == &janus_ice_add_candidates || pkt == &janus_ice_dtls_handshake || - pkt == &janus_ice_enable_bwe || pkt == &janus_ice_set_bwe_target || pkt == &janus_ice_disable_bwe || + pkt == &janus_ice_enable_bwe || pkt == &janus_ice_set_bwe_target || + pkt == &janus_ice_debug_bwe || pkt == &janus_ice_disable_bwe || pkt == &janus_ice_media_stopped || pkt == &janus_ice_hangup_peerconnection || pkt == &janus_ice_detach_handle || @@ -1617,6 +1619,8 @@ static void janus_ice_handle_free(const janus_refcount *handle_ref) { } g_free(handle->opaque_id); g_free(handle->token); + g_free(handle->bwe_csv); + g_free(handle->bwe_host); g_free(handle); } @@ -3904,6 +3908,18 @@ void janus_ice_handle_set_bwe_target(janus_ice_handle *handle, uint32_t bitrate) } } +void janus_ice_handle_debug_bwe(janus_ice_handle *handle) { + JANUS_LOG(LOG_WARN, "[%"SCNu64"] Tweaking debugging of bandwidth estimation\n", handle->handle_id); + if(handle->queued_packets != NULL) { +#if GLIB_CHECK_VERSION(2, 46, 0) + g_async_queue_push_front(handle->queued_packets, &janus_ice_debug_bwe); +#else + g_async_queue_push(handle->queued_packets, &janus_ice_debug_bwe); +#endif + g_main_context_wakeup(handle->mainctx); + } +} + void janus_ice_handle_disable_bwe(janus_ice_handle *handle) { JANUS_LOG(LOG_VERB, "[%"SCNu64"] Disabling bandwidth estimation\n", handle->handle_id); if(handle->queued_packets != NULL) { @@ -4714,6 +4730,15 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu pc->bwe = janus_bwe_context_create(); janus_mutex_lock(&handle->mutex); handle->bwe_target = 0; + /* Check if we need debugging */ + if(handle->bwe_csv) { + JANUS_LOG(LOG_INFO, "[%"SCNu64"] Enabling CSV debugging of bandwidth estimation\n", handle->handle_id); + janus_bwe_save_csv(pc->bwe, handle->bwe_csv); + } + if(handle->bwe_host && handle->bwe_port > 0) { + JANUS_LOG(LOG_INFO, "[%"SCNu64"] Enabling live debugging of bandwidth estimation\n", handle->handle_id); + janus_bwe_save_live(pc->bwe, handle->bwe_host, handle->bwe_port); + } janus_mutex_unlock(&handle->mutex); /* Let's create a source for BWE, for plugin notifications and probing */ handle->bwe_source = g_timeout_source_new(50); /* FIXME */ @@ -4743,8 +4768,30 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu //~ pc->bwe->probing_buildup_timer = janus_get_monotonic_time(); pc->bwe->probing_deferred = janus_get_monotonic_time() + G_USEC_PER_SEC; return G_SOURCE_CONTINUE; + } else if(pkt == &janus_ice_debug_bwe) { + /* Enable or disable debugging for the bandwidth estimator */ + if(pc == NULL || pc->bwe == NULL) + return G_SOURCE_CONTINUE; + JANUS_LOG(LOG_INFO, "[%"SCNu64"] Tweaking debugging for bandwidth estimation\n", handle->handle_id); + janus_mutex_lock(&handle->mutex); + if(handle->bwe_csv) { + JANUS_LOG(LOG_INFO, "[%"SCNu64"] Enabling CSV debugging of bandwidth estimation\n", handle->handle_id); + janus_bwe_save_csv(pc->bwe, handle->bwe_csv); + } else { + JANUS_LOG(LOG_INFO, "[%"SCNu64"] Disabling CSV debugging of bandwidth estimation\n", handle->handle_id); + janus_bwe_close_csv(pc->bwe); + } + if(handle->bwe_host && handle->bwe_port > 0) { + JANUS_LOG(LOG_INFO, "[%"SCNu64"] Enabling live debugging of bandwidth estimation\n", handle->handle_id); + janus_bwe_save_live(pc->bwe, handle->bwe_host, handle->bwe_port); + } else { + JANUS_LOG(LOG_INFO, "[%"SCNu64"] Disabling live debugging of bandwidth estimation\n", handle->handle_id); + janus_bwe_close_live(pc->bwe); + } + janus_mutex_unlock(&handle->mutex); + return G_SOURCE_CONTINUE; } else if(pkt == &janus_ice_disable_bwe) { - /* We need to get rif of the bandwidth estimator */ + /* We need to get rid of the bandwidth estimator */ if(pc == NULL || pc->bwe == NULL) return G_SOURCE_CONTINUE; JANUS_LOG(LOG_INFO, "[%"SCNu64"] Disabling bandwidth estimation\n", handle->handle_id); diff --git a/src/ice.h b/src/ice.h index 41414c59f7..a696de5721 100644 --- a/src/ice.h +++ b/src/ice.h @@ -411,6 +411,12 @@ struct janus_ice_handle { gint last_event_stats; /*! \brief Bandwidth estimation target, if any, as asked by the plugin */ uint32_t bwe_target; + /*! \brief In case offline BWE debugging is enabled, the CSV file to save to */ + char *bwe_csv; + /*! \brief In case live BWE debugging is enabled, the host to send stats to */ + char *bwe_host; + /*! \brief In case live BWE debugging is enabled, the port to send stats to */ + uint16_t bwe_port; /*! \brief Flag to decide whether or not packets need to be dumped to a text2pcap file */ volatile gint dump_packets; /*! \brief In case this session must be saved to text2pcap, the instance to dump packets to */ @@ -796,6 +802,9 @@ void janus_ice_handle_enable_bwe(janus_ice_handle *handle); * @param[in] handle The Janus ICE handle this method refers to * @param[in] bitrate The bitrate to target (will be used to generate probing) */ void janus_ice_handle_set_bwe_target(janus_ice_handle *handle, uint32_t bitrate); +/*! \brief Method to dynamically tweak the bandwidth estimation debugging for a handle + * @param[in] handle The Janus ICE handle this method refers to */ +void janus_ice_handle_debug_bwe(janus_ice_handle *handle); /*! \brief Method to dynamically disable bandwidth estimation for a handle * @param[in] handle The Janus ICE handle this method refers to */ void janus_ice_handle_disable_bwe(janus_ice_handle *handle); diff --git a/src/janus.c b/src/janus.c index ca4351ac53..a5f02ca8d9 100644 --- a/src/janus.c +++ b/src/janus.c @@ -189,6 +189,13 @@ static struct janus_json_parameter text2pcap_parameters[] = { {"filename", JSON_STRING, 0}, {"truncate", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE} }; +static struct janus_json_parameter debug_bwe_parameters[] = { + {"csv", JANUS_JSON_BOOL, 0}, + {"path", JSON_STRING, 0}, + {"live", JANUS_JSON_BOOL, 0}, + {"host", JSON_STRING, 0}, + {"port", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE} +}; static struct janus_json_parameter handleinfo_parameters[] = { {"plugin_only", JANUS_JSON_BOOL, 0} }; @@ -3002,8 +3009,88 @@ int janus_process_incoming_admin_request(janus_request *request) { /* Send the success reply */ ret = janus_process_success(request, reply); goto jsondone; + } else if(!strcasecmp(message_text, "debug_bwe")) { + /* Enable or disable BWE debugging (to CSV and/or external UDP address) */ + JANUS_VALIDATE_JSON_OBJECT(root, debug_bwe_parameters, + error_code, error_cause, FALSE, + JANUS_ERROR_MISSING_MANDATORY_ELEMENT, JANUS_ERROR_INVALID_ELEMENT_TYPE); + json_t *csv = json_object_get(root, "csv"); + json_t *live = json_object_get(root, "live"); + gboolean changes = FALSE; + janus_mutex_lock(&handle->mutex); + if(json_is_true(csv)) { + /* Enable CSV offline debugging */ + if(handle->bwe_csv) { + janus_mutex_unlock(&handle->mutex); + ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_UNKNOWN, + "CSV debugging already configured"); + goto jsondone; + } + json_t *path = json_object_get(root, "path"); + const char *path_value = json_string_value(path); + if(path_value == NULL) { + janus_mutex_unlock(&handle->mutex); + ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, + "Missing or invalid element (path)"); + goto jsondone; + } + changes = TRUE; + handle->bwe_csv = g_strdup(path_value); + } else { + /* Disable CSV offline debugging */ + if(handle->bwe_csv) + changes = TRUE; + g_free(handle->bwe_csv); + handle->bwe_csv = NULL; + } + if(json_is_true(live)) { + /* Enable live debugging */ + if(handle->bwe_host) { + janus_mutex_unlock(&handle->mutex); + ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_UNKNOWN, + "Live debugging already configured"); + goto jsondone; + } + json_t *host = json_object_get(root, "host"); + json_t *port = json_object_get(root, "port"); + const char *host_value = json_string_value(host); + uint16_t port_value = json_integer_value(port); + if(host_value == NULL) { + janus_mutex_unlock(&handle->mutex); + ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, + "Missing or invalid element (host)"); + goto jsondone; + } + if(port_value == 0) { + janus_mutex_unlock(&handle->mutex); + ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, + "Missing or invalid element (port)"); + goto jsondone; + } + changes = TRUE; + handle->bwe_host = g_strdup(host_value); + handle->bwe_port = port_value; + } else { + /* Disable live debugging */ + if(handle->bwe_host) + changes = TRUE; + g_free(handle->bwe_host); + handle->bwe_host = NULL; + handle->bwe_port = 0; + } + janus_mutex_unlock(&handle->mutex); + /* Apply the changes, if needed */ + if(changes) + janus_ice_handle_debug_bwe(handle); + /* Prepare JSON reply */ + json_t *reply = json_object(); + json_object_set_new(reply, "janus", json_string("success")); + json_object_set_new(reply, "transaction", json_string(transaction_text)); + /* Send the success reply */ + ret = janus_process_success(request, reply); + goto jsondone; } - /* If this is not a request to start/stop debugging to text2pcap, it must be a handle_info */ + /* If we git here, it must be a handle_info */ if(strcasecmp(message_text, "handle_info")) { ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_INVALID_REQUEST_PATH, "Unhandled request '%s' at this path", message_text); goto jsondone;