diff --git a/src/internal_modules/roc_fec/block_reader.cpp b/src/internal_modules/roc_fec/block_reader.cpp index c48e76f68..d17bea6ae 100644 --- a/src/internal_modules/roc_fec/block_reader.cpp +++ b/src/internal_modules/roc_fec/block_reader.cpp @@ -32,7 +32,6 @@ BlockReader::BlockReader(const BlockReaderConfig& config, , repair_queue_(0) , source_block_(arena) , repair_block_(arena) - , alive_(true) , started_(false) , can_repair_(false) , head_index_(0) @@ -61,10 +60,6 @@ bool BlockReader::is_started() const { return started_; } -bool BlockReader::is_alive() const { - return alive_; -} - packet::stream_timestamp_t BlockReader::max_block_duration() const { roc_panic_if(init_status_ != status::StatusOK); @@ -74,70 +69,73 @@ packet::stream_timestamp_t BlockReader::max_block_duration() const { status::StatusCode BlockReader::read(packet::PacketPtr& pp, packet::PacketReadMode mode) { roc_panic_if(init_status_ != status::StatusOK); - if (!alive_) { - return status::StatusAbort; - } - - const status::StatusCode code = read_(pp, mode); - - if (code != status::StatusOK && code != status::StatusDrain) { - pp = NULL; - return code; - } - - if (!alive_) { - pp = NULL; - return status::StatusAbort; - } + status::StatusCode code = status::NoStatus; - if (code == status::StatusOK && mode == packet::ModeFetch) { - n_packets_++; - } - return code; -} -status::StatusCode BlockReader::read_(packet::PacketPtr& pp, - packet::PacketReadMode mode) { - const status::StatusCode code = fetch_all_packets_(); - if (code != status::StatusOK) { + // Greedily fetch packets from underlying readers to queues. + if ((code = fetch_all_packets_()) != status::StatusOK) { + roc_panic_if(code == status::StatusDrain); return code; } + // Try starting until we get first eligible packet. if (!started_) { - started_ = try_start_(); + if ((code = try_start_()) != status::StatusOK) { + roc_panic_if(code == status::StatusDrain); + return code; + } } if (!started_) { // Until started, just forward all source packets. - return source_queue_.read(pp, mode); + code = source_queue_.read(pp, mode); + } else { + // Normal read. + code = get_next_packet_(pp, mode); } - return get_next_packet_(pp, mode); + if (code == status::StatusOK && mode == packet::ModeFetch) { + n_packets_++; + } + return code; } -bool BlockReader::try_start_() { +status::StatusCode BlockReader::try_start_() { packet::PacketPtr pp = source_queue_.head(); if (!pp) { - return false; + return status::StatusOK; } const packet::FEC& fec = *pp->fec(); - if (!process_source_packet_(pp)) { + const status::StatusCode code = process_source_packet_(pp); + if (code == status::StatusBadPacket) { + // Wait until we receive a valid packet. roc_log(LogTrace, - "fec block reader: dropping leading source packet:" + "fec block reader: skipping leading source packet:" " esi=%lu sblen=%lu blen=%lu payload_size=%lu", (unsigned long)fec.encoding_symbol_id, (unsigned long)fec.source_block_length, (unsigned long)fec.block_length, (unsigned long)fec.payload.size()); - return false; + return status::StatusOK; + } + if (code != status::StatusOK) { + // Unexpected failure, aborting. + return code; } cur_sbn_ = fec.source_block_number; drop_repair_packets_from_prev_blocks_(); - if (pp->fec()->encoding_symbol_id > 0) { - // Wait until we receive first packet in block (ESI=0), see also gh-186. - return false; + if (fec.encoding_symbol_id > 0) { + // Wait until we receive first packet in block (ESI=0). + // See also gh-186. + roc_log(LogTrace, + "fec block reader: skipping leading source packet:" + " esi=%lu sblen=%lu blen=%lu payload_size=%lu", + (unsigned long)fec.encoding_symbol_id, + (unsigned long)fec.source_block_length, (unsigned long)fec.block_length, + (unsigned long)fec.payload.size()); + return status::StatusOK; } roc_log(LogDebug, @@ -147,26 +145,30 @@ bool BlockReader::try_start_() { started_ = true; - return true; + return status::StatusOK; } status::StatusCode BlockReader::get_next_packet_(packet::PacketPtr& result_pkt, packet::PacketReadMode mode) { - fill_block_(); + const status::StatusCode code = fill_block_(); + if (code != status::StatusOK) { + roc_panic_if(code == status::StatusDrain); + return code; + } packet::PacketPtr pkt = source_block_[head_index_]; - while (alive_) { + for (;;) { size_t next_index = 0; if (pkt) { next_index = head_index_ + 1; } else { - // try repairing as much as possible and store in block - const status::StatusCode status_code = try_repair_(); - if (status_code != status::StatusOK) { - roc_panic_if(status_code == status::StatusDrain); - return status_code; + // Try repairing as much as possible and store in block. + const status::StatusCode code = try_repair_(); + if (code != status::StatusOK) { + roc_panic_if(code == status::StatusDrain); + return code; } // Find first present packet in block, starting from head. @@ -195,7 +197,11 @@ status::StatusCode BlockReader::get_next_packet_(packet::PacketPtr& result_pkt, // Switch to next packet and maybe next block. head_index_ = next_index; if (head_index_ == source_block_.size()) { - next_block_(); + const status::StatusCode code = next_block_(); + if (code != status::StatusOK) { + roc_panic_if(code == status::StatusDrain); + return code; + } } if (pkt) { // Found packet. @@ -207,7 +213,7 @@ status::StatusCode BlockReader::get_next_packet_(packet::PacketPtr& result_pkt, return pkt ? status::StatusOK : status::StatusDrain; } -void BlockReader::next_block_() { +status::StatusCode BlockReader::next_block_() { roc_log(LogTrace, "fec block reader: next block: sbn=%lu", (unsigned long)cur_sbn_); if (source_block_[0]) { @@ -233,29 +239,28 @@ void BlockReader::next_block_() { can_repair_ = false; - fill_block_(); -} - -bool BlockReader::is_block_resized_() const { - return source_block_resized_ && repair_block_resized_ && payload_resized_; + return fill_block_(); } status::StatusCode BlockReader::try_repair_() { - if (!can_repair_ || !is_block_resized_()) { + const bool is_block_resized = + source_block_resized_ && repair_block_resized_ && payload_resized_; + + if (!can_repair_ || !is_block_resized) { + // Can't do anything right now. return status::StatusOK; } - const status::StatusCode status_code = block_decoder_.begin_block( + const status::StatusCode code = block_decoder_.begin_block( source_block_.size(), repair_block_.size(), payload_size_); - if (status_code != status::StatusOK) { - roc_log(LogDebug, - "fec block reader: can't begin decoder block, shutting down:" + if (code != status::StatusOK) { + roc_log(LogError, + "fec block reader: can't begin decoder block:" " sbl=%lu rbl=%lu payload_size=%lu", (unsigned long)source_block_.size(), (unsigned long)repair_block_.size(), (unsigned long)payload_size_); - alive_ = false; - return status_code; + return code; } for (size_t n = 0; n < source_block_.size(); n++) { @@ -283,36 +288,44 @@ status::StatusCode BlockReader::try_repair_() { continue; } - packet::PacketPtr pp = parse_repaired_packet_(buffer); - if (!pp) { + packet::PacketPtr packet; + const status::StatusCode code = parse_repaired_packet_(buffer, packet); + if (code == status::StatusBadPacket) { continue; } + if (code != status::StatusOK) { + return code; + } - source_block_[n] = pp; + source_block_[n] = packet; } block_decoder_.end_block(); can_repair_ = false; + return status::StatusOK; } -packet::PacketPtr -BlockReader::parse_repaired_packet_(const core::Slice& buffer) { +status::StatusCode BlockReader::parse_repaired_packet_(const core::Slice& buffer, + packet::PacketPtr& result_packet) { packet::PacketPtr pp = packet_factory_.new_packet(); if (!pp) { roc_log(LogError, "fec block reader: can't allocate packet"); - return NULL; + return status::StatusNoMem; } if (!parser_.parse(*pp, buffer)) { roc_log(LogDebug, "fec block reader: can't parse repaired packet"); - return NULL; + // Upper code expects StatusBadPacket in this case. + return status::StatusBadPacket; } pp->set_buffer(buffer); pp->add_flags(packet::Packet::FlagRestored); - return pp; + result_packet = pp; + + return status::StatusOK; } status::StatusCode BlockReader::fetch_all_packets_() { @@ -335,18 +348,20 @@ status::StatusCode BlockReader::fetch_packets_(packet::IReader& reader, packet::PacketPtr pp; status::StatusCode code = reader.read(pp, packet::ModeFetch); + if (code == status::StatusDrain) { + // Fine, no more packets right now. + break; + } if (code != status::StatusOK) { - if (code == status::StatusDrain) { - break; - } return code; } if (!validate_fec_packet_(pp)) { - break; + return status::StatusAbort; } - if ((code = writer.write(pp)) != status::StatusOK) { + code = writer.write(pp); + if (code != status::StatusOK) { return code; } } @@ -354,12 +369,21 @@ status::StatusCode BlockReader::fetch_packets_(packet::IReader& reader, return status::StatusOK; } -void BlockReader::fill_block_() { - fill_source_block_(); - fill_repair_block_(); +status::StatusCode BlockReader::fill_block_() { + status::StatusCode code = status::NoStatus; + + if ((code = fill_source_block_()) != status::StatusOK) { + return code; + } + + if ((code = fill_repair_block_()) != status::StatusOK) { + return code; + } + + return status::StatusOK; } -void BlockReader::fill_source_block_() { +status::StatusCode BlockReader::fill_source_block_() { unsigned n_fetched = 0, n_added = 0, n_dropped = 0; for (;;) { @@ -369,7 +393,7 @@ void BlockReader::fill_source_block_() { } if (!validate_sbn_sequence_(pp)) { - break; + return status::StatusAbort; } const packet::FEC& fec = *pp->fec(); @@ -379,7 +403,7 @@ void BlockReader::fill_source_block_() { } packet::PacketPtr p; - const status::StatusCode code = source_queue_.read(p, packet::ModeFetch); + status::StatusCode code = source_queue_.read(p, packet::ModeFetch); roc_panic_if_msg(code != status::StatusOK, "failed to read source packet: status=%s", status::code_to_str(code)); @@ -398,7 +422,8 @@ void BlockReader::fill_source_block_() { // Should not happen: we have handled preceding and following blocks above. roc_panic_if_not(fec.source_block_number == cur_sbn_); - if (!process_source_packet_(pp)) { + code = process_source_packet_(pp); + if (code == status::StatusBadPacket) { roc_log(LogTrace, "fec block reader: dropping source packet from current block:" " esi=%lu sblen=%lu blen=%lu payload_size=%lu", @@ -408,6 +433,10 @@ void BlockReader::fill_source_block_() { n_dropped++; continue; } + if (code != status::StatusOK) { + // Unexpected failure, aborting. + return code; + } // Should not happen: we have handled validation and block size above. roc_panic_if_not(fec.source_block_length == source_block_.size()); @@ -427,9 +456,11 @@ void BlockReader::fill_source_block_() { "fec block reader: source queue: fetched=%u added=%u dropped=%u", n_fetched, n_added, n_dropped); } + + return status::StatusOK; } -void BlockReader::fill_repair_block_() { +status::StatusCode BlockReader::fill_repair_block_() { unsigned n_fetched = 0, n_added = 0, n_dropped = 0; for (;;) { @@ -439,7 +470,7 @@ void BlockReader::fill_repair_block_() { } if (!validate_sbn_sequence_(pp)) { - break; + return status::StatusAbort; } const packet::FEC& fec = *pp->fec(); @@ -449,7 +480,7 @@ void BlockReader::fill_repair_block_() { } packet::PacketPtr p; - const status::StatusCode code = repair_queue_.read(p, packet::ModeFetch); + status::StatusCode code = repair_queue_.read(p, packet::ModeFetch); roc_panic_if_msg(code != status::StatusOK, "failed to read repair packet: status=%s", status::code_to_str(code)); @@ -467,7 +498,8 @@ void BlockReader::fill_repair_block_() { // Should not happen: we have handled preceding and following blocks above. roc_panic_if(fec.source_block_number != cur_sbn_); - if (!process_repair_packet_(pp)) { + code = process_repair_packet_(pp); + if (code == status::StatusBadPacket) { roc_log(LogTrace, "fec block reader: dropping repair packet from current block:" " esi=%lu sblen=%lu blen=%lu payload_size=%lu", @@ -477,6 +509,10 @@ void BlockReader::fill_repair_block_() { n_dropped++; continue; } + if (code != status::StatusOK) { + // Unexpected failure, aborting. + return code; + } // Should not happen: we have handled validation and block size above. roc_panic_if_not(fec.source_block_length == source_block_.size()); @@ -498,82 +534,86 @@ void BlockReader::fill_repair_block_() { "fec block reader: repair queue: fetched=%u added=%u dropped=%u", n_fetched, n_added, n_dropped); } + + return status::StatusOK; } -bool BlockReader::process_source_packet_(const packet::PacketPtr& pp) { +status::StatusCode BlockReader::process_source_packet_(const packet::PacketPtr& pp) { + status::StatusCode code = status::NoStatus; + const packet::FEC& fec = *pp->fec(); if (!validate_incoming_source_packet_(pp)) { - return false; + return status::StatusBadPacket; } if (!can_update_payload_size_(fec.payload.size())) { - return false; + return status::StatusBadPacket; } if (!can_update_source_block_size_(fec.source_block_length)) { - return false; + return status::StatusBadPacket; } - if (!update_payload_size_(fec.payload.size())) { - return false; + if ((code = update_payload_size_(fec.payload.size())) != status::StatusOK) { + return code; } - if (!update_source_block_size_(fec.source_block_length)) { - return false; + if ((code = update_source_block_size_(fec.source_block_length)) != status::StatusOK) { + return code; } - return true; + return status::StatusOK; } -bool BlockReader::process_repair_packet_(const packet::PacketPtr& pp) { +status::StatusCode BlockReader::process_repair_packet_(const packet::PacketPtr& pp) { + status::StatusCode code = status::NoStatus; + const packet::FEC& fec = *pp->fec(); if (!validate_incoming_repair_packet_(pp)) { - return false; + return status::StatusBadPacket; } if (!can_update_payload_size_(fec.payload.size())) { - return false; + return status::StatusBadPacket; } if (!can_update_source_block_size_(fec.source_block_length)) { - return false; + return status::StatusBadPacket; } if (!can_update_repair_block_size_(fec.block_length)) { - return false; + return status::StatusBadPacket; } - if (!update_payload_size_(fec.payload.size())) { - return false; + if ((code = update_payload_size_(fec.payload.size())) != status::StatusOK) { + return code; } - if (!update_source_block_size_(fec.source_block_length)) { - return false; + if ((code = update_source_block_size_(fec.source_block_length)) != status::StatusOK) { + return code; } - if (!update_repair_block_size_(fec.block_length)) { - return false; + if ((code = update_repair_block_size_(fec.block_length)) != status::StatusOK) { + return code; } - return true; + return status::StatusOK; } bool BlockReader::validate_fec_packet_(const packet::PacketPtr& pp) { - const packet::FEC* fec = pp->fec(); - - if (!fec) { - roc_panic("fec block reader: unexpected non-fec source packet"); + if (!pp->has_flags(packet::Packet::FlagFEC)) { + roc_panic("fec block reader: unexpected non-fec packet"); } - if (fec->fec_scheme != fec_scheme_) { + if (pp->fec()->fec_scheme != fec_scheme_) { roc_log(LogDebug, - "fec block reader: unexpected packet fec scheme, shutting down:" + "fec block reader: unexpected packet fec scheme:" " packet_scheme=%s session_scheme=%s", - packet::fec_scheme_to_str(fec->fec_scheme), + packet::fec_scheme_to_str(pp->fec()->fec_scheme), packet::fec_scheme_to_str(fec_scheme_)); - return (alive_ = false); + return false; } return true; @@ -591,11 +631,11 @@ bool BlockReader::validate_sbn_sequence_(const packet::PacketPtr& pp) { if ((size_t)blk_dist > max_sbn_jump_) { roc_log(LogDebug, - "fec block reader: too long source block number jump, shutting down:" + "fec block reader: too long source block number jump:" " cur_sbn=%lu pkt_sbn=%lu dist=%lu max=%lu", (unsigned long)cur_sbn_, (unsigned long)fec.source_block_number, (unsigned long)blk_dist, (unsigned long)max_sbn_jump_); - return (alive_ = false); + return false; } return true; @@ -670,10 +710,10 @@ bool BlockReader::can_update_payload_size_(size_t new_payload_size) { return true; } -bool BlockReader::update_payload_size_(size_t new_payload_size) { +status::StatusCode BlockReader::update_payload_size_(size_t new_payload_size) { if (payload_size_ == new_payload_size) { payload_resized_ = true; - return true; + return status::StatusOK; } roc_log( @@ -685,7 +725,7 @@ bool BlockReader::update_payload_size_(size_t new_payload_size) { payload_size_ = new_payload_size; payload_resized_ = true; - return true; + return status::StatusOK; } bool BlockReader::can_update_source_block_size_(size_t new_sblen) { @@ -705,49 +745,48 @@ bool BlockReader::can_update_source_block_size_(size_t new_sblen) { return false; } - if (new_sblen > block_decoder_.max_block_length()) { - roc_log(LogDebug, - "fec block reader: can't change source block size above maximum, " - "shutting down:" - " cur_sblen=%lu new_sblen=%lu max_blen=%lu", - (unsigned long)cur_sblen, (unsigned long)new_sblen, - (unsigned long)block_decoder_.max_block_length()); - return (alive_ = false); - } - return true; } -bool BlockReader::update_source_block_size_(size_t new_sblen) { +status::StatusCode BlockReader::update_source_block_size_(size_t new_sblen) { const size_t cur_sblen = source_block_.size(); if (cur_sblen == new_sblen) { source_block_resized_ = true; - return true; + return status::StatusOK; } - // max_block_duration() reports maximum duration since last resize, - // so when resize happens, we reset maximum. - prev_block_timestamp_valid_ = false; - block_max_duration_ = 0; + if (new_sblen > block_decoder_.max_block_length()) { + roc_log(LogDebug, + "fec block reader: can't change source block size above maximum:" + " cur_sblen=%lu new_sblen=%lu max_blen=%lu", + (unsigned long)cur_sblen, (unsigned long)new_sblen, + (unsigned long)block_decoder_.max_block_length()); + return status::StatusAbort; + } if (!source_block_.resize(new_sblen)) { - roc_log(LogDebug, - "fec block reader: can't allocate source block memory, shutting down:" + roc_log(LogError, + "fec block reader: can't allocate source block memory:" " cur_sblen=%lu new_sblen=%lu", (unsigned long)cur_sblen, (unsigned long)new_sblen); - return (alive_ = false); + return status::StatusNoMem; } roc_log(LogDebug, - "fec block reader: update source block size:" + "fec block reader: updated source block size:" " cur_sblen=%lu cur_rblen=%lu new_sblen=%lu", (unsigned long)cur_sblen, (unsigned long)repair_block_.size(), (unsigned long)new_sblen); + // max_block_duration() reports maximum duration since last resize, + // so when resize happens, we reset maximum. + prev_block_timestamp_valid_ = false; + block_max_duration_ = 0; + source_block_resized_ = true; - return true; + return status::StatusOK; } bool BlockReader::can_update_repair_block_size_(size_t new_blen) { @@ -766,20 +805,10 @@ bool BlockReader::can_update_repair_block_size_(size_t new_blen) { return false; } - if (new_blen > block_decoder_.max_block_length()) { - roc_log(LogDebug, - "fec block reader: can't change repair block size above maximum, " - "shutting down:" - " cur_blen=%lu new_blen=%lu max_blen=%lu", - (unsigned long)cur_blen, (unsigned long)new_blen, - (unsigned long)block_decoder_.max_block_length()); - return (alive_ = false); - } - return true; } -bool BlockReader::update_repair_block_size_(size_t new_blen) { +status::StatusCode BlockReader::update_repair_block_size_(size_t new_blen) { const size_t cur_sblen = source_block_.size(); const size_t cur_rblen = repair_block_.size(); @@ -787,35 +816,44 @@ bool BlockReader::update_repair_block_size_(size_t new_blen) { if (new_blen == cur_blen) { repair_block_resized_ = true; - return true; + return status::StatusOK; } - // max_block_duration() reports maximum duration since last resize, - // so when resize happens, we reset maximum. - prev_block_timestamp_valid_ = false; - block_max_duration_ = 0; + if (new_blen > block_decoder_.max_block_length()) { + roc_log(LogDebug, + "fec block reader: can't change repair block size above maximum:" + " cur_blen=%lu new_blen=%lu max_blen=%lu", + (unsigned long)cur_blen, (unsigned long)new_blen, + (unsigned long)block_decoder_.max_block_length()); + return status::StatusAbort; + } - // Should not happen: sblen should be validated in upper code + // Should not happen: sblen should be validated in upper code. roc_panic_if_not(new_blen > cur_sblen); const size_t new_rblen = new_blen - cur_sblen; if (!repair_block_.resize(new_rblen)) { - roc_log(LogDebug, - "fec block reader: can't allocate repair block memory, shutting down:" + roc_log(LogError, + "fec block reader: can't allocate repair block memory:" " cur_rblen=%lu new_rblen=%lu", (unsigned long)cur_rblen, (unsigned long)new_rblen); - return (alive_ = false); + return status::StatusNoMem; } roc_log(LogDebug, - "fec block reader: update repair block size:" + "fec block reader: updated repair block size:" " cur_sblen=%lu cur_rblen=%lu new_rblen=%lu", (unsigned long)cur_sblen, (unsigned long)cur_rblen, (unsigned long)new_rblen); + // max_block_duration() reports maximum duration since last resize, + // so when resize happens, we reset maximum. + prev_block_timestamp_valid_ = false; + block_max_duration_ = 0; + repair_block_resized_ = true; - return true; + return status::StatusOK; } void BlockReader::drop_repair_packets_from_prev_blocks_() { @@ -857,6 +895,7 @@ void BlockReader::update_block_duration_(const packet::PacketPtr& curr_block_pkt block_dur = packet::stream_timestamp_diff(curr_block_pkt->stream_timestamp(), prev_block_timestamp_); } + if (block_dur < 0) { roc_log(LogTrace, "fec block reader: negative block duration: prev_ts=%lu curr_ts=%lu", diff --git a/src/internal_modules/roc_fec/block_reader.h b/src/internal_modules/roc_fec/block_reader.h index fec7d1e79..fa8b22541 100644 --- a/src/internal_modules/roc_fec/block_reader.h +++ b/src/internal_modules/roc_fec/block_reader.h @@ -51,16 +51,6 @@ struct BlockReaderConfig { class BlockReader : public packet::IReader, public core::NonCopyable<> { public: //! Initialize. - //! - //! @b Parameters - //! - @p config contains FEC scheme parameters - //! - @p fec_scheme is FEC codec ID - //! - @p decoder is FEC codec implementation - //! - @p source_reader specifies input queue with data packets - //! - @p repair_reader specifies input queue with FEC packets - //! - @p parser specifies packet parser for restored packets - //! - @p packet_factory is used to allocate restored packets - //! - @p arena is used to initialize a packet array BlockReader(const BlockReaderConfig& config, packet::FecScheme fec_scheme, IBlockDecoder& block_decoder, @@ -76,9 +66,6 @@ class BlockReader : public packet::IReader, public core::NonCopyable<> { //! Did decoder catch block beginning? bool is_started() const; - //! Is decoder alive? - bool is_alive() const; - //! Get maximal FEC block duration seen since last block resize. packet::stream_timestamp_t max_block_duration() const; @@ -89,26 +76,24 @@ class BlockReader : public packet::IReader, public core::NonCopyable<> { packet::PacketReadMode mode); private: - status::StatusCode read_(packet::PacketPtr& packet, packet::PacketReadMode mode); - - bool try_start_(); + status::StatusCode try_start_(); status::StatusCode get_next_packet_(packet::PacketPtr& packet, packet::PacketReadMode mode); + status::StatusCode next_block_(); - void next_block_(); status::StatusCode try_repair_(); - - packet::PacketPtr parse_repaired_packet_(const core::Slice& buffer); + status::StatusCode parse_repaired_packet_(const core::Slice& buffer, + packet::PacketPtr& packet); status::StatusCode fetch_all_packets_(); status::StatusCode fetch_packets_(packet::IReader&, packet::IWriter&); - void fill_block_(); - void fill_source_block_(); - void fill_repair_block_(); + status::StatusCode fill_block_(); + status::StatusCode fill_source_block_(); + status::StatusCode fill_repair_block_(); - bool process_source_packet_(const packet::PacketPtr&); - bool process_repair_packet_(const packet::PacketPtr&); + status::StatusCode process_source_packet_(const packet::PacketPtr&); + status::StatusCode process_repair_packet_(const packet::PacketPtr&); bool validate_fec_packet_(const packet::PacketPtr&); bool validate_sbn_sequence_(const packet::PacketPtr&); @@ -120,16 +105,13 @@ class BlockReader : public packet::IReader, public core::NonCopyable<> { bool can_update_source_block_size_(size_t); bool can_update_repair_block_size_(size_t); - bool update_payload_size_(size_t); - bool update_source_block_size_(size_t); - bool update_repair_block_size_(size_t); + status::StatusCode update_payload_size_(size_t); + status::StatusCode update_source_block_size_(size_t); + status::StatusCode update_repair_block_size_(size_t); void drop_repair_packets_from_prev_blocks_(); - void update_block_duration_(const packet::PacketPtr& curr_block_pkt); - bool is_block_resized_() const; - IBlockDecoder& block_decoder_; packet::IReader& source_reader_; @@ -143,7 +125,6 @@ class BlockReader : public packet::IReader, public core::NonCopyable<> { core::Array source_block_; core::Array repair_block_; - bool alive_; bool started_; bool can_repair_; diff --git a/src/internal_modules/roc_fec/block_writer.cpp b/src/internal_modules/roc_fec/block_writer.cpp index f62658e34..33a55b7d0 100644 --- a/src/internal_modules/roc_fec/block_writer.cpp +++ b/src/internal_modules/roc_fec/block_writer.cpp @@ -30,13 +30,12 @@ BlockWriter::BlockWriter(const BlockWriterConfig& config, , next_rblen_(0) , cur_payload_size_(0) , block_encoder_(block_encoder) - , writer_(writer) + , pkt_writer_(writer) , source_composer_(source_composer) , repair_composer_(repair_composer) , packet_factory_(packet_factory) , repair_block_(arena) , first_packet_(true) - , alive_(true) , cur_packet_(0) , fec_scheme_(fec_scheme) , prev_block_timestamp_valid_(false) @@ -51,8 +50,8 @@ BlockWriter::BlockWriter(const BlockWriterConfig& config, cur_block_repair_sn_ = (packet::seqnum_t)core::fast_random_range(0, packet::seqnum_t(-1)); - if (!resize(config.n_source_packets, config.n_repair_packets)) { - init_status_ = status::StatusNoMem; + if ((init_status_ = resize(config.n_source_packets, config.n_repair_packets)) + != status::StatusOK) { return; } @@ -63,28 +62,23 @@ status::StatusCode BlockWriter::init_status() const { return init_status_; } -bool BlockWriter::is_alive() const { - roc_panic_if(init_status_ != status::StatusOK); - - return alive_; -} - packet::stream_timestamp_t BlockWriter::max_block_duration() const { roc_panic_if(init_status_ != status::StatusOK); return (packet::stream_timestamp_t)block_max_duration_; } -bool BlockWriter::resize(size_t sblen, size_t rblen) { +status::StatusCode BlockWriter::resize(size_t sblen, size_t rblen) { roc_panic_if(init_status_ != status::StatusOK); if (next_sblen_ == sblen && next_rblen_ == rblen) { - return true; + // nothing to do + return status::StatusOK; } if (sblen == 0) { roc_log(LogError, "fec block writer: resize: sblen can't be zero"); - return false; + return status::StatusBadConfig; } const size_t new_blen = sblen + rblen; @@ -96,7 +90,7 @@ bool BlockWriter::resize(size_t sblen, size_t rblen) { (unsigned long)cur_sblen_, (unsigned long)cur_rblen_, (unsigned long)sblen, (unsigned long)rblen, (unsigned long)block_encoder_.max_block_length()); - return false; + return status::StatusBadConfig; } roc_log(LogDebug, @@ -112,43 +106,39 @@ bool BlockWriter::resize(size_t sblen, size_t rblen) { // so when resize happens, we reset maximum. prev_block_timestamp_valid_ = false; - return true; + return status::StatusOK; } status::StatusCode BlockWriter::write(const packet::PacketPtr& pp) { roc_panic_if(init_status_ != status::StatusOK); roc_panic_if(!pp); - if (!alive_) { - return status::StatusAbort; - } - - validate_fec_packet_(pp); + validate_packet_(pp); if (first_packet_) { first_packet_ = false; } if (cur_packet_ == 0) { - const status::StatusCode status_code = begin_block_(pp); - if (status_code != status::StatusOK) { - return status_code; + const status::StatusCode code = begin_block_(pp); + if (code != status::StatusOK) { + return code; } } - if (!validate_source_packet_(pp)) { - // TODO(gh-183): return status - return status::StatusOK; - } - const status::StatusCode code = write_source_packet_(pp); - // TODO(gh-183): forward status - roc_panic_if(code != status::StatusOK); + if (code != status::StatusOK) { + return code; + } cur_packet_++; if (cur_packet_ == cur_sblen_) { - end_block_(); + const status::StatusCode code = end_block_(); + if (code != status::StatusOK) { + return code; + } + next_block_(); } @@ -159,8 +149,6 @@ status::StatusCode BlockWriter::begin_block_(const packet::PacketPtr& pp) { update_block_duration_(pp); if (!apply_sizes_(next_sblen_, next_rblen_, pp->fec()->payload.size())) { - roc_log(LogError, - "fec block writer: apply_sizes in begin_block_ failed with StatusNoMem"); return status::StatusNoMem; } @@ -169,27 +157,41 @@ status::StatusCode BlockWriter::begin_block_(const packet::PacketPtr& pp) { (unsigned long)cur_sbn_, (unsigned long)cur_sblen_, (unsigned long)cur_rblen_, (unsigned long)cur_payload_size_); - const status::StatusCode status_code = + const status::StatusCode code = block_encoder_.begin_block(cur_sblen_, cur_rblen_, cur_payload_size_); - if (status_code != status::StatusOK) { + if (code != status::StatusOK) { roc_log(LogError, - "fec block writer: can't begin encoder block, shutting down:" + "fec block writer: can't begin encoder block:" " sblen=%lu rblen=%lu", (unsigned long)cur_sblen_, (unsigned long)cur_rblen_); - alive_ = false; } - return status_code; + return code; } -void BlockWriter::end_block_() { - make_repair_packets_(); - encode_repair_packets_(); - compose_repair_packets_(); - write_repair_packets_(); +status::StatusCode BlockWriter::end_block_() { + status::StatusCode code = status::NoStatus; + + if ((code = make_repair_packets_()) != status::StatusOK) { + return code; + } + + if ((code = encode_repair_packets_()) != status::StatusOK) { + return code; + } + + if ((code = compose_repair_packets_()) != status::StatusOK) { + return code; + } + + if ((code = write_repair_packets_()) != status::StatusOK) { + return code; + } block_encoder_.end_block(); + + return status::StatusOK; } void BlockWriter::next_block_() { @@ -199,18 +201,13 @@ void BlockWriter::next_block_() { } bool BlockWriter::apply_sizes_(size_t sblen, size_t rblen, size_t payload_size) { - if (payload_size == 0) { - roc_log(LogError, "fec block writer: payload size can't be zero"); - return (alive_ = false); - } - if (repair_block_.size() != rblen) { if (!repair_block_.resize(rblen)) { roc_log(LogError, - "fec block writer: can't allocate repair block memory, shutting down:" + "fec block writer: can't allocate repair block memory:" " cur_rbl=%lu new_rbl=%lu", (unsigned long)repair_block_.size(), (unsigned long)rblen); - return (alive_ = false); + return false; } } @@ -227,70 +224,80 @@ status::StatusCode BlockWriter::write_source_packet_(const packet::PacketPtr& pp fill_packet_fec_fields_(pp, (packet::seqnum_t)cur_packet_); if (!source_composer_.compose(*pp)) { - // TODO(gh-183): return status from composer - roc_panic("fec block writer: can't compose source packet"); + // TODO(gh-183): forward status from composer + return status::StatusBadBuffer; } pp->add_flags(packet::Packet::FlagComposed); - return writer_.write(pp); + return pkt_writer_.write(pp); } -void BlockWriter::make_repair_packets_() { +status::StatusCode BlockWriter::make_repair_packets_() { for (size_t i = 0; i < cur_rblen_; i++) { - packet::PacketPtr rp = make_repair_packet_((packet::seqnum_t)i); - if (!rp) { - continue; + packet::PacketPtr rp; + const status::StatusCode code = make_repair_packet_((packet::seqnum_t)i, rp); + if (code != status::StatusOK) { + return code; } + + roc_panic_if(!rp); repair_block_[i] = rp; } + + return status::StatusOK; } -packet::PacketPtr BlockWriter::make_repair_packet_(packet::seqnum_t pack_n) { +status::StatusCode BlockWriter::make_repair_packet_(packet::seqnum_t pack_n, + packet::PacketPtr& result_packet) { packet::PacketPtr packet = packet_factory_.new_packet(); if (!packet) { roc_log(LogError, "fec block writer: can't allocate packet"); - return NULL; + return status::StatusNoMem; } core::Slice buffer = packet_factory_.new_packet_buffer(); if (!buffer) { roc_log(LogError, "fec block writer: can't allocate buffer"); - // TODO(gh-183): return StatusNoMem - return NULL; + return status::StatusNoMem; } if (!repair_composer_.align(buffer, 0, block_encoder_.buffer_alignment())) { roc_log(LogError, "fec block writer: can't align packet buffer"); - // TODO(gh-183): return status from composer - return NULL; + // TODO(gh-183): forward status from composer + return status::StatusBadBuffer; } if (!repair_composer_.prepare(*packet, buffer, cur_payload_size_)) { roc_log(LogError, "fec block writer: can't prepare packet"); - // TODO(gh-183): return status from composer - return NULL; + // TODO(gh-183): forward status from composer + return status::StatusBadBuffer; } packet->add_flags(packet::Packet::FlagPrepared); packet->set_buffer(buffer); - validate_fec_packet_(packet); + validate_packet_(packet); fill_packet_fec_fields_(packet, (packet::seqnum_t)cur_sblen_ + pack_n); - return packet; + result_packet = packet; + return status::StatusOK; } -void BlockWriter::encode_repair_packets_() { +status::StatusCode BlockWriter::encode_repair_packets_() { for (size_t i = 0; i < cur_rblen_; i++) { packet::PacketPtr rp = repair_block_[i]; - if (rp) { - block_encoder_.set_buffer(cur_sblen_ + i, rp->fec()->payload); + if (!rp) { + continue; } + block_encoder_.set_buffer(cur_sblen_ + i, rp->fec()->payload); } + block_encoder_.fill_buffers(); + + return status::StatusOK; } -void BlockWriter::compose_repair_packets_() { +status::StatusCode BlockWriter::compose_repair_packets_() { for (size_t i = 0; i < cur_rblen_; i++) { packet::PacketPtr rp = repair_block_[i]; if (!rp) { @@ -298,11 +305,13 @@ void BlockWriter::compose_repair_packets_() { } if (!repair_composer_.compose(*rp)) { - // TODO(gh-183): return status from composer - roc_panic("fec block writer: can't compose repair packet"); + // TODO(gh-183): forward status from composer + return status::StatusBadBuffer; } rp->add_flags(packet::Packet::FlagComposed); } + + return status::StatusOK; } status::StatusCode BlockWriter::write_repair_packets_() { @@ -312,9 +321,10 @@ status::StatusCode BlockWriter::write_repair_packets_() { continue; } - const status::StatusCode code = writer_.write(repair_block_[i]); - // TODO(gh-183): forward status - roc_panic_if(code != status::StatusOK); + const status::StatusCode code = pkt_writer_.write(repair_block_[i]); + if (code != status::StatusOK) { + return code; + } repair_block_[i] = NULL; } @@ -332,42 +342,39 @@ void BlockWriter::fill_packet_fec_fields_(const packet::PacketPtr& packet, fec.block_length = cur_sblen_ + cur_rblen_; } -void BlockWriter::validate_fec_packet_(const packet::PacketPtr& pp) { +void BlockWriter::validate_packet_(const packet::PacketPtr& pp) { if (!pp->has_flags(packet::Packet::FlagPrepared)) { - roc_panic("fec block writer: unexpected packet: should be prepared"); + roc_panic("fec block writer: unexpected packet: must be prepared"); } if (pp->has_flags(packet::Packet::FlagComposed)) { - roc_panic("fec block writer: unexpected packet: should not be composed"); + roc_panic("fec block writer: unexpected packet: must not be composed"); } - const packet::FEC* fec = pp->fec(); - if (!fec) { + if (!pp->has_flags(packet::Packet::FlagFEC)) { roc_panic("fec block writer: unexpected non-fec packet"); } - if (fec->fec_scheme != fec_scheme_) { + if (pp->fec()->fec_scheme != fec_scheme_) { roc_panic("fec block writer: unexpected packet fec scheme:" " packet_scheme=%s session_scheme=%s", - packet::fec_scheme_to_str(fec->fec_scheme), + packet::fec_scheme_to_str(pp->fec()->fec_scheme), packet::fec_scheme_to_str(fec_scheme_)); } -} -bool BlockWriter::validate_source_packet_(const packet::PacketPtr& pp) { const size_t payload_size = pp->fec()->payload.size(); - if (payload_size != cur_payload_size_) { - roc_log(LogError, - "fec block writer: can't change payload size in the middle of a block:" - " sbn=%lu esi=%lu old_size=%lu new_size=%lu", - (unsigned long)cur_sbn_, (unsigned long)cur_packet_, - (unsigned long)cur_payload_size_, (unsigned long)payload_size); - // TODO(gh-183): return status - return (alive_ = false); + if (payload_size == 0) { + roc_panic("fec block writer: unexpected packet with zero payload size"); } - return true; + if (cur_packet_ != 0 && payload_size != cur_payload_size_) { + roc_panic( + "fec block writer: unexpected payload size change in the middle of a block:" + " sbn=%lu esi=%lu old_size=%lu new_size=%lu", + (unsigned long)cur_sbn_, (unsigned long)cur_packet_, + (unsigned long)cur_payload_size_, (unsigned long)payload_size); + } } void BlockWriter::update_block_duration_(const packet::PacketPtr& curr_block_pkt) { @@ -376,6 +383,7 @@ void BlockWriter::update_block_duration_(const packet::PacketPtr& curr_block_pkt block_dur = packet::stream_timestamp_diff(curr_block_pkt->stream_timestamp(), prev_block_timestamp_); } + if (block_dur < 0) { roc_log(LogTrace, "fec reader: negative block duration: prev_ts=%lu curr_ts=%lu", (unsigned long)prev_block_timestamp_, diff --git a/src/internal_modules/roc_fec/block_writer.h b/src/internal_modules/roc_fec/block_writer.h index c54980a2a..365e27921 100644 --- a/src/internal_modules/roc_fec/block_writer.h +++ b/src/internal_modules/roc_fec/block_writer.h @@ -50,17 +50,6 @@ struct BlockWriterConfig { class BlockWriter : public packet::IWriter, public core::NonCopyable<> { public: //! Initialize. - //! - //! @b Parameters - //! - @p config contains FEC scheme parameters - //! - @p fec_scheme is FEC codec ID - //! - @p block_encoder is FEC codec implementation - //! - @p writer is used to write coded source and repair packets - //! - @p source_composer is used to format source packets - //! - @p repair_composer is used to format repair packets - //! - @p packet_factory is used to allocate repair packets - //! - @p buffer_factory is used to allocate buffers for repair packets - //! - @p arena is used to initialize a packet array BlockWriter(const BlockWriterConfig& config, packet::FecScheme fec_scheme, IBlockEncoder& block_encoder, @@ -73,14 +62,13 @@ class BlockWriter : public packet::IWriter, public core::NonCopyable<> { //! Check if the object was successfully constructed. status::StatusCode init_status() const; - //! Check if writer is still working. - bool is_alive() const; - //! Get maximal FEC block duration seen since last block resize. packet::stream_timestamp_t max_block_duration() const; //! Set number of source packets per block. - bool resize(size_t sblen, size_t rblen); + //! @note + //! Actual reallocation may happen later. + ROC_ATTR_NODISCARD status::StatusCode resize(size_t sblen, size_t rblen); //! Write packet. //! @remarks @@ -90,21 +78,20 @@ class BlockWriter : public packet::IWriter, public core::NonCopyable<> { private: status::StatusCode begin_block_(const packet::PacketPtr& pp); - void end_block_(); + status::StatusCode end_block_(); void next_block_(); bool apply_sizes_(size_t sblen, size_t rblen, size_t payload_size); status::StatusCode write_source_packet_(const packet::PacketPtr&); - void make_repair_packets_(); - packet::PacketPtr make_repair_packet_(packet::seqnum_t n); - void encode_repair_packets_(); - void compose_repair_packets_(); + status::StatusCode make_repair_packets_(); + status::StatusCode make_repair_packet_(packet::seqnum_t n, packet::PacketPtr& pp); + status::StatusCode encode_repair_packets_(); + status::StatusCode compose_repair_packets_(); status::StatusCode write_repair_packets_(); void fill_packet_fec_fields_(const packet::PacketPtr& packet, packet::seqnum_t n); - void validate_fec_packet_(const packet::PacketPtr&); - bool validate_source_packet_(const packet::PacketPtr&); + void validate_packet_(const packet::PacketPtr&); void update_block_duration_(const packet::PacketPtr& curr_block_pkt); @@ -117,7 +104,7 @@ class BlockWriter : public packet::IWriter, public core::NonCopyable<> { size_t cur_payload_size_; IBlockEncoder& block_encoder_; - packet::IWriter& writer_; + packet::IWriter& pkt_writer_; packet::IComposer& source_composer_; packet::IComposer& repair_composer_; @@ -127,7 +114,6 @@ class BlockWriter : public packet::IWriter, public core::NonCopyable<> { core::Array repair_block_; bool first_packet_; - bool alive_; packet::blknum_t cur_sbn_; packet::seqnum_t cur_block_repair_sn_; diff --git a/src/tests/roc_fec/test_block_encoder_decoder.cpp b/src/tests/roc_fec/test_block_encoder_decoder.cpp index cd1298173..c0121cbdf 100644 --- a/src/tests/roc_fec/test_block_encoder_decoder.cpp +++ b/src/tests/roc_fec/test_block_encoder_decoder.cpp @@ -8,12 +8,13 @@ #include +#include "test_helpers/mock_arena.h" + #include "roc_core/array.h" #include "roc_core/fast_random.h" #include "roc_core/log.h" #include "roc_core/scoped_ptr.h" #include "roc_fec/codec_map.h" -#include "test_helpers/mock_arena.h" namespace roc { namespace fec { @@ -132,32 +133,6 @@ TEST(block_encoder_decoder, without_loss) { } } -TEST(block_encoder_decoder, no_memory) { - enum { NumSourcePackets = 20, NumRepairPackets = 10, PayloadSize = 251 }; - - for (size_t n_scheme = 0; n_scheme < CodecMap::instance().num_schemes(); n_scheme++) { - CodecConfig config; - config.scheme = CodecMap::instance().nth_scheme(n_scheme); - - { // test encoder - Codec code(config); - code.set_fail(true); - LONGS_EQUAL(status::StatusNoMem, - code.encoder().begin_block(NumSourcePackets, NumRepairPackets, - PayloadSize)); - } - - { // test decoder - Codec code(config); - code.encode(NumSourcePackets, NumRepairPackets, PayloadSize); - code.set_fail(true); - LONGS_EQUAL(status::StatusNoMem, - code.decoder().begin_block(NumSourcePackets, NumRepairPackets, - PayloadSize)); - } - } -} - TEST(block_encoder_decoder, lost_1) { enum { NumSourcePackets = 20, NumRepairPackets = 10, PayloadSize = 251 }; @@ -266,6 +241,32 @@ TEST(block_encoder_decoder, full_repair_payload_sizes) { } } +TEST(block_encoder_decoder, no_memory) { + enum { NumSourcePackets = 20, NumRepairPackets = 10, PayloadSize = 251 }; + + for (size_t n_scheme = 0; n_scheme < CodecMap::instance().num_schemes(); n_scheme++) { + CodecConfig config; + config.scheme = CodecMap::instance().nth_scheme(n_scheme); + + { // test encoder + Codec code(config); + code.set_fail(true); + LONGS_EQUAL(status::StatusNoMem, + code.encoder().begin_block(NumSourcePackets, NumRepairPackets, + PayloadSize)); + } + + { // test decoder + Codec code(config); + code.encode(NumSourcePackets, NumRepairPackets, PayloadSize); + code.set_fail(true); + LONGS_EQUAL(status::StatusNoMem, + code.decoder().begin_block(NumSourcePackets, NumRepairPackets, + PayloadSize)); + } + } +} + TEST(block_encoder_decoder, max_source_block) { for (size_t n_scheme = 0; n_scheme < CodecMap::instance().num_schemes(); ++n_scheme) { CodecConfig config; diff --git a/src/tests/roc_fec/test_block_writer_reader.cpp b/src/tests/roc_fec/test_block_writer_reader.cpp index d0b835e7f..f80b5e2bc 100644 --- a/src/tests/roc_fec/test_block_writer_reader.cpp +++ b/src/tests/roc_fec/test_block_writer_reader.cpp @@ -1302,7 +1302,6 @@ TEST(block_writer_reader, invalid_esi) { check_restored(p, i == 5); } - CHECK(reader.is_alive()); UNSIGNED_LONGS_EQUAL(0, dispatcher.source_size()); UNSIGNED_LONGS_EQUAL(0, dispatcher.repair_size()); } @@ -1381,7 +1380,6 @@ TEST(block_writer_reader, invalid_sbl) { check_restored(p, i == 5); } - CHECK(reader.is_alive()); UNSIGNED_LONGS_EQUAL(0, dispatcher.source_size()); UNSIGNED_LONGS_EQUAL(0, dispatcher.repair_size()); } @@ -1459,7 +1457,6 @@ TEST(block_writer_reader, invalid_nes) { check_restored(p, false); } - CHECK(reader.is_alive()); UNSIGNED_LONGS_EQUAL(0, dispatcher.source_size()); UNSIGNED_LONGS_EQUAL(0, dispatcher.repair_size()); } @@ -1547,7 +1544,6 @@ TEST(block_writer_reader, invalid_payload_size) { check_restored(p, i == 5 || (n_block == 3 && i == 0)); } - CHECK(reader.is_alive()); UNSIGNED_LONGS_EQUAL(0, source_queue.size()); UNSIGNED_LONGS_EQUAL(0, repair_queue.size()); } @@ -1635,7 +1631,6 @@ TEST(block_writer_reader, zero_source_packets) { } } - CHECK(reader.is_alive()); UNSIGNED_LONGS_EQUAL(0, dispatcher.source_size()); UNSIGNED_LONGS_EQUAL(0, dispatcher.repair_size()); } @@ -1721,7 +1716,6 @@ TEST(block_writer_reader, zero_repair_packets) { } } - CHECK(reader.is_alive()); UNSIGNED_LONGS_EQUAL(0, dispatcher.source_size()); UNSIGNED_LONGS_EQUAL(0, dispatcher.repair_size()); } @@ -1809,7 +1803,6 @@ TEST(block_writer_reader, zero_payload_size) { } } - CHECK(reader.is_alive()); UNSIGNED_LONGS_EQUAL(0, source_queue.size()); UNSIGNED_LONGS_EQUAL(0, repair_queue.size()); } @@ -1880,8 +1873,6 @@ TEST(block_writer_reader, sbn_jump) { check_restored(p, false); } - CHECK(reader.is_alive()); - // write second block to the dispatcher // shift it ahead but in the allowed range for (size_t i = 0; i < NumSourcePackets + NumRepairPackets; ++i) { @@ -1907,8 +1898,6 @@ TEST(block_writer_reader, sbn_jump) { check_restored(p, false); } - CHECK(reader.is_alive()); - // write third block to the dispatcher // shift it ahead too far for (size_t i = 0; i < NumSourcePackets + NumRepairPackets; ++i) { @@ -1929,12 +1918,9 @@ TEST(block_writer_reader, sbn_jump) { packet::PacketPtr pp; LONGS_EQUAL(status::StatusAbort, reader.read(pp, packet::ModeFetch)); CHECK(!pp); - CHECK(!reader.is_alive()); UNSIGNED_LONGS_EQUAL(0, dispatcher.source_size()); UNSIGNED_LONGS_EQUAL(0, dispatcher.repair_size()); - - LONGS_EQUAL(status::StatusAbort, reader.read(pp, packet::ModeFetch)); } } @@ -2070,7 +2056,8 @@ TEST(block_writer_reader, writer_resize_blocks) { packet::seqnum_t rd_sn = 0; for (size_t n = 0; n < ROC_ARRAY_SIZE(source_sizes); ++n) { - CHECK(writer.resize(source_sizes[n], repair_sizes[n])); + LONGS_EQUAL(status::StatusOK, + writer.resize(source_sizes[n], repair_sizes[n])); for (size_t i = 0; i < source_sizes[n]; ++i) { packet::PacketPtr p = generate_packet(wr_sn, payload_sizes[n]); @@ -2143,7 +2130,8 @@ TEST(block_writer_reader, resize_block_begin) { packet::seqnum_t rd_sn = 0; for (size_t n = 0; n < ROC_ARRAY_SIZE(source_sizes); ++n) { - CHECK(writer.resize(source_sizes[n], repair_sizes[n])); + LONGS_EQUAL(status::StatusOK, + writer.resize(source_sizes[n], repair_sizes[n])); for (size_t i = 0; i < source_sizes[n]; ++i) { packet::PacketPtr p = generate_packet(wr_sn, payload_sizes[n]); @@ -2238,7 +2226,8 @@ TEST(block_writer_reader, resize_block_middle) { } // Update source block size. - CHECK(writer.resize(source_sizes[n], repair_sizes[n])); + LONGS_EQUAL(status::StatusOK, + writer.resize(source_sizes[n], repair_sizes[n])); // Write the remaining packets. for (size_t i = prev_sblen / 2; i < prev_sblen; ++i) { @@ -2316,7 +2305,8 @@ TEST(block_writer_reader, resize_block_losses) { packet::seqnum_t rd_sn = 0; for (size_t n = 0; n < ROC_ARRAY_SIZE(source_sizes); ++n) { - CHECK(writer.resize(source_sizes[n], repair_sizes[n])); + LONGS_EQUAL(status::StatusOK, + writer.resize(source_sizes[n], repair_sizes[n])); dispatcher.resize(source_sizes[n], repair_sizes[n]); dispatcher.reset(); @@ -2402,7 +2392,8 @@ TEST(block_writer_reader, resize_block_repair_first) { } // Resize. - CHECK(writer.resize(NumSourcePackets * 2, NumRepairPackets * 2)); + LONGS_EQUAL(status::StatusOK, + writer.resize(NumSourcePackets * 2, NumRepairPackets * 2)); // Lose one packet. dispatcher.resize(NumSourcePackets * 2, NumRepairPackets * 2); @@ -2477,7 +2468,8 @@ TEST(block_writer_reader, writer_oversized_block) { LONGS_EQUAL(status::StatusOK, reader.init_status()); // try to resize writer with an invalid value - CHECK(!writer.resize(encoder->max_block_length() + 1, NumRepairPackets)); + LONGS_EQUAL(status::StatusBadConfig, + writer.resize(encoder->max_block_length() + 1, NumRepairPackets)); // ensure that the block size was not updated for (size_t n = 0; n < 10; ++n) { @@ -2506,7 +2498,6 @@ TEST(block_writer_reader, writer_oversized_block) { UNSIGNED_LONGS_EQUAL(NumSourcePackets, p->fec()->source_block_length); } - CHECK(reader.is_alive()); UNSIGNED_LONGS_EQUAL(0, dispatcher.source_size()); UNSIGNED_LONGS_EQUAL(0, dispatcher.repair_size()); } @@ -2583,7 +2574,6 @@ TEST(block_writer_reader, reader_oversized_source_block) { packet::PacketPtr pp; LONGS_EQUAL(status::StatusAbort, reader.read(pp, packet::ModeFetch)); CHECK(!pp); - CHECK(!reader.is_alive()); } } @@ -2657,58 +2647,6 @@ TEST(block_writer_reader, reader_oversized_repair_block) { packet::PacketPtr pp; LONGS_EQUAL(status::StatusAbort, reader.read(pp, packet::ModeFetch)); CHECK(!pp); - CHECK(!reader.is_alive()); - } -} - -TEST(block_writer_reader, writer_invalid_payload_size_change) { - for (size_t n_scheme = 0; n_scheme < CodecMap::instance().num_schemes(); ++n_scheme) { - codec_config.scheme = CodecMap::instance().nth_scheme(n_scheme); - - core::ScopedPtr encoder( - CodecMap::instance().new_block_encoder(codec_config, packet_factory, arena), - arena); - CHECK(encoder); - - test::PacketDispatcher dispatcher(source_parser(), repair_parser(), - packet_factory, NumSourcePackets, - NumRepairPackets); - - BlockWriter writer(writer_config, codec_config.scheme, *encoder, dispatcher, - source_composer(), repair_composer(), packet_factory, arena); - LONGS_EQUAL(status::StatusOK, writer.init_status()); - - size_t sn = 0; - - // write the first block with the same payload size - for (size_t i = 0; i < NumSourcePackets; ++i) { - LONGS_EQUAL(status::StatusOK, - writer.write(generate_packet(sn++, FECPayloadSize))); - } - - CHECK(writer.is_alive()); - UNSIGNED_LONGS_EQUAL(NumSourcePackets, dispatcher.source_size()); - UNSIGNED_LONGS_EQUAL(NumRepairPackets, dispatcher.repair_size()); - - // write a half of the second block with another payload size - for (size_t i = 0; i < NumSourcePackets / 2; ++i) { - LONGS_EQUAL(status::StatusOK, - writer.write(generate_packet(sn++, FECPayloadSize - 1))); - } - - CHECK(writer.is_alive()); - UNSIGNED_LONGS_EQUAL(NumSourcePackets + NumSourcePackets / 2, - dispatcher.source_size()); - UNSIGNED_LONGS_EQUAL(NumRepairPackets, dispatcher.repair_size()); - - // write a packet with different payload size - LONGS_EQUAL(status::StatusOK, writer.write(generate_packet(sn, FECPayloadSize))); - - // writer should be terminated - CHECK(!writer.is_alive()); - UNSIGNED_LONGS_EQUAL(NumSourcePackets + NumSourcePackets / 2, - dispatcher.source_size()); - UNSIGNED_LONGS_EQUAL(NumRepairPackets, dispatcher.repair_size()); } } @@ -2760,7 +2698,6 @@ TEST(block_writer_reader, reader_invalid_fec_scheme_source_packet) { for (size_t i = 0; i < NumSourcePackets / 2; ++i) { packet::PacketPtr p; LONGS_EQUAL(status::StatusOK, reader.read(p, packet::ModeFetch)); - CHECK(reader.is_alive()); } UNSIGNED_LONGS_EQUAL(0, source_queue.size()); @@ -2780,7 +2717,6 @@ TEST(block_writer_reader, reader_invalid_fec_scheme_source_packet) { packet::PacketPtr pp; LONGS_EQUAL(status::StatusAbort, reader.read(pp, packet::ModeFetch)); CHECK(!pp); - CHECK(!reader.is_alive()); UNSIGNED_LONGS_EQUAL(0, source_queue.size()); } } @@ -2844,7 +2780,6 @@ TEST(block_writer_reader, reader_invalid_fec_scheme_repair_packet) { for (size_t i = 0; i < NumSourcePackets / 2; ++i) { packet::PacketPtr p; LONGS_EQUAL(status::StatusOK, reader.read(p, packet::ModeFetch)); - CHECK(reader.is_alive()); } UNSIGNED_LONGS_EQUAL(0, source_queue.size()); UNSIGNED_LONGS_EQUAL(0, repair_queue.size()); @@ -2883,7 +2818,6 @@ TEST(block_writer_reader, reader_invalid_fec_scheme_repair_packet) { packet::PacketPtr pp; LONGS_EQUAL(status::StatusAbort, reader.read(pp, packet::ModeFetch)); CHECK(!pp); - CHECK(!reader.is_alive()); UNSIGNED_LONGS_EQUAL(0, source_queue.size()); UNSIGNED_LONGS_EQUAL(0, repair_queue.size()); } diff --git a/src/tests/roc_fec/test_block_writer_reader_duration.cpp b/src/tests/roc_fec/test_block_writer_reader_duration.cpp index e7ea341c4..1f093152b 100644 --- a/src/tests/roc_fec/test_block_writer_reader_duration.cpp +++ b/src/tests/roc_fec/test_block_writer_reader_duration.cpp @@ -389,9 +389,11 @@ TEST(block_writer_reader_duration, resize_block_middle) { dispatcher.clear_losses(); if (i_block == 3) { - writer.resize(sb_len[i_block], dispatcher.repair_size()); + LONGS_EQUAL(status::StatusOK, + writer.resize(sb_len[i_block], dispatcher.repair_size())); } else if (i_block == 6) { - writer.resize(sb_len[i_block], dispatcher.repair_size()); + LONGS_EQUAL(status::StatusOK, + writer.resize(sb_len[i_block], dispatcher.repair_size())); } if (!packets.resize(sb_len[i_block])) { FAIL("resize failed"); diff --git a/src/tests/roc_fec/test_block_writer_reader_errors.cpp b/src/tests/roc_fec/test_block_writer_reader_errors.cpp index 2b49ae0df..be610dc29 100644 --- a/src/tests/roc_fec/test_block_writer_reader_errors.cpp +++ b/src/tests/roc_fec/test_block_writer_reader_errors.cpp @@ -157,13 +157,12 @@ TEST(block_writer_reader_errors, writer_cant_resize_block) { size_t sn = 0; - CHECK(writer.resize(NumSourcePackets, BlockSize1)); + LONGS_EQUAL(status::StatusOK, writer.resize(NumSourcePackets, BlockSize1)); for (size_t i = 0; i < NumSourcePackets; ++i) { LONGS_EQUAL(status::StatusOK, writer.write(generate_packet(sn++))); } - CHECK(writer.is_alive()); UNSIGNED_LONGS_EQUAL(NumSourcePackets, dispatcher.source_size()); UNSIGNED_LONGS_EQUAL(BlockSize1, dispatcher.repair_size()); @@ -172,15 +171,8 @@ TEST(block_writer_reader_errors, writer_cant_resize_block) { mock_arena.set_fail(true); - CHECK(writer.resize(NumSourcePackets, BlockSize2)); - - for (size_t i = 0; i < NumSourcePackets; ++i) { - const status::StatusCode expected_status = - (i == 0) ? status::StatusNoMem : status::StatusAbort; - LONGS_EQUAL(expected_status, writer.write(generate_packet(sn++))); - - CHECK(!writer.is_alive()); - } + LONGS_EQUAL(status::StatusOK, writer.resize(NumSourcePackets, BlockSize2)); + LONGS_EQUAL(status::StatusNoMem, writer.write(generate_packet(sn++))); UNSIGNED_LONGS_EQUAL(0, dispatcher.source_size()); UNSIGNED_LONGS_EQUAL(0, dispatcher.repair_size()); @@ -210,26 +202,18 @@ TEST(block_writer_reader_errors, writer_cant_encode_packet) { size_t sn = 0; - CHECK(writer.resize(BlockSize1, NumRepairPackets)); + LONGS_EQUAL(status::StatusOK, writer.resize(BlockSize1, NumRepairPackets)); for (size_t i = 0; i < BlockSize1; ++i) { LONGS_EQUAL(status::StatusOK, writer.write(generate_packet(sn++))); } - CHECK(writer.is_alive()); UNSIGNED_LONGS_EQUAL(BlockSize1, dispatcher.source_size()); UNSIGNED_LONGS_EQUAL(NumRepairPackets, dispatcher.repair_size()); mock_arena.set_fail(true); - CHECK(writer.resize(BlockSize2, NumRepairPackets)); - - for (size_t i = 0; i < BlockSize2; ++i) { - const status::StatusCode expected_status = - (i == 0) ? status::StatusNoMem : status::StatusAbort; - LONGS_EQUAL(expected_status, writer.write(generate_packet(sn++))); - - CHECK(!writer.is_alive()); - } + LONGS_EQUAL(status::StatusOK, writer.resize(BlockSize2, NumRepairPackets)); + LONGS_EQUAL(status::StatusNoMem, writer.write(generate_packet(sn++))); UNSIGNED_LONGS_EQUAL(BlockSize1, dispatcher.source_size()); UNSIGNED_LONGS_EQUAL(NumRepairPackets, dispatcher.repair_size()); @@ -271,7 +255,7 @@ TEST(block_writer_reader_errors, reader_cant_resize_block) { size_t sn = 0; // write first block - CHECK(writer.resize(BlockSize1, NumRepairPackets)); + LONGS_EQUAL(status::StatusOK, writer.resize(BlockSize1, NumRepairPackets)); for (size_t i = 0; i < BlockSize1; ++i) { LONGS_EQUAL(status::StatusOK, writer.write(generate_packet(sn++))); } @@ -289,7 +273,7 @@ TEST(block_writer_reader_errors, reader_cant_resize_block) { } // write second block - CHECK(writer.resize(BlockSize2, NumRepairPackets)); + LONGS_EQUAL(status::StatusOK, writer.resize(BlockSize2, NumRepairPackets)); for (size_t i = 0; i < BlockSize2; ++i) { LONGS_EQUAL(status::StatusOK, writer.write(generate_packet(sn++))); } @@ -303,9 +287,8 @@ TEST(block_writer_reader_errors, reader_cant_resize_block) { // reader should get an error from arena when trying // to resize the block and shut down packet::PacketPtr pp; - LONGS_EQUAL(status::StatusAbort, reader.read(pp, packet::ModeFetch)); + LONGS_EQUAL(status::StatusNoMem, reader.read(pp, packet::ModeFetch)); CHECK(!pp); - CHECK(!reader.is_alive()); } TEST(block_writer_reader_errors, reader_cant_decode_packet) { @@ -343,7 +326,7 @@ TEST(block_writer_reader_errors, reader_cant_decode_packet) { size_t sn = 0; // write first block - CHECK(writer.resize(BlockSize1, NumRepairPackets)); + LONGS_EQUAL(status::StatusOK, writer.resize(BlockSize1, NumRepairPackets)); for (size_t i = 0; i < BlockSize1; ++i) { LONGS_EQUAL(status::StatusOK, writer.write(generate_packet(sn++))); } @@ -365,7 +348,7 @@ TEST(block_writer_reader_errors, reader_cant_decode_packet) { dispatcher.lose(10); // write second block - CHECK(writer.resize(BlockSize2, NumRepairPackets)); + LONGS_EQUAL(status::StatusOK, writer.resize(BlockSize2, NumRepairPackets)); for (size_t i = 0; i < BlockSize2; ++i) { LONGS_EQUAL(status::StatusOK, writer.write(generate_packet(sn++))); } @@ -390,10 +373,6 @@ TEST(block_writer_reader_errors, reader_cant_decode_packet) { packet::PacketPtr pp; LONGS_EQUAL(status::StatusNoMem, reader.read(pp, packet::ModeFetch)); CHECK(!pp); - CHECK(!reader.is_alive()); - - // reader should get an abort error if it is not alive - LONGS_EQUAL(status::StatusAbort, reader.read(pp, packet::ModeFetch)); } TEST(block_writer_reader_errors, reader_cant_read_source_packet) {