diff --git a/src/bwe.c b/src/bwe.c index 96be1c944e..f34999f5b7 100644 --- a/src/bwe.c +++ b/src/bwe.c @@ -65,7 +65,7 @@ janus_bwe_context *janus_bwe_context_create(void) { g_snprintf(filename, sizeof(filename), "/tmp/bwe-janus-%"SCNi64, janus_get_real_time()); bwe->csv = fopen(filename, "wt"); char line[2048]; - g_snprintf(line, sizeof(line), "time,status,estimate,bitrate_out,rtx_out,probing_out,bitrate_in,rtx_in,probing_in,acked,lost,loss_ratio,avg_delay,avg_delay_weighted,avg_delay_fb\n"); + g_snprintf(line, sizeof(line), "time,status,estimate,probing_target,bitrate_out,rtx_out,probing_out,bitrate_in,rtx_in,probing_in,acked,lost,loss_ratio,avg_delay,avg_delay_weighted,avg_delay_fb\n"); fwrite(line, sizeof(char), strlen(line), bwe->csv); #endif return bwe; @@ -190,7 +190,9 @@ void janus_bwe_context_update(janus_bwe_context *bwe) { bwe->status = janus_bwe_status_lossy; bwe->status_changed = now; bwe->estimate = estimate; - } else if(avg_delay_weighted > 1.5 && avg_delay_weighted - bwe->avg_delay > 0.1) { + } else if(avg_delay_weighted > 1.2 && avg_delay_weighted - bwe->avg_delay > 0.1) { + JANUS_LOG(LOG_WARN, "[BWE] Congested (delay=%.2f, increase=%.2f)\n", + avg_delay_weighted, avg_delay_weighted - bwe->avg_delay); /* FIXME Delay is increasing */ if(bwe->status != janus_bwe_status_lossy && bwe->status != janus_bwe_status_congested) notify_plugin = TRUE; @@ -215,7 +217,8 @@ void janus_bwe_context_update(janus_bwe_context *bwe) { /* Restore probing but incrementally */ bwe->probing_sent = 0; bwe->probing_portion = 0.0; - bwe->probing_part = 200; /* FIXME */ + bwe->probing_buildup = 0; + bwe->probing_buildup_timer = now; } else { /* FIXME Keep converging to the estimate */ if(estimate > bwe->estimate) @@ -230,22 +233,20 @@ void janus_bwe_context_update(janus_bwe_context *bwe) { bwe->estimate = estimate; //~ else if(now - bwe->status_changed < 10*G_USEC_PER_SEC) //~ bwe->estimate = ((double)bwe->estimate * 1.02); - if(bwe->probing_part > 0) /* FIXME */ - bwe->probing_part--; } } bwe->avg_delay = avg_delay_weighted; bwe->bitrate_ts = now; - JANUS_LOG(LOG_WARN, "[BWE][%"SCNi64"][%s] sent=%"SCNu32"kbps (probing=%"SCNu32"kbps), acked=%"SCNu32"kbps (probing=%"SCNu32"kbps), loss=%.2f%%, avg_delay=%.2fms, estimate=%"SCNu32"\n", + JANUS_LOG(LOG_DBG, "[BWE][%"SCNi64"][%s] sent=%"SCNu32"kbps (probing=%"SCNu32"kbps), acked=%"SCNu32"kbps (probing=%"SCNu32"kbps), loss=%.2f%%, avg_delay=%.2fms, estimate=%"SCNu32"\n", now, janus_bwe_status_description(bwe->status), bitrate_out / 1024, probing_out / 1024, bitrate_in / 1024, probing_in / 1024, bwe->loss_ratio, bwe->avg_delay, bwe->estimate); #ifdef BWE_DEBUGGING /* Save the details to CSV */ char line[2048]; - g_snprintf(line, sizeof(line), "%"SCNi64",%d,%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu16",%"SCNu16",%.2f,%.2f,%.2f,%.2f\n", + g_snprintf(line, sizeof(line), "%"SCNi64",%d,%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu16",%"SCNu16",%.2f,%.2f,%.2f,%.2f\n", now - bwe->started, bwe->status, - bwe->estimate, bitrate_out, rtx_out, probing_out, bitrate_in, rtx_in, probing_in, + bwe->estimate, bwe->probing_target, bitrate_out, rtx_out, probing_out, bitrate_in, rtx_in, probing_in, bwe->received_pkts, bwe->lost_pkts, bwe->loss_ratio, avg_delay, avg_delay_weighted, avg_delay_latest); fwrite(line, sizeof(char), strlen(line), bwe->csv); #endif @@ -254,7 +255,7 @@ void janus_bwe_context_update(janus_bwe_context *bwe) { bwe->received_pkts = 0; bwe->lost_pkts = 0; /* Check if we should notify the plugin about the estimate */ - if(notify_plugin || (now - bwe->last_notified) >= 250000) { + if(bwe->status != janus_bwe_status_start && (notify_plugin || (now - bwe->last_notified) >= 250000)) { bwe->notify_plugin = TRUE; bwe->last_notified = now; } diff --git a/src/bwe.h b/src/bwe.h index 57476352bd..93b32c8444 100644 --- a/src/bwe.h +++ b/src/bwe.h @@ -145,16 +145,16 @@ typedef struct janus_bwe_context { int64_t status_changed; /*! \brief Index of the m-line we're using for probing */ int probing_mindex; - /*! \brief How much we should aim for with out probing (and how much we sent in a second) */ - uint32_t probing_target, probing_sent; + /*! \brief How much we should aim for with out probing (and how much to increase, plus much we sent in a second) */ + uint32_t probing_target, probing_buildup, probing_sent; /*! \brief How many times we went through probing in a second */ uint8_t probing_count; /*! \brief Portion of probing we didn't manage to send the previous round */ double probing_portion; /*! \brief In case probing was deferred, when it shoult restart */ int64_t probing_deferred; - /*! \brief Whether probing should grow incrementally (e.g., after recovery) */ - uint16_t probing_part; + /*! \brief Timer for building up probing */ + int64_t probing_buildup_timer; /*! \brief Monotonic timestamp of the last sent packet */ int64_t last_sent_ts; /*! \brief Last twcc seq number of a received packet */ diff --git a/src/ice.c b/src/ice.c index 9b94c44855..764ac49eda 100644 --- a/src/ice.c +++ b/src/ice.c @@ -474,10 +474,8 @@ static janus_ice_queued_packet janus_ice_dtls_handshake, janus_ice_media_stopped, janus_ice_enable_bwe, + janus_ice_set_bwe_target, janus_ice_disable_bwe, - janus_ice_enable_probing, - janus_ice_defer_probing, - janus_ice_disable_probing, janus_ice_hangup_peerconnection, janus_ice_detach_handle, janus_ice_data_ready; @@ -534,7 +532,6 @@ typedef struct janus_ice_outgoing_traffic { static gboolean janus_ice_outgoing_rtcp_handle(gpointer user_data); static gboolean janus_ice_outgoing_stats_handle(gpointer user_data); static gboolean janus_ice_outgoing_bwe_handle(gpointer user_data); -static gboolean janus_ice_outgoing_probing_handle(gpointer user_data); static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janus_ice_queued_packet *pkt); static gboolean janus_ice_outgoing_traffic_prepare(GSource *source, gint *timeout) { janus_ice_outgoing_traffic *t = (janus_ice_outgoing_traffic *)source; @@ -657,8 +654,7 @@ static void janus_ice_free_queued_packet(janus_ice_queued_packet *pkt) { if(pkt == NULL || pkt == &janus_ice_start_gathering || pkt == &janus_ice_add_candidates || pkt == &janus_ice_dtls_handshake || - pkt == &janus_ice_enable_bwe || pkt == &janus_ice_disable_bwe || - pkt == &janus_ice_enable_probing || pkt == &janus_ice_defer_probing || pkt == &janus_ice_disable_probing || + pkt == &janus_ice_enable_bwe || pkt == &janus_ice_set_bwe_target || pkt == &janus_ice_disable_bwe || pkt == &janus_ice_media_stopped || pkt == &janus_ice_hangup_peerconnection || pkt == &janus_ice_detach_handle || @@ -3892,49 +3888,29 @@ void janus_ice_handle_enable_bwe(janus_ice_handle *handle) { } } -void janus_ice_handle_disable_bwe(janus_ice_handle *handle) { - JANUS_LOG(LOG_VERB, "[%"SCNu64"] Disabling bandwidth estimation\n", handle->handle_id); - if(handle->queued_packets != NULL) { -#if GLIB_CHECK_VERSION(2, 46, 0) - g_async_queue_push_front(handle->queued_packets, &janus_ice_disable_bwe); -#else - g_async_queue_push(handle->queued_packets, &janus_ice_disable_bwe); -#endif - g_main_context_wakeup(handle->mainctx); - } -} - -void janus_ice_handle_enable_probing(janus_ice_handle *handle) { - JANUS_LOG(LOG_VERB, "[%"SCNu64"] Enabling bandwidth probing\n", handle->handle_id); - if(handle->queued_packets != NULL) { -#if GLIB_CHECK_VERSION(2, 46, 0) - g_async_queue_push_front(handle->queued_packets, &janus_ice_enable_probing); -#else - g_async_queue_push(handle->queued_packets, &janus_ice_enable_probing); -#endif - g_main_context_wakeup(handle->mainctx); - } -} - -void janus_ice_handle_defer_probing(janus_ice_handle *handle) { - JANUS_LOG(LOG_VERB, "[%"SCNu64"] Deferring bandwidth probing\n", handle->handle_id); +/* FIXME */ +void janus_ice_handle_set_bwe_target(janus_ice_handle *handle, uint32_t bitrate) { + JANUS_LOG(LOG_VERB, "[%"SCNu64"] Configuring bandwidth estimation target: %"SCNu32"\n", handle->handle_id, bitrate); + janus_mutex_lock(&handle->mutex); + handle->bwe_target = bitrate; + janus_mutex_unlock(&handle->mutex); if(handle->queued_packets != NULL) { #if GLIB_CHECK_VERSION(2, 46, 0) - g_async_queue_push_front(handle->queued_packets, &janus_ice_defer_probing); + g_async_queue_push_front(handle->queued_packets, &janus_ice_set_bwe_target); #else - g_async_queue_push(handle->queued_packets, &janus_ice_defer_probing); + g_async_queue_push(handle->queued_packets, &janus_ice_set_bwe_target); #endif g_main_context_wakeup(handle->mainctx); } } -void janus_ice_handle_disable_probing(janus_ice_handle *handle) { - JANUS_LOG(LOG_VERB, "[%"SCNu64"] Disabling bandwidth probing\n", handle->handle_id); +void janus_ice_handle_disable_bwe(janus_ice_handle *handle) { + JANUS_LOG(LOG_VERB, "[%"SCNu64"] Disabling bandwidth estimation\n", handle->handle_id); if(handle->queued_packets != NULL) { #if GLIB_CHECK_VERSION(2, 46, 0) - g_async_queue_push_front(handle->queued_packets, &janus_ice_disable_probing); + g_async_queue_push_front(handle->queued_packets, &janus_ice_disable_bwe); #else - g_async_queue_push(handle->queued_packets, &janus_ice_disable_probing); + g_async_queue_push(handle->queued_packets, &janus_ice_disable_bwe); #endif g_main_context_wakeup(handle->mainctx); } @@ -4535,24 +4511,20 @@ static gboolean janus_ice_outgoing_bwe_handle(gpointer user_data) { janus_ice_peerconnection *pc = handle->pc; if(pc == NULL || pc->bwe == NULL) return G_SOURCE_CONTINUE; + /* First of all, let's check if we have to notify the plugin */ if(pc->bwe->notify_plugin) { - /* Notify the plugin about the current value */ + /* Notify the plugin about the current estimate */ pc->bwe->notify_plugin = FALSE; janus_plugin *plugin = (janus_plugin *)handle->app; if(plugin && plugin->estimated_bandwidth && janus_plugin_session_is_alive(handle->app_handle) && !g_atomic_int_get(&handle->destroyed)) plugin->estimated_bandwidth(handle->app_handle, pc->bwe->estimate); } - return G_SOURCE_CONTINUE; -} - -static gboolean janus_ice_outgoing_probing_handle(gpointer user_data) { - janus_ice_handle *handle = (janus_ice_handle *)user_data; - janus_ice_peerconnection *pc = handle->pc; - janus_ice_peerconnection_medium *medium = NULL; - if(pc == NULL || pc->bwe == NULL || pc->bwe->probing_target == 0) + /* Now, let's check if there's probing we need to perform */ + uint32_t bitrate_out = (pc->bwe->sent->packets[janus_bwe_packet_type_regular*3] ? pc->bwe->sent->bitrate[janus_bwe_packet_type_regular*3] : 0) + + (pc->bwe->sent->packets[janus_bwe_packet_type_rtx*3] ? pc->bwe->sent->bitrate[janus_bwe_packet_type_rtx*3] : 0); + if(pc->bwe->probing_target == 0 || pc->bwe->probing_target <= bitrate_out) return G_SOURCE_CONTINUE; - /* This callback is for regularly sending probing for the bandwidth estimator */ if(pc->bwe->status != janus_bwe_status_start && pc->bwe->status != janus_bwe_status_regular) { /* The BWE status may be lossy, congested, or recovering: don't probe for now */ return G_SOURCE_CONTINUE; @@ -4566,6 +4538,7 @@ static gboolean janus_ice_outgoing_probing_handle(gpointer user_data) { pc->bwe->probing_deferred = 0; } /* Get the medium instance we'll use for probing */ + janus_ice_peerconnection_medium *medium = NULL; if(pc->bwe->probing_mindex != -1) medium = g_hash_table_lookup(pc->media, GINT_TO_POINTER(pc->bwe->probing_mindex)); if(medium == NULL || !medium->send || medium->rtx_payload_type == -1 || g_queue_is_empty(medium->retransmit_buffer)) { @@ -4597,16 +4570,24 @@ static gboolean janus_ice_outgoing_probing_handle(gpointer user_data) { pc->bwe->probing_portion = 0.0; } pc->bwe->probing_count++; - uint32_t required = pc->bwe->probing_target; - if(pc->bwe->probing_part > 0) - required = required / pc->bwe->probing_part; + /* Check if we need to build up probing */ + uint32_t required = pc->bwe->probing_target - bitrate_out; + if(pc->bwe->probing_buildup_timer > 0) { + if(pc->bwe->probing_buildup == 0) + pc->bwe->probing_buildup = 25000; + if(now - pc->bwe->probing_buildup_timer >= 500000) { + pc->bwe->probing_buildup += 25000; + pc->bwe->probing_buildup_timer = now; + } + if(pc->bwe->probing_buildup >= required) + pc->bwe->probing_buildup = required; + required = pc->bwe->probing_buildup; + } if(pc->bwe->probing_sent >= required) { /* We sent enough for this round */ return G_SOURCE_CONTINUE; } uint32_t required_now = required / (8 * 20); - //~ if(pc->bwe->probing_count == 20) - //~ required_now = (pc->bwe->probing_target - pc->bwe->probing_sent) / 8; double prev_portion = pc->bwe->probing_portion; double portion = (double)required_now / (double)(p->length+SRTP_MAX_TAG_LEN); double new_portion = prev_portion + portion; @@ -4726,12 +4707,36 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu return G_SOURCE_CONTINUE; JANUS_LOG(LOG_INFO, "[%"SCNu64"] Enabling bandwidth estimation\n", handle->handle_id); pc->bwe = janus_bwe_context_create(); - /* Let's create a source for BWE, just used to figure out when to notify the plugin */ + janus_mutex_lock(&handle->mutex); + handle->bwe_target = 0; + janus_mutex_unlock(&handle->mutex); + /* Let's create a source for BWE, for plugin notifications and probing */ handle->bwe_source = g_timeout_source_new(50); /* FIXME */ g_source_set_priority(handle->bwe_source, G_PRIORITY_DEFAULT); g_source_set_callback(handle->bwe_source, janus_ice_outgoing_bwe_handle, handle, NULL); g_source_attach(handle->bwe_source, handle->mainctx); return G_SOURCE_CONTINUE; + } else if(pkt == &janus_ice_set_bwe_target) { + /* Enable probing for the BWE context (assuming BWE is active) */ + if(pc == NULL || pc->bwe == NULL) + return G_SOURCE_CONTINUE; + janus_mutex_lock(&handle->mutex); + uint32_t target = handle->bwe_target; + JANUS_LOG(LOG_INFO, "[%"SCNu64"] Configuring bandwidth estimation target: %"SCNu32"\n", handle->handle_id, target); + if(target == pc->bwe->probing_target) { + /* Nothing to do */ + janus_mutex_unlock(&handle->mutex); + return G_SOURCE_CONTINUE; + } + janus_mutex_unlock(&handle->mutex); + /* Let's configure probing accordingly */ + pc->bwe->probing_target = target; + pc->bwe->probing_count = 0; + pc->bwe->probing_sent = 0; + pc->bwe->probing_portion = 0.0; + pc->bwe->probing_buildup = 0; + pc->bwe->probing_buildup_timer = janus_get_monotonic_time(); + return G_SOURCE_CONTINUE; } else if(pkt == &janus_ice_disable_bwe) { /* We need to get rif of the bandwidth estimator */ if(pc == NULL || pc->bwe == NULL) @@ -4742,52 +4747,11 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu g_source_unref(handle->bwe_source); handle->bwe_source = NULL; } - if(handle->probing_source) { - g_source_destroy(handle->probing_source); - g_source_unref(handle->probing_source); - handle->probing_source = NULL; - } janus_bwe_context_destroy(pc->bwe); pc->bwe = NULL; - return G_SOURCE_CONTINUE; - } else if(pkt == &janus_ice_enable_probing) { - /* Enable probing for the BWE context (assuming BWE is active) */ - if(pc == NULL || pc->bwe == NULL || handle->probing_source != NULL) - return G_SOURCE_CONTINUE; - JANUS_LOG(LOG_INFO, "[%"SCNu64"] Enabling bandwidth probing\n", handle->handle_id); - /* Let's create a source for probing */ - pc->bwe->probing_target = 200000; /* FIXME */ - pc->bwe->probing_count = 0; - pc->bwe->probing_sent = 0; - pc->bwe->probing_portion = 0.0; - pc->bwe->probing_part = 100; /* FIXME */ - handle->probing_source = g_timeout_source_new(50); - g_source_set_priority(handle->probing_source, G_PRIORITY_DEFAULT); - g_source_set_callback(handle->probing_source, janus_ice_outgoing_probing_handle, handle, NULL); - g_source_attach(handle->probing_source, handle->mainctx); - return G_SOURCE_CONTINUE; - } else if(pkt == &janus_ice_defer_probing) { - /* We need to temporarily pause bandwidth probing */ - if(pc == NULL || pc->bwe == NULL || handle->probing_source == NULL) - return G_SOURCE_CONTINUE; - JANUS_LOG(LOG_INFO, "[%"SCNu64"] Deferring bandwidth probing\n", handle->handle_id); - gint64 deferred = janus_get_monotonic_time() + G_USEC_PER_SEC; - if(deferred > pc->bwe->probing_deferred) - pc->bwe->probing_deferred = deferred; - pc->bwe->probing_sent = 0; - pc->bwe->probing_portion = 0.0; - pc->bwe->probing_part = 100; /* FIXME */ - return G_SOURCE_CONTINUE; - } else if(pkt == &janus_ice_disable_probing) { - /* We need to get rid of the bandwidth probing */ - if(pc == NULL || pc->bwe == NULL) - return G_SOURCE_CONTINUE; - JANUS_LOG(LOG_INFO, "[%"SCNu64"] Disabling bandwidth probing\n", handle->handle_id); - if(handle->probing_source) { - g_source_destroy(handle->probing_source); - g_source_unref(handle->probing_source); - handle->probing_source = NULL; - } + janus_mutex_lock(&handle->mutex); + handle->bwe_target = 0; + janus_mutex_unlock(&handle->mutex); return G_SOURCE_CONTINUE; } else if(pkt == &janus_ice_media_stopped) { /* Some media has been disabled on the way in, so use the callback to notify the peer */ @@ -4838,11 +4802,6 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu g_source_unref(handle->bwe_source); handle->bwe_source = NULL; } - if(handle->probing_source) { - g_source_destroy(handle->probing_source); - g_source_unref(handle->probing_source); - handle->probing_source = NULL; - } if(handle->stats_source) { g_source_destroy(handle->stats_source); g_source_unref(handle->stats_source); diff --git a/src/ice.h b/src/ice.h index 07ac82b4c0..41414c59f7 100644 --- a/src/ice.h +++ b/src/ice.h @@ -374,7 +374,7 @@ struct janus_ice_handle { /*! \brief GLib thread for the handle and libnice */ GThread *thread; /*! \brief GLib sources for outgoing traffic, recurring RTCP, and stats (and optionally TWCC/BWE/probing) */ - GSource *rtp_source, *rtcp_source, *stats_source, *twcc_source, *bwe_source, *probing_source; + GSource *rtp_source, *rtcp_source, *stats_source, *twcc_source, *bwe_source; /*! \brief libnice ICE agent */ NiceAgent *agent; /*! \brief Monotonic time of when the ICE agent has been created */ @@ -409,6 +409,8 @@ struct janus_ice_handle { gint last_srtp_error, last_srtp_summary; /*! \brief Count of how many seconds passed since the last stats passed to event handlers */ gint last_event_stats; + /*! \brief Bandwidth estimation target, if any, as asked by the plugin */ + uint32_t bwe_target; /*! \brief Flag to decide whether or not packets need to be dumped to a text2pcap file */ volatile gint dump_packets; /*! \brief In case this session must be saved to text2pcap, the instance to dump packets to */ @@ -524,7 +526,7 @@ struct janus_ice_peerconnection { GHashTable *rtx_payload_types_rev; /*! \brief Helper flag to avoid flooding the console with the same error all over again */ gboolean noerrorlog; - /*! Bandwidth estimation context */ + /*! \brief Bandwidth estimation context */ janus_bwe_context *bwe; /*! \brief Mutex to lock/unlock this stream */ janus_mutex mutex; @@ -790,18 +792,13 @@ void janus_ice_resend_trickles(janus_ice_handle *handle); /*! \brief Method to dynamically enable bandwidth estimation for a handle * @param[in] handle The Janus ICE handle this method refers to */ void janus_ice_handle_enable_bwe(janus_ice_handle *handle); +/*! \brief Method to dynamically tweak the bandwidth estimation target for a handle (for probing) + * @param[in] handle The Janus ICE handle this method refers to + * @param[in] bitrate The bitrate to target (will be used to generate probing) */ +void janus_ice_handle_set_bwe_target(janus_ice_handle *handle, uint32_t bitrate); /*! \brief Method to dynamically disable bandwidth estimation for a handle * @param[in] handle The Janus ICE handle this method refers to */ void janus_ice_handle_disable_bwe(janus_ice_handle *handle); -/*! \brief Method to dynamically enable bandwidth probing for a handle - * @param[in] handle The Janus ICE handle this method refers to */ -void janus_ice_handle_enable_probing(janus_ice_handle *handle); -/*! \brief Method to dynamically defer bandwidth probing for a handle for a few seconds - * @param[in] handle The Janus ICE handle this method refers to */ -void janus_ice_handle_defer_probing(janus_ice_handle *handle); -/*! \brief Method to dynamically disable bandwidth probing for a handle - * @param[in] handle The Janus ICE handle this method refers to */ -void janus_ice_handle_disable_probing(janus_ice_handle *handle); ///@} diff --git a/src/janus.c b/src/janus.c index 5aa82fbef4..ca4351ac53 100644 --- a/src/janus.c +++ b/src/janus.c @@ -612,10 +612,8 @@ void janus_plugin_send_pli(janus_plugin_session *plugin_session); void janus_plugin_send_pli_stream(janus_plugin_session *plugin_session, int mindex); void janus_plugin_send_remb(janus_plugin_session *plugin_session, uint32_t bitrate); void janus_plugin_enable_bwe(janus_plugin_session *plugin_session); +void janus_plugin_set_bwe_target(janus_plugin_session *plugin_session, uint32_t bitrate); void janus_plugin_disable_bwe(janus_plugin_session *plugin_session); -void janus_plugin_enable_probing(janus_plugin_session *plugin_session); -void janus_plugin_defer_probing(janus_plugin_session *plugin_session); -void janus_plugin_disable_probing(janus_plugin_session *plugin_session); void janus_plugin_close_pc(janus_plugin_session *plugin_session); void janus_plugin_end_session(janus_plugin_session *plugin_session); void janus_plugin_notify_event(janus_plugin *plugin, janus_plugin_session *plugin_session, json_t *event); @@ -632,10 +630,8 @@ static janus_callbacks janus_handler_plugin = .send_pli_stream = janus_plugin_send_pli_stream, .send_remb = janus_plugin_send_remb, .enable_bwe = janus_plugin_enable_bwe, + .set_bwe_target = janus_plugin_set_bwe_target, .disable_bwe = janus_plugin_disable_bwe, - .enable_probing = janus_plugin_enable_probing, - .defer_probing = janus_plugin_defer_probing, - .disable_probing = janus_plugin_disable_probing, .close_pc = janus_plugin_close_pc, .end_session = janus_plugin_end_session, .events_is_enabled = janus_events_is_enabled, @@ -4276,36 +4272,20 @@ void janus_plugin_enable_bwe(janus_plugin_session *plugin_session) { janus_ice_handle_enable_bwe(handle); } -void janus_plugin_disable_bwe(janus_plugin_session *plugin_session) { - janus_ice_handle *handle = (janus_ice_handle *)plugin_session->gateway_handle; - if(!handle || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP) - || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT)) - return; - janus_ice_handle_disable_bwe(handle); -} - -void janus_plugin_enable_probing(janus_plugin_session *plugin_session) { +void janus_plugin_set_bwe_target(janus_plugin_session *plugin_session, uint32_t bitrate) { janus_ice_handle *handle = (janus_ice_handle *)plugin_session->gateway_handle; if(!handle || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP) || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT)) return; - janus_ice_handle_enable_probing(handle); + janus_ice_handle_set_bwe_target(handle, bitrate); } -void janus_plugin_defer_probing(janus_plugin_session *plugin_session) { - janus_ice_handle *handle = (janus_ice_handle *)plugin_session->gateway_handle; - if(!handle || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP) - || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT)) - return; - janus_ice_handle_defer_probing(handle); -} - -void janus_plugin_disable_probing(janus_plugin_session *plugin_session) { +void janus_plugin_disable_bwe(janus_plugin_session *plugin_session) { janus_ice_handle *handle = (janus_ice_handle *)plugin_session->gateway_handle; if(!handle || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP) || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT)) return; - janus_ice_handle_disable_probing(handle); + janus_ice_handle_disable_bwe(handle); } static gboolean janus_plugin_close_pc_internal(gpointer user_data) { diff --git a/src/plugins/janus_videoroom.c b/src/plugins/janus_videoroom.c index eab504cfa6..f567873564 100644 --- a/src/plugins/janus_videoroom.c +++ b/src/plugins/janus_videoroom.c @@ -2237,6 +2237,8 @@ typedef struct janus_videoroom_subscriber_stream { janus_vp8_simulcast_context vp8_context; /* SVC context */ janus_rtp_svc_context svc_context; + /* When we last dropped a layer because of BWE (to implement a cooldown period) */ + gint64 last_bwe_drop; /* Playout delays to enforce when relaying this stream, if the extension has been negotiated */ int16_t min_delay, max_delay; volatile gint ready, destroyed; @@ -8415,12 +8417,15 @@ void janus_videoroom_estimated_bandwidth(janus_plugin_session *handle, uint32_t JANUS_LOG(LOG_WARN, "[videoroom] BWE=%"SCNu32"\n", estimate); janus_mutex_lock(&subscriber->streams_mutex); GList *temp = subscriber->streams; + gint64 now = janus_get_monotonic_time(); while(temp) { janus_videoroom_subscriber_stream *s = (janus_videoroom_subscriber_stream *)temp->data; if(s->type == JANUS_VIDEOROOM_MEDIA_DATA) { temp = temp->next; continue; } + if(s->last_bwe_drop > 0 && s->last_bwe_drop < now) + s->last_bwe_drop = 0; GSList *list = s->publisher_streams; if(list) { janus_videoroom_publisher_stream *ps = list->data; @@ -8436,7 +8441,7 @@ void janus_videoroom_estimated_bandwidth(janus_plugin_session *handle, uint32_t } } else if(s->type == JANUS_VIDEOROOM_MEDIA_VIDEO) { if(ps->simulcast) { - JANUS_LOG(LOG_WARN, "current=%d/%d, target=%d/%d, bwe=%d/%d\n", + JANUS_LOG(LOG_DBG, "current=%d/%d, target=%d/%d, bwe=%d/%d\n", s->sim_context.substream, s->sim_context.templayer, s->sim_context.substream_target, s->sim_context.templayer_target, s->sim_context.substream_target_bwe, s->sim_context.templayer_target_bwe); @@ -8451,19 +8456,40 @@ void janus_videoroom_estimated_bandwidth(janus_plugin_session *handle, uint32_t int target = 3*substream + templayer; if(target > 8) target = 8; - if(ps->bitrates->packets[target] == NULL || estimate < ps->bitrates->bitrate[target]) { + /* Check if the target is actually available */ + while(ps->bitrates->packets[target] == NULL) { + templayer--; + if(templayer < 0) { + substream--; + if(substream < 0) { + substream = 0; + } else { + templayer = 2; + } + } + target = 3*substream + templayer; + } + if(estimate < ps->bitrates->bitrate[target]) { /* Unavailable layer, or we don't have room for these layers, find one below that fits */ if(target == 0) { estimate = 0; JANUS_LOG(LOG_WARN, "Insufficient bandwidth for simulcast stream %d (video)\n", s->mindex); } else { gboolean found = FALSE; + int old_target = target; int new_substream = 0, new_templayer = 0; while(target > 0) { target--; new_substream = target/3; new_templayer = target%3; if(ps->bitrates->packets[target] && estimate >= ps->bitrates->bitrate[target]) { + /* If we're going up, make sure we're not in the cooldown period */ + if(s->sim_context.substream_target_bwe != -1 && + new_substream > s->sim_context.substream_target_bwe && + now < s->last_bwe_drop) { + /* We are, ignore this layer */ + continue; + } found = TRUE; break; } @@ -8474,13 +8500,20 @@ void janus_videoroom_estimated_bandwidth(janus_plugin_session *handle, uint32_t } else { if(s->sim_context.substream_target_bwe == -1 || s->sim_context.substream_target_bwe > new_substream || s->sim_context.templayer_target_bwe == -1 || s->sim_context.templayer_target_bwe > new_templayer) { - JANUS_LOG(LOG_WARN, "Insufficient bandwidth for simulcast %d/%d of stream %d (%"SCNu32" < %"SCNu32"), switching to %d/%d\n", - substream, templayer, s->mindex, estimate, ps->bitrates->bitrate[target], new_substream, new_templayer); - /* FIXME */ + JANUS_LOG(LOG_WARN, "Insufficient bandwidth for simulcast %d/%d of stream %d (%"SCNu32" < %"SCNu32"), switching to %d/%d (%"SCNu32")\n", + substream, templayer, s->mindex, estimate, ps->bitrates->bitrate[old_target], + new_substream, new_templayer, ps->bitrates->bitrate[target]); + /* If BWE made us go down, implement a cooldown period of about 5 seconds */ + if(s->sim_context.substream_target_bwe == -1 || + new_substream < s->sim_context.substream_target_bwe) + s->last_bwe_drop = now + 5*G_USEC_PER_SEC; } estimate -= ps->bitrates->bitrate[target]; s->sim_context.substream_target_bwe = new_substream; s->sim_context.templayer_target_bwe = new_templayer; + /* Request a PLI if needed */ + if(s->sim_context.substream_target_bwe != new_substream) + janus_videoroom_reqpli(ps, "Simulcasting substream change (BWE)"); } } } else { @@ -12452,15 +12485,24 @@ static void janus_videoroom_relay_rtp_packet(gpointer data, gpointer user_data) json_decref(event); } if(stream->sim_context.changed_substream || stream->sim_context.changed_temporal) { - if(stream->sim_context.substream_target_bwe == -1 && stream->sim_context.substream_target_temp == -1 && - stream->sim_context.substream == stream->sim_context.substream_target && - stream->sim_context.templayer == stream->sim_context.templayer_target) { - /* We're receiving what we wanted, stop probing (if it was active) */ - gateway->disable_probing(subscriber->session->handle); - } else { - /* We're not receiving what we need, start probing (if we weren't already) */ - gateway->enable_probing(subscriber->session->handle); - gateway->defer_probing(subscriber->session->handle); + /* FIXME We changed substream, check if we need to reconfigure the bitrate target for probing */ + if(stream->sim_context.substream < stream->sim_context.substream_target || + (stream->sim_context.substream == stream->sim_context.substream_target && + stream->sim_context.templayer < stream->sim_context.templayer_target)) { + /* We're not receiving what we need, aim for a higher layer */ + uint8_t current_layer = stream->sim_context.substream*3 + stream->sim_context.templayer; + uint8_t target_layer = stream->sim_context.substream_target*3 + stream->sim_context.templayer_target; + uint32_t target = 0; + uint8_t i = 0; + JANUS_LOG(LOG_WARN, "[videoroom] current=%d, target=%d\n", current_layer, target_layer); + for(i = current_layer+1; i<=target_layer; i++) { + JANUS_LOG(LOG_WARN, "[videoroom] -- %d --> %"SCNu32"\n", i, (ps->bitrates->packets[i] ? ps->bitrates->bitrate[i] : 0)); + if(ps->bitrates->packets[i]) + target = ps->bitrates->bitrate[i]; + if(target > 0) + break; + } + gateway->set_bwe_target(subscriber->session->handle, target); } } /* If we got here, update the RTP header and send the packet */ diff --git a/src/plugins/plugin.h b/src/plugins/plugin.h index 7bf54328e2..db00f51ff1 100644 --- a/src/plugins/plugin.h +++ b/src/plugins/plugin.h @@ -410,21 +410,17 @@ struct janus_callbacks { * estimated_bandwidth callback on a regular basis for this session * @param[in] handle The plugin/gateway session to enable BWE for */ void (* const enable_bwe)(janus_plugin_session *handle); - /*! \brief Get rid of the bandwidth estimation context for this session - * @param[in] handle The plugin/gateway session to disnable BWE for */ - void (* const disable_bwe)(janus_plugin_session *handle); - /*! \brief Enable bandwidth probing for this session, for bandwidth estimation purposes + /*! \brief Configure the target bitrate in this session, to generate + * probing for bandwidth estimation purposes (0 disables probing) * \note The request will be ignored if no BWE context is enabled for this session. * Also notice that probing may be paused at any time by the core, whether it * was enabled or not, e.g., in case congestion or losses are detected - * @param[in] handle The plugin/gateway session to enable BWE probing for */ - void (* const enable_probing)(janus_plugin_session *handle); - /*! \brief Disable bandwidth probing for this session - * @param[in] handle The plugin/gateway session to disable BWE probing for */ - void (* const defer_probing)(janus_plugin_session *handle); - /*! \brief Defers bandwidth probing for this session for a few seconds - * @param[in] handle The plugin/gateway session to defer BWE probing for */ - void (* const disable_probing)(janus_plugin_session *handle); + * @param[in] handle The plugin/gateway session to enable BWE probing for + * @param[in] target The bitrate to target (e.g., next simulcast layer) */ + void (* const set_bwe_target)(janus_plugin_session *handle, uint32_t bitrate); + /*! \brief Get rid of the bandwidth estimation context for this session + * @param[in] handle The plugin/gateway session to disnable BWE for */ + void (* const disable_bwe)(janus_plugin_session *handle); /*! \brief Callback to ask the core to close a WebRTC PeerConnection * \note A call to this method will result in the core invoking the hangup_media diff --git a/src/rtp.c b/src/rtp.c index 621852e400..373e522a72 100644 --- a/src/rtp.c +++ b/src/rtp.c @@ -1184,11 +1184,14 @@ gboolean janus_rtp_simulcasting_context_process_rtp(janus_rtp_simulcasting_conte context->changed_substream = TRUE; context->last_relayed = now; } else if(context->substream_target_bwe != -1 && context->substream > target && substream < context->substream) { - /* We need to go down because of BWE, don't wait for a keyframe */ + /* We need to go down because of BWE, don't wait for a keyframe (hopefully one will follow) */ context->substream = substream; /* Notify the caller that the substream changed */ context->changed_substream = TRUE; context->last_relayed = now; + //~ } else if(context->substream_target_bwe != -1 && context->substream > target) { + //~ /* We need to go down because of BWE but don't have a keyframe yet, don't relay anything */ + //~ relay = FALSE; } } /* If we haven't received our desired substream yet, let's drop temporarily */