Skip to content

Commit

Permalink
[sp] roc-streaminggh-183 roc-streaminggh-615 Prepare Depacketizer to …
Browse files Browse the repository at this point in the history
…handle packet status

This commit does not change behavior, but restructures code of
Depacketizer so that we can update it to handle status codes
from packet reader and return errors or partial reads.

Sponsored-by: waspd
  • Loading branch information
gavv committed Jun 20, 2024
1 parent bb5f83c commit 282d484
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 99 deletions.
197 changes: 107 additions & 90 deletions src/internal_modules/roc_audio/depacketizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ Depacketizer::Depacketizer(packet::IReader& packet_reader,
, stream_ts_(0)
, next_capture_ts_(0)
, valid_capture_ts_(false)
, zero_samples_(0)
, padding_samples_(0)
, missing_samples_(0)
, packet_samples_(0)
, decoded_samples_(0)
, fetched_packets_(0)
, dropped_packets_(0)
, rate_limiter_(LogInterval)
, beep_(beep)
, first_packet_(true)
Expand Down Expand Up @@ -80,42 +82,42 @@ packet::stream_timestamp_t Depacketizer::next_timestamp() const {
return stream_ts_;
}

status::StatusCode Depacketizer::read(Frame& out_frame,
status::StatusCode Depacketizer::read(Frame& frame,
packet::stream_timestamp_t requested_duration) {
roc_panic_if(init_status_ != status::StatusOK);

const packet::stream_timestamp_t capped_duration = sample_spec_.cap_frame_duration(
requested_duration, frame_factory_.byte_buffer_size());

if (!frame_factory_.reallocate_frame(
out_frame, sample_spec_.stream_timestamp_2_bytes(capped_duration))) {
frame, sample_spec_.stream_timestamp_2_bytes(capped_duration))) {
return status::StatusNoMem;
}

out_frame.set_raw(true);
frame.set_raw(true);

sample_t* buff_ptr = out_frame.raw_samples();
sample_t* buff_end = out_frame.raw_samples() + out_frame.num_raw_samples();
sample_t* buff_ptr = frame.raw_samples();
sample_t* buff_end = frame.raw_samples() + frame.num_raw_samples();

FrameInfo frame_info;
FrameStats frame_stats;

while (buff_ptr < buff_end) {
buff_ptr = read_samples_(buff_ptr, buff_end, frame_info);
const status::StatusCode code = update_packet_(frame_stats);
// TODO(gh-183): forward status from packet reader
(void)code;

buff_ptr = read_samples_(buff_ptr, buff_end, frame_stats);
}
roc_panic_if(buff_ptr != buff_end);

set_frame_info_(out_frame, frame_info);

commit_frame_(frame, frame_stats);
report_stats_();

// TODO(gh-183): forward status from packet reader
return capped_duration == requested_duration ? status::StatusOK : status::StatusPart;
}

sample_t*
Depacketizer::read_samples_(sample_t* buff_ptr, sample_t* buff_end, FrameInfo& info) {
update_packet_(info);

Depacketizer::read_samples_(sample_t* buff_ptr, sample_t* buff_end, FrameStats& stats) {
if (packet_) {
packet::stream_timestamp_t next_timestamp = payload_decoder_.position();

Expand All @@ -137,43 +139,43 @@ Depacketizer::read_samples_(sample_t* buff_ptr, sample_t* buff_end, FrameInfo& i
// timestamp_
//
// Frame |----------------|
info.n_filled_samples += n_samples;
if (!info.capture_ts && valid_capture_ts_) {
info.capture_ts = next_capture_ts_
- sample_spec_.samples_overall_2_ns(info.n_filled_samples);
stats.n_filled_samples += n_samples;
if (!stats.capture_ts && valid_capture_ts_) {
stats.capture_ts = next_capture_ts_
- sample_spec_.samples_overall_2_ns(stats.n_filled_samples);
}
}

if (buff_ptr < buff_end) {
sample_t* new_buff_ptr = read_packet_samples_(buff_ptr, buff_end);
const size_t n_samples = size_t(new_buff_ptr - buff_ptr);

info.n_decoded_samples += n_samples;
if (n_samples && !info.capture_ts && valid_capture_ts_) {
info.capture_ts = next_capture_ts_
- sample_spec_.samples_overall_2_ns(info.n_filled_samples);
stats.n_decoded_samples += n_samples;
if (n_samples && !stats.capture_ts && valid_capture_ts_) {
stats.capture_ts = next_capture_ts_
- sample_spec_.samples_overall_2_ns(stats.n_filled_samples);
}
if (valid_capture_ts_) {
next_capture_ts_ += sample_spec_.samples_overall_2_ns(n_samples);
}

info.n_filled_samples += n_samples;
stats.n_filled_samples += n_samples;
buff_ptr = new_buff_ptr;
}

return buff_ptr;
} else {
const size_t n_samples = size_t(buff_end - buff_ptr);

if (!info.capture_ts && valid_capture_ts_) {
info.capture_ts = next_capture_ts_
- sample_spec_.samples_overall_2_ns(info.n_filled_samples);
if (!stats.capture_ts && valid_capture_ts_) {
stats.capture_ts = next_capture_ts_
- sample_spec_.samples_overall_2_ns(stats.n_filled_samples);
}
if (valid_capture_ts_) {
next_capture_ts_ += sample_spec_.samples_overall_2_ns(n_samples);
}

info.n_filled_samples += n_samples;
stats.n_filled_samples += n_samples;
return read_missing_samples_(buff_ptr, buff_end);
}
}
Expand All @@ -185,7 +187,7 @@ sample_t* Depacketizer::read_packet_samples_(sample_t* buff_ptr, sample_t* buff_
const size_t decoded_samples = payload_decoder_.read(buff_ptr, requested_samples);

stream_ts_ += (packet::stream_timestamp_t)decoded_samples;
packet_samples_ += (packet::stream_timestamp_t)decoded_samples;
decoded_samples_ += (packet::stream_timestamp_t)decoded_samples;

if (decoded_samples < requested_samples) {
payload_decoder_.end();
Expand All @@ -208,56 +210,76 @@ sample_t* Depacketizer::read_missing_samples_(sample_t* buff_ptr, sample_t* buff
stream_ts_ += (packet::stream_timestamp_t)num_samples;

if (first_packet_) {
zero_samples_ += (packet::stream_timestamp_t)num_samples;
padding_samples_ += (packet::stream_timestamp_t)num_samples;
} else {
missing_samples_ += (packet::stream_timestamp_t)num_samples;
}

return (buff_ptr + num_samples * sample_spec_.num_channels());
}

void Depacketizer::update_packet_(FrameInfo& info) {
status::StatusCode Depacketizer::update_packet_(FrameStats& frame_stats) {
if (packet_) {
return;
// Already have packet.
return status::StatusOK;
}

packet::stream_timestamp_t pkt_timestamp = 0;
unsigned n_dropped = 0;

while ((packet_ = read_packet_())) {
payload_decoder_.begin(packet_->stream_timestamp(), packet_->payload().data(),
packet_->payload().size());

pkt_timestamp = payload_decoder_.position();
for (;;) {
const status::StatusCode code = fetch_packet_();
if (code != status::StatusOK) {
return code;
}

if (first_packet_) {
if (start_packet_()) {
break;
}

const packet::stream_timestamp_t pkt_end =
pkt_timestamp + payload_decoder_.available();
// Try next packet.
frame_stats.n_dropped_packets++;
}

if (packet::stream_timestamp_lt(stream_ts_, pkt_end)) {
break;
}
roc_panic_if(!packet_);

roc_log(LogDebug, "depacketizer: dropping late packet: ts=%lu pkt_ts=%lu",
(unsigned long)stream_ts_, (unsigned long)pkt_timestamp);
return status::StatusOK;
}

n_dropped++;
status::StatusCode Depacketizer::fetch_packet_() {
packet::PacketPtr pp;
const status::StatusCode code = packet_reader_.read(pp);

payload_decoder_.end();
if (code != status::StatusOK) {
if (code != status::StatusDrain) {
roc_log(LogError, "depacketizer: failed to read packet: status=%s",
status::code_to_str(code));
}
return code;
}

if (n_dropped != 0) {
roc_log(LogDebug, "depacketizer: fetched=%d dropped=%u", (int)!!packet_,
n_dropped);
packet_ = pp;
fetched_packets_++;

info.n_dropped_packets += n_dropped;
}
return status::StatusOK;
}

if (!packet_) {
return;
bool Depacketizer::start_packet_() {
roc_panic_if(!packet_);

payload_decoder_.begin(packet_->stream_timestamp(), packet_->payload().data(),
packet_->payload().size());

const packet::stream_timestamp_t pkt_begin = payload_decoder_.position();
const packet::stream_timestamp_t pkt_end = pkt_begin + payload_decoder_.available();

if (!first_packet_ && !packet::stream_timestamp_lt(stream_ts_, pkt_end)) {
roc_log(LogTrace, "depacketizer: dropping late packet: stream_ts=%lu pkt_ts=%lu",
(unsigned long)stream_ts_, (unsigned long)pkt_begin);

dropped_packets_++;

payload_decoder_.end();
packet_ = NULL;

return false;
}

next_capture_ts_ = packet_->capture_timestamp();
Expand All @@ -266,18 +288,18 @@ void Depacketizer::update_packet_(FrameInfo& info) {
}

if (first_packet_) {
roc_log(LogDebug, "depacketizer: got first packet: zero_samples=%lu",
(unsigned long)zero_samples_);
roc_log(LogDebug, "depacketizer: got first packet: padding_samples=%lu",
(unsigned long)padding_samples_);

stream_ts_ = pkt_timestamp;
stream_ts_ = pkt_begin;
first_packet_ = false;
}

// Packet |-----------------|
// NextFrame |----------------|
if (packet::stream_timestamp_lt(pkt_timestamp, stream_ts_)) {
if (packet::stream_timestamp_lt(pkt_begin, stream_ts_)) {
const size_t diff_samples =
(size_t)packet::stream_timestamp_diff(stream_ts_, pkt_timestamp);
(size_t)packet::stream_timestamp_diff(stream_ts_, pkt_begin);
if (valid_capture_ts_) {
next_capture_ts_ += sample_spec_.samples_per_chan_2_ns(diff_samples);
}
Expand All @@ -286,46 +308,32 @@ void Depacketizer::update_packet_(FrameInfo& info) {
roc_panic("depacketizer: can't shift packet");
}
}
}

packet::PacketPtr Depacketizer::read_packet_() {
packet::PacketPtr pp;
const status::StatusCode code = packet_reader_.read(pp);
if (code != status::StatusOK) {
if (code != status::StatusDrain) {
// TODO(gh-302): forward status
roc_log(LogError, "depacketizer: failed to read packet: status=%s",
status::code_to_str(code));
}

return NULL;
}

return pp;
return true;
}

void Depacketizer::set_frame_info_(Frame& frame, const FrameInfo& info) {
void Depacketizer::commit_frame_(Frame& frame, const FrameStats& frame_stats) {
unsigned flags = 0;

if (info.n_decoded_samples != 0) {
if (frame_stats.n_decoded_samples != 0) {
flags |= Frame::HasSignal;
}

if (info.n_decoded_samples < frame.num_raw_samples()) {
if (frame_stats.n_decoded_samples < frame.num_raw_samples()) {
flags |= Frame::HasHoles;
}

if (info.n_dropped_packets != 0) {
if (frame_stats.n_dropped_packets != 0) {
flags |= Frame::HasPacketDrops;
}

frame.set_flags(flags);
frame.set_duration(frame.num_raw_samples() / sample_spec_.num_channels());

if (info.capture_ts > 0) {
// do not produce negative cts, which may happen when first packet was in
// the middle of the frame and has small timestamp close to unix epoch
frame.set_capture_timestamp(info.capture_ts);
if (frame_stats.capture_ts > 0) {
// Do not produce negative cts, which may happen when first packet was in
// the middle of the frame and has small timestamp close to unix epoch.
frame.set_capture_timestamp(frame_stats.capture_ts);
}
}

Expand All @@ -334,12 +342,21 @@ void Depacketizer::report_stats_() {
return;
}

const size_t total_samples = missing_samples_ + packet_samples_;
const double loss_ratio =
total_samples != 0 ? (double)missing_samples_ / total_samples : 0.;
const double loss_ratio = (missing_samples_ + decoded_samples_) != 0
? (double)missing_samples_ / (missing_samples_ + decoded_samples_)
: 0.;

const double drop_ratio =
fetched_packets_ != 0 ? (double)dropped_packets_ / fetched_packets_ : 0.;

roc_log(LogDebug,
"depacketizer:"
" fetched_pkts=%lu dropped_pkts=%lu loss_ratio=%.5lf late_ratio=%.5lf",
(unsigned long)fetched_packets_, (unsigned long)dropped_packets_, loss_ratio,
drop_ratio);

roc_log(LogDebug, "depacketizer: ts=%lu loss_ratio=%.5lf", (unsigned long)stream_ts_,
loss_ratio);
missing_samples_ = decoded_samples_ = 0;
fetched_packets_ = dropped_packets_ = 0;
}

} // namespace audio
Expand Down
Loading

0 comments on commit 282d484

Please sign in to comment.