Skip to content

Commit

Permalink
[1.x] Optimize NACKs handling (see #3471)
Browse files Browse the repository at this point in the history
  • Loading branch information
lminiero committed Dec 12, 2024
1 parent 4419baf commit 417d3f8
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 38 deletions.
5 changes: 3 additions & 2 deletions fuzzers/rtcp_fuzzer.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
janus_rtcp_remove_nacks((char *)copy_data[idx++], size);
/* Functions that allocate new memory */
char *output_data = janus_rtcp_filter((char *)data, size, &newlen);
GSList *list = janus_rtcp_get_nacks((char *)data, size);
GQueue *queue = g_queue_new();
janus_rtcp_get_nacks((char *)data, size, queue);

/* Free resources */
g_free(output_data);
if (list) g_slist_free(list);
g_queue_free(queue);
return 0;
}
54 changes: 34 additions & 20 deletions src/ice.c
Original file line number Diff line number Diff line change
Expand Up @@ -679,8 +679,9 @@ static void janus_ice_free_queued_packet(janus_ice_queued_packet *pkt) {
/* Minimum and maximum value, in milliseconds, for the NACK queue/retransmissions (default=200ms/1000ms) */
#define DEFAULT_MIN_NACK_QUEUE 200
#define DEFAULT_MAX_NACK_QUEUE 1000
/* Maximum ignore count after retransmission (200ms) */
#define MAX_NACK_IGNORE 200000
/* Min/Max time to rate limit retransmissions of the same packet */
#define MAX_NACK_IGNORE DEFAULT_MAX_NACK_QUEUE*1000
#define MIN_NACK_IGNORE 40000

static gboolean nack_optimizations = FALSE;
void janus_set_nack_optimizations_enabled(gboolean optimize) {
Expand Down Expand Up @@ -1861,6 +1862,8 @@ static void janus_ice_peerconnection_free(const janus_refcount *pc_ref) {
if(pc->rtx_payload_types_rev != NULL)
g_hash_table_destroy(pc->rtx_payload_types_rev);
pc->rtx_payload_types_rev = NULL;
if(pc->nacks_queue != NULL)
g_queue_free(pc->nacks_queue);
g_free(pc);
pc = NULL;
}
Expand Down Expand Up @@ -1963,13 +1966,15 @@ static void janus_ice_peerconnection_medium_free(const janus_refcount *medium_re
if(medium->rtx_nacked[2])
g_hash_table_destroy(medium->rtx_nacked[2]);
medium->rtx_nacked[2] = NULL;
if(medium->pending_nacked_cleanup && g_hash_table_size(medium->pending_nacked_cleanup) > 0) {
GHashTableIter iter;
gpointer val;
g_hash_table_iter_init(&iter, medium->pending_nacked_cleanup);
while(g_hash_table_iter_next(&iter, NULL, &val)) {
GSource *source = val;
g_source_destroy(source);
if(medium->pending_nacked_cleanup != NULL) {
if(g_hash_table_size(medium->pending_nacked_cleanup) > 0) {
GHashTableIter iter;
gpointer val;
g_hash_table_iter_init(&iter, medium->pending_nacked_cleanup);
while(g_hash_table_iter_next(&iter, NULL, &val)) {
GSource *source = val;
g_source_destroy(source);
}
}
g_hash_table_destroy(medium->pending_nacked_cleanup);
}
Expand Down Expand Up @@ -3169,17 +3174,20 @@ static void janus_ice_cb_nice_recv(NiceAgent *agent, guint stream_id, guint comp

/* Now let's see if there are any NACKs to handle */
gint64 now = janus_get_monotonic_time();
GSList *nacks = janus_rtcp_get_nacks(buf, buflen);
guint nacks_count = g_slist_length(nacks);
if(pc->nacks_queue == NULL)
pc->nacks_queue = g_queue_new();
GQueue *nacks = pc->nacks_queue;
janus_rtcp_get_nacks(buf, buflen, nacks);
guint nacks_count = g_queue_get_length(nacks);
if(nacks_count && medium->do_nacks) {
/* Handle NACK */
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Just got some NACKS (%d) we should handle...\n", handle->handle_id, nacks_count);
GHashTable *retransmit_seqs = medium->retransmit_seqs;
GSList *list = (retransmit_seqs != NULL ? nacks : NULL);
GQueue *queue = (retransmit_seqs != NULL ? nacks : NULL);
int retransmits_cnt = 0;
janus_mutex_lock(&medium->mutex);
while(list) {
unsigned int seqnr = GPOINTER_TO_UINT(list->data);
while(g_queue_get_length(queue) > 0) {
unsigned int seqnr = GPOINTER_TO_UINT(g_queue_pop_tail(queue));
JANUS_LOG(LOG_DBG, "[%"SCNu64"] >> %u\n", handle->handle_id, seqnr);
int in_rb = 0;
/* Check if we have the packet */
Expand All @@ -3188,14 +3196,21 @@ static void janus_ice_cb_nice_recv(NiceAgent *agent, guint stream_id, guint comp
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] >> >> Can't retransmit packet %u, we don't have it...\n", handle->handle_id, seqnr);
} else {
/* Should we retransmit this packet? */
if((p->last_retransmit > 0) && (now-p->last_retransmit < MAX_NACK_IGNORE)) {
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] >> >> Packet %u was retransmitted just %"SCNi64"ms ago, skipping\n", handle->handle_id, seqnr, now-p->last_retransmit);
list = list->next;
if((p->last_retransmit > 0) && (now-p->last_retransmit < p->current_backoff)) {
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] >> >> Packet %u was retransmitted just %"SCNi64"us ago, skipping\n", handle->handle_id, seqnr, now-p->last_retransmit);
g_queue_pop_tail(queue);
continue;
}
in_rb = 1;
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] >> >> Scheduling %u for retransmission due to NACK\n", handle->handle_id, seqnr);
p->last_retransmit = now;
if(p->current_backoff == 0) {
p->current_backoff = MIN_NACK_IGNORE;
} else {
p->current_backoff *= 2;
if(p->current_backoff > MAX_NACK_IGNORE)
p->current_backoff = MAX_NACK_IGNORE;
}
retransmits_cnt++;
/* Enqueue it */
janus_ice_queued_packet *pkt = g_malloc(sizeof(janus_ice_queued_packet));
Expand Down Expand Up @@ -3237,16 +3252,14 @@ static void janus_ice_cb_nice_recv(NiceAgent *agent, guint stream_id, guint comp
if(rtcp_ctx != NULL && in_rb) {
g_atomic_int_inc(&rtcp_ctx->nack_count);
}
list = list->next;
g_queue_pop_tail(queue);
}
medium->retransmit_recent_cnt += retransmits_cnt;
/* FIXME Remove the NACK compound packet, we've handled it */
buflen = janus_rtcp_remove_nacks(buf, buflen);
/* Update stats */
medium->in_stats.info[vindex].nacks += nacks_count;
janus_mutex_unlock(&medium->mutex);
g_slist_free(nacks);
nacks = NULL;
}
if(medium->retransmit_recent_cnt &&
now - medium->retransmit_log_ts > 5*G_USEC_PER_SEC) {
Expand Down Expand Up @@ -4954,6 +4967,7 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu
}
p->created = janus_get_monotonic_time();
p->last_retransmit = 0;
p->current_backoff = 0;
janus_rtp_header *header = (janus_rtp_header *)pkt->data;
guint16 seq = ntohs(header->seq_number);
if(medium->retransmit_buffer == NULL) {
Expand Down
2 changes: 2 additions & 0 deletions src/ice.h
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,8 @@ struct janus_ice_peerconnection {
GHashTable *rtx_payload_types;
/*! \brief Reverse mapping of rtx payload types to actual media-related packet types */
GHashTable *rtx_payload_types_rev;
/*! \brief Helper queue for storing requested packets from NACKs */
GQueue *nacks_queue;
/*! \brief Helper flag to avoid flooding the console with the same error all over again */
gboolean noerrorlog;
/*! \brief Mutex to lock/unlock this stream */
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ janus_plugin *create(void) {
* Janus instance or it will crash.
*
*/
#define JANUS_PLUGIN_API_VERSION 105
#define JANUS_PLUGIN_API_VERSION 106

/*! \brief Initialization of all plugin properties to NULL
*
Expand Down
21 changes: 8 additions & 13 deletions src/rtcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1200,12 +1200,11 @@ gboolean janus_rtcp_has_pli(char *packet, int len) {
return FALSE;
}

GSList *janus_rtcp_get_nacks(char *packet, int len) {
if(packet == NULL || len == 0)
return NULL;
void janus_rtcp_get_nacks(char *packet, int len, GQueue *nacks_queue) {
if(packet == NULL || len == 0 || nacks_queue == NULL)
return;
janus_rtcp_header *rtcp = (janus_rtcp_header *)packet;
/* FIXME Get list of sequence numbers we should send again */
GSList *list = NULL;
g_queue_clear(nacks_queue);
int total = len;
gboolean error = FALSE;
while(rtcp) {
Expand Down Expand Up @@ -1237,13 +1236,13 @@ GSList *janus_rtcp_get_nacks(char *packet, int len) {
for(i=0; i< nacks; i++) {
nack = (janus_rtcp_nack *)rtcpfb->fci + i;
pid = ntohs(nack->pid);
list = g_slist_prepend(list, GUINT_TO_POINTER(pid));
g_queue_push_head(nacks_queue, GUINT_TO_POINTER(pid));
blp = ntohs(nack->blp);
memset(bitmask, 0, 20);
for(j=0; j<16; j++) {
bitmask[j] = (blp & ( 1 << j )) >> j ? '1' : '0';
if((blp & ( 1 << j )) >> j)
list = g_slist_prepend(list, GUINT_TO_POINTER(pid+j+1));
g_queue_push_head(nacks_queue, GUINT_TO_POINTER(pid+j+1));
}
bitmask[16] = '\n';
JANUS_LOG(LOG_DBG, "[%d] %"SCNu16" / %s\n", i, pid, bitmask);
Expand All @@ -1261,12 +1260,8 @@ GSList *janus_rtcp_get_nacks(char *packet, int len) {
break;
rtcp = (janus_rtcp_header *)((uint32_t*)rtcp + length + 1);
}
if (error && list) {
g_slist_free(list);
list = NULL;
}
list = g_slist_reverse(list);
return list;
if(error)
g_queue_clear(nacks_queue);
}

int janus_rtcp_remove_nacks(char *packet, int len) {
Expand Down
4 changes: 2 additions & 2 deletions src/rtcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,8 @@ gboolean janus_rtcp_has_pli(char *packet, int len);
/*! \brief Method to parse an RTCP NACK message
* @param[in] packet The message data
* @param[in] len The message data length in bytes
* @returns A list of janus_nack elements containing the sequence numbers to send again */
GSList *janus_rtcp_get_nacks(char *packet, int len);
* @param[in,out] nacks_queue The queue containing the sequence numbers to send again */
void janus_rtcp_get_nacks(char *packet, int len, GQueue *nacks_queue);

/*! \brief Method to remove an RTCP NACK message
* @param[in] packet The message data
Expand Down
1 change: 1 addition & 0 deletions src/rtp.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ typedef struct janus_rtp_packet {
gint length;
gint64 created;
gint64 last_retransmit;
gint64 current_backoff;
janus_plugin_rtp_extensions extensions;
} janus_rtp_packet;

Expand Down

0 comments on commit 417d3f8

Please sign in to comment.