Skip to content

Commit

Permalink
support edge mode downstream video fps stat
Browse files Browse the repository at this point in the history
  • Loading branch information
b4158813 committed Jun 28, 2023
1 parent 43dfb1b commit fd3ae56
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 0 deletions.
12 changes: 12 additions & 0 deletions trunk/src/app/srs_app_edge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ SrsEdgeIngester::SrsEdgeIngester()
source = NULL;
edge = NULL;
req = NULL;
video_frames = 0;
#ifdef SRS_APM
span_main_ = NULL;
#endif
Expand Down Expand Up @@ -466,6 +467,11 @@ string SrsEdgeIngester::get_curr_origin()
return lb->selected();
}

uint64_t SrsEdgeIngester::nb_video_frames()
{
return video_frames;
}

#ifdef SRS_APM
ISrsApmSpan* SrsEdgeIngester::span()
{
Expand Down Expand Up @@ -643,6 +649,7 @@ srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, stri

// process video packet
if (msg->header.is_video()) {
video_frames ++;
if ((err = source->on_video(msg)) != srs_success) {
return srs_error_wrap(err, "source consume video");
}
Expand Down Expand Up @@ -1015,6 +1022,11 @@ string SrsPlayEdge::get_curr_origin()
return ingester->get_curr_origin();
}

uint64_t SrsPlayEdge::get_ingester_video_frames()
{
return ingester->nb_video_frames();
}

srs_error_t SrsPlayEdge::on_ingest_play()
{
srs_error_t err = srs_success;
Expand Down
4 changes: 4 additions & 0 deletions trunk/src/app/srs_app_edge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class SrsEdgeIngester : public ISrsCoroutineHandler
SrsCoroutine* trd;
SrsLbRoundRobin* lb;
SrsEdgeUpstream* upstream;
uint64_t video_frames;
#ifdef SRS_APM
ISrsApmSpan* span_main_;
#endif
Expand All @@ -154,6 +155,7 @@ class SrsEdgeIngester : public ISrsCoroutineHandler
virtual srs_error_t start();
virtual void stop();
virtual std::string get_curr_origin();
virtual uint64_t nb_video_frames();
#ifdef SRS_APM
// Get the current main span. Note that it might be NULL.
ISrsApmSpan* span();
Expand Down Expand Up @@ -225,6 +227,8 @@ class SrsPlayEdge
public:
// When ingester start to play stream.
virtual srs_error_t on_ingest_play();
// Get video fps from ingester.
virtual uint64_t get_ingester_video_frames();
};

// The publish edge control service.
Expand Down
8 changes: 8 additions & 0 deletions trunk/src/app/srs_app_http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,8 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
entry->pattern.c_str(), enc_desc.c_str(), srsu2msi(mw_sleep), enc->has_cache(), msgs.max, drop_if_not_match,
has_audio, has_video, guess_has_av);

uint64_t nb_frames = 0;

// TODO: free and erase the disabled entry after all related connections is closed.
// TODO: FXIME: Support timeout for player, quit infinite-loop.
while (entry->enabled) {
Expand Down Expand Up @@ -746,6 +748,12 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
}

// TODO: FIXME: Update the stat.
// FIXED: update the downstream fps stat.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_video_frames(req, (int)(source->get_play_edge_video_frames() - nb_frames))) != srs_success) {
return srs_error_wrap(err, "rtmp: stat video frames");
}
nb_frames = source->get_play_edge_video_frames();

// free the messages.
for (int i = 0; i < count; i++) {
Expand Down
8 changes: 8 additions & 0 deletions trunk/src/app/srs_app_rtmp_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* cons
SrsAutoFree(ISrsApmSpan, span);
#endif

uint64_t nb_frames = 0;
while (true) {
// when source is set to expired, disconnect it.
if ((err = trd->pull()) != srs_success) {
Expand Down Expand Up @@ -857,6 +858,13 @@ srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* cons
return srs_error_wrap(err, "rtmp: consumer dump packets");
}

// Update the stat for downstream video fps stat.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_video_frames(req, (int)(source->get_play_edge_video_frames() - nb_frames))) != srs_success) {
return srs_error_wrap(err, "rtmp: stat video frames");
}
nb_frames = source->get_play_edge_video_frames();

// reportable
if (pprint->can_print()) {
kbps->sample();
Expand Down
4 changes: 4 additions & 0 deletions trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2807,3 +2807,7 @@ string SrsLiveSource::get_curr_origin()
return play_edge->get_curr_origin();
}

uint64_t SrsLiveSource::get_play_edge_video_frames() {
return play_edge->get_ingester_video_frames();
}

3 changes: 3 additions & 0 deletions trunk/src/app/srs_app_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,9 @@ class SrsLiveSource : public ISrsReloadHandler
virtual void on_edge_proxy_unpublish();
public:
virtual std::string get_curr_origin();
public:
// For edge, get downstream video fps.
virtual uint64_t get_play_edge_video_frames();
};

#endif
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_statistic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ srs_error_t SrsStatisticStream::dumps(SrsJsonObject* obj)

video->set("width", SrsJsonAny::integer(width));
video->set("height", SrsJsonAny::integer(height));
video->set("fps", SrsJsonAny::integer(frames->r10s() / (nb_clients > 1 ? nb_clients : 1)));
}

if (!has_audio) {
Expand Down

0 comments on commit fd3ae56

Please sign in to comment.