Skip to content

Commit

Permalink
remove Segment, Timeline dependency on Qt
Browse files Browse the repository at this point in the history
  • Loading branch information
deanlee committed Oct 22, 2024
1 parent b87a52a commit 0e5e65b
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 178 deletions.
4 changes: 4 additions & 0 deletions tools/cabana/streams/replaystream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ bool ReplayStream::loadRoute(const QString &route, const QString &data_dir, uint
{}, nullptr, replay_flags, data_dir.toStdString(), this));
replay->setSegmentCacheLimit(settings.max_cached_minutes);
replay->installEventFilter(event_filter, this);

// Forward replay callbacks to corresponding Qt signals.
replay->onQLogLoaded = [this](std::shared_ptr<LogReader> qlog) { emit qLogLoaded(qlog); };

QObject::connect(replay.get(), &Replay::seeking, this, &AbstractStream::seeking);
QObject::connect(replay.get(), &Replay::seekedTo, this, &AbstractStream::seekedTo);
QObject::connect(replay.get(), &Replay::segmentsMerged, this, &ReplayStream::mergeSegments);
Expand Down
7 changes: 6 additions & 1 deletion tools/cabana/streams/replaystream.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "tools/cabana/streams/abstractstream.h"
#include "tools/replay/replay.h"

Q_DECLARE_METATYPE(std::shared_ptr<LogReader>);

class ReplayStream : public AbstractStream {
Q_OBJECT

Expand All @@ -24,14 +26,17 @@ class ReplayStream : public AbstractStream {
inline QString carFingerprint() const override { return replay->carFingerprint().c_str(); }
double minSeconds() const override { return replay->minSeconds(); }
double maxSeconds() const { return replay->maxSeconds(); }
inline QDateTime beginDateTime() const { return replay->routeDateTime(); }
inline QDateTime beginDateTime() const { return QDateTime::fromSecsSinceEpoch(replay->routeDateTime()); }
inline uint64_t beginMonoTime() const override { return replay->routeStartNanos(); }
inline void setSpeed(float speed) override { replay->setSpeed(speed); }
inline float getSpeed() const { return replay->getSpeed(); }
inline Replay *getReplay() const { return replay.get(); }
inline bool isPaused() const override { return replay->isPaused(); }
void pause(bool pause) override;

signals:
void qLogLoaded(std::shared_ptr<LogReader> qlog);

private:
void mergeSegments();
std::unique_ptr<Replay> replay = nullptr;
Expand Down
5 changes: 1 addition & 4 deletions tools/cabana/videowidget.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,7 @@ QWidget *VideoWidget::createCameraWidget() {
QObject::connect(camera_tab, &QTabBar::currentChanged, [this](int index) {
if (index != -1) cam_widget->setStreamType((VisionStreamType)camera_tab->tabData(index).toInt());
});

auto replay = static_cast<ReplayStream*>(can)->getReplay();
QObject::connect(replay, &Replay::qLogLoaded, slider, &Slider::parseQLog, Qt::QueuedConnection);
QObject::connect(replay, &Replay::minMaxTimeChanged, this, &VideoWidget::timeRangeChanged, Qt::QueuedConnection);
QObject::connect(static_cast<ReplayStream*>(can), &ReplayStream::qLogLoaded, slider, &Slider::parseQLog, Qt::QueuedConnection);
return w;
}

Expand Down
3 changes: 2 additions & 1 deletion tools/replay/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ if arch == "Darwin":
else:
base_libs.append('OpenCL')

replay_lib_src = ["replay.cc", "consoleui.cc", "camera.cc", "filereader.cc", "logreader.cc", "framereader.cc", "route.cc", "util.cc"]
replay_lib_src = ["replay.cc", "consoleui.cc", "camera.cc", "filereader.cc", "logreader.cc", "framereader.cc",
"route.cc", "util.cc", "timeline.cc"]
replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs, FRAMEWORKS=base_frameworks)
Export('replay_lib')
replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'zstd', 'curl', 'yuv', 'ncurses'] + base_libs
Expand Down
6 changes: 4 additions & 2 deletions tools/replay/consoleui.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "tools/replay/consoleui.h"

#include <time.h>
#include <initializer_list>
#include <string>
#include <tuple>
Expand Down Expand Up @@ -152,7 +153,6 @@ void ConsoleUI::updateStatus() {
add_str(win, unit.c_str());
};
static const std::pair<const char *, Color> status_text[] = {
{"loading...", Color::Red},
{"playing", Color::Green},
{"paused...", Color::Yellow},
};
Expand All @@ -161,8 +161,10 @@ void ConsoleUI::updateStatus() {

auto [status_str, status_color] = status_text[status];
write_item(0, 0, "STATUS: ", status_str, " ", false, status_color);
auto cur_ts = replay->routeDateTime() + (int)replay->currentSeconds();
char *time_string = ctime(&cur_ts);
std::string current_segment = " - " + std::to_string((int)(replay->currentSeconds() / 60));
write_item(0, 25, "TIME: ", replay->currentDateTime().toString("ddd MMMM dd hh:mm:ss").toStdString(), current_segment, true);
write_item(0, 25, "TIME: ", time_string, current_segment, true);

auto p = sm["liveParameters"].getLiveParameters();
write_item(1, 0, "STIFFNESS: ", util::string_format("%.2f %%", p.getStiffnessFactor() * 100), " ");
Expand Down
128 changes: 24 additions & 104 deletions tools/replay/replay.cc
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#include "tools/replay/replay.h"

#include <QDebug>
#include <QtConcurrent>
#include <capnp/dynamic.h>
#include <csignal>
#include "cereal/services.h"
Expand All @@ -11,6 +9,14 @@

static void interrupt_sleep_handler(int signal) {}

// Helper function to notify events with safety checks
template <typename Callback, typename... Args>
void notifyEvent(Callback &callback, Args &&...args) {
if (callback) {
callback(std::forward<Args>(args)...);
}
}

Replay::Replay(const std::string &route, std::vector<std::string> allow, std::vector<std::string> block, SubMaster *sm_,
uint32_t flags, const std::string &data_dir, QObject *parent) : sm(sm_), flags_(flags), QObject(parent) {
// Register signal handler for SIGUSR1
Expand Down Expand Up @@ -40,7 +46,7 @@ Replay::Replay(const std::string &route, std::vector<std::string> allow, std::ve
}
}

rInfo("active services: %s", join(active_services, ',').c_str());
rInfo("active services: %s", join(active_services, ", ").c_str());
rInfo("loading route %s", route.c_str());

if (sm == nullptr) {
Expand Down Expand Up @@ -68,7 +74,6 @@ void Replay::stop() {
stream_thread_ = nullptr;
rInfo("shutdown: done");
}
timeline_future.waitForFinished();
camera_server_.reset(nullptr);
segments_.clear();
}
Expand Down Expand Up @@ -151,101 +156,11 @@ void Replay::checkSeekProgress() {
}

void Replay::seekToFlag(FindFlag flag) {
if (auto next = find(flag)) {
if (auto next = timeline_.find(currentSeconds(), flag)) {
seekTo(*next - 2, false); // seek to 2 seconds before next
}
}

void Replay::buildTimeline() {
uint64_t engaged_begin = 0;
bool engaged = false;

auto alert_status = cereal::SelfdriveState::AlertStatus::NORMAL;
auto alert_size = cereal::SelfdriveState::AlertSize::NONE;
uint64_t alert_begin = 0;
std::string alert_type;

const TimelineType timeline_types[] = {
[(int)cereal::SelfdriveState::AlertStatus::NORMAL] = TimelineType::AlertInfo,
[(int)cereal::SelfdriveState::AlertStatus::USER_PROMPT] = TimelineType::AlertWarning,
[(int)cereal::SelfdriveState::AlertStatus::CRITICAL] = TimelineType::AlertCritical,
};

const auto &route_segments = route_->segments();
for (auto it = route_segments.cbegin(); it != route_segments.cend() && !exit_; ++it) {
std::shared_ptr<LogReader> log(new LogReader());
if (!log->load(it->second.qlog, &exit_, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3) || log->events.empty()) continue;

std::vector<std::tuple<double, double, TimelineType>> timeline;
for (const Event &e : log->events) {
if (e.which == cereal::Event::Which::SELFDRIVE_STATE) {
capnp::FlatArrayMessageReader reader(e.data);
auto event = reader.getRoot<cereal::Event>();
auto cs = event.getSelfdriveState();

if (engaged != cs.getEnabled()) {
if (engaged) {
timeline.push_back({toSeconds(engaged_begin), toSeconds(e.mono_time), TimelineType::Engaged});
}
engaged_begin = e.mono_time;
engaged = cs.getEnabled();
}

if (alert_type != cs.getAlertType().cStr() || alert_status != cs.getAlertStatus()) {
if (!alert_type.empty() && alert_size != cereal::SelfdriveState::AlertSize::NONE) {
timeline.push_back({toSeconds(alert_begin), toSeconds(e.mono_time), timeline_types[(int)alert_status]});
}
alert_begin = e.mono_time;
alert_type = cs.getAlertType().cStr();
alert_size = cs.getAlertSize();
alert_status = cs.getAlertStatus();
}
} else if (e.which == cereal::Event::Which::USER_FLAG) {
timeline.push_back({toSeconds(e.mono_time), toSeconds(e.mono_time), TimelineType::UserFlag});
}
}

if (it->first == route_segments.rbegin()->first) {
if (engaged) {
timeline.push_back({toSeconds(engaged_begin), toSeconds(log->events.back().mono_time), TimelineType::Engaged});
}
if (!alert_type.empty() && alert_size != cereal::SelfdriveState::AlertSize::NONE) {
timeline.push_back({toSeconds(alert_begin), toSeconds(log->events.back().mono_time), timeline_types[(int)alert_status]});
}

max_seconds_ = std::ceil(toSeconds(log->events.back().mono_time));
emit minMaxTimeChanged(route_segments.cbegin()->first * 60.0, max_seconds_);
}
{
std::lock_guard lk(timeline_lock);
timeline_.insert(timeline_.end(), timeline.begin(), timeline.end());
std::sort(timeline_.begin(), timeline_.end(), [](auto &l, auto &r) { return std::get<2>(l) < std::get<2>(r); });
}
emit qLogLoaded(log);
}
}

std::optional<uint64_t> Replay::find(FindFlag flag) {
int cur_ts = currentSeconds();
for (auto [start_ts, end_ts, type] : getTimeline()) {
if (type == TimelineType::Engaged) {
if (flag == FindFlag::nextEngagement && start_ts > cur_ts) {
return start_ts;
} else if (flag == FindFlag::nextDisEngagement && end_ts > cur_ts) {
return end_ts;
}
} else if (start_ts > cur_ts) {
if ((flag == FindFlag::nextUserFlag && type == TimelineType::UserFlag) ||
(flag == FindFlag::nextInfo && type == TimelineType::AlertInfo) ||
(flag == FindFlag::nextWarning && type == TimelineType::AlertWarning) ||
(flag == FindFlag::nextCritical && type == TimelineType::AlertCritical)) {
return start_ts;
}
}
}
return std::nullopt;
}

void Replay::pause(bool pause) {
if (user_paused_ != pause) {
pauseStreamThread();
Expand All @@ -266,16 +181,15 @@ void Replay::pauseStreamThread() {
}
}

void Replay::segmentLoadFinished(bool success) {
void Replay::segmentLoadFinished(int seg_num, bool success) {
if (!success) {
Segment *seg = qobject_cast<Segment *>(sender());
rWarning("failed to load segment %d, removing it from current replay list", seg->seg_num);
rWarning("failed to load segment %d, removing it from current replay list", seg_num);
updateEvents([&]() {
segments_.erase(seg->seg_num);
segments_.erase(seg_num);
return !segments_.empty();
});
}
updateSegmentsCache();
QMetaObject::invokeMethod(this, &Replay::updateSegmentsCache, Qt::QueuedConnection);
}

void Replay::updateSegmentsCache() {
Expand Down Expand Up @@ -306,8 +220,10 @@ void Replay::loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator
auto it = std::find_if(first, last, [](const auto &seg_it) { return !seg_it.second || !seg_it.second->isLoaded(); });
if (it != last && !it->second) {
rDebug("loading segment %d...", it->first);
it->second = std::make_unique<Segment>(it->first, route_->at(it->first), flags_, filters_);
QObject::connect(it->second.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
it->second = std::make_unique<Segment>(it->first, route_->at(it->first), flags_, filters_,
[this](int seg_num, bool success) {
segmentLoadFinished(seg_num, success);
});
return true;
}
return false;
Expand Down Expand Up @@ -373,7 +289,7 @@ void Replay::startStream(const Segment *cur_segment) {
auto event = reader.getRoot<cereal::Event>();
uint64_t wall_time = event.getInitData().getWallTimeNanos();
if (wall_time > 0) {
route_date_time_ = QDateTime::fromMSecsSinceEpoch(wall_time / 1e6);
route_date_time_ = wall_time / 1e6;
}
}

Expand Down Expand Up @@ -405,12 +321,16 @@ void Replay::startStream(const Segment *cur_segment) {
}

emit segmentsMerged();

timeline_.initialize(*route_, route_start_ts_, !(flags_ & REPLAY_FLAG_NO_FILE_CACHE),
[this](std::shared_ptr<LogReader> log) {
notifyEvent(onQLogLoaded, log);
});
// start stream thread
stream_thread_ = new QThread();
QObject::connect(stream_thread_, &QThread::started, [=]() { streamThread(); });
stream_thread_->start();

timeline_future = QtConcurrent::run(this, &Replay::buildTimeline);
emit streamStarted();
}

Expand Down
Loading

0 comments on commit 0e5e65b

Please sign in to comment.