Skip to content

Commit

Permalink
[sp] roc-streaminggh-614 Add status codes to IFrameReader
Browse files Browse the repository at this point in the history
Add status code to IFrameReader::read(), and update implementations
of that interface.

Sponsored-by: waspd
  • Loading branch information
gavv committed Jun 20, 2024
1 parent bcb7fb5 commit b5fec7c
Show file tree
Hide file tree
Showing 50 changed files with 253 additions and 231 deletions.
26 changes: 15 additions & 11 deletions src/internal_modules/roc_audio/channel_mapper_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ status::StatusCode ChannelMapperReader::init_status() const {
return init_status_;
}

bool ChannelMapperReader::read(Frame& out_frame) {
status::StatusCode ChannelMapperReader::read(Frame& out_frame) {
roc_panic_if(init_status_ != status::StatusOK);

if (out_frame.num_raw_samples() % out_spec_.num_channels() != 0) {
Expand All @@ -75,8 +75,10 @@ bool ChannelMapperReader::read(Frame& out_frame) {
const size_t n_read = std::min(n_samples, max_batch);

core::nanoseconds_t capt_ts = 0;
if (!read_(out_samples, n_read, flags, capt_ts)) {
return false;

const status::StatusCode code = read_(out_samples, n_read, flags, capt_ts);
if (code != status::StatusOK) {
return code;
}

if (frames_counter == 0) {
Expand All @@ -90,16 +92,18 @@ bool ChannelMapperReader::read(Frame& out_frame) {
out_frame.set_flags(flags);
out_frame.set_duration(out_frame.num_raw_samples() / out_spec_.num_channels());

return true;
return status::StatusOK;
}

bool ChannelMapperReader::read_(sample_t* out_samples,
size_t n_samples,
unsigned& flags,
core::nanoseconds_t& capt_ts) {
status::StatusCode ChannelMapperReader::read_(sample_t* out_samples,
size_t n_samples,
unsigned& flags,
core::nanoseconds_t& capt_ts) {
Frame in_frame(input_buf_.data(), n_samples * in_spec_.num_channels());
if (!input_reader_.read(in_frame)) {
return false;

const status::StatusCode code = input_reader_.read(in_frame);
if (code != status::StatusOK) {
return code;
}

mapper_.map(in_frame.raw_samples(), in_frame.num_raw_samples(), out_samples,
Expand All @@ -108,7 +112,7 @@ bool ChannelMapperReader::read_(sample_t* out_samples,
capt_ts = in_frame.capture_timestamp();
flags |= in_frame.flags();

return true;
return status::StatusOK;
}

} // namespace audio
Expand Down
10 changes: 5 additions & 5 deletions src/internal_modules/roc_audio/channel_mapper_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ class ChannelMapperReader : public IFrameReader, public core::NonCopyable<> {
status::StatusCode init_status() const;

//! Read audio frame.
virtual bool read(Frame& frame);
virtual ROC_ATTR_NODISCARD status::StatusCode read(Frame& frame);

private:
bool read_(sample_t* out_samples,
size_t n_samples,
unsigned& flags,
core::nanoseconds_t& capt_ts);
status::StatusCode read_(sample_t* out_samples,
size_t n_samples,
unsigned& flags,
core::nanoseconds_t& capt_ts);

IFrameReader& input_reader_;
core::Slice<sample_t> input_buf_;
Expand Down
5 changes: 3 additions & 2 deletions src/internal_modules/roc_audio/depacketizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,15 @@ packet::stream_timestamp_t Depacketizer::next_timestamp() const {
return stream_ts_;
}

bool Depacketizer::read(Frame& frame) {
status::StatusCode Depacketizer::read(Frame& frame) {
roc_panic_if(init_status_ != status::StatusOK);

read_frame_(frame);

report_stats_();

return true;
// TODO(gh-183): forward status
return status::StatusOK;
}

void Depacketizer::read_frame_(Frame& frame) {
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_audio/depacketizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class Depacketizer : public IFrameReader, public core::NonCopyable<> {
bool is_started() const;

//! Read audio frame.
virtual bool read(Frame& frame);
virtual ROC_ATTR_NODISCARD status::StatusCode read(Frame& frame);

//! Get next timestamp to be rendered.
//! @pre
Expand Down
15 changes: 9 additions & 6 deletions src/internal_modules/roc_audio/iframe_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
#define ROC_AUDIO_IFRAME_READER_H_

#include "roc_audio/frame.h"
#include "roc_core/attributes.h"
#include "roc_core/list_node.h"
#include "roc_status/status_code.h"

namespace roc {
namespace audio {
Expand All @@ -23,13 +25,14 @@ class IFrameReader : public core::ListNode<> {
public:
virtual ~IFrameReader();

//! Read audio frame.
//! @remarks
//! Frame buffer and its size should be set by caller. The reader
//! should fill the entire buffer and should not resize it.
//! Read frame.
//!
//! @returns
//! false if there is nothing to read anymore.
virtual bool read(Frame& frame) = 0;
//! If frame was successfully and completely read, returns status::StatusOK,
//! otherwise, returns an error.
//!
//! @see status::StatusCode.
virtual ROC_ATTR_NODISCARD status::StatusCode read(Frame& frame) = 0;
};

} // namespace audio
Expand Down
11 changes: 6 additions & 5 deletions src/internal_modules/roc_audio/latency_monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ const LatencyMetrics& LatencyMonitor::metrics() const {
return latency_metrics_;
}

bool LatencyMonitor::read(Frame& frame) {
status::StatusCode LatencyMonitor::read(Frame& frame) {
roc_panic_if(init_status_ != status::StatusOK);

if (alive_) {
Expand All @@ -82,16 +82,17 @@ bool LatencyMonitor::read(Frame& frame) {
}

if (!alive_) {
return false;
return status::StatusAbort;
}

if (!frame_reader_.read(frame)) {
return false;
const status::StatusCode code = frame_reader_.read(frame);
if (code != status::StatusOK) {
return code;
}

post_process_(frame);

return true;
return status::StatusOK;
}

bool LatencyMonitor::reclock(const core::nanoseconds_t playback_timestamp) {
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_audio/latency_monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> {
//! Read audio frame from a pipeline.
//! @remarks
//! Forwards frame from underlying reader as-is.
virtual bool read(Frame& frame);
virtual ROC_ATTR_NODISCARD status::StatusCode read(Frame& frame);

//! Report playback timestamp of last frame returned by read.
//! @remarks
Expand Down
41 changes: 18 additions & 23 deletions src/internal_modules/roc_audio/mixer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,9 @@ void Mixer::remove_input(IFrameReader& reader) {
readers_.remove(reader);
}

bool Mixer::read(Frame& frame) {
status::StatusCode Mixer::read(Frame& frame) {
roc_panic_if(init_status_ != status::StatusOK);

// Optimization for single reader case.
if (readers_.size() == 1) {
if (!readers_.front()->read(frame)) {
frame.set_duration(frame.num_raw_samples() / sample_spec_.num_channels());
}

if (!enable_timestamps_) {
// When timestamps are disabled, don't forget to zeroise
// them in the optimized path.
frame.set_capture_timestamp(0);
}

return true;
}

const size_t max_read = temp_buf_.size();

sample_t* samples = frame.raw_samples();
Expand All @@ -88,7 +73,10 @@ bool Mixer::read(Frame& frame) {
n_read = max_read;
}

read_(samples, n_read, flags, capture_ts);
const status::StatusCode code = read_(samples, n_read, flags, capture_ts);
if (code != status::StatusOK) {
return code;
}

samples += n_read;
n_samples -= n_read;
Expand All @@ -98,13 +86,13 @@ bool Mixer::read(Frame& frame) {
frame.set_duration(frame.num_raw_samples() / sample_spec_.num_channels());
frame.set_capture_timestamp(capture_ts);

return true;
return status::StatusOK;
}

void Mixer::read_(sample_t* out_data,
size_t out_size,
unsigned& out_flags,
core::nanoseconds_t& out_cts) {
status::StatusCode Mixer::read_(sample_t* out_data,
size_t out_size,
unsigned& out_flags,
core::nanoseconds_t& out_cts) {
roc_panic_if(!out_data);
roc_panic_if(out_size == 0);

Expand All @@ -121,9 +109,14 @@ void Mixer::read_(sample_t* out_data,
sample_t* temp_data = temp_buf_.data();

Frame temp_frame(temp_data, out_size);
if (!rp->read(temp_frame)) {

const status::StatusCode code = rp->read(temp_frame);
if (code == status::StatusDrain) {
continue;
}
if (code != status::StatusOK) {
return code;
}

for (size_t n = 0; n < out_size; n++) {
out_data[n] += temp_data[n];
Expand Down Expand Up @@ -157,6 +150,8 @@ void Mixer::read_(sample_t* out_data,
out_cts = core::nanoseconds_t(cts_base * ((double)cts_count / n_readers)
+ cts_sum / (double)n_readers);
}

return status::StatusOK;
}

} // namespace audio
Expand Down
10 changes: 5 additions & 5 deletions src/internal_modules/roc_audio/mixer.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ class Mixer : public IFrameReader, public core::NonCopyable<> {
//! @remarks
//! Reads samples from every input reader, mixes them, and fills @p frame
//! with the result.
virtual bool read(Frame& frame);
virtual ROC_ATTR_NODISCARD status::StatusCode read(Frame& frame);

private:
void read_(sample_t* out_data,
size_t out_size,
unsigned& out_flags,
core::nanoseconds_t& out_cts);
status::StatusCode read_(sample_t* out_data,
size_t out_size,
unsigned& out_flags,
core::nanoseconds_t& out_cts);

core::List<IFrameReader, core::NoOwnership> readers_;
core::Slice<sample_t> temp_buf_;
Expand Down
10 changes: 6 additions & 4 deletions src/internal_modules/roc_audio/pcm_mapper_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ status::StatusCode PcmMapperReader::init_status() const {
return init_status_;
}

bool PcmMapperReader::read(Frame& out_frame) {
status::StatusCode PcmMapperReader::read(Frame& out_frame) {
roc_panic_if(init_status_ != status::StatusOK);

const size_t max_sample_count = mapper_.input_sample_count(in_buf_.size()) / num_ch_;
Expand All @@ -86,8 +86,10 @@ bool PcmMapperReader::read(Frame& out_frame) {
size_t in_bit_offset = 0;

Frame in_frame(in_buf_.data(), in_byte_count);
if (!in_reader_.read(in_frame)) {
return false;

const status::StatusCode code = in_reader_.read(in_frame);
if (code != status::StatusOK) {
return code;
}

mapper_.map(in_buf_.data(), in_byte_count, in_bit_offset, out_frame.bytes(),
Expand All @@ -110,7 +112,7 @@ bool PcmMapperReader::read(Frame& out_frame) {
out_frame.set_flags(out_flags);
out_frame.set_duration(out_sample_count);

return true;
return status::StatusOK;
}

} // namespace audio
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_audio/pcm_mapper_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class PcmMapperReader : public IFrameReader, public core::NonCopyable<> {
status::StatusCode init_status() const;

//! Read audio frame.
virtual bool read(Frame& frame);
virtual ROC_ATTR_NODISCARD status::StatusCode read(Frame& frame);

private:
PcmMapper mapper_;
Expand Down
18 changes: 6 additions & 12 deletions src/internal_modules/roc_audio/profiling_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,16 @@ status::StatusCode ProfilingReader::init_status() const {
return profiler_.init_status();
}

bool ProfilingReader::read(Frame& frame) {
bool ret;
const core::nanoseconds_t elapsed = read_(frame, ret);
status::StatusCode ProfilingReader::read(Frame& frame) {
const core::nanoseconds_t started = core::timestamp(core::ClockMonotonic);
const status::StatusCode code = reader_.read(frame);
const core::nanoseconds_t elapsed = core::timestamp(core::ClockMonotonic) - started;

if (ret) {
if (code == status::StatusOK) {
profiler_.add_frame(frame.duration(), elapsed);
}
return ret;
}

core::nanoseconds_t ProfilingReader::read_(Frame& frame, bool& ret) {
const core::nanoseconds_t start = core::timestamp(core::ClockMonotonic);

ret = reader_.read(frame);

return core::timestamp(core::ClockMonotonic) - start;
return code;
}

} // namespace audio
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_audio/profiling_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ProfilingReader : public IFrameReader, public core::NonCopyable<> {
status::StatusCode init_status() const;

//! Read audio frame.
virtual bool read(Frame& frame);
virtual ROC_ATTR_NODISCARD status::StatusCode read(Frame& frame);

private:
core::nanoseconds_t read_(Frame& frame, bool& ret);
Expand Down
Loading

0 comments on commit b5fec7c

Please sign in to comment.