Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[apps] FIX srt-live-transmit stops listening after SRT disconnect #2997... #3108

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 51 additions & 8 deletions apps/srt-live-transmit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ int main(int argc, char** argv)
bool srcConnected = false;
unique_ptr<Target> tar;
bool tarConnected = false;
bool srcMayBlock = false;

int pollid = srt_epoll_create();
if (pollid < 0)
Expand Down Expand Up @@ -514,6 +515,7 @@ int main(int argc, char** argv)
return 1;
}
int events = SRT_EPOLL_IN | SRT_EPOLL_ERR;

switch (src->uri.type())
{
case UriParser::SRT:
Expand All @@ -537,14 +539,19 @@ int main(int argc, char** argv)
}
break;
case UriParser::FILE:
if (srt_epoll_add_ssock(pollid,
src->GetSysSocket(), &events))
{
cerr << "Failed to add FILE source to poll, "
<< src->GetSysSocket() << endl;
return 1;
int con = src->GetSysSocket();
cl-ment marked this conversation as resolved.
Show resolved Hide resolved
// try to make the standard input non blocking
srcMayBlock = fcntl(con, F_SETFL, fcntl(con, F_GETFL) | O_NONBLOCK) < 0;
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
if (srt_epoll_add_ssock(pollid, con, &events))
{
cerr << "Failed to add FILE source to poll, "
<< src->GetSysSocket() << endl;
return 1;
}
break;

}
break;
default:
break;
}
Expand Down Expand Up @@ -589,6 +596,7 @@ int main(int argc, char** argv)
SRTSOCKET srtrwfds[4] = {SRT_INVALID_SOCK, SRT_INVALID_SOCK , SRT_INVALID_SOCK , SRT_INVALID_SOCK };
int sysrfdslen = 2;
SYSSOCKET sysrfds[2];

if (srt_epoll_wait(pollid,
&srtrwfds[0], &srtrfdslen, &srtrwfds[2], &srtwfdslen,
100,
Expand Down Expand Up @@ -771,12 +779,46 @@ int main(int argc, char** argv)
break;
}


bool srcReady = false;

if (src.get() && src->IsOpen())
{
if (srtrfdslen > 0)
{
SRTSOCKET sock = src->GetSRTSocket();
if (sock != SRT_INVALID_SOCK)
{
for (int n = 0; n < srtrfdslen; n ++)
if (sock == srtrwfds[n])
{
srcReady = true;
break;
}
cl-ment marked this conversation as resolved.
Show resolved Hide resolved

}
}
if (!srcReady && sysrfdslen > 0)
{
int sock = src->GetSysSocket();
if (sock != -1)
{
for (int n = 0; n < sysrfdslen; n++)
if (sock == sysrfds[n])
{
srcReady = true;
break;
}

cl-ment marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
// read a few chunks at a time in attempt to deplete
// read buffers as much as possible on each read event
// note that this implies live streams and does not
// work for cached/file sources
std::list<std::shared_ptr<MediaPacket>> dataqueue;
if (src.get() && src->IsOpen() && (srtrfdslen || sysrfdslen))
if (srcReady)
{
while (dataqueue.size() < cfg.buffering)
{
Expand All @@ -800,9 +842,10 @@ int main(int argc, char** argv)

dataqueue.push_back(pkt);
receivedBytes += pkt->payload.size();
if (srcMayBlock)
break;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that actually reading should be done this way: either you have the device read-ready, so you read, and then after you read, you don't know, and have to recheck.

Or, you can resolve to reading multiple times, counting on that when particular time reading isn't ready, then the Read call should report an error. Might be, I think, a good idea, to keep the "blocked" state in the fields, which will be written to, in case when particular Read implementation finds out that the call failed due to not being ready. This way it won't need to see if this is SRT and this way we use that function to get the error and maybe check for an SRT-specific readiness failure.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand, the current implementation already follows this approach. For file descriptors in blocking mode (case 1), it checks if the device is ready before attempting to read, performs the read operation, and rechecks the state afterward since readiness isn’t guaranteed. For file descriptors in non-blocking mode (case 2), it handles multiple read attempts and relies on the error returned by the Read call to detect if the device isn’t ready.
Could you clarify if there’s something specific you’d like me to adjust or add to the current implementation? Perhaps there’s a particular scenario or behavior you’d like to address that isn’t currently handled?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This here is unclear. First, MayBlock() actually represent the state whether setting the nonblocking mode could be done, which is useless. If the architecture requires nonblocking mode, it should be nonblocking always, and all devices should be operated as such. The case if operating the console device in blocking mode should not be even taken into account.

The only thing I'm referring to is the approach to multiple reading calls, which should follow one of two methods:

  • Check if read-ready always before calling Read(), and break the loop if it isn't
  • Check if read-ready once before the loop, then call Read() in loop until Read() informs you that reading is no longer possible

Note that the second approach isn't possible to be used reliably in case of blocking mode, that's why it should not be taken into account.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the benefits of enforcing non-blocking mode for consistency and simplifying the architecture. However, I think it might be preferable to maintain support for both blocking and non-blocking modes. This flexibility ensures compatibility with a broader range of sources and use cases, particularly for systems where blocking mode is either required or more practical.

The current implementation can differentiate the handling of each mode:
• For blocking mode: Always check read-readiness before calling Read().
• For non-blocking mode: Check read-readiness once before the loop and perform multiple Read() calls until it’s no longer possible.

Would you be open to maintaining support for both modes, or do you see specific challenges in doing so?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial specific about srt-live-transmit is that it is intended as a sample application, showing how to work with SRT using epoll and non-blocking mode. I don't think supporting the blocking mode was the case.
Now the problem is that a user can actually set the blocking mode via URI srt://ip:port?blocking=true, and then the application must either work, or report an error that the blocking mode is not supported.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly. This is built in into the architecture of this application to only support the nonblocking mode and it should not even take the blocking mode into account - unlike the srt-test-live application, which supports both, and it is prepared to work in either mode.

}
}

// if there is no target, let the received data be lost
while (!dataqueue.empty())
{
Expand Down
15 changes: 7 additions & 8 deletions apps/transmitmedia.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -720,9 +720,8 @@ class ConsoleSource: public Source
if (pkt.payload.size() < chunk)
pkt.payload.resize(chunk);

bool st = cin.read(pkt.payload.data(), chunk).good();
chunk = cin.gcount();
if (chunk == 0 || !st)
int ret = ::read(GetSysSocket(), pkt.payload.data(), chunk);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int ret = ::read(GetSysSocket(), pkt.payload.data(), chunk);
const int ret = ::read(GetSysSocket(), pkt.payload.data(), chunk);

if (ret <= 0)
{
pkt.payload.clear();
return 0;
Expand All @@ -731,14 +730,14 @@ class ConsoleSource: public Source
// Save this time to potentially use it for SRT target.
pkt.time = srt_time_now();
if (chunk < pkt.payload.size())
pkt.payload.resize(chunk);
pkt.payload.resize(ret);

return (int) chunk;
return ret;
}

bool IsOpen() override { return cin.good(); }
bool IsOpen() override { return !cin.eof(); }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel good about this, because the IsOpen() function (BTW should be const) is supposed to check if the file is open. If the EOF functionality is needed, maybe better to use End() function already defined below.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the only use case when cin.eof() could be true is when a user presses Ctrl-D and when the pipeline is broken (you also need to ignore SIGPIPE).

bool End() override { return cin.eof(); }
int GetSysSocket() const override { return 0; };
int GetSysSocket() const override { return fileno(stdin); };
};

class ConsoleTarget: public Target
Expand Down Expand Up @@ -767,7 +766,7 @@ class ConsoleTarget: public Target

bool IsOpen() override { return cout.good(); }
bool Broken() override { return cout.eof(); }
int GetSysSocket() const override { return 0; };
int GetSysSocket() const override { return fileno(stdout); };
};

template <class Iface> struct Console;
Expand Down
Loading