Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test PR #3743

Closed
wants to merge 3 commits into from
Closed

Test PR #3743

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 90 additions & 9 deletions trunk/src/app/srs_app_rtc_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid)

cache_ssrc0_ = cache_ssrc1_ = cache_ssrc2_ = 0;
cache_track0_ = cache_track1_ = cache_track2_ = NULL;

timer_rtcp_ = new SrsRtcPlayRtcpTimer(this);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

测试

Copy link
Member Author

@winlinvip winlinvip Jul 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test PR Review Reply

TRANS_BY_GPT3

}

SrsRtcPlayStream::~SrsRtcPlayStream()
Expand All @@ -448,6 +450,9 @@ SrsRtcPlayStream::~SrsRtcPlayStream()

_srs_config->unsubscribe(this);

if (timer_rtcp_) {
srs_freep(timer_rtcp_);
}
srs_freep(nack_epp);
srs_freep(pli_worker_);
srs_freep(trd_);
Expand Down Expand Up @@ -681,6 +686,26 @@ srs_error_t SrsRtcPlayStream::cycle()
}
}

srs_error_t SrsRtcPlayStream::send_rtcp_sr() {
srs_error_t err = srs_success;

for (auto& item : video_tracks_) {
SrsRtcVideoSendTrack* track = item.second;
if ((err = track->send_rtcp_sr()) != srs_success) {
return srs_error_wrap(err, "video send rtcp sr error track=%s", track->get_track_id().c_str());
}
}

for (auto& item : audio_tracks_) {
SrsRtcAudioSendTrack* track = item.second;
if ((err = track->send_rtcp_sr()) != srs_success) {
return srs_error_wrap(err, "video send rtcp sr error track=%s", track->get_track_id().c_str());
}
}

return err;
}

srs_error_t SrsRtcPlayStream::send_packet(SrsRtpPacket*& pkt)
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -779,8 +804,9 @@ void SrsRtcPlayStream::set_all_tracks_status(bool status)
srs_error_t SrsRtcPlayStream::on_rtcp(SrsRtcpCommon* rtcp)
{
if(SrsRtcpType_rr == rtcp->type()) {
int64_t now_ms = srs_update_system_time()/1000;
SrsRtcpRR* rr = dynamic_cast<SrsRtcpRR*>(rtcp);
return on_rtcp_rr(rr);
return on_rtcp_rr(rr, now_ms);
} else if(SrsRtcpType_rtpfb == rtcp->type()) {
//currently rtpfb of nack will be handle by player. TWCC will be handled by SrsRtcConnection
SrsRtcpNack* nack = dynamic_cast<SrsRtcpNack*>(rtcp);
Expand All @@ -799,12 +825,29 @@ srs_error_t SrsRtcPlayStream::on_rtcp(SrsRtcpCommon* rtcp)
}
}

srs_error_t SrsRtcPlayStream::on_rtcp_rr(SrsRtcpRR* rtcp)
srs_error_t SrsRtcPlayStream::on_rtcp_rr(SrsRtcpRR* rtcp, int64_t now_ms)
{
srs_error_t err = srs_success;

// TODO: FIXME: Implements it.
//srs_trace("receive rtcp rr block count:%lu", rtcp->rr_blocks_.size());
for (SrsRtcpRB& rb : rtcp->rr_blocks_) {
uint32_t ssrc = rb.ssrc;

for(auto item : audio_tracks_) {
if(ssrc == item.second->track_desc_->ssrc_) {
item.second->handle_rtcp_rr(rb, now_ms);
return err;
}
}

for(auto item : video_tracks_) {
if(ssrc == item.second->track_desc_->ssrc_) {
item.second->handle_rtcp_rr(rb, now_ms);
return err;
}
}
srs_warn("rtcp rr find to find track by ssrc:%u", ssrc);
}
return err;
}

Expand Down Expand Up @@ -928,6 +971,32 @@ srs_error_t SrsRtcPlayStream::do_request_keyframe(uint32_t ssrc, SrsContextId ci
return err;
}

SrsRtcPlayRtcpTimer::SrsRtcPlayRtcpTimer(SrsRtcPlayStream* p) : p_(p)
{
_srs_hybrid->timer1s()->subscribe(this);
}

SrsRtcPlayRtcpTimer::~SrsRtcPlayRtcpTimer()
{
_srs_hybrid->timer1s()->unsubscribe(this);
}

srs_error_t SrsRtcPlayRtcpTimer::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;

if (!p_->is_started) {
return err;
}

if ((err = p_->send_rtcp_sr()) != srs_success) {
srs_warn("RR err %s", srs_error_desc(err).c_str());
srs_freep(err);
}

return err;
}

SrsRtcPublishRtcpTimer::SrsRtcPublishRtcpTimer(SrsRtcPublishStream* p) : p_(p)
{
_srs_hybrid->timer1s()->subscribe(this);
Expand Down Expand Up @@ -1298,15 +1367,15 @@ srs_error_t SrsRtcPublishStream::send_rtcp_rr()
{
srs_error_t err = srs_success;

for (int i = 0; i < (int)video_tracks_.size(); ++i) {
SrsRtcVideoRecvTrack* track = video_tracks_.at(i);
for (auto& item : video_tracks_) {
SrsRtcVideoRecvTrack* track = item;
if ((err = track->send_rtcp_rr()) != srs_success) {
return srs_error_wrap(err, "track=%s", track->get_track_id().c_str());
}
}

for (int i = 0; i < (int)audio_tracks_.size(); ++i) {
SrsRtcAudioRecvTrack* track = audio_tracks_.at(i);
for (auto& item : audio_tracks_) {
SrsRtcAudioRecvTrack* track = item;
if ((err = track->send_rtcp_rr()) != srs_success) {
return srs_error_wrap(err, "track=%s", track->get_track_id().c_str());
}
Expand Down Expand Up @@ -2079,9 +2148,14 @@ srs_error_t SrsRtcConnection::dispatch_rtcp(SrsRtcpCommon* rtcp)
// Ignore special packet.
if (SrsRtcpType_rr == rtcp->type()) {
SrsRtcpRR* rr = dynamic_cast<SrsRtcpRR*>(rtcp);
if (rr->get_rb_ssrc() == 0) { //for native client
if (rr->rr_blocks_.empty()) { //for native client
return err;
}
for (auto& rb : rr->rr_blocks_) {
if (rb.ssrc == 0) {
return err;
}
}
}

// The feedback packet for specified SSRC.
Expand All @@ -2091,7 +2165,14 @@ srs_error_t SrsRtcConnection::dispatch_rtcp(SrsRtcpCommon* rtcp)
required_publisher_ssrc = rtcp->get_ssrc();
} else if (SrsRtcpType_rr == rtcp->type()) {
SrsRtcpRR* rr = dynamic_cast<SrsRtcpRR*>(rtcp);
required_player_ssrc = rr->get_rb_ssrc();
for(auto& rb : rr->rr_blocks_) {
uint32_t ssrc = rb.ssrc;
auto it = players_ssrc_map_.find(ssrc);
if (it != players_ssrc_map_.end()) {
it->second->on_rtcp(rtcp);
break;
}
}
} else if (SrsRtcpType_rtpfb == rtcp->type()) {
if(1 == rtcp->get_rc()) {
SrsRtcpNack* nack = dynamic_cast<SrsRtcpNack*>(rtcp);
Expand Down
21 changes: 20 additions & 1 deletion trunk/src/app/srs_app_rtc_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class SrsRtcNetworks;
class SrsRtcUdpNetwork;
class ISrsRtcNetwork;
class SrsRtcTcpNetwork;
class SrsRtcPlayRtcpTimer;

const uint8_t kSR = 200;
const uint8_t kRR = 201;
Expand Down Expand Up @@ -210,6 +211,7 @@ class SrsRtcAsyncCallOnStop : public ISrsAsyncCallTask
class SrsRtcPlayStream : public ISrsCoroutineHandler, public ISrsReloadHandler
, public ISrsRtcPLIWorkerHandler, public ISrsRtcSourceChangeCallback
{
friend class SrsRtcPlayRtcpTimer;
private:
SrsContextId cid_;
SrsFastCoroutine* trd_;
Expand All @@ -223,6 +225,8 @@ class SrsRtcPlayStream : public ISrsCoroutineHandler, public ISrsReloadHandler
std::map<uint32_t, SrsRtcVideoSendTrack*> video_tracks_;
// The pithy print for special stage.
SrsErrorPithyPrint* nack_epp;
private:
SrsRtcPlayRtcpTimer* timer_rtcp_;
private:
// Fast cache for tracks.
uint32_t cache_ssrc0_;
Expand Down Expand Up @@ -259,6 +263,8 @@ class SrsRtcPlayStream : public ISrsCoroutineHandler, public ISrsReloadHandler
virtual void stop();
public:
virtual srs_error_t cycle();
public:
srs_error_t send_rtcp_sr();
private:
srs_error_t send_packet(SrsRtpPacket*& pkt);
public:
Expand All @@ -270,7 +276,7 @@ class SrsRtcPlayStream : public ISrsCoroutineHandler, public ISrsReloadHandler
srs_error_t on_rtcp_xr(SrsRtcpXr* rtcp);
srs_error_t on_rtcp_nack(SrsRtcpNack* rtcp);
srs_error_t on_rtcp_ps_feedback(SrsRtcpFbCommon* rtcp);
srs_error_t on_rtcp_rr(SrsRtcpRR* rtcp);
srs_error_t on_rtcp_rr(SrsRtcpRR* rtcp, int64_t now_ms);
uint32_t get_video_publish_ssrc(uint32_t play_ssrc);
// Interface ISrsRtcPLIWorkerHandler
public:
Expand All @@ -290,6 +296,19 @@ class SrsRtcPublishRtcpTimer : public ISrsFastTimer
srs_error_t on_timer(srs_utime_t interval);
};

// A fast timer for play stream, for RTCP feedback.
class SrsRtcPlayRtcpTimer : public ISrsFastTimer
{
private:
SrsRtcPlayStream* p_;
public:
SrsRtcPlayRtcpTimer(SrsRtcPlayStream* p);
virtual ~SrsRtcPlayRtcpTimer();
// interface ISrsFastTimer
private:
srs_error_t on_timer(srs_utime_t interval);
};

// A fast timer for publish stream, for TWCC feedback.
class SrsRtcPublishTwccTimer : public ISrsFastTimer
{
Expand Down
77 changes: 77 additions & 0 deletions trunk/src/app/srs_app_rtc_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2638,6 +2638,13 @@ SrsRtcSendTrack::SrsRtcSendTrack(SrsRtcConnection* session, SrsRtcTrackDescripti
}

nack_epp = new SrsErrorPithyPrint();

//for rtcp rr
lost_total_ = 0;
jitter_ = 0;
lost_rate_ = 0.0;
rtt_ = 0.0;
avg_rtt_ = 10.0;
}

SrsRtcSendTrack::~SrsRtcSendTrack()
Expand Down Expand Up @@ -2710,6 +2717,74 @@ void SrsRtcSendTrack::rebuild_packet(SrsRtpPacket* pkt)
srs_info("RTC: Correct %s seq=%u/%u, ts=%u/%u", track_desc_->type_.c_str(), seq, pkt->header.get_sequence(), ts, pkt->header.get_timestamp());
}

srs_error_t SrsRtcSendTrack::send_rtcp_sr() {
srs_error_t err = srs_success;
SrsRtcpSR* sr = new SrsRtcpSR();
uint32_t ssrc = track_desc_->ssrc_;
int64_t now_ms = srs_update_system_time()/1000;

last_sr_ntp_ = SrsNtp::from_time_ms(now_ms);
int64_t current_sr = ((last_sr_ntp_.ntp_second_ & 0xffff) << 16) | (last_sr_ntp_.ntp_fractions_ & 0xffff);
int64_t diff_ms = now_ms - last_rtp_ms_;
int64_t diff_ts = diff_ms * track_desc_->media_->sample_ / 1000;
int64_t video_rtp_ts = last_rtp_pkt_ts_ + diff_ts;

//srs_trace("send rtcp sr ssrc:%u, current_sr:%ld, last_sr:%ld, diff:%ld", ssrc, current_sr, last_sr_, current_sr - last_sr_);
//srs_trace("send rtcp sr ssrc:%u, current ms:%ld, last ms:%ld, diff:%ld", ssrc, now_ms, last_ms_, now_ms - last_ms_);
last_sr_ = current_sr;
last_ms_ = now_ms;
sr->set_ssrc(ssrc);
sr->set_ntp(last_sr_ntp_.ntp_);
sr->set_rtp_ts(video_rtp_ts);
sr->set_rtp_send_packets(send_count_);
sr->set_rtp_send_bytes(send_bytes_);

char data[1500];
SrsBuffer buffer(data, sr->nb_bytes());
sr->encode(&buffer);
delete sr;
sr = nullptr;

session_->send_rtcp(buffer.data(), buffer.size());

return err;
}

void SrsRtcSendTrack::update_rtp_static(int64_t len, uint32_t rtp_ts) {
send_count_++;
send_bytes_ += len;
last_rtp_pkt_ts_ = rtp_ts;
last_rtp_ms_ = srs_get_system_time() / 1000;//ms
}

srs_error_t SrsRtcSendTrack::handle_rtcp_rr(const SrsRtcpRB& rb, int64_t now_ms) {
jitter_ = rb.jitter;
lost_rate_ = rb.fraction_lost / 256.0;
lost_total_ = rb.lost_packets;

int64_t lsr = rb.lsr;
int64_t dlsr = rb.dlsr;

SrsNtp now_ntp = SrsNtp::from_time_ms(now_ms);

uint32_t compact_ntp = (now_ntp.ntp_second_ & 0x0000FFFF) << 16;
compact_ntp |= (now_ntp.ntp_fractions_ & 0xFFFF0000) >> 16;

uint32_t rtt = 0;
if (lsr && dlsr && (compact_ntp > dlsr + lsr)) {
rtt = compact_ntp - dlsr - lsr;
}
//srs_trace("hand rtcp rr ssrc:%u, compact ntp:%lu, dlsr:%u, lsr:%u",
// rb.ssrc, compact_ntp, dlsr, lsr);
rtt_ = static_cast<float>(rtt >> 16) * 1000.0;
rtt_ += (static_cast<float>(rtt & 0x0000FFFF) / 65536.0) * 1000.0;

avg_rtt_ += (rtt_ - avg_rtt_) / 4.0;
//srs_trace("handle rtcp rr ssrc:%u, lost total:%u, lost rate:%.03f, jitter:%u, rtt_:%.02f, avg rtt:%.02f",
// rb.ssrc, lost_total_, lost_rate_, jitter_, rtt_, avg_rtt_);
return srs_success;
}

srs_error_t SrsRtcSendTrack::on_nack(SrsRtpPacket** ppkt)
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -2789,6 +2864,7 @@ srs_error_t SrsRtcAudioSendTrack::on_rtp(SrsRtpPacket* pkt)

// Rebuild the sequence number and timestamp of packet, see https://github.com/ossrs/srs/issues/3167
rebuild_packet(pkt);
update_rtp_static(pkt->nb_bytes(), pkt->header.get_timestamp());

if ((err = session_->do_send_packet(pkt)) != srs_success) {
return srs_error_wrap(err, "raw send");
Expand Down Expand Up @@ -2839,6 +2915,7 @@ srs_error_t SrsRtcVideoSendTrack::on_rtp(SrsRtpPacket* pkt)

// Rebuild the sequence number and timestamp of packet, see https://github.com/ossrs/srs/issues/3167
rebuild_packet(pkt);
update_rtp_static(pkt->nb_bytes(), pkt->header.get_timestamp());

if ((err = session_->do_send_packet(pkt)) != srs_success) {
return srs_error_wrap(err, "raw send");
Expand Down
21 changes: 21 additions & 0 deletions trunk/src/app/srs_app_rtc_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <srs_app_hourglass.hpp>
#include <srs_protocol_format.hpp>
#include <srs_app_stream_bridge.hpp>
#include <srs_kernel_rtc_rtcp.hpp>

class SrsRequest;
class SrsMetaCache;
Expand Down Expand Up @@ -685,6 +686,13 @@ class SrsRtcSendTrack
bool nack_no_copy_;
// The pithy print for special stage.
SrsErrorPithyPrint* nack_epp;
private:
//for rtcp rr
int64_t jitter_;
float lost_rate_;
int64_t lost_total_;
float rtt_;
float avg_rtt_;
public:
SrsRtcSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc, bool is_audio);
virtual ~SrsRtcSendTrack();
Expand All @@ -706,6 +714,19 @@ class SrsRtcSendTrack
virtual srs_error_t on_rtp(SrsRtpPacket* pkt) = 0;
virtual srs_error_t on_rtcp(SrsRtpPacket* pkt) = 0;
virtual srs_error_t on_recv_nack(const std::vector<uint16_t>& lost_seqs);
public:
srs_error_t send_rtcp_sr();
void update_rtp_static(int64_t len, uint32_t rtp_ts);
public:
srs_error_t handle_rtcp_rr(const SrsRtcpRB& rb, int64_t now_ms);
protected:
int64_t send_bytes_ = 0;
int64_t send_count_ = 0;
int64_t last_rtp_pkt_ts_ = 0;
int64_t last_rtp_ms_ = 0;
int64_t last_sr_ = 0;//for debug
int64_t last_ms_ = 0;//for debug
SrsNtp last_sr_ntp_;
};

class SrsRtcAudioSendTrack : public SrsRtcSendTrack
Expand Down
Loading
Loading