From e59a8bda6b630dec98b8e0437d0906d9e5cbdd98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Thu, 12 Sep 2019 12:45:44 +0200 Subject: [PATCH] Internal changes in the epoll mechanism --- srtcore/epoll.cpp | 472 ++++++++++++++++++++---------------------- srtcore/epoll.h | 213 ++++++++++++++++++- srtcore/srt.h | 24 ++- srtcore/srt_c_api.cpp | 18 +- testing/testmedia.cpp | 2 +- testing/testmedia.hpp | 2 +- 6 files changed, 461 insertions(+), 270 deletions(-) diff --git a/srtcore/epoll.cpp b/srtcore/epoll.cpp index 1dbfc61c5..a2b8d51d3 100644 --- a/srtcore/epoll.cpp +++ b/srtcore/epoll.cpp @@ -123,25 +123,6 @@ ENOMEM: There was insufficient memory to create the kernel object. return desc.m_iID; } -int CEPoll::add_usock(const int eid, const SRTSOCKET& u, const int* events) -{ - CGuard pg(m_EPollLock); - - map::iterator p = m_mPolls.find(eid); - if (p == m_mPolls.end()) - throw CUDTException(MJ_NOTSUP, MN_EIDINVAL); - - if (!events || (*events & UDT_EPOLL_IN)) - p->second.m_sUDTSocksIn.insert(u); - if (!events || (*events & UDT_EPOLL_OUT)) - p->second.m_sUDTSocksOut.insert(u); - // Connecting timeout not signalled without EPOLL_ERR - if (!events || (*events & UDT_EPOLL_ERR)) - p->second.m_sUDTSocksEx.insert(u); - - return 0; -} - int CEPoll::add_ssock(const int eid, const SYSSOCKET& s, const int* events) { CGuard pg(m_EPollLock); @@ -210,30 +191,6 @@ int CEPoll::add_ssock(const int eid, const SYSSOCKET& s, const int* events) return 0; } -int CEPoll::remove_usock(const int eid, const SRTSOCKET& u) -{ - CGuard pg(m_EPollLock); - - map::iterator p = m_mPolls.find(eid); - if (p == m_mPolls.end()) - throw CUDTException(MJ_NOTSUP, MN_EIDINVAL); - - p->second.m_sUDTSocksIn.erase(u); - p->second.m_sUDTSocksOut.erase(u); - p->second.m_sUDTSocksEx.erase(u); - - /* - * We are no longer interested in signals from this socket - * If some are up, they will unblock EPoll forever. - * Clear them. - */ - p->second.m_sUDTReads.erase(u); - p->second.m_sUDTWrites.erase(u); - p->second.m_sUDTExcepts.erase(u); - - return 0; -} - int CEPoll::remove_ssock(const int eid, const SYSSOCKET& s) { CGuard pg(m_EPollLock); @@ -267,41 +224,41 @@ int CEPoll::remove_ssock(const int eid, const SYSSOCKET& s) // Need this to atomically modify polled events (ex: remove write/keep read) int CEPoll::update_usock(const int eid, const SRTSOCKET& u, const int* events) { - CGuard pg(m_EPollLock); - - map::iterator p = m_mPolls.find(eid); - if (p == m_mPolls.end()) - throw CUDTException(MJ_NOTSUP, MN_EIDINVAL); - - if (!events || (*events & UDT_EPOLL_IN)) - p->second.m_sUDTSocksIn.insert(u); - else - { - p->second.m_sUDTSocksIn.erase(u); - /* - * We are no longer interested in this event from this socket - * If some are up, they will unblock EPoll forever. - * Clear them. - */ - p->second.m_sUDTReads.erase(u); - } - - if (!events || (*events & UDT_EPOLL_OUT)) - p->second.m_sUDTSocksOut.insert(u); - else - { - p->second.m_sUDTSocksOut.erase(u); - p->second.m_sUDTWrites.erase(u); - } - if (!events || (*events & UDT_EPOLL_ERR)) - p->second.m_sUDTSocksEx.insert(u); - else - { - p->second.m_sUDTSocksEx.erase(u); - p->second.m_sUDTExcepts.erase(u); - } - - return 0; + CGuard pg(m_EPollLock); + + map::iterator p = m_mPolls.find(eid); + if (p == m_mPolls.end()) + throw CUDTException(MJ_NOTSUP, MN_EIDINVAL); + + int32_t evts = events ? *events : uint32_t(SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR); + bool edgeTriggered = evts & SRT_EPOLL_ET; + evts &= ~SRT_EPOLL_ET; + if (evts) + { + pair iter_new = p->second.addWatch(u, evts, edgeTriggered); + CEPollDesc::Wait& wait = iter_new.first->second; + int newstate = wait.watch & wait.state; + if (newstate) + { + p->second.addEventNotice(wait, u, newstate); + } + else if (!iter_new.second) // if it was freshly added, no notice object exists + { + // This removes the event notice entry, but leaves the subscription + p->second.removeEvents(wait); + } + } + else if (edgeTriggered) + { + // Specified only SRT_EPOLL_ET flag, but no event flag. Error. + throw CUDTException(MJ_NOTSUP, MN_INVAL); + } + else + { + // Update with no events means to remove subscription + p->second.removeSubscription(u); + } + return 0; } int CEPoll::update_ssock(const int eid, const SYSSOCKET& s, const int* events) @@ -369,151 +326,166 @@ int CEPoll::update_ssock(const int eid, const SYSSOCKET& s, const int* events) return 0; } + int CEPoll::wait(const int eid, set* readfds, set* writefds, int64_t msTimeOut, set* lrfds, set* lwfds) { - // if all fields is NULL and waiting time is infinite, then this would be a deadlock - if (!readfds && !writefds && !lrfds && !lwfds && (msTimeOut < 0)) - throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); - - // Clear these sets in case the app forget to do it. - if (readfds) readfds->clear(); - if (writefds) writefds->clear(); - if (lrfds) lrfds->clear(); - if (lwfds) lwfds->clear(); - - int total = 0; - - int64_t entertime = CTimer::getTime(); - while (true) - { - CGuard::enterCS(m_EPollLock); - - map::iterator p = m_mPolls.find(eid); - if (p == m_mPolls.end()) - { - CGuard::leaveCS(m_EPollLock); - throw CUDTException(MJ_NOTSUP, MN_EIDINVAL); - } - - if (p->second.m_sUDTSocksIn.empty() && p->second.m_sUDTSocksOut.empty() && p->second.m_sLocals.empty() && (msTimeOut < 0)) - { - // no socket is being monitored, this may be a deadlock - CGuard::leaveCS(m_EPollLock); - throw CUDTException(MJ_NOTSUP, MN_INVAL); - } - - // Sockets with exceptions are returned to both read and write sets. - if ((NULL != readfds) && (!p->second.m_sUDTReads.empty() || !p->second.m_sUDTExcepts.empty())) - { - *readfds = p->second.m_sUDTReads; - for (set::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i) - readfds->insert(*i); - total += p->second.m_sUDTReads.size() + p->second.m_sUDTExcepts.size(); - } - if ((NULL != writefds) && (!p->second.m_sUDTWrites.empty() || !p->second.m_sUDTExcepts.empty())) - { - *writefds = p->second.m_sUDTWrites; - for (set::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i) - writefds->insert(*i); - total += p->second.m_sUDTWrites.size() + p->second.m_sUDTExcepts.size(); - } - - if (lrfds || lwfds) - { - #ifdef LINUX - const int max_events = p->second.m_sLocals.size(); - epoll_event ev[max_events]; - int nfds = ::epoll_wait(p->second.m_iLocalID, ev, max_events, 0); - - for (int i = 0; i < nfds; ++ i) - { - if ((NULL != lrfds) && (ev[i].events & EPOLLIN)) + // if all fields is NULL and waiting time is infinite, then this would be a deadlock + if (!readfds && !writefds && !lrfds && !lwfds && (msTimeOut < 0)) + throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); + + // Clear these sets in case the app forget to do it. + if (readfds) readfds->clear(); + if (writefds) writefds->clear(); + if (lrfds) lrfds->clear(); + if (lwfds) lwfds->clear(); + + int total = 0; + + int64_t entertime = CTimer::getTime(); + while (true) + { + { + CGuard epollock(m_EPollLock); + + map::iterator p = m_mPolls.find(eid); + if (p == m_mPolls.end()) { - lrfds->insert(ev[i].data.fd); - ++ total; + throw CUDTException(MJ_NOTSUP, MN_EIDINVAL); } - if ((NULL != lwfds) && (ev[i].events & EPOLLOUT)) + + CEPollDesc& ed = p->second; + + if (!ed.flags(SRT_EPOLL_ENABLE_EMPTY) && ed.watch_empty() && ed.m_sLocals.empty()) { - lwfds->insert(ev[i].data.fd); - ++ total; + // Empty EID is not allowed, report error. + throw CUDTException(MJ_NOTSUP, MN_INVAL); } - } - #elif defined(BSD) || defined(OSX) || (TARGET_OS_IOS == 1) || (TARGET_OS_TV == 1) - struct timespec tmout = {0, 0}; - const int max_events = p->second.m_sLocals.size(); - struct kevent ke[max_events]; - - int nfds = kevent(p->second.m_iLocalID, NULL, 0, ke, max_events, &tmout); - for (int i = 0; i < nfds; ++ i) - { - if ((NULL != lrfds) && (ke[i].filter == EVFILT_READ)) + if (ed.flags(SRT_EPOLL_ENABLE_OUTPUTCHECK)) { - lrfds->insert(ke[i].ident); - ++ total; + // Empty report is not allowed, report error. + if (!ed.m_sLocals.empty() && (!lrfds || !lwfds)) + throw CUDTException(MJ_NOTSUP, MN_INVAL); + + if (!ed.watch_empty() && (!readfds || !writefds)) + throw CUDTException(MJ_NOTSUP, MN_INVAL); } - if ((NULL != lwfds) && (ke[i].filter == EVFILT_WRITE)) + + // Sockets with exceptions are returned to both read and write sets. + for (CEPollDesc::enotice_t::iterator it = ed.enotice_begin(), it_next = it; it != ed.enotice_end(); it = it_next) { - lwfds->insert(ke[i].ident); - ++ total; + ++it_next; + if (readfds && ((it->events & UDT_EPOLL_IN) || (it->events & UDT_EPOLL_ERR))) + { + if (readfds->insert(it->fd).second) + ++total; + } + + if (writefds && ((it->events & UDT_EPOLL_OUT) || (it->events & UDT_EPOLL_ERR))) + { + if (writefds->insert(it->fd).second) + ++total; + } + + ed.checkEdge(it); // NOTE: potentially erases 'it'. } - } - #else - //currently "select" is used for all non-Linux platforms. - //faster approaches can be applied for specific systems in the future. - - //"select" has a limitation on the number of sockets - int max_fd = 0; - - fd_set readfds; - fd_set writefds; - FD_ZERO(&readfds); - FD_ZERO(&writefds); - - for (set::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i) - { - if (lrfds) - FD_SET(*i, &readfds); - if (lwfds) - FD_SET(*i, &writefds); - if (*i > max_fd) - max_fd = *i; - } - timeval tv; - tv.tv_sec = 0; - tv.tv_usec = 0; - if (::select(max_fd + 1, &readfds, &writefds, NULL, &tv) > 0) - { - for (set::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i) + if (lrfds || lwfds) { - if (lrfds && FD_ISSET(*i, &readfds)) - { - lrfds->insert(*i); - ++ total; - } - if (lwfds && FD_ISSET(*i, &writefds)) - { - lwfds->insert(*i); - ++ total; - } +#ifdef LINUX + const int max_events = ed.m_sLocals.size(); + epoll_event ev[max_events]; + int nfds = ::epoll_wait(ed.m_iLocalID, ev, max_events, 0); + + for (int i = 0; i < nfds; ++ i) + { + if ((NULL != lrfds) && (ev[i].events & EPOLLIN)) + { + lrfds->insert(ev[i].data.fd); + ++ total; + } + if ((NULL != lwfds) && (ev[i].events & EPOLLOUT)) + { + lwfds->insert(ev[i].data.fd); + ++ total; + } + } +#elif defined(BSD) || defined(OSX) || (TARGET_OS_IOS == 1) || (TARGET_OS_TV == 1) + struct timespec tmout = {0, 0}; + const int max_events = ed.m_sLocals.size(); + struct kevent ke[max_events]; + + int nfds = kevent(ed.m_iLocalID, NULL, 0, ke, max_events, &tmout); + + for (int i = 0; i < nfds; ++ i) + { + if ((NULL != lrfds) && (ke[i].filter == EVFILT_READ)) + { + lrfds->insert(ke[i].ident); + ++ total; + } + if ((NULL != lwfds) && (ke[i].filter == EVFILT_WRITE)) + { + lwfds->insert(ke[i].ident); + ++ total; + } + } +#else + //currently "select" is used for all non-Linux platforms. + //faster approaches can be applied for specific systems in the future. + + //"select" has a limitation on the number of sockets + int max_fd = 0; + + fd_set readfds; + fd_set writefds; + FD_ZERO(&readfds); + FD_ZERO(&writefds); + + for (set::const_iterator i = ed.m_sLocals.begin(); i != ed.m_sLocals.end(); ++ i) + { + if (lrfds) + FD_SET(*i, &readfds); + if (lwfds) + FD_SET(*i, &writefds); + if ((int)*i > max_fd) + max_fd = *i; + } + + timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 0; + if (::select(max_fd + 1, &readfds, &writefds, NULL, &tv) > 0) + { + for (set::const_iterator i = ed.m_sLocals.begin(); i != ed.m_sLocals.end(); ++ i) + { + if (lrfds && FD_ISSET(*i, &readfds)) + { + lrfds->insert(*i); + ++ total; + } + if (lwfds && FD_ISSET(*i, &writefds)) + { + lwfds->insert(*i); + ++ total; + } + } + } +#endif } - } - #endif - } - CGuard::leaveCS(m_EPollLock); + } // END-LOCK: m_EPollLock - if (total > 0) - return total; + if (total > 0) + return total; - if ((msTimeOut >= 0) && (int64_t(CTimer::getTime() - entertime) >= msTimeOut * int64_t(1000))) - throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0); + if ((msTimeOut >= 0) && (int64_t(CTimer::getTime() - entertime) >= msTimeOut * int64_t(1000))) + throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0); - CTimer::waitForEvent(); - } + CTimer::waitForEvent(); + } - return 0; + return 0; } int CEPoll::release(const int eid) @@ -536,50 +508,60 @@ int CEPoll::release(const int eid) return 0; } -namespace -{ - -void update_epoll_sets(const SRTSOCKET& uid, const set& watch, set& result, bool enable) -{ - if (enable && (watch.find(uid) != watch.end())) - { - result.insert(uid); - } - else if (!enable) - { - result.erase(uid); - } -} - -} // namespace -int CEPoll::update_events(const SRTSOCKET& uid, std::set& eids, int events, bool enable) +int CEPoll::update_events(const SRTSOCKET& uid, std::set& eids, const int events, const bool enable) { - CGuard pg(m_EPollLock); + vector lost; + + CGuard pg(m_EPollLock); + for (set::iterator i = eids.begin(); i != eids.end(); ++ i) + { + map::iterator p = m_mPolls.find(*i); + if (p == m_mPolls.end()) + { + // EID invalid, though still present in the socket's subscriber list + // (dangling in the socket). Postpone to fix the subscruption and continue. + lost.push_back(*i); + continue; + } - map::iterator p; + CEPollDesc& ed = p->second; - vector lost; - for (set::iterator i = eids.begin(); i != eids.end(); ++ i) - { - p = m_mPolls.find(*i); - if (p == m_mPolls.end()) - { - lost.push_back(*i); - } - else - { - if ((events & UDT_EPOLL_IN) != 0) - update_epoll_sets(uid, p->second.m_sUDTSocksIn, p->second.m_sUDTReads, enable); - if ((events & UDT_EPOLL_OUT) != 0) - update_epoll_sets(uid, p->second.m_sUDTSocksOut, p->second.m_sUDTWrites, enable); - if ((events & UDT_EPOLL_ERR) != 0) - update_epoll_sets(uid, p->second.m_sUDTSocksEx, p->second.m_sUDTExcepts, enable); - } - } - - for (vector::iterator i = lost.begin(); i != lost.end(); ++ i) - eids.erase(*i); + // Check if this EID is subscribed for this socket. + CEPollDesc::Wait* pwait = ed.watch_find(uid); + if (!pwait) + { + // As this is mapped in the socket's data, it should be impossible. + continue; + } - return 0; + // compute new states + + // New state to be set into the permanent state + const int newstate = enable ? pwait->state | events // SET event bits if enable + : pwait->state & (~events); // CLEAR event bits + + // compute states changes! + int changes = pwait->state ^ newstate; // oldState XOR newState + if (!changes) + continue; // no changes! + // assign new state + pwait->state = newstate; + // filter change relating what is watching + changes &= pwait->watch; + if (!changes) + continue; // no change watching + // set events changes! + + // This function will update the notice object associated with + // the given events, that is: + // - if enable, it will set event flags, possibly in a new notice object + // - if !enable, it will clear event flags, possibly remove notice if resulted in 0 + ed.updateEventNotice(*pwait, uid, events, enable); + } + + for (vector::iterator i = lost.begin(); i != lost.end(); ++ i) + eids.erase(*i); + + return 0; } diff --git a/srtcore/epoll.h b/srtcore/epoll.h index f5b047c70..527fa956a 100644 --- a/srtcore/epoll.h +++ b/srtcore/epoll.h @@ -56,22 +56,217 @@ modified by #include #include +#include #include "udt.h" struct CEPollDesc { int m_iID; // epoll ID - std::set m_sUDTSocksOut; // set of UDT sockets waiting for write events - std::set m_sUDTSocksIn; // set of UDT sockets waiting for read events - std::set m_sUDTSocksEx; // set of UDT sockets waiting for exceptions + + struct Wait; + + struct Notice: public SRT_EPOLL_EVENT + { + Wait* parent; + + Notice(Wait* p, SRTSOCKET sock, int ev): parent(p) + { + fd = sock; + events = ev; + } + }; + + /// The type for `m_USockEventNotice`, the pair contains: + /// * The back-pointer to the subscriber object for which this event notice serves + /// * The events currently being on + typedef std::list enotice_t; + + struct Wait + { + /// Events the subscriber is interested with. Only those will be + /// regarded when updating event flags. + int watch; + + /// Which events should be edge-triggered. When the event isn't + /// mentioned in `watch`, this bit flag is disregarded. Otherwise + /// it means that the event is to be waited for persistent state + /// if this flag is not present here, and for edge trigger, if + /// the flag is present here. + int edge; + + /// The current persistent state. This is usually duplicated in + /// a dedicated state object in `m_USockEventNotice`, however the state + /// here will stay forever as is, regardless of the edge/persistent + /// subscription mode for the event. + int state; + + /// The iterator to `m_USockEventNotice` container that contains the + /// event notice object for this subscription, or the value from + /// `nullNotice()` if there is no such object. + enotice_t::iterator notit; + + Wait(int sub, bool etr, enotice_t::iterator i) + :watch(sub) + ,edge(etr ? sub : 0) + ,state(0) + ,notit(i) + { + } + + int edgeOnly() { return edge & watch; } + }; + + typedef std::map ewatch_t; + +private: + + /// Sockets that are subscribed for events in this eid. + ewatch_t m_USockWatchState; + + /// Objects representing changes in SRT sockets. + /// Objects are removed from here when an event is registerred as edge-triggered. + /// Otherwise it is removed only when all events as per subscription + /// are no longer on. + enotice_t m_USockEventNotice; + + // Special behavior + int32_t m_Flags; + + enotice_t::iterator nullNotice() { return m_USockEventNotice.end(); } + +public: + + CEPollDesc(): + m_Flags(0) + { + } + + static const int32_t EF_NOCHECK_EMPTY = 1 << 0; + static const int32_t EF_CHECK_REP = 1 << 1; + + int32_t flags() { return m_Flags; } + bool flags(int32_t f) { return (m_Flags & f) != 0; } + void set_flags(int32_t flg) { m_Flags |= flg; } + void clr_flags(int32_t flg) { m_Flags &= ~flg; } + + // Container accessors for ewatch_t. + bool watch_empty() { return m_USockWatchState.empty(); } + Wait* watch_find(SRTSOCKET sock) + { + ewatch_t::iterator i = m_USockWatchState.find(sock); + if (i == m_USockWatchState.end()) + return NULL; + return &i->second; + } + + // Container accessors for enotice_t. + enotice_t::iterator enotice_begin() { return m_USockEventNotice.begin(); } + enotice_t::iterator enotice_end() { return m_USockEventNotice.end(); } int m_iLocalID; // local system epoll ID std::set m_sLocals; // set of local (non-UDT) descriptors - std::set m_sUDTWrites; // UDT sockets ready for write - std::set m_sUDTReads; // UDT sockets ready for read - std::set m_sUDTExcepts; // UDT sockets with exceptions (connection broken, etc.) + std::pair addWatch(SRTSOCKET sock, int32_t events, bool edgeTrg) + { + return m_USockWatchState.insert(std::make_pair(sock, Wait(events, edgeTrg, nullNotice()))); + } + + void addEventNotice(Wait& wait, SRTSOCKET sock, int events) + { + // `events` contains bits to be set, so: + // + // 1. If no notice object exists, add it exactly with `events`. + // 2. If it exists, only set the bits from `events`. + // ASSUME: 'events' is not 0, that is, we have some readiness + + if (wait.notit == nullNotice()) // No notice object + { + // Add new event notice and bind to the wait object. + m_USockEventNotice.push_back(Notice(&wait, sock, events)); + wait.notit = --m_USockEventNotice.end(); + + return; + } + + // We have an existing event notice, so update it + wait.notit->events |= events; + } + + // This function only updates the corresponding event notice object + // according to the change in the events. + void updateEventNotice(Wait& wait, SRTSOCKET sock, int events, bool enable) + { + if (enable) + { + addEventNotice(wait, sock, events); + } + else + { + // `events` contains bits to be cleared. + // 1. If there is no notice event, do nothing - clear already. + // 2. If there is a notice event, update by clearing the bits + // 2.1. If this made resulting state to be 0, also remove the notice. + + // If wait.notit is empty, there's no event to clear + if (wait.notit == nullNotice()) + return; + + // Update the state + const int newstate = wait.notit->events & (~events); + + if (newstate == 0) + { + // If the new state is full 0 (no events), + // then remove the corresponding notice object + m_USockEventNotice.erase(wait.notit); + + // and set the "corresponding notice object" to nothing + wait.notit = nullNotice(); + return; + } + + wait.notit->events = newstate; + } + } + + void removeSubscription(SRTSOCKET u) + { + std::map::iterator i = m_USockWatchState.find(u); + if (i == m_USockWatchState.end()) + return; + + if (i->second.notit != nullNotice()) + { + m_USockEventNotice.erase(i->second.notit); + // NOTE: no need to update the Wait::notit field + // because the Wait object is about to be removed anyway. + } + m_USockWatchState.erase(i); + } + + void removeExistingNotices(Wait& wait) + { + m_USockEventNotice.erase(wait.notit); + wait.notit = nullNotice(); + } + + void removeEvents(Wait& wait) + { + if (wait.notit == nullNotice()) + return; + removeExistingNotices(wait); + } + + void checkEdge(enotice_t::iterator i) + { + // This function should check if this event was subscribed + // as edge-triggered, and if so, clear the event from the notice. + // Update events and check edge mode at the subscriber + i->events &= ~i->parent->edgeOnly(); + if(!i->events) + removeExistingNotices(*i->parent); + } }; class CEPoll @@ -96,7 +291,7 @@ friend class CRendezvousQueue; /// @param [in] events events to watch. /// @return 0 if success, otherwise an error number. - int add_usock(const int eid, const SRTSOCKET& u, const int* events = NULL); + int add_usock(const int eid, const SRTSOCKET& u, const int* events = NULL) { return update_usock(eid, u, events); } /// add a system socket to an EPoll. /// @param [in] eid EPoll ID. @@ -111,7 +306,7 @@ friend class CRendezvousQueue; /// @param [in] u UDT socket ID. /// @return 0 if success, otherwise an error number. - int remove_usock(const int eid, const SRTSOCKET& u); + int remove_usock(const int eid, const SRTSOCKET& u) { static const int Null(0); return update_usock(eid, u, &Null);} /// remove a system socket event from an EPoll; socket will be removed if no events to watch. /// @param [in] eid EPoll ID. @@ -125,7 +320,7 @@ friend class CRendezvousQueue; /// @param [in] events events to watch. /// @return 0 if success, otherwise an error number. - int update_usock(const int eid, const SRTSOCKET& u, const int* events = NULL); + int update_usock(const int eid, const SRTSOCKET& u, const int* events); /// update a system socket events from an EPoll. /// @param [in] eid EPoll ID. diff --git a/srtcore/srt.h b/srtcore/srt.h index ebc2b1fb7..91ab48f66 100644 --- a/srtcore/srt.h +++ b/srtcore/srt.h @@ -522,7 +522,24 @@ enum SRT_EPOLL_OPT // so that if system values are used by mistake, they should have the same effect SRT_EPOLL_IN = 0x1, SRT_EPOLL_OUT = 0x4, - SRT_EPOLL_ERR = 0x8 + SRT_EPOLL_ERR = 0x8, + SRT_EPOLL_ET = 1u << 31 +}; +// These are actually flags - use a bit container: +typedef int32_t SRT_EPOLL_T; + +enum SRT_EPOLL_FLAGS +{ + /// This allows the EID container to be empty when calling the waiting + /// function with infinite time. This means an infinite hangup, although + /// a socket can be added to this EID from a separate thread. + SRT_EPOLL_ENABLE_EMPTY = 1, + + /// This makes the waiting function check if there is output container + /// passed to it, and report an error if it isn't. By default it is allowed + /// that the output container is 0 size or NULL and therefore the readiness + /// state is reported only as a number of ready sockets from return value. + SRT_EPOLL_ENABLE_OUTPUTCHECK = 2 }; #ifdef __cplusplus @@ -663,6 +680,11 @@ SRT_API int srt_epoll_update_ssock(int eid, SYSSOCKET s, const int* events); SRT_API int srt_epoll_wait(int eid, SRTSOCKET* readfds, int* rnum, SRTSOCKET* writefds, int* wnum, int64_t msTimeOut, SYSSOCKET* lrfds, int* lrnum, SYSSOCKET* lwfds, int* lwnum); +typedef struct SRT_EPOLL_EVENT_ +{ + SRTSOCKET fd; + int events; // SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR +} SRT_EPOLL_EVENT; SRT_API int srt_epoll_release(int eid); // Logging control diff --git a/srtcore/srt_c_api.cpp b/srtcore/srt_c_api.cpp index 4674f83b6..67244ff08 100644 --- a/srtcore/srt_c_api.cpp +++ b/srtcore/srt_c_api.cpp @@ -221,15 +221,7 @@ int srt_epoll_remove_ssock(int eid, SYSSOCKET s) { return CUDT::epoll_remove_sso int srt_epoll_update_usock(int eid, SRTSOCKET u, const int * events) { - int srt_ev = 0; - - if (events) { - srt_ev = *events; - } else { - srt_ev = SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR; - } - - return CUDT::epoll_update_usock(eid, u, &srt_ev); + return CUDT::epoll_update_usock(eid, u, events); } int srt_epoll_update_ssock(int eid, SYSSOCKET s, const int * events) @@ -247,10 +239,10 @@ int srt_epoll_update_ssock(int eid, SYSSOCKET s, const int * events) } int srt_epoll_wait( - int eid, - SRTSOCKET* readfds, int* rnum, SRTSOCKET* writefds, int* wnum, - int64_t msTimeOut, - SYSSOCKET* lrfds, int* lrnum, SYSSOCKET* lwfds, int* lwnum) + int eid, + SRTSOCKET* readfds, int* rnum, SRTSOCKET* writefds, int* wnum, + int64_t msTimeOut, + SYSSOCKET* lrfds, int* lrnum, SYSSOCKET* lwfds, int* lwnum) { return UDT::epoll_wait2( eid, diff --git a/testing/testmedia.cpp b/testing/testmedia.cpp index 07faa5915..118cca59b 100755 --- a/testing/testmedia.cpp +++ b/testing/testmedia.cpp @@ -42,7 +42,7 @@ bool transmit_printformat_json = false; srt_listen_callback_fn* transmit_accept_hook_fn = nullptr; void* transmit_accept_hook_op = nullptr; -string DirectionName(SRT_EPOLL_OPT direction) +string DirectionName(SRT_EPOLL_T direction) { string dir_name; if (direction) diff --git a/testing/testmedia.hpp b/testing/testmedia.hpp index 26c2b5692..54b529208 100644 --- a/testing/testmedia.hpp +++ b/testing/testmedia.hpp @@ -43,7 +43,7 @@ class SrtCommon protected: int srt_epoll = -1; - SRT_EPOLL_OPT m_direction = SRT_EPOLL_OPT_NONE; //< Defines which of SND or RCV option variant should be used, also to set SRT_SENDER for output + SRT_EPOLL_T m_direction = SRT_EPOLL_OPT_NONE; //< Defines which of SND or RCV option variant should be used, also to set SRT_SENDER for output bool m_blocking_mode = true; //< enforces using SRTO_SNDSYN or SRTO_RCVSYN, depending on @a m_direction int m_timeout = 0; //< enforces using SRTO_SNDTIMEO or SRTO_RCVTIMEO, depending on @a m_direction bool m_tsbpdmode = true;