Skip to content

Commit

Permalink
Refactored probing API to chase a bitrate target
Browse files Browse the repository at this point in the history
  • Loading branch information
lminiero committed Oct 4, 2023
1 parent 8f3eb5c commit 317657b
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 180 deletions.
19 changes: 10 additions & 9 deletions src/bwe.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ janus_bwe_context *janus_bwe_context_create(void) {
g_snprintf(filename, sizeof(filename), "/tmp/bwe-janus-%"SCNi64, janus_get_real_time());
bwe->csv = fopen(filename, "wt");
char line[2048];
g_snprintf(line, sizeof(line), "time,status,estimate,bitrate_out,rtx_out,probing_out,bitrate_in,rtx_in,probing_in,acked,lost,loss_ratio,avg_delay,avg_delay_weighted,avg_delay_fb\n");
g_snprintf(line, sizeof(line), "time,status,estimate,probing_target,bitrate_out,rtx_out,probing_out,bitrate_in,rtx_in,probing_in,acked,lost,loss_ratio,avg_delay,avg_delay_weighted,avg_delay_fb\n");
fwrite(line, sizeof(char), strlen(line), bwe->csv);
#endif
return bwe;
Expand Down Expand Up @@ -190,7 +190,9 @@ void janus_bwe_context_update(janus_bwe_context *bwe) {
bwe->status = janus_bwe_status_lossy;
bwe->status_changed = now;
bwe->estimate = estimate;
} else if(avg_delay_weighted > 1.5 && avg_delay_weighted - bwe->avg_delay > 0.1) {
} else if(avg_delay_weighted > 1.2 && avg_delay_weighted - bwe->avg_delay > 0.1) {
JANUS_LOG(LOG_WARN, "[BWE] Congested (delay=%.2f, increase=%.2f)\n",
avg_delay_weighted, avg_delay_weighted - bwe->avg_delay);
/* FIXME Delay is increasing */
if(bwe->status != janus_bwe_status_lossy && bwe->status != janus_bwe_status_congested)
notify_plugin = TRUE;
Expand All @@ -215,7 +217,8 @@ void janus_bwe_context_update(janus_bwe_context *bwe) {
/* Restore probing but incrementally */
bwe->probing_sent = 0;
bwe->probing_portion = 0.0;
bwe->probing_part = 200; /* FIXME */
bwe->probing_buildup = 0;
bwe->probing_buildup_timer = now;
} else {
/* FIXME Keep converging to the estimate */
if(estimate > bwe->estimate)
Expand All @@ -230,22 +233,20 @@ void janus_bwe_context_update(janus_bwe_context *bwe) {
bwe->estimate = estimate;
//~ else if(now - bwe->status_changed < 10*G_USEC_PER_SEC)
//~ bwe->estimate = ((double)bwe->estimate * 1.02);
if(bwe->probing_part > 0) /* FIXME */
bwe->probing_part--;
}
}
bwe->avg_delay = avg_delay_weighted;
bwe->bitrate_ts = now;
JANUS_LOG(LOG_WARN, "[BWE][%"SCNi64"][%s] sent=%"SCNu32"kbps (probing=%"SCNu32"kbps), acked=%"SCNu32"kbps (probing=%"SCNu32"kbps), loss=%.2f%%, avg_delay=%.2fms, estimate=%"SCNu32"\n",
JANUS_LOG(LOG_DBG, "[BWE][%"SCNi64"][%s] sent=%"SCNu32"kbps (probing=%"SCNu32"kbps), acked=%"SCNu32"kbps (probing=%"SCNu32"kbps), loss=%.2f%%, avg_delay=%.2fms, estimate=%"SCNu32"\n",
now, janus_bwe_status_description(bwe->status),
bitrate_out / 1024, probing_out / 1024, bitrate_in / 1024, probing_in / 1024,
bwe->loss_ratio, bwe->avg_delay, bwe->estimate);
#ifdef BWE_DEBUGGING
/* Save the details to CSV */
char line[2048];
g_snprintf(line, sizeof(line), "%"SCNi64",%d,%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu16",%"SCNu16",%.2f,%.2f,%.2f,%.2f\n",
g_snprintf(line, sizeof(line), "%"SCNi64",%d,%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu32",%"SCNu16",%"SCNu16",%.2f,%.2f,%.2f,%.2f\n",
now - bwe->started, bwe->status,
bwe->estimate, bitrate_out, rtx_out, probing_out, bitrate_in, rtx_in, probing_in,
bwe->estimate, bwe->probing_target, bitrate_out, rtx_out, probing_out, bitrate_in, rtx_in, probing_in,
bwe->received_pkts, bwe->lost_pkts, bwe->loss_ratio, avg_delay, avg_delay_weighted, avg_delay_latest);
fwrite(line, sizeof(char), strlen(line), bwe->csv);
#endif
Expand All @@ -254,7 +255,7 @@ void janus_bwe_context_update(janus_bwe_context *bwe) {
bwe->received_pkts = 0;
bwe->lost_pkts = 0;
/* Check if we should notify the plugin about the estimate */
if(notify_plugin || (now - bwe->last_notified) >= 250000) {
if(bwe->status != janus_bwe_status_start && (notify_plugin || (now - bwe->last_notified) >= 250000)) {
bwe->notify_plugin = TRUE;
bwe->last_notified = now;
}
Expand Down
8 changes: 4 additions & 4 deletions src/bwe.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,16 @@ typedef struct janus_bwe_context {
int64_t status_changed;
/*! \brief Index of the m-line we're using for probing */
int probing_mindex;
/*! \brief How much we should aim for with out probing (and how much we sent in a second) */
uint32_t probing_target, probing_sent;
/*! \brief How much we should aim for with out probing (and how much to increase, plus much we sent in a second) */
uint32_t probing_target, probing_buildup, probing_sent;
/*! \brief How many times we went through probing in a second */
uint8_t probing_count;
/*! \brief Portion of probing we didn't manage to send the previous round */
double probing_portion;
/*! \brief In case probing was deferred, when it shoult restart */
int64_t probing_deferred;
/*! \brief Whether probing should grow incrementally (e.g., after recovery) */
uint16_t probing_part;
/*! \brief Timer for building up probing */
int64_t probing_buildup_timer;
/*! \brief Monotonic timestamp of the last sent packet */
int64_t last_sent_ts;
/*! \brief Last twcc seq number of a received packet */
Expand Down
165 changes: 62 additions & 103 deletions src/ice.c
Original file line number Diff line number Diff line change
Expand Up @@ -474,10 +474,8 @@ static janus_ice_queued_packet
janus_ice_dtls_handshake,
janus_ice_media_stopped,
janus_ice_enable_bwe,
janus_ice_set_bwe_target,
janus_ice_disable_bwe,
janus_ice_enable_probing,
janus_ice_defer_probing,
janus_ice_disable_probing,
janus_ice_hangup_peerconnection,
janus_ice_detach_handle,
janus_ice_data_ready;
Expand Down Expand Up @@ -534,7 +532,6 @@ typedef struct janus_ice_outgoing_traffic {
static gboolean janus_ice_outgoing_rtcp_handle(gpointer user_data);
static gboolean janus_ice_outgoing_stats_handle(gpointer user_data);
static gboolean janus_ice_outgoing_bwe_handle(gpointer user_data);
static gboolean janus_ice_outgoing_probing_handle(gpointer user_data);
static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janus_ice_queued_packet *pkt);
static gboolean janus_ice_outgoing_traffic_prepare(GSource *source, gint *timeout) {
janus_ice_outgoing_traffic *t = (janus_ice_outgoing_traffic *)source;
Expand Down Expand Up @@ -657,8 +654,7 @@ static void janus_ice_free_queued_packet(janus_ice_queued_packet *pkt) {
if(pkt == NULL || pkt == &janus_ice_start_gathering ||
pkt == &janus_ice_add_candidates ||
pkt == &janus_ice_dtls_handshake ||
pkt == &janus_ice_enable_bwe || pkt == &janus_ice_disable_bwe ||
pkt == &janus_ice_enable_probing || pkt == &janus_ice_defer_probing || pkt == &janus_ice_disable_probing ||
pkt == &janus_ice_enable_bwe || pkt == &janus_ice_set_bwe_target || pkt == &janus_ice_disable_bwe ||
pkt == &janus_ice_media_stopped ||
pkt == &janus_ice_hangup_peerconnection ||
pkt == &janus_ice_detach_handle ||
Expand Down Expand Up @@ -3892,49 +3888,29 @@ void janus_ice_handle_enable_bwe(janus_ice_handle *handle) {
}
}

void janus_ice_handle_disable_bwe(janus_ice_handle *handle) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Disabling bandwidth estimation\n", handle->handle_id);
if(handle->queued_packets != NULL) {
#if GLIB_CHECK_VERSION(2, 46, 0)
g_async_queue_push_front(handle->queued_packets, &janus_ice_disable_bwe);
#else
g_async_queue_push(handle->queued_packets, &janus_ice_disable_bwe);
#endif
g_main_context_wakeup(handle->mainctx);
}
}

void janus_ice_handle_enable_probing(janus_ice_handle *handle) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Enabling bandwidth probing\n", handle->handle_id);
if(handle->queued_packets != NULL) {
#if GLIB_CHECK_VERSION(2, 46, 0)
g_async_queue_push_front(handle->queued_packets, &janus_ice_enable_probing);
#else
g_async_queue_push(handle->queued_packets, &janus_ice_enable_probing);
#endif
g_main_context_wakeup(handle->mainctx);
}
}

void janus_ice_handle_defer_probing(janus_ice_handle *handle) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Deferring bandwidth probing\n", handle->handle_id);
/* FIXME */
void janus_ice_handle_set_bwe_target(janus_ice_handle *handle, uint32_t bitrate) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Configuring bandwidth estimation target: %"SCNu32"\n", handle->handle_id, bitrate);
janus_mutex_lock(&handle->mutex);
handle->bwe_target = bitrate;
janus_mutex_unlock(&handle->mutex);
if(handle->queued_packets != NULL) {
#if GLIB_CHECK_VERSION(2, 46, 0)
g_async_queue_push_front(handle->queued_packets, &janus_ice_defer_probing);
g_async_queue_push_front(handle->queued_packets, &janus_ice_set_bwe_target);
#else
g_async_queue_push(handle->queued_packets, &janus_ice_defer_probing);
g_async_queue_push(handle->queued_packets, &janus_ice_set_bwe_target);
#endif
g_main_context_wakeup(handle->mainctx);
}
}

void janus_ice_handle_disable_probing(janus_ice_handle *handle) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Disabling bandwidth probing\n", handle->handle_id);
void janus_ice_handle_disable_bwe(janus_ice_handle *handle) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Disabling bandwidth estimation\n", handle->handle_id);
if(handle->queued_packets != NULL) {
#if GLIB_CHECK_VERSION(2, 46, 0)
g_async_queue_push_front(handle->queued_packets, &janus_ice_disable_probing);
g_async_queue_push_front(handle->queued_packets, &janus_ice_disable_bwe);
#else
g_async_queue_push(handle->queued_packets, &janus_ice_disable_probing);
g_async_queue_push(handle->queued_packets, &janus_ice_disable_bwe);
#endif
g_main_context_wakeup(handle->mainctx);
}
Expand Down Expand Up @@ -4535,24 +4511,20 @@ static gboolean janus_ice_outgoing_bwe_handle(gpointer user_data) {
janus_ice_peerconnection *pc = handle->pc;
if(pc == NULL || pc->bwe == NULL)
return G_SOURCE_CONTINUE;
/* First of all, let's check if we have to notify the plugin */
if(pc->bwe->notify_plugin) {
/* Notify the plugin about the current value */
/* Notify the plugin about the current estimate */
pc->bwe->notify_plugin = FALSE;
janus_plugin *plugin = (janus_plugin *)handle->app;
if(plugin && plugin->estimated_bandwidth && janus_plugin_session_is_alive(handle->app_handle) &&
!g_atomic_int_get(&handle->destroyed))
plugin->estimated_bandwidth(handle->app_handle, pc->bwe->estimate);
}
return G_SOURCE_CONTINUE;
}

static gboolean janus_ice_outgoing_probing_handle(gpointer user_data) {
janus_ice_handle *handle = (janus_ice_handle *)user_data;
janus_ice_peerconnection *pc = handle->pc;
janus_ice_peerconnection_medium *medium = NULL;
if(pc == NULL || pc->bwe == NULL || pc->bwe->probing_target == 0)
/* Now, let's check if there's probing we need to perform */
uint32_t bitrate_out = (pc->bwe->sent->packets[janus_bwe_packet_type_regular*3] ? pc->bwe->sent->bitrate[janus_bwe_packet_type_regular*3] : 0) +
(pc->bwe->sent->packets[janus_bwe_packet_type_rtx*3] ? pc->bwe->sent->bitrate[janus_bwe_packet_type_rtx*3] : 0);
if(pc->bwe->probing_target == 0 || pc->bwe->probing_target <= bitrate_out)
return G_SOURCE_CONTINUE;
/* This callback is for regularly sending probing for the bandwidth estimator */
if(pc->bwe->status != janus_bwe_status_start && pc->bwe->status != janus_bwe_status_regular) {
/* The BWE status may be lossy, congested, or recovering: don't probe for now */
return G_SOURCE_CONTINUE;
Expand All @@ -4566,6 +4538,7 @@ static gboolean janus_ice_outgoing_probing_handle(gpointer user_data) {
pc->bwe->probing_deferred = 0;
}
/* Get the medium instance we'll use for probing */
janus_ice_peerconnection_medium *medium = NULL;
if(pc->bwe->probing_mindex != -1)
medium = g_hash_table_lookup(pc->media, GINT_TO_POINTER(pc->bwe->probing_mindex));
if(medium == NULL || !medium->send || medium->rtx_payload_type == -1 || g_queue_is_empty(medium->retransmit_buffer)) {
Expand Down Expand Up @@ -4597,16 +4570,24 @@ static gboolean janus_ice_outgoing_probing_handle(gpointer user_data) {
pc->bwe->probing_portion = 0.0;
}
pc->bwe->probing_count++;
uint32_t required = pc->bwe->probing_target;
if(pc->bwe->probing_part > 0)
required = required / pc->bwe->probing_part;
/* Check if we need to build up probing */
uint32_t required = pc->bwe->probing_target - bitrate_out;
if(pc->bwe->probing_buildup_timer > 0) {
if(pc->bwe->probing_buildup == 0)
pc->bwe->probing_buildup = 25000;
if(now - pc->bwe->probing_buildup_timer >= 500000) {
pc->bwe->probing_buildup += 25000;
pc->bwe->probing_buildup_timer = now;
}
if(pc->bwe->probing_buildup >= required)
pc->bwe->probing_buildup = required;
required = pc->bwe->probing_buildup;
}
if(pc->bwe->probing_sent >= required) {
/* We sent enough for this round */
return G_SOURCE_CONTINUE;
}
uint32_t required_now = required / (8 * 20);
//~ if(pc->bwe->probing_count == 20)
//~ required_now = (pc->bwe->probing_target - pc->bwe->probing_sent) / 8;
double prev_portion = pc->bwe->probing_portion;
double portion = (double)required_now / (double)(p->length+SRTP_MAX_TAG_LEN);
double new_portion = prev_portion + portion;
Expand Down Expand Up @@ -4726,12 +4707,36 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu
return G_SOURCE_CONTINUE;
JANUS_LOG(LOG_INFO, "[%"SCNu64"] Enabling bandwidth estimation\n", handle->handle_id);
pc->bwe = janus_bwe_context_create();
/* Let's create a source for BWE, just used to figure out when to notify the plugin */
janus_mutex_lock(&handle->mutex);
handle->bwe_target = 0;
janus_mutex_unlock(&handle->mutex);
/* Let's create a source for BWE, for plugin notifications and probing */
handle->bwe_source = g_timeout_source_new(50); /* FIXME */
g_source_set_priority(handle->bwe_source, G_PRIORITY_DEFAULT);
g_source_set_callback(handle->bwe_source, janus_ice_outgoing_bwe_handle, handle, NULL);
g_source_attach(handle->bwe_source, handle->mainctx);
return G_SOURCE_CONTINUE;
} else if(pkt == &janus_ice_set_bwe_target) {
/* Enable probing for the BWE context (assuming BWE is active) */
if(pc == NULL || pc->bwe == NULL)
return G_SOURCE_CONTINUE;
janus_mutex_lock(&handle->mutex);
uint32_t target = handle->bwe_target;
JANUS_LOG(LOG_INFO, "[%"SCNu64"] Configuring bandwidth estimation target: %"SCNu32"\n", handle->handle_id, target);
if(target == pc->bwe->probing_target) {
/* Nothing to do */
janus_mutex_unlock(&handle->mutex);
return G_SOURCE_CONTINUE;
}
janus_mutex_unlock(&handle->mutex);
/* Let's configure probing accordingly */
pc->bwe->probing_target = target;
pc->bwe->probing_count = 0;
pc->bwe->probing_sent = 0;
pc->bwe->probing_portion = 0.0;
pc->bwe->probing_buildup = 0;
pc->bwe->probing_buildup_timer = janus_get_monotonic_time();
return G_SOURCE_CONTINUE;
} else if(pkt == &janus_ice_disable_bwe) {
/* We need to get rif of the bandwidth estimator */
if(pc == NULL || pc->bwe == NULL)
Expand All @@ -4742,52 +4747,11 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu
g_source_unref(handle->bwe_source);
handle->bwe_source = NULL;
}
if(handle->probing_source) {
g_source_destroy(handle->probing_source);
g_source_unref(handle->probing_source);
handle->probing_source = NULL;
}
janus_bwe_context_destroy(pc->bwe);
pc->bwe = NULL;
return G_SOURCE_CONTINUE;
} else if(pkt == &janus_ice_enable_probing) {
/* Enable probing for the BWE context (assuming BWE is active) */
if(pc == NULL || pc->bwe == NULL || handle->probing_source != NULL)
return G_SOURCE_CONTINUE;
JANUS_LOG(LOG_INFO, "[%"SCNu64"] Enabling bandwidth probing\n", handle->handle_id);
/* Let's create a source for probing */
pc->bwe->probing_target = 200000; /* FIXME */
pc->bwe->probing_count = 0;
pc->bwe->probing_sent = 0;
pc->bwe->probing_portion = 0.0;
pc->bwe->probing_part = 100; /* FIXME */
handle->probing_source = g_timeout_source_new(50);
g_source_set_priority(handle->probing_source, G_PRIORITY_DEFAULT);
g_source_set_callback(handle->probing_source, janus_ice_outgoing_probing_handle, handle, NULL);
g_source_attach(handle->probing_source, handle->mainctx);
return G_SOURCE_CONTINUE;
} else if(pkt == &janus_ice_defer_probing) {
/* We need to temporarily pause bandwidth probing */
if(pc == NULL || pc->bwe == NULL || handle->probing_source == NULL)
return G_SOURCE_CONTINUE;
JANUS_LOG(LOG_INFO, "[%"SCNu64"] Deferring bandwidth probing\n", handle->handle_id);
gint64 deferred = janus_get_monotonic_time() + G_USEC_PER_SEC;
if(deferred > pc->bwe->probing_deferred)
pc->bwe->probing_deferred = deferred;
pc->bwe->probing_sent = 0;
pc->bwe->probing_portion = 0.0;
pc->bwe->probing_part = 100; /* FIXME */
return G_SOURCE_CONTINUE;
} else if(pkt == &janus_ice_disable_probing) {
/* We need to get rid of the bandwidth probing */
if(pc == NULL || pc->bwe == NULL)
return G_SOURCE_CONTINUE;
JANUS_LOG(LOG_INFO, "[%"SCNu64"] Disabling bandwidth probing\n", handle->handle_id);
if(handle->probing_source) {
g_source_destroy(handle->probing_source);
g_source_unref(handle->probing_source);
handle->probing_source = NULL;
}
janus_mutex_lock(&handle->mutex);
handle->bwe_target = 0;
janus_mutex_unlock(&handle->mutex);
return G_SOURCE_CONTINUE;
} else if(pkt == &janus_ice_media_stopped) {
/* Some media has been disabled on the way in, so use the callback to notify the peer */
Expand Down Expand Up @@ -4838,11 +4802,6 @@ static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janu
g_source_unref(handle->bwe_source);
handle->bwe_source = NULL;
}
if(handle->probing_source) {
g_source_destroy(handle->probing_source);
g_source_unref(handle->probing_source);
handle->probing_source = NULL;
}
if(handle->stats_source) {
g_source_destroy(handle->stats_source);
g_source_unref(handle->stats_source);
Expand Down
Loading

0 comments on commit 317657b

Please sign in to comment.