Skip to content

Commit

Permalink
[spo] roc-streaminggh-183 Forward packet reader errors in depacketizer
Browse files Browse the repository at this point in the history
If underlying packet reader returns error (not StatusOK and
not StatusDrain), Depacketizer forwards this error.

Sponsored-by: waspd
  • Loading branch information
gavv committed Jun 20, 2024
1 parent aa89886 commit 7599194
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 53 deletions.
5 changes: 3 additions & 2 deletions src/internal_modules/roc_audio/depacketizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ status::StatusCode Depacketizer::read(Frame& frame,

while (buff_ptr < buff_end) {
const status::StatusCode code = update_packet_(frame_stats);
// TODO(gh-183): forward status from packet reader
(void)code;
if (code != status::StatusOK && code != status::StatusDrain) {
return code;
}

buff_ptr = read_samples_(buff_ptr, buff_end, frame_stats);
}
Expand Down
94 changes: 43 additions & 51 deletions src/tests/roc_audio/test_depacketizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,27 +98,29 @@ void expect_values(const sample_t* samples, size_t num_samples, sample_t value)
}

void expect_output(Depacketizer& depacketizer,
size_t sz,
size_t samples_per_chan,
sample_t value,
core::nanoseconds_t capt_ts) {
FramePtr frame = frame_factory.allocate_frame_no_buffer();
CHECK(frame);

LONGS_EQUAL(status::StatusOK,
depacketizer.read(*frame, (packet::stream_timestamp_t)sz));
depacketizer.read(*frame, (packet::stream_timestamp_t)samples_per_chan));

CHECK(frame->is_raw());

UNSIGNED_LONGS_EQUAL(sz, frame->duration());
UNSIGNED_LONGS_EQUAL(sz * frame_spec.num_channels(), frame->num_raw_samples());
UNSIGNED_LONGS_EQUAL(samples_per_chan, frame->duration());
UNSIGNED_LONGS_EQUAL(samples_per_chan * frame_spec.num_channels(),
frame->num_raw_samples());

CHECK(core::ns_equal_delta(frame->capture_timestamp(), capt_ts, core::Microsecond));

expect_values(frame->raw_samples(), sz * frame_spec.num_channels(), value);
expect_values(frame->raw_samples(), samples_per_chan * frame_spec.num_channels(),
value);
}

void expect_flags(Depacketizer& depacketizer,
size_t sz,
size_t samples_per_chan,
unsigned int flags,
core::nanoseconds_t capt_ts = -1) {
const core::nanoseconds_t epsilon = 100 * core::Microsecond;
Expand All @@ -127,52 +129,44 @@ void expect_flags(Depacketizer& depacketizer,
CHECK(frame);

LONGS_EQUAL(status::StatusOK,
depacketizer.read(*frame, (packet::stream_timestamp_t)sz));
depacketizer.read(*frame, (packet::stream_timestamp_t)samples_per_chan));

UNSIGNED_LONGS_EQUAL(flags, frame->flags());
if (capt_ts >= 0) {
CHECK(core::ns_equal_delta(frame->capture_timestamp(), capt_ts, epsilon));
}
}

class TestReader : public packet::IReader {
void expect_error(Depacketizer& depacketizer,
size_t samples_per_chan,
status::StatusCode expected_status) {
FramePtr frame = frame_factory.allocate_frame_no_buffer();
CHECK(frame);

LONGS_EQUAL(expected_status,
depacketizer.read(*frame, (packet::stream_timestamp_t)samples_per_chan));
}

class StatusReader : public packet::IReader {
public:
explicit TestReader(packet::IReader& reader)
explicit StatusReader(packet::IReader& reader)
: reader_(reader)
, call_count_(0)
, code_enabled_(false)
, code_(status::NoStatus) {
}

virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr& pp) {
++call_count_;

if (code_enabled_) {
virtual status::StatusCode read(packet::PacketPtr& pp) {
if (code_ != status::NoStatus && code_ != status::StatusOK) {
return code_;
}

return reader_.read(pp);
}

void enable_status_code(status::StatusCode code) {
code_enabled_ = true;
void set_status(status::StatusCode code) {
code_ = code;
}

void disable_status_code() {
code_enabled_ = false;
code_ = status::NoStatus;
}

unsigned call_count() const {
return call_count_;
}

private:
packet::IReader& reader_;

unsigned call_count_;
bool code_enabled_;
status::StatusCode code_;
};

Expand Down Expand Up @@ -731,33 +725,31 @@ TEST(depacketizer, timestamp_small_non_zero_cts) {
expect_output(dp, SamplesPerPacket * PacketsPerFrame, 0.2f, second_frame_capt_ts);
}

TEST(depacketizer, read_after_error) {
const status::StatusCode codes[] = {
status::StatusDrain,
status::StatusAbort,
};
TEST(depacketizer, forward_error) {
PcmEncoder encoder(packet_spec);
PcmDecoder decoder(packet_spec);

for (unsigned n = 0; n < ROC_ARRAY_SIZE(codes); ++n) {
PcmEncoder encoder(packet_spec);
PcmDecoder decoder(packet_spec);
packet::Queue queue;
StatusReader reader(queue);
Depacketizer dp(reader, decoder, frame_factory, frame_spec, false);
LONGS_EQUAL(status::StatusOK, dp.init_status());

packet::Queue queue;
TestReader reader(queue);
Depacketizer dp(reader, decoder, frame_factory, frame_spec, false);
LONGS_EQUAL(status::StatusOK, dp.init_status());
// push one packet
write_packet(queue, new_packet(encoder, 0, 0.11f, 0));

write_packet(queue, new_packet(encoder, 0, 0.11f, Now));
// read first half of packet
expect_output(dp, SamplesPerPacket / 2, 0.11f, 0);

LONGS_EQUAL(0, reader.call_count());
// packet reader will now return error
reader.set_status(status::StatusAbort);

reader.enable_status_code(codes[n]);
expect_output(dp, SamplesPerPacket, 0.00f, 0);
LONGS_EQUAL(1, reader.call_count());
// read second half of packet
// no error because depacketizer still has buffered packet
expect_output(dp, SamplesPerPacket / 2, 0.11f, 0);

reader.disable_status_code();
expect_output(dp, SamplesPerPacket, 0.11f, Now);
LONGS_EQUAL(2, reader.call_count());
}
// try to read more
// get error because depacketizer tries to read packet
expect_error(dp, SamplesPerPacket, status::StatusAbort);
}

} // namespace audio
Expand Down

0 comments on commit 7599194

Please sign in to comment.