diff --git a/tools/cabana/streams/abstractstream.cc b/tools/cabana/streams/abstractstream.cc index ad911b7baedcf7..2278104c269171 100644 --- a/tools/cabana/streams/abstractstream.cc +++ b/tools/cabana/streams/abstractstream.cc @@ -143,7 +143,6 @@ bool AbstractStream::isMessageActive(const MessageId &id) const { } void AbstractStream::updateLastMsgsTo(double sec) { - std::lock_guard lk(mutex_); current_sec_ = sec; uint64_t last_ts = toMonoTime(sec); std::unordered_map msgs; @@ -175,10 +174,17 @@ void AbstractStream::updateLastMsgsTo(double sec) { std::any_of(messages_.cbegin(), messages_.cend(), [this](const auto &m) { return !last_msgs.count(m.first); }); last_msgs = messages_; - mutex_.unlock(); - emit msgsReceived(nullptr, id_changed); - resumeStream(); + + std::lock_guard lk(mutex_); + seek_finished_ = true; + seek_finished_cv_.notify_one(); +} + +void AbstractStream::waitForSeekFinshed() { + std::unique_lock lock(mutex_); + seek_finished_cv_.wait(lock, [this]() { return seek_finished_; }); + seek_finished_ = false; } const CanEvent *AbstractStream::newEvent(uint64_t mono_time, const cereal::CanData::Reader &c) { diff --git a/tools/cabana/streams/abstractstream.h b/tools/cabana/streams/abstractstream.h index 8180c6b6f8b72b..f35b19d34f7045 100644 --- a/tools/cabana/streams/abstractstream.h +++ b/tools/cabana/streams/abstractstream.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -111,7 +112,7 @@ class AbstractStream : public QObject { void mergeEvents(const std::vector &events); const CanEvent *newEvent(uint64_t mono_time, const cereal::CanData::Reader &c); void updateEvent(const MessageId &id, double sec, const uint8_t *data, uint8_t size); - virtual void resumeStream() {} + void waitForSeekFinshed(); std::vector all_events_; double current_sec_ = 0; std::optional> time_range_; @@ -127,6 +128,8 @@ class AbstractStream : public QObject { // Members accessed in multiple threads. (mutex protected) std::mutex mutex_; + std::condition_variable seek_finished_cv_; + bool seek_finished_ = false; std::set new_msgs_; std::unordered_map messages_; std::unordered_map> masks_; diff --git a/tools/cabana/streams/replaystream.cc b/tools/cabana/streams/replaystream.cc index bfe5ca74da1580..ebe478ded77d1d 100644 --- a/tools/cabana/streams/replaystream.cc +++ b/tools/cabana/streams/replaystream.cc @@ -53,9 +53,12 @@ bool ReplayStream::loadRoute(const QString &route, const QString &data_dir, uint // Forward replay callbacks to corresponding Qt signals. replay->onSeeking = [this](double sec) { emit seeking(sec); }; - replay->onSeekedTo = [this](double sec) { emit seekedTo(sec); }; + replay->onSeekedTo = [this](double sec) { + emit seekedTo(sec); + waitForSeekFinshed(); + }; replay->onQLogLoaded = [this](std::shared_ptr qlog) { emit qLogLoaded(qlog); }; - replay->onSegmentsMerged = [this]() { QMetaObject::invokeMethod(this, &ReplayStream::mergeSegments, Qt::QueuedConnection); }; + replay->onSegmentsMerged = [this]() { QMetaObject::invokeMethod(this, &ReplayStream::mergeSegments, Qt::BlockingQueuedConnection); }; bool success = replay->load(); if (!success) { diff --git a/tools/cabana/streams/replaystream.h b/tools/cabana/streams/replaystream.h index 2d7f3351935d9c..df1f2526a5e328 100644 --- a/tools/cabana/streams/replaystream.h +++ b/tools/cabana/streams/replaystream.h @@ -32,7 +32,6 @@ class ReplayStream : public AbstractStream { inline float getSpeed() const { return replay->getSpeed(); } inline Replay *getReplay() const { return replay.get(); } inline bool isPaused() const override { return replay->isPaused(); } - void resumeStream() override { return replay->resumeStream(); } void pause(bool pause) override; signals: diff --git a/tools/replay/replay.cc b/tools/replay/replay.cc index 2bd3614530abae..a940b6d04e81a8 100644 --- a/tools/replay/replay.cc +++ b/tools/replay/replay.cc @@ -106,31 +106,27 @@ void Replay::seekTo(double seconds, bool relative) { rInfo("Seeking to %d s, segment %d", (int)target_time, target_segment); notifyEvent(onSeeking, target_time); - double seeked_to_sec = -1; interruptStream([&]() { - current_segment_ = target_segment; + current_segment_.store(target_segment); cur_mono_time_ = route_start_ts_ + target_time * 1e9; - seeking_to_ = target_time; - - if (event_data_->isSegmentLoaded(target_segment)) { - seeked_to_sec = *seeking_to_; - seeking_to_.reset(); - } + seeking_to_.store(target_time, std::memory_order_relaxed); return false; }); - checkSeekProgress(seeked_to_sec); seg_mgr_->setCurrentSegment(target_segment); + checkSeekProgress(); } -void Replay::checkSeekProgress(double seeked_to_sec) { - if (seeked_to_sec >= 0) { - if (onSeekedTo) { - onSeekedTo(seeked_to_sec); - } else { - interruptStream([]() { return true; }); - } +void Replay::checkSeekProgress() { + if (!seg_mgr_->getEventData()->isSegmentLoaded(current_segment_.load())) return; + + double seek_to = seeking_to_.exchange(-1.0, std::memory_order_acquire); + if (seek_to >= 0 && onSeekedTo) { + onSeekedTo(seek_to); } + + // Resume the interrupted stream + interruptStream([]() { return true; }); } void Replay::seekToFlag(FindFlag flag) { @@ -152,29 +148,19 @@ void Replay::pause(bool pause) { void Replay::handleSegmentMerge() { if (exit_) return; - double seeked_to_sec = -1; - interruptStream([&]() { - event_data_ = seg_mgr_->getEventData(); - notifyEvent(onSegmentsMerged); - - bool segment_loaded = event_data_->isSegmentLoaded(current_segment_); - if (seeking_to_ && segment_loaded) { - seeked_to_sec = *seeking_to_; - seeking_to_.reset(); - return false; - } - return segment_loaded; - }); - - checkSeekProgress(seeked_to_sec); - if (!stream_thread_.joinable() && !event_data_->events.empty()) { - startStream(); + auto event_data = seg_mgr_->getEventData(); + if (!stream_thread_.joinable() && !event_data->segments.empty()) { + startStream(event_data->segments.begin()->second); } + notifyEvent(onSegmentsMerged); + + // Interrupt the stream to handle segment merge + interruptStream([]() { return false; }); + checkSeekProgress(); } -void Replay::startStream() { - const auto &cur_segment = event_data_->segments.begin()->second; - const auto &events = cur_segment->log->events; +void Replay::startStream(const std::shared_ptr segment) { + const auto &events = segment->log->events; route_start_ts_ = events.front().mono_time; cur_mono_time_ += route_start_ts_ - 1; @@ -212,7 +198,7 @@ void Replay::startStream() { if (!hasFlag(REPLAY_FLAG_NO_VIPC)) { std::pair camera_size[MAX_CAMERAS] = {}; for (auto type : ALL_CAMERAS) { - if (auto &fr = cur_segment->frames[type]) { + if (auto &fr = segment->frames[type]) { camera_size[type] = {fr->width, fr->height}; } } @@ -271,6 +257,7 @@ void Replay::streamThread() { stream_cv_.wait(lk, [this]() { return exit_ || (events_ready_ && !interrupt_requested_); }); if (exit_) break; + event_data_ = seg_mgr_->getEventData(); const auto &events = event_data_->events; auto first = std::upper_bound(events.cbegin(), events.cend(), Event(cur_which, cur_mono_time_, {})); if (first == events.cend()) { @@ -308,11 +295,11 @@ std::vector::const_iterator Replay::publishEvents(std::vector::con for (; !interrupt_requested_ && first != last; ++first) { const Event &evt = *first; - int segment = toSeconds(evt.mono_time) / 60; - if (current_segment_ != segment) { - current_segment_ = segment; - seg_mgr_->setCurrentSegment(current_segment_); + int segment = toSeconds(evt.mono_time) / 60; + if (current_segment_.load(std::memory_order_relaxed) != segment) { + current_segment_.store(segment, std::memory_order_relaxed); + seg_mgr_->setCurrentSegment(segment); } // Skip events if socket is not present diff --git a/tools/replay/replay.h b/tools/replay/replay.h index d549eaefc4dd76..8525a532a1766c 100644 --- a/tools/replay/replay.h +++ b/tools/replay/replay.h @@ -55,9 +55,8 @@ class Replay { inline const std::string &carFingerprint() const { return car_fingerprint_; } inline const std::shared_ptr> getTimeline() const { return timeline_.getEntries(); } inline const std::optional findAlertAtTime(double sec) const { return timeline_.findAlertAtTime(sec); } - const std::shared_ptr getEventData() const { return event_data_; } + const std::shared_ptr getEventData() const { return seg_mgr_->getEventData(); } void installEventFilter(std::function filter) { event_filter_ = filter; } - void resumeStream() { interruptStream([]() { return true; }); } // Event callback functions std::function onSegmentsMerged = nullptr; @@ -68,7 +67,7 @@ class Replay { private: void setupServices(const std::vector &allow, const std::vector &block); void setupSegmentManager(bool has_filters); - void startStream(); + void startStream(const std::shared_ptr segment); void streamThread(); void handleSegmentMerge(); void interruptStream(const std::function& update_fn); @@ -76,7 +75,7 @@ class Replay { std::vector::const_iterator last); void publishMessage(const Event *e); void publishFrame(const Event *e); - void checkSeekProgress(double seeked_to_sec); + void checkSeekProgress(); std::unique_ptr seg_mgr_; Timeline timeline_; @@ -86,8 +85,8 @@ class Replay { std::mutex stream_lock_; bool user_paused_ = false; std::condition_variable stream_cv_; - int current_segment_ = 0; - std::optional seeking_to_; + std::atomic current_segment_ = 0; + std::atomic seeking_to_ = -1.0; std::atomic exit_ = false; std::atomic interrupt_requested_ = false; bool events_ready_ = false; diff --git a/tools/replay/seg_mgr.cc b/tools/replay/seg_mgr.cc index 954b25e8744f88..8a00d426b1a804 100644 --- a/tools/replay/seg_mgr.cc +++ b/tools/replay/seg_mgr.cc @@ -105,7 +105,7 @@ bool SegmentManager::mergeSegments(const SegmentMap::iterator &begin, const Segm merged_event_data->segments[n] = segments_.at(n); } - event_data_ = merged_event_data; + std::atomic_store(&event_data_, std::move(merged_event_data)); merged_segments_ = segments_to_merge; return true; diff --git a/tools/replay/seg_mgr.h b/tools/replay/seg_mgr.h index efb3d7f0eadacf..40bdcd51f06d1d 100644 --- a/tools/replay/seg_mgr.h +++ b/tools/replay/seg_mgr.h @@ -21,14 +21,14 @@ class SegmentManager { }; SegmentManager(const std::string &route_name, uint32_t flags, const std::string &data_dir = "") - : flags_(flags), route_(route_name, data_dir) {}; + : flags_(flags), route_(route_name, data_dir), event_data_(std::make_shared()) {} ~SegmentManager(); bool load(); void setCurrentSegment(int seg_num); void setCallback(const std::function &callback) { onSegmentMergedCallback_ = callback; } void setFilters(const std::vector &filters) { filters_ = filters; } - const std::shared_ptr getEventData() const { return event_data_; } + const std::shared_ptr getEventData() const { return std::atomic_load(&event_data_); } bool hasSegment(int n) const { return segments_.find(n) != segments_.end(); } Route route_; diff --git a/tools/replay/timeline.cc b/tools/replay/timeline.cc index d836de972b60f0..a4c2ffb700c1a4 100644 --- a/tools/replay/timeline.cc +++ b/tools/replay/timeline.cc @@ -1,7 +1,7 @@ #include "tools/replay/timeline.h" -#include #include +#include #include "cereal/gen/cpp/log.capnp.h" @@ -74,7 +74,7 @@ void Timeline::buildTimeline(const Route &route, uint64_t route_start_ts, bool l // Sort and finalize the timeline entries auto entries = std::make_shared>(staging_entries_); std::sort(entries->begin(), entries->end(), [](auto &a, auto &b) { return a.start_time < b.start_time; }); - timeline_entries_ = entries; + std::atomic_store(&timeline_entries_, std::move(entries)); callback(log); // Notify the callback once the log is processed } diff --git a/tools/replay/timeline.h b/tools/replay/timeline.h index 689a80635f9b0d..74add9e5cc4623 100644 --- a/tools/replay/timeline.h +++ b/tools/replay/timeline.h @@ -27,20 +27,20 @@ class Timeline { std::function)> callback); std::optional find(double cur_ts, FindFlag flag) const; std::optional findAlertAtTime(double target_time) const; - const std::shared_ptr> getEntries() const { return timeline_entries_; } + const std::shared_ptr> getEntries() const { return std::atomic_load(&timeline_entries_); } private: - void buildTimeline(const Route &route, uint64_t route_start_ts, bool local_cache, - std::function)> callback); - void updateEngagementStatus(const cereal::SelfdriveState::Reader &cs, std::optional &idx, double seconds); - void updateAlertStatus(const cereal::SelfdriveState::Reader &cs, std::optional &idx, double seconds); + void buildTimeline(const Route &route, uint64_t route_start_ts, bool local_cache, + std::function)> callback); + void updateEngagementStatus(const cereal::SelfdriveState::Reader &cs, std::optional &idx, double seconds); + void updateAlertStatus(const cereal::SelfdriveState::Reader &cs, std::optional &idx, double seconds); - std::thread thread_; - std::atomic should_exit_ = false; + std::thread thread_; + std::atomic should_exit_ = false; - // Temporarily holds entries before they are sorted and finalized - std::vector staging_entries_; + // Temporarily holds entries before they are sorted and finalized + std::vector staging_entries_; - // Final sorted timeline entries - std::shared_ptr> timeline_entries_; + // Final sorted timeline entries + std::shared_ptr> timeline_entries_; };