Skip to content

Commit

Permalink
[sp] roc-streaminggh-183 Forward statuses through refresh()
Browse files Browse the repository at this point in the history
Since IFrameReader::read() now can return status, and this status
can be status::StatusAbort, we need to be able to remember it and
return from refresh().

This commit updates refresh() calls to return statuses.

Sponsored-by: waspd
  • Loading branch information
gavv committed Jun 20, 2024
1 parent c0d1db6 commit 461b44c
Show file tree
Hide file tree
Showing 21 changed files with 347 additions and 244 deletions.
4 changes: 1 addition & 3 deletions src/internal_modules/roc_audio/latency_monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,12 @@ status::StatusCode LatencyMonitor::read(Frame& frame) {
return status::StatusOK;
}

bool LatencyMonitor::reclock(const core::nanoseconds_t playback_timestamp) {
void LatencyMonitor::reclock(const core::nanoseconds_t playback_timestamp) {
roc_panic_if(init_status_ != status::StatusOK);

// this method is called when playback time of last frame was reported
// now we can update e2e latency based on it
compute_e2e_latency_(playback_timestamp);

return true;
}

bool LatencyMonitor::pre_process_(const Frame& frame) {
Expand Down
4 changes: 1 addition & 3 deletions src/internal_modules/roc_audio/latency_monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ class LatencyMonitor : public IFrameReader, public core::NonCopyable<> {
//! @remarks
//! Pipeline invokes this method after adding last frame to
//! playback buffer and knowing its playback time.
//! @returns
//! false if the session is ended
bool reclock(core::nanoseconds_t playback_timestamp);
void reclock(core::nanoseconds_t playback_timestamp);

private:
void compute_niq_latency_();
Expand Down
18 changes: 14 additions & 4 deletions src/internal_modules/roc_pipeline/receiver_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,21 @@ uint64_t ReceiverLoop::tid_imp() const {
}

status::StatusCode ReceiverLoop::process_subframe_imp(audio::Frame& frame) {
// TODO(gh-183): forward status (refresh)
// TODO: handle returned deadline and schedule refresh
source_.refresh(core::timestamp(core::ClockUnix));
status::StatusCode code = status::NoStatus;

return source_.read(frame);
// TODO(gh-674): handle returned deadline and schedule refresh
core::nanoseconds_t next_deadline = 0;

if ((code = source_.refresh(core::timestamp(core::ClockUnix), &next_deadline))
!= status::StatusOK) {
return code;
}

if ((code = source_.read(frame)) != status::StatusOK) {
return code;
}

return status::StatusOK;
}

bool ReceiverLoop::process_task_imp(PipelineTask& basic_task) {
Expand Down
24 changes: 8 additions & 16 deletions src/internal_modules/roc_pipeline/receiver_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,33 +262,25 @@ status::StatusCode ReceiverSession::route_packet(const packet::PacketPtr& packet
return packet_router_->write(packet);
}

bool ReceiverSession::refresh(core::nanoseconds_t current_time,
core::nanoseconds_t* next_refresh) {
status::StatusCode ReceiverSession::refresh(core::nanoseconds_t current_time,
core::nanoseconds_t& next_deadline) {
roc_panic_if(init_status_ != status::StatusOK);

(void)current_time;

if (next_refresh) {
*next_refresh = 0;
}

if (watchdog_) {
if (!watchdog_->is_alive()) {
return false;
}
if (watchdog_ && !watchdog_->is_alive()) {
return status::StatusAbort;
}

if (!latency_monitor_->is_alive()) {
return false;
return status::StatusAbort;
}

return true;
return status::StatusOK;
}

bool ReceiverSession::reclock(core::nanoseconds_t playback_time) {
void ReceiverSession::reclock(core::nanoseconds_t playback_time) {
roc_panic_if(init_status_ != status::StatusOK);

return latency_monitor_->reclock(playback_time);
latency_monitor_->reclock(playback_time);
}

size_t ReceiverSession::num_reports() const {
Expand Down
14 changes: 6 additions & 8 deletions src/internal_modules/roc_pipeline/receiver_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,17 @@ class ReceiverSession : public core::RefCounted<ReceiverSession, core::ArenaAllo

//! Refresh pipeline according to current time.
//! @remarks
//! writes to @p next_refresh deadline (absolute time) when refresh should
//! be invoked again if there are no frames
//! @returns
//! false if the session is ended
bool refresh(core::nanoseconds_t current_time, core::nanoseconds_t* next_refresh);
//! Should be invoked before reading each frame.
//! If there are no frames for a while, should be invoked no
//! later than the deadline returned via @p next_deadline.
ROC_ATTR_NODISCARD status::StatusCode refresh(core::nanoseconds_t current_time,
core::nanoseconds_t& next_deadline);

//! Adjust session clock to match consumer clock.
//! @remarks
//! @p playback_time specified absolute time when first sample of last frame
//! retrieved from pipeline will be actually played on sink
//! @returns
//! false if the session is ended
bool reclock(core::nanoseconds_t playback_time);
void reclock(core::nanoseconds_t playback_time);

//! Get number of RTCP reports to be generated.
size_t num_reports() const;
Expand Down
53 changes: 27 additions & 26 deletions src/internal_modules/roc_pipeline/receiver_session_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,60 +91,61 @@ status::StatusCode ReceiverSessionGroup::route_packet(const packet::PacketPtr& p
return route_transport_packet_(packet);
}

core::nanoseconds_t
ReceiverSessionGroup::refresh_sessions(core::nanoseconds_t current_time) {
status::StatusCode
ReceiverSessionGroup::refresh_sessions(core::nanoseconds_t current_time,
core::nanoseconds_t& next_deadline) {
roc_panic_if(init_status_ != status::StatusOK);

core::SharedPtr<ReceiverSession> curr, next;

core::nanoseconds_t next_deadline = 0;

if (rtcp_communicator_) {
// This will invoke IParticipant methods implemented by us,
// in particular query_recv_streams().
const status::StatusCode code =
rtcp_communicator_->generate_reports(current_time);
// TODO(gh-183): forward status (refresh)
roc_panic_if(code != status::StatusOK);

if (code != status::StatusOK) {
return code;
}

next_deadline = rtcp_communicator_->generation_deadline(current_time);
}

for (curr = sessions_.front(); curr; curr = next) {
next = sessions_.nextof(*curr);
core::SharedPtr<ReceiverSession> curr_sess, next_sess;

for (curr_sess = sessions_.front(); curr_sess; curr_sess = next_sess) {
next_sess = sessions_.nextof(*curr_sess);

core::nanoseconds_t sess_deadline = 0;
const status::StatusCode code = curr_sess->refresh(current_time, sess_deadline);

if (!curr->refresh(current_time, &sess_deadline)) {
// Session ended.
remove_session_(curr);
// Terminate sessions which asked so.
if (code == status::StatusEnd || code == status::StatusAbort) {
remove_session_(curr_sess);
continue;
}

// Handle other errors.
if (code != status::StatusOK) {
return code;
}

if (sess_deadline != 0) {
if (next_deadline == 0) {
next_deadline = sess_deadline;
} else {
next_deadline = std::min(next_deadline, sess_deadline);
}
next_deadline = (next_deadline == 0 ? sess_deadline
: std::min(next_deadline, sess_deadline));
}
}

return next_deadline;
return status::StatusOK;
}

void ReceiverSessionGroup::reclock_sessions(core::nanoseconds_t playback_time) {
roc_panic_if(init_status_ != status::StatusOK);

core::SharedPtr<ReceiverSession> curr, next;
core::SharedPtr<ReceiverSession> curr_sess, next_sess;

for (curr = sessions_.front(); curr; curr = next) {
next = sessions_.nextof(*curr);
for (curr_sess = sessions_.front(); curr_sess; curr_sess = next_sess) {
next_sess = sessions_.nextof(*curr_sess);

if (!curr->reclock(playback_time)) {
// Session ended.
remove_session_(curr);
}
curr_sess->reclock(playback_time);
}
}

Expand Down
11 changes: 7 additions & 4 deletions src/internal_modules/roc_pipeline/receiver_session_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,13 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
core::nanoseconds_t current_time);

//! Refresh pipeline according to current time.
//! @returns
//! deadline (absolute time) when refresh should be invoked again
//! if there are no frames
core::nanoseconds_t refresh_sessions(core::nanoseconds_t current_time);
//! @remarks
//! Should be invoked before reading each frame.
//! If there are no frames for a while, should be invoked no
//! later than the deadline returned via @p next_deadline.
ROC_ATTR_NODISCARD status::StatusCode
refresh_sessions(core::nanoseconds_t current_time,
core::nanoseconds_t& next_deadline);

//! Adjust session clock to match consumer clock.
//! @remarks
Expand Down
30 changes: 19 additions & 11 deletions src/internal_modules/roc_pipeline/receiver_slot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,28 +73,36 @@ ReceiverEndpoint* ReceiverSlot::add_endpoint(address::Interface iface,
return NULL;
}

core::nanoseconds_t ReceiverSlot::refresh(core::nanoseconds_t current_time) {
status::StatusCode ReceiverSlot::refresh(core::nanoseconds_t current_time,
core::nanoseconds_t& next_deadline) {
roc_panic_if(init_status_ != status::StatusOK);

status::StatusCode code = status::NoStatus;

if (source_endpoint_) {
const status::StatusCode code = source_endpoint_->pull_packets(current_time);
// TODO(gh-183): forward status (refresh)
roc_panic_if(code != status::StatusOK);
if ((code = source_endpoint_->pull_packets(current_time)) != status::StatusOK) {
return code;
}
}

if (repair_endpoint_) {
const status::StatusCode code = repair_endpoint_->pull_packets(current_time);
// TODO(gh-183): forward status (refresh)
roc_panic_if(code != status::StatusOK);
if ((code = repair_endpoint_->pull_packets(current_time)) != status::StatusOK) {
return code;
}
}

if (control_endpoint_) {
const status::StatusCode code = control_endpoint_->pull_packets(current_time);
// TODO(gh-183): forward status (refresh)
roc_panic_if(code != status::StatusOK);
if ((code = control_endpoint_->pull_packets(current_time)) != status::StatusOK) {
return code;
}
}

if ((code = session_group_.refresh_sessions(current_time, next_deadline))
!= status::StatusOK) {
return code;
}

return session_group_.refresh_sessions(current_time);
return status::StatusOK;
}

void ReceiverSlot::reclock(core::nanoseconds_t playback_time) {
Expand Down
12 changes: 7 additions & 5 deletions src/internal_modules/roc_pipeline/receiver_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ class ReceiverSlot : public core::RefCounted<ReceiverSlot, core::ArenaAllocation
const address::SocketAddr& inbound_address,
packet::IWriter* outbound_writer);

//! Pull packets and refresh sessions according to current time.
//! @returns
//! deadline (absolute time) when refresh should be invoked again
//! if there are no frames
core::nanoseconds_t refresh(core::nanoseconds_t current_time);
//! Pull packets and refresh pipeline according to current time.
//! @remarks
//! Should be invoked before reading each frame.
//! If there are no frames for a while, should be invoked no
//! later than the deadline returned via @p next_deadline.
ROC_ATTR_NODISCARD status::StatusCode refresh(core::nanoseconds_t current_time,
core::nanoseconds_t& next_deadline);

//! Adjust sessions clock to match consumer clock.
//! @remarks
Expand Down
26 changes: 14 additions & 12 deletions src/internal_modules/roc_pipeline/receiver_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,30 +110,32 @@ size_t ReceiverSource::num_sessions() const {
return state_tracker_.num_active_sessions();
}

core::nanoseconds_t ReceiverSource::refresh(core::nanoseconds_t current_time) {
status::StatusCode ReceiverSource::refresh(core::nanoseconds_t current_time,
core::nanoseconds_t* next_deadline) {
roc_panic_if(init_status_ != status::StatusOK);

roc_panic_if_msg(current_time <= 0,
"receiver source: invalid timestamp:"
" expected positive value, got %lld",
(long long)current_time);

core::nanoseconds_t next_deadline = 0;

for (core::SharedPtr<ReceiverSlot> slot = slots_.front(); slot;
slot = slots_.nextof(*slot)) {
const core::nanoseconds_t slot_deadline = slot->refresh(current_time);

if (slot_deadline != 0) {
if (next_deadline == 0) {
next_deadline = slot_deadline;
} else {
next_deadline = std::min(next_deadline, slot_deadline);
}
core::nanoseconds_t slot_deadline = 0;

const status::StatusCode code = slot->refresh(current_time, slot_deadline);
if (code != status::StatusOK) {
return code;
}

if (next_deadline && slot_deadline != 0) {
*next_deadline = *next_deadline == 0
? slot_deadline
: std::min(*next_deadline, slot_deadline);
}
}

return next_deadline;
return status::StatusOK;
}

sndio::ISink* ReceiverSource::to_sink() {
Expand Down
10 changes: 4 additions & 6 deletions src/internal_modules/roc_pipeline/receiver_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,10 @@ class ReceiverSource : public sndio::ISource, public core::NonCopyable<> {
//! Pull packets and refresh pipeline according to current time.
//! @remarks
//! Should be invoked before reading each frame.
//! Also should be invoked after provided deadline if no frames were
//! read until that deadline expires.
//! @returns
//! deadline (absolute time) when refresh should be invoked again
//! if there are no frames
core::nanoseconds_t refresh(core::nanoseconds_t current_time);
//! If there are no frames for a while, should be invoked no
//! later than the deadline returned via @p next_deadline.
ROC_ATTR_NODISCARD status::StatusCode refresh(core::nanoseconds_t current_time,
core::nanoseconds_t* next_deadline);

//! Cast IDevice to ISink.
virtual sndio::ISink* to_sink();
Expand Down
18 changes: 13 additions & 5 deletions src/internal_modules/roc_pipeline/sender_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,21 @@ uint64_t SenderLoop::tid_imp() const {
}

status::StatusCode SenderLoop::process_subframe_imp(audio::Frame& frame) {
const status::StatusCode status = sink_.write(frame);
status::StatusCode code = status::NoStatus;

// TODO(gh-183): forward status (refresh)
// TODO: handle returned deadline and schedule refresh
sink_.refresh(core::timestamp(core::ClockUnix));
if ((code = sink_.write(frame)) != status::StatusOK) {
return code;
}

// TODO(gh-674): handle returned deadline and schedule refresh
core::nanoseconds_t next_deadline = 0;

if ((code = sink_.refresh(core::timestamp(core::ClockUnix), &next_deadline))
!= status::StatusOK) {
return code;
}

return status;
return status::StatusOK;
}

bool SenderLoop::process_task_imp(PipelineTask& basic_task) {
Expand Down
Loading

0 comments on commit 461b44c

Please sign in to comment.