diff --git a/apps/srt-file-transmit.cpp b/apps/srt-file-transmit.cpp index 0c910aaa4..df241d887 100644 --- a/apps/srt-file-transmit.cpp +++ b/apps/srt-file-transmit.cpp @@ -180,7 +180,7 @@ int parse_args(FileTransmitConfig &cfg, int argc, char** argv) return 2; } - cfg.chunk_size = stoul(Option(params, "1316", o_chunk)); + cfg.chunk_size = stoul(Option(params, "1456", o_chunk)); cfg.skip_flushing = Option(params, false, o_no_flush); cfg.bw_report = stoi(Option(params, "0", o_bwreport)); cfg.stats_report = stoi(Option(params, "0", o_statsrep)); @@ -584,16 +584,15 @@ bool DoDownload(UriParser& us, string directory, string filename, if (connected) { vector buf(cfg.chunk_size); - int n; - if(!ofile.is_open()) + if (!ofile.is_open()) { const char * fn = id.empty() ? filename.c_str() : id.c_str(); directory.append("/"); directory.append(fn); ofile.open(directory.c_str(), ios::out | ios::trunc | ios::binary); - if(!ofile.is_open()) + if (!ofile.is_open()) { cerr << "Error opening file [" << directory << "]" << endl; goto exit; @@ -601,7 +600,7 @@ bool DoDownload(UriParser& us, string directory, string filename, cerr << "Writing output to [" << directory << "]" << endl; } - n = src->Read(cfg.chunk_size, buf, out_stats); + int n = src->Read(cfg.chunk_size, buf, out_stats); if (n == SRT_ERROR) { cerr << "Download: SRT error: " << srt_getlasterror_str() << endl; diff --git a/apps/srt-live-transmit.cpp b/apps/srt-live-transmit.cpp index 9a7baef12..9d9d6d10c 100644 --- a/apps/srt-live-transmit.cpp +++ b/apps/srt-live-transmit.cpp @@ -712,10 +712,24 @@ int main(int argc, char** argv) { std::shared_ptr pdata( new bytevector(cfg.chunk_size)); - if (!src->Read(cfg.chunk_size, *pdata, out_stats) || (*pdata).empty()) + + const int res = src->Read(cfg.chunk_size, *pdata, out_stats); + + if (res == SRT_ERROR && src->uri.type() == UriParser::SRT) + { + if (srt_getlasterror(NULL) == SRT_EASYNCRCV) + break; + + throw std::runtime_error( + string("error: recvmsg: ") + string(srt_getlasterror_str()) + ); + } + + if (res == 0 || pdata->empty()) { break; } + dataqueue.push_back(pdata); receivedBytes += (*pdata).size(); } diff --git a/apps/srt-multiplex.cpp b/apps/srt-multiplex.cpp index 1da34dd6c..4c988cd1f 100644 --- a/apps/srt-multiplex.cpp +++ b/apps/srt-multiplex.cpp @@ -105,8 +105,8 @@ struct MediumPair if (!initial_portion.empty()) { - tar->Write(initial_portion); - if ( tar->Broken() ) + tar->Write(initial_portion.data(), initial_portion.size()); + if (tar->Broken()) { applog.Note() << "OUTPUT BROKEN for loop: " << name; return; @@ -121,7 +121,9 @@ struct MediumPair ostringstream sout; alarm(1); bytevector data; - src->Read(chunk, data); + const int read_res = src->Read(chunk, data); + + alarm(0); if (alarm_state) { @@ -138,8 +140,8 @@ struct MediumPair applog.Note() << sout.str(); break; } - tar->Write(data); - if ( tar->Broken() ) + tar->Write(data.data(), data.size()); + if (tar->Broken()) { sout << " OUTPUT broken"; applog.Note() << sout.str(); diff --git a/apps/transmitbase.hpp b/apps/transmitbase.hpp index 85e4b04f1..fb9b00b11 100644 --- a/apps/transmitbase.hpp +++ b/apps/transmitbase.hpp @@ -42,7 +42,7 @@ class Location class Source: public Location { public: - virtual bool Read(size_t chunk, bytevector& data, std::ostream &out_stats = std::cout) = 0; + virtual int Read(size_t chunk, bytevector& data, std::ostream &out_stats = std::cout) = 0; virtual bool IsOpen() = 0; virtual bool End() = 0; static std::unique_ptr Create(const std::string& url); @@ -65,8 +65,7 @@ class Source: public Location class Target: public Location { public: - virtual int Write(const char* data, size_t size, std::ostream &out_stats = std::cout) = 0; - virtual bool Write(const bytevector& portion) = 0; + virtual int Write(const char* data, size_t size, std::ostream &out_stats = std::cout) = 0; virtual bool IsOpen() = 0; virtual bool Broken() = 0; virtual void Close() {} diff --git a/apps/transmitmedia.cpp b/apps/transmitmedia.cpp index dda8f3f13..645f124f6 100644 --- a/apps/transmitmedia.cpp +++ b/apps/transmitmedia.cpp @@ -57,7 +57,7 @@ class FileSource: public Source throw std::runtime_error(path + ": Can't open file for reading"); } - bool Read(size_t chunk, bytevector& data, ostream &SRT_ATR_UNUSED = cout) override + int Read(size_t chunk, bytevector& data, ostream &SRT_ATR_UNUSED = cout) override { if (data.size() < chunk) data.resize(chunk); @@ -67,12 +67,12 @@ class FileSource: public Source if ( nread < data.size() ) data.resize(nread); - if ( data.empty() ) + if (data.empty()) { - return false; + return 0; } - return true; + return (int) nread; } bool IsOpen() override { return bool(ifile); } @@ -86,16 +86,10 @@ class FileTarget: public Target FileTarget(const string& path): ofile(path, ios::out | ios::trunc | ios::binary) {} - bool Write(const bytevector& data) override - { - ofile.write(data.data(), data.size()); - return !(ofile.bad()); - } - int Write(const char* data, size_t size, ostream &SRT_ATR_UNUSED = cout) override { ofile.write(data, size); - return !(ofile.bad()) ? size : 0; + return !(ofile.bad()) ? (int) size : 0; } bool IsOpen() override { return !!ofile; } @@ -629,38 +623,22 @@ SrtSource::SrtSource(string host, int port, const map& par) hostport_copy = os.str(); } -bool SrtSource::Read(size_t chunk, bytevector& data, ostream &out_stats) +int SrtSource::Read(size_t chunk, bytevector& data, ostream &out_stats) { static unsigned long counter = 1; if (data.size() < chunk) data.resize(chunk); - bool ready = true; - int stat; - do + const int stat = srt_recvmsg(m_sock, data.data(), (int) chunk); + if (stat <= 0) { - stat = srt_recvmsg(m_sock, data.data(), chunk); - if ( stat == SRT_ERROR ) - { - // EAGAIN for SRT READING - if ( srt_getlasterror(NULL) == SRT_EASYNCRCV ) - { - data.clear(); - return false; - } - Error(UDT::getlasterror(), "recvmsg"); - } - - if ( stat == 0 ) - { - throw ReadEOF(hostport_copy); - } + data.clear(); + return stat; } - while (!ready); chunk = size_t(stat); - if ( chunk < data.size() ) + if (chunk < data.size()) data.resize(chunk); const bool need_bw_report = transmit_bw_report && (counter % transmit_bw_report) == transmit_bw_report - 1; @@ -682,7 +660,7 @@ bool SrtSource::Read(size_t chunk, bytevector& data, ostream &out_stats) ++counter; - return true; + return stat; } int SrtTarget::ConfigurePre(SRTSOCKET sock) @@ -707,7 +685,7 @@ int SrtTarget::Write(const char* data, size_t size, ostream &out_stats) { static unsigned long counter = 1; - int stat = srt_sendmsg2(m_sock, data, size, nullptr); + int stat = srt_sendmsg2(m_sock, data, (int) size, nullptr); if (stat == SRT_ERROR) { return stat; @@ -735,10 +713,6 @@ int SrtTarget::Write(const char* data, size_t size, ostream &out_stats) return stat; } -bool SrtTarget::Write(const bytevector& data) -{ - return -1 != Write(data.data(), data.size()); -} SrtModel::SrtModel(string host, int port, map par) { @@ -840,25 +814,23 @@ class ConsoleSource: public Source #endif } - bool Read(size_t chunk, bytevector& data, ostream &SRT_ATR_UNUSED = cout) override + int Read(size_t chunk, bytevector& data, ostream &SRT_ATR_UNUSED = cout) override { if (data.size() < chunk) data.resize(chunk); bool st = cin.read(data.data(), chunk).good(); chunk = cin.gcount(); - if ( chunk == 0 && !st ) + if (chunk == 0 || !st) { data.clear(); - return false; + return 0; } - if ( chunk < data.size() ) + if (chunk < data.size()) data.resize(chunk); - if ( data.empty() ) - return false; - return true; + return (int) chunk; } bool IsOpen() override { return cin.good(); } @@ -882,12 +854,7 @@ class ConsoleTarget: public Target int Write(const char* data, size_t len, ostream &SRT_ATR_UNUSED = cout) override { cout.write(data, len); - return len; - } - - bool Write(const bytevector& data) override - { - return 0 != Write(data.data(), data.size()); + return (int) len; } bool IsOpen() override { return cout.good(); } @@ -1109,7 +1076,7 @@ class UdpSource: public Source, public UdpCommon eof = false; } - bool Read(size_t chunk, bytevector& data, ostream &SRT_ATR_UNUSED = cout) override + int Read(size_t chunk, bytevector& data, ostream &SRT_ATR_UNUSED = cout) override { if (data.size() < chunk) data.resize(chunk); @@ -1117,19 +1084,19 @@ class UdpSource: public Source, public UdpCommon sockaddr_in sa; socklen_t si = sizeof(sockaddr_in); int stat = recvfrom(m_sock, data.data(), chunk, 0, (sockaddr*)&sa, &si); - if ( stat < 1 ) + if (stat < 1) { if (SysError() != EWOULDBLOCK) eof = true; data.clear(); - return false; + return stat; } chunk = size_t(stat); if ( chunk < data.size() ) data.resize(chunk); - return true; + return stat; } bool IsOpen() override { return m_sock != -1; } @@ -1158,11 +1125,6 @@ class UdpTarget: public Target, public UdpCommon return stat; } - bool Write(const bytevector& data) override - { - return -1 != Write(data.data(), data.size()); - } - bool IsOpen() override { return m_sock != -1; } bool Broken() override { return false; } diff --git a/apps/transmitmedia.hpp b/apps/transmitmedia.hpp index 64f2c25c2..3c4f48d22 100644 --- a/apps/transmitmedia.hpp +++ b/apps/transmitmedia.hpp @@ -94,7 +94,7 @@ class SrtSource: public Source, public SrtCommon // Do nothing - create just to prepare for use } - bool Read(size_t chunk, bytevector& data, ostream& out_stats = cout) override; + int Read(size_t chunk, bytevector& data, ostream& out_stats = cout) override; /* In this form this isn't needed. @@ -135,7 +135,6 @@ class SrtTarget: public Target, public SrtCommon int ConfigurePre(SRTSOCKET sock) override; int Write(const char* data, size_t size, ostream &out_stats = cout) override; - bool Write(const bytevector& data) override; bool IsOpen() override { return IsUsable(); } bool Broken() override { return IsBroken(); } void Close() override { return SrtCommon::Close(); }