Skip to content

Commit

Permalink
replay: fix various synchronization and event handling issues (#34254)
Browse files Browse the repository at this point in the history
fix various synchronization and event handling issues
  • Loading branch information
deanlee authored Dec 21, 2024
1 parent 822f613 commit d621469
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 71 deletions.
14 changes: 10 additions & 4 deletions tools/cabana/streams/abstractstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ bool AbstractStream::isMessageActive(const MessageId &id) const {
}

void AbstractStream::updateLastMsgsTo(double sec) {
std::lock_guard lk(mutex_);
current_sec_ = sec;
uint64_t last_ts = toMonoTime(sec);
std::unordered_map<MessageId, CanData> msgs;
Expand Down Expand Up @@ -175,10 +174,17 @@ void AbstractStream::updateLastMsgsTo(double sec) {
std::any_of(messages_.cbegin(), messages_.cend(),
[this](const auto &m) { return !last_msgs.count(m.first); });
last_msgs = messages_;
mutex_.unlock();

emit msgsReceived(nullptr, id_changed);
resumeStream();

std::lock_guard lk(mutex_);
seek_finished_ = true;
seek_finished_cv_.notify_one();
}

void AbstractStream::waitForSeekFinshed() {
std::unique_lock lock(mutex_);
seek_finished_cv_.wait(lock, [this]() { return seek_finished_; });
seek_finished_ = false;
}

const CanEvent *AbstractStream::newEvent(uint64_t mono_time, const cereal::CanData::Reader &c) {
Expand Down
5 changes: 4 additions & 1 deletion tools/cabana/streams/abstractstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <algorithm>
#include <array>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <optional>
Expand Down Expand Up @@ -111,7 +112,7 @@ class AbstractStream : public QObject {
void mergeEvents(const std::vector<const CanEvent *> &events);
const CanEvent *newEvent(uint64_t mono_time, const cereal::CanData::Reader &c);
void updateEvent(const MessageId &id, double sec, const uint8_t *data, uint8_t size);
virtual void resumeStream() {}
void waitForSeekFinshed();
std::vector<const CanEvent *> all_events_;
double current_sec_ = 0;
std::optional<std::pair<double, double>> time_range_;
Expand All @@ -127,6 +128,8 @@ class AbstractStream : public QObject {

// Members accessed in multiple threads. (mutex protected)
std::mutex mutex_;
std::condition_variable seek_finished_cv_;
bool seek_finished_ = false;
std::set<MessageId> new_msgs_;
std::unordered_map<MessageId, CanData> messages_;
std::unordered_map<MessageId, std::vector<uint8_t>> masks_;
Expand Down
7 changes: 5 additions & 2 deletions tools/cabana/streams/replaystream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ bool ReplayStream::loadRoute(const QString &route, const QString &data_dir, uint

// Forward replay callbacks to corresponding Qt signals.
replay->onSeeking = [this](double sec) { emit seeking(sec); };
replay->onSeekedTo = [this](double sec) { emit seekedTo(sec); };
replay->onSeekedTo = [this](double sec) {
emit seekedTo(sec);
waitForSeekFinshed();
};
replay->onQLogLoaded = [this](std::shared_ptr<LogReader> qlog) { emit qLogLoaded(qlog); };
replay->onSegmentsMerged = [this]() { QMetaObject::invokeMethod(this, &ReplayStream::mergeSegments, Qt::QueuedConnection); };
replay->onSegmentsMerged = [this]() { QMetaObject::invokeMethod(this, &ReplayStream::mergeSegments, Qt::BlockingQueuedConnection); };

bool success = replay->load();
if (!success) {
Expand Down
1 change: 0 additions & 1 deletion tools/cabana/streams/replaystream.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class ReplayStream : public AbstractStream {
inline float getSpeed() const { return replay->getSpeed(); }
inline Replay *getReplay() const { return replay.get(); }
inline bool isPaused() const override { return replay->isPaused(); }
void resumeStream() override { return replay->resumeStream(); }
void pause(bool pause) override;

signals:
Expand Down
69 changes: 28 additions & 41 deletions tools/replay/replay.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,31 +106,27 @@ void Replay::seekTo(double seconds, bool relative) {
rInfo("Seeking to %d s, segment %d", (int)target_time, target_segment);
notifyEvent(onSeeking, target_time);

double seeked_to_sec = -1;
interruptStream([&]() {
current_segment_ = target_segment;
current_segment_.store(target_segment);
cur_mono_time_ = route_start_ts_ + target_time * 1e9;
seeking_to_ = target_time;

if (event_data_->isSegmentLoaded(target_segment)) {
seeked_to_sec = *seeking_to_;
seeking_to_.reset();
}
seeking_to_.store(target_time, std::memory_order_relaxed);
return false;
});

checkSeekProgress(seeked_to_sec);
seg_mgr_->setCurrentSegment(target_segment);
checkSeekProgress();
}

void Replay::checkSeekProgress(double seeked_to_sec) {
if (seeked_to_sec >= 0) {
if (onSeekedTo) {
onSeekedTo(seeked_to_sec);
} else {
interruptStream([]() { return true; });
}
void Replay::checkSeekProgress() {
if (!seg_mgr_->getEventData()->isSegmentLoaded(current_segment_.load())) return;

double seek_to = seeking_to_.exchange(-1.0, std::memory_order_acquire);
if (seek_to >= 0 && onSeekedTo) {
onSeekedTo(seek_to);
}

// Resume the interrupted stream
interruptStream([]() { return true; });
}

void Replay::seekToFlag(FindFlag flag) {
Expand All @@ -152,29 +148,19 @@ void Replay::pause(bool pause) {
void Replay::handleSegmentMerge() {
if (exit_) return;

double seeked_to_sec = -1;
interruptStream([&]() {
event_data_ = seg_mgr_->getEventData();
notifyEvent(onSegmentsMerged);

bool segment_loaded = event_data_->isSegmentLoaded(current_segment_);
if (seeking_to_ && segment_loaded) {
seeked_to_sec = *seeking_to_;
seeking_to_.reset();
return false;
}
return segment_loaded;
});

checkSeekProgress(seeked_to_sec);
if (!stream_thread_.joinable() && !event_data_->events.empty()) {
startStream();
auto event_data = seg_mgr_->getEventData();
if (!stream_thread_.joinable() && !event_data->segments.empty()) {
startStream(event_data->segments.begin()->second);
}
notifyEvent(onSegmentsMerged);

// Interrupt the stream to handle segment merge
interruptStream([]() { return false; });
checkSeekProgress();
}

void Replay::startStream() {
const auto &cur_segment = event_data_->segments.begin()->second;
const auto &events = cur_segment->log->events;
void Replay::startStream(const std::shared_ptr<Segment> segment) {
const auto &events = segment->log->events;
route_start_ts_ = events.front().mono_time;
cur_mono_time_ += route_start_ts_ - 1;

Expand Down Expand Up @@ -212,7 +198,7 @@ void Replay::startStream() {
if (!hasFlag(REPLAY_FLAG_NO_VIPC)) {
std::pair<int, int> camera_size[MAX_CAMERAS] = {};
for (auto type : ALL_CAMERAS) {
if (auto &fr = cur_segment->frames[type]) {
if (auto &fr = segment->frames[type]) {
camera_size[type] = {fr->width, fr->height};
}
}
Expand Down Expand Up @@ -271,6 +257,7 @@ void Replay::streamThread() {
stream_cv_.wait(lk, [this]() { return exit_ || (events_ready_ && !interrupt_requested_); });
if (exit_) break;

event_data_ = seg_mgr_->getEventData();
const auto &events = event_data_->events;
auto first = std::upper_bound(events.cbegin(), events.cend(), Event(cur_which, cur_mono_time_, {}));
if (first == events.cend()) {
Expand Down Expand Up @@ -308,11 +295,11 @@ std::vector<Event>::const_iterator Replay::publishEvents(std::vector<Event>::con

for (; !interrupt_requested_ && first != last; ++first) {
const Event &evt = *first;
int segment = toSeconds(evt.mono_time) / 60;

if (current_segment_ != segment) {
current_segment_ = segment;
seg_mgr_->setCurrentSegment(current_segment_);
int segment = toSeconds(evt.mono_time) / 60;
if (current_segment_.load(std::memory_order_relaxed) != segment) {
current_segment_.store(segment, std::memory_order_relaxed);
seg_mgr_->setCurrentSegment(segment);
}

// Skip events if socket is not present
Expand Down
11 changes: 5 additions & 6 deletions tools/replay/replay.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ class Replay {
inline const std::string &carFingerprint() const { return car_fingerprint_; }
inline const std::shared_ptr<std::vector<Timeline::Entry>> getTimeline() const { return timeline_.getEntries(); }
inline const std::optional<Timeline::Entry> findAlertAtTime(double sec) const { return timeline_.findAlertAtTime(sec); }
const std::shared_ptr<SegmentManager::EventData> getEventData() const { return event_data_; }
const std::shared_ptr<SegmentManager::EventData> getEventData() const { return seg_mgr_->getEventData(); }
void installEventFilter(std::function<bool(const Event *)> filter) { event_filter_ = filter; }
void resumeStream() { interruptStream([]() { return true; }); }

// Event callback functions
std::function<void()> onSegmentsMerged = nullptr;
Expand All @@ -68,15 +67,15 @@ class Replay {
private:
void setupServices(const std::vector<std::string> &allow, const std::vector<std::string> &block);
void setupSegmentManager(bool has_filters);
void startStream();
void startStream(const std::shared_ptr<Segment> segment);
void streamThread();
void handleSegmentMerge();
void interruptStream(const std::function<bool()>& update_fn);
std::vector<Event>::const_iterator publishEvents(std::vector<Event>::const_iterator first,
std::vector<Event>::const_iterator last);
void publishMessage(const Event *e);
void publishFrame(const Event *e);
void checkSeekProgress(double seeked_to_sec);
void checkSeekProgress();

std::unique_ptr<SegmentManager> seg_mgr_;
Timeline timeline_;
Expand All @@ -86,8 +85,8 @@ class Replay {
std::mutex stream_lock_;
bool user_paused_ = false;
std::condition_variable stream_cv_;
int current_segment_ = 0;
std::optional<double> seeking_to_;
std::atomic<int> current_segment_ = 0;
std::atomic<double> seeking_to_ = -1.0;
std::atomic<bool> exit_ = false;
std::atomic<bool> interrupt_requested_ = false;
bool events_ready_ = false;
Expand Down
2 changes: 1 addition & 1 deletion tools/replay/seg_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ bool SegmentManager::mergeSegments(const SegmentMap::iterator &begin, const Segm
merged_event_data->segments[n] = segments_.at(n);
}

event_data_ = merged_event_data;
std::atomic_store(&event_data_, std::move(merged_event_data));
merged_segments_ = segments_to_merge;

return true;
Expand Down
4 changes: 2 additions & 2 deletions tools/replay/seg_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ class SegmentManager {
};

SegmentManager(const std::string &route_name, uint32_t flags, const std::string &data_dir = "")
: flags_(flags), route_(route_name, data_dir) {};
: flags_(flags), route_(route_name, data_dir), event_data_(std::make_shared<EventData>()) {}
~SegmentManager();

bool load();
void setCurrentSegment(int seg_num);
void setCallback(const std::function<void()> &callback) { onSegmentMergedCallback_ = callback; }
void setFilters(const std::vector<bool> &filters) { filters_ = filters; }
const std::shared_ptr<EventData> getEventData() const { return event_data_; }
const std::shared_ptr<EventData> getEventData() const { return std::atomic_load(&event_data_); }
bool hasSegment(int n) const { return segments_.find(n) != segments_.end(); }

Route route_;
Expand Down
4 changes: 2 additions & 2 deletions tools/replay/timeline.cc
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include "tools/replay/timeline.h"

#include <array>
#include <algorithm>
#include <array>

#include "cereal/gen/cpp/log.capnp.h"

Expand Down Expand Up @@ -74,7 +74,7 @@ void Timeline::buildTimeline(const Route &route, uint64_t route_start_ts, bool l
// Sort and finalize the timeline entries
auto entries = std::make_shared<std::vector<Entry>>(staging_entries_);
std::sort(entries->begin(), entries->end(), [](auto &a, auto &b) { return a.start_time < b.start_time; });
timeline_entries_ = entries;
std::atomic_store(&timeline_entries_, std::move(entries));

callback(log); // Notify the callback once the log is processed
}
Expand Down
22 changes: 11 additions & 11 deletions tools/replay/timeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ class Timeline {
std::function<void(std::shared_ptr<LogReader>)> callback);
std::optional<uint64_t> find(double cur_ts, FindFlag flag) const;
std::optional<Entry> findAlertAtTime(double target_time) const;
const std::shared_ptr<std::vector<Entry>> getEntries() const { return timeline_entries_; }
const std::shared_ptr<std::vector<Entry>> getEntries() const { return std::atomic_load(&timeline_entries_); }

private:
void buildTimeline(const Route &route, uint64_t route_start_ts, bool local_cache,
std::function<void(std::shared_ptr<LogReader>)> callback);
void updateEngagementStatus(const cereal::SelfdriveState::Reader &cs, std::optional<size_t> &idx, double seconds);
void updateAlertStatus(const cereal::SelfdriveState::Reader &cs, std::optional<size_t> &idx, double seconds);
void buildTimeline(const Route &route, uint64_t route_start_ts, bool local_cache,
std::function<void(std::shared_ptr<LogReader>)> callback);
void updateEngagementStatus(const cereal::SelfdriveState::Reader &cs, std::optional<size_t> &idx, double seconds);
void updateAlertStatus(const cereal::SelfdriveState::Reader &cs, std::optional<size_t> &idx, double seconds);

std::thread thread_;
std::atomic<bool> should_exit_ = false;
std::thread thread_;
std::atomic<bool> should_exit_ = false;

// Temporarily holds entries before they are sorted and finalized
std::vector<Entry> staging_entries_;
// Temporarily holds entries before they are sorted and finalized
std::vector<Entry> staging_entries_;

// Final sorted timeline entries
std::shared_ptr<std::vector<Entry>> timeline_entries_;
// Final sorted timeline entries
std::shared_ptr<std::vector<Entry>> timeline_entries_;
};

0 comments on commit d621469

Please sign in to comment.