From bb17ba96b1222252305956f387b40f13c1d66c3c Mon Sep 17 00:00:00 2001 From: Victor Gaydov Date: Thu, 30 May 2024 23:53:09 +0400 Subject: [PATCH] gh-183 Forward packet reader errors in depacketizer If underlying packet reader returns error (not StatusOK and not StatusDrain), Depacketizer forwards this error. Sponsored-by: waspd --- .../roc_audio/depacketizer.cpp | 5 +- src/tests/roc_audio/test_depacketizer.cpp | 94 +++++++++---------- 2 files changed, 46 insertions(+), 53 deletions(-) diff --git a/src/internal_modules/roc_audio/depacketizer.cpp b/src/internal_modules/roc_audio/depacketizer.cpp index cfc1baf97..6dc79ec5b 100644 --- a/src/internal_modules/roc_audio/depacketizer.cpp +++ b/src/internal_modules/roc_audio/depacketizer.cpp @@ -103,8 +103,9 @@ status::StatusCode Depacketizer::read(Frame& frame, while (buff_ptr < buff_end) { const status::StatusCode code = update_packet_(frame_stats); - // TODO(gh-183): forward status from packet reader - (void)code; + if (code != status::StatusOK && code != status::StatusDrain) { + return code; + } buff_ptr = read_samples_(buff_ptr, buff_end, frame_stats); } diff --git a/src/tests/roc_audio/test_depacketizer.cpp b/src/tests/roc_audio/test_depacketizer.cpp index 93fe21788..25bfdd659 100644 --- a/src/tests/roc_audio/test_depacketizer.cpp +++ b/src/tests/roc_audio/test_depacketizer.cpp @@ -98,27 +98,29 @@ void expect_values(const sample_t* samples, size_t num_samples, sample_t value) } void expect_output(Depacketizer& depacketizer, - size_t sz, + size_t samples_per_chan, sample_t value, core::nanoseconds_t capt_ts) { FramePtr frame = frame_factory.allocate_frame_no_buffer(); CHECK(frame); LONGS_EQUAL(status::StatusOK, - depacketizer.read(*frame, (packet::stream_timestamp_t)sz)); + depacketizer.read(*frame, (packet::stream_timestamp_t)samples_per_chan)); CHECK(frame->is_raw()); - UNSIGNED_LONGS_EQUAL(sz, frame->duration()); - UNSIGNED_LONGS_EQUAL(sz * frame_spec.num_channels(), frame->num_raw_samples()); + UNSIGNED_LONGS_EQUAL(samples_per_chan, frame->duration()); + UNSIGNED_LONGS_EQUAL(samples_per_chan * frame_spec.num_channels(), + frame->num_raw_samples()); CHECK(core::ns_equal_delta(frame->capture_timestamp(), capt_ts, core::Microsecond)); - expect_values(frame->raw_samples(), sz * frame_spec.num_channels(), value); + expect_values(frame->raw_samples(), samples_per_chan * frame_spec.num_channels(), + value); } void expect_flags(Depacketizer& depacketizer, - size_t sz, + size_t samples_per_chan, unsigned int flags, core::nanoseconds_t capt_ts = -1) { const core::nanoseconds_t epsilon = 100 * core::Microsecond; @@ -127,7 +129,7 @@ void expect_flags(Depacketizer& depacketizer, CHECK(frame); LONGS_EQUAL(status::StatusOK, - depacketizer.read(*frame, (packet::stream_timestamp_t)sz)); + depacketizer.read(*frame, (packet::stream_timestamp_t)samples_per_chan)); UNSIGNED_LONGS_EQUAL(flags, frame->flags()); if (capt_ts >= 0) { @@ -135,44 +137,36 @@ void expect_flags(Depacketizer& depacketizer, } } -class TestReader : public packet::IReader { +void expect_error(Depacketizer& depacketizer, + size_t samples_per_chan, + status::StatusCode expected_status) { + FramePtr frame = frame_factory.allocate_frame_no_buffer(); + CHECK(frame); + + LONGS_EQUAL(expected_status, + depacketizer.read(*frame, (packet::stream_timestamp_t)samples_per_chan)); +} + +class StatusReader : public packet::IReader { public: - explicit TestReader(packet::IReader& reader) + explicit StatusReader(packet::IReader& reader) : reader_(reader) - , call_count_(0) - , code_enabled_(false) , code_(status::NoStatus) { } - virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr& pp) { - ++call_count_; - - if (code_enabled_) { + virtual status::StatusCode read(packet::PacketPtr& pp) { + if (code_ != status::NoStatus && code_ != status::StatusOK) { return code_; } - return reader_.read(pp); } - void enable_status_code(status::StatusCode code) { - code_enabled_ = true; + void set_status(status::StatusCode code) { code_ = code; } - void disable_status_code() { - code_enabled_ = false; - code_ = status::NoStatus; - } - - unsigned call_count() const { - return call_count_; - } - private: packet::IReader& reader_; - - unsigned call_count_; - bool code_enabled_; status::StatusCode code_; }; @@ -731,33 +725,31 @@ TEST(depacketizer, timestamp_small_non_zero_cts) { expect_output(dp, SamplesPerPacket * PacketsPerFrame, 0.2f, second_frame_capt_ts); } -TEST(depacketizer, read_after_error) { - const status::StatusCode codes[] = { - status::StatusDrain, - status::StatusAbort, - }; +TEST(depacketizer, forward_error) { + PcmEncoder encoder(packet_spec); + PcmDecoder decoder(packet_spec); - for (unsigned n = 0; n < ROC_ARRAY_SIZE(codes); ++n) { - PcmEncoder encoder(packet_spec); - PcmDecoder decoder(packet_spec); + packet::Queue queue; + StatusReader reader(queue); + Depacketizer dp(reader, decoder, frame_factory, frame_spec, false); + LONGS_EQUAL(status::StatusOK, dp.init_status()); - packet::Queue queue; - TestReader reader(queue); - Depacketizer dp(reader, decoder, frame_factory, frame_spec, false); - LONGS_EQUAL(status::StatusOK, dp.init_status()); + // push one packet + write_packet(queue, new_packet(encoder, 0, 0.11f, 0)); - write_packet(queue, new_packet(encoder, 0, 0.11f, Now)); + // read first half of packet + expect_output(dp, SamplesPerPacket / 2, 0.11f, 0); - LONGS_EQUAL(0, reader.call_count()); + // packet reader will now return error + reader.set_status(status::StatusAbort); - reader.enable_status_code(codes[n]); - expect_output(dp, SamplesPerPacket, 0.00f, 0); - LONGS_EQUAL(1, reader.call_count()); + // read second half of packet + // no error because depacketizer still has buffered packet + expect_output(dp, SamplesPerPacket / 2, 0.11f, 0); - reader.disable_status_code(); - expect_output(dp, SamplesPerPacket, 0.11f, Now); - LONGS_EQUAL(2, reader.call_count()); - } + // try to read more + // get error because depacketizer tries to read packet + expect_error(dp, SamplesPerPacket, status::StatusAbort); } } // namespace audio