diff --git a/src/core/PortalManager.cpp b/src/core/PortalManager.cpp index 9111870..22cae6c 100644 --- a/src/core/PortalManager.cpp +++ b/src/core/PortalManager.cpp @@ -287,62 +287,78 @@ void CPortalManager::startEventLoop() { std::thread pollThr([this, &pollfds]() { while (1) { - int ret = poll(pollfds, 3, 5000 /* 5 seconds, reasonable. It's because we might need to terminate */); - if (ret < 0) { - Debug::log(CRIT, "[core] Polling fds failed with {}", strerror(errno)); - g_pPortalManager->terminate(); - } + bool preparedToRead = wl_display_prepare_read(m_sWaylandConnection.display) == 0; + + int events = 0; + if (preparedToRead) { + events = poll(pollfds, 3, 5000 /* 5 seconds, reasonable. It's because we might need to terminate */); + if (events < 0) { + if (preparedToRead) + wl_display_cancel_read(m_sWaylandConnection.display); - for (size_t i = 0; i < 3; ++i) { - if (pollfds[i].revents & POLLHUP) { - Debug::log(CRIT, "[core] Disconnected from pollfd id {}", i); + if (errno == EINTR) + continue; + + Debug::log(CRIT, "[core] Polling fds failed with {}", strerror(errno)); g_pPortalManager->terminate(); } - } - if (m_bTerminate) - break; + for (size_t i = 0; i < 3; ++i) { + if (pollfds[i].revents & POLLHUP) { + Debug::log(CRIT, "[core] Disconnected from pollfd id {}", i); + g_pPortalManager->terminate(); + } + } + + if (m_bTerminate) + break; - if (ret != 0) { + wl_display_read_events(m_sWaylandConnection.display); + m_sEventLoopInternals.wlDispatched = false; + } + + if (events > 0 || !preparedToRead) { Debug::log(TRACE, "[core] got poll event"); - std::lock_guard lg(m_sEventLoopInternals.loopRequestMutex); + std::unique_lock lk(m_sEventLoopInternals.loopRequestMutex); m_sEventLoopInternals.shouldProcess = true; m_sEventLoopInternals.loopSignal.notify_all(); + + m_sEventLoopInternals.wlDispatchCV.wait_for(lk, std::chrono::milliseconds(100), [this] { return m_sEventLoopInternals.wlDispatched; }); } } }); - m_sTimersThread.thread = std::make_unique([this] { - while (1) { - std::unique_lock lk(m_sTimersThread.loopMutex); + std::thread timersThr([this]() { + while (!m_bTerminate) { + m_sEventLoopInternals.timersMutex.lock(); // find nearest timer ms - m_mEventLock.lock(); float nearest = 60000; /* reasonable timeout */ - for (auto& t : m_sTimersThread.timers) { + for (auto& t : m_timers) { float until = t->duration() - t->passedMs(); if (until < nearest) nearest = until; } - m_mEventLock.unlock(); - m_sTimersThread.loopSignal.wait_for(lk, std::chrono::milliseconds((int)nearest), [this] { return m_sTimersThread.shouldProcess; }); - m_sTimersThread.shouldProcess = false; + m_sEventLoopInternals.timersMutex.unlock(); - if (m_bTerminate) - break; + std::unique_lock lk(m_sEventLoopInternals.timerRequestMutex); + m_sEventLoopInternals.timerRequestCV.wait_for(lk, std::chrono::milliseconds((int)nearest), [this] { return m_sEventLoopInternals.timerEvent; }); + m_sEventLoopInternals.timerEvent = false; + + m_sEventLoopInternals.timersMutex.lock(); // awakened. Check if any timers passed - m_mEventLock.lock(); bool notify = false; - for (auto& t : m_sTimersThread.timers) { + for (auto& t : m_timers) { if (t->passed()) { Debug::log(TRACE, "[core] got timer event"); notify = true; break; } } - m_mEventLock.unlock(); + + m_sEventLoopInternals.timersMutex.unlock(); if (notify) { std::lock_guard lg(m_sEventLoopInternals.loopRequestMutex); @@ -352,7 +368,7 @@ void CPortalManager::startEventLoop() { } }); - while (1) { // dbus events + while (1) { // process events // wait for being awakened std::unique_lock lk(m_sEventLoopInternals.loopMutex); if (m_sEventLoopInternals.shouldProcess == false) // avoid a lock if a thread managed to request something already since we .unlock()ed @@ -365,32 +381,22 @@ void CPortalManager::startEventLoop() { m_sEventLoopInternals.shouldProcess = false; - m_mEventLock.lock(); - if (pollfds[0].revents & POLLIN /* dbus */) { while (m_pConnection->processPendingEvent()) { ; } } - if (pollfds[1].revents & POLLIN /* wl */) { - wl_display_flush(m_sWaylandConnection.display); - if (wl_display_prepare_read(m_sWaylandConnection.display) == 0) { - wl_display_read_events(m_sWaylandConnection.display); - wl_display_dispatch_pending(m_sWaylandConnection.display); - } else { - wl_display_dispatch(m_sWaylandConnection.display); - } - } - if (pollfds[2].revents & POLLIN /* pw */) { while (pw_loop_iterate(m_sPipewire.loop, 0) != 0) { ; } } + m_sEventLoopInternals.timersMutex.lock(); + std::vector toRemove; - for (auto& t : m_sTimersThread.timers) { + for (auto& t : m_timers) { if (t->passed()) { t->m_fnCallback(); toRemove.emplace_back(t.get()); @@ -398,17 +404,17 @@ void CPortalManager::startEventLoop() { } } - int ret = 0; - do { - ret = wl_display_dispatch_pending(m_sWaylandConnection.display); - wl_display_flush(m_sWaylandConnection.display); - } while (ret > 0); - if (!toRemove.empty()) - std::erase_if(m_sTimersThread.timers, + std::erase_if(m_timers, [&](const auto& t) { return std::find_if(toRemove.begin(), toRemove.end(), [&](const auto& other) { return other == t.get(); }) != toRemove.end(); }); - m_mEventLock.unlock(); + m_sEventLoopInternals.timersMutex.unlock(); + + wl_display_dispatch_pending(m_sWaylandConnection.display); + wl_display_flush(m_sWaylandConnection.display); + + m_sEventLoopInternals.wlDispatched = true; + m_sEventLoopInternals.wlDispatchCV.notify_all(); } Debug::log(ERR, "[core] Terminated"); @@ -422,7 +428,7 @@ void CPortalManager::startEventLoop() { pw_loop_destroy(m_sPipewire.loop); wl_display_disconnect(m_sWaylandConnection.display); - m_sTimersThread.thread.release(); + timersThr.join(); pollThr.join(); // wait for poll to exit } @@ -482,9 +488,12 @@ gbm_device* CPortalManager::createGBMDevice(drmDevice* dev) { void CPortalManager::addTimer(const CTimer& timer) { Debug::log(TRACE, "[core] adding timer for {}ms", timer.duration()); - m_sTimersThread.timers.emplace_back(std::make_unique(timer)); - m_sTimersThread.shouldProcess = true; - m_sTimersThread.loopSignal.notify_all(); + m_sEventLoopInternals.timersMutex.lock(); + m_timers.emplace_back(std::make_unique(timer)); + m_sEventLoopInternals.timersMutex.unlock(); + + m_sEventLoopInternals.timerEvent = true; + m_sEventLoopInternals.timerRequestCV.notify_all(); } void CPortalManager::terminate() { @@ -500,6 +509,6 @@ void CPortalManager::terminate() { m_sEventLoopInternals.loopSignal.notify_all(); } - m_sTimersThread.shouldProcess = true; - m_sTimersThread.loopSignal.notify_all(); + m_sEventLoopInternals.timerEvent = true; + m_sEventLoopInternals.timerRequestCV.notify_all(); } diff --git a/src/core/PortalManager.hpp b/src/core/PortalManager.hpp index e0f2f03..5758da9 100644 --- a/src/core/PortalManager.hpp +++ b/src/core/PortalManager.hpp @@ -106,20 +106,20 @@ class CPortalManager { std::mutex loopMutex; std::atomic shouldProcess = false; std::mutex loopRequestMutex; + + std::condition_variable wlDispatchCV; + bool wlDispatched = false; + + std::mutex timersMutex; + std::condition_variable timerRequestCV; + std::mutex timerRequestMutex; + bool timerEvent = false; } m_sEventLoopInternals; - struct { - std::condition_variable loopSignal; - std::mutex loopMutex; - bool shouldProcess = false; - std::vector> timers; - std::unique_ptr thread; - } m_sTimersThread; + std::vector> m_timers; std::unique_ptr m_pConnection; std::vector> m_vOutputs; - - std::mutex m_mEventLock; }; inline std::unique_ptr g_pPortalManager;