From 15fbe45a9a3aa93fc2b09d402379f698fb45c9bd Mon Sep 17 00:00:00 2001 From: Winlin Date: Sun, 1 Sep 2024 13:02:07 +0800 Subject: [PATCH] FLV: Refine source and http handler. v6.0.155 v7.0.14 (#4165) 1. Do not create a source when mounting FLV because it may not unmount FLV when freeing the source. If you access the FLV stream without any publisher, then wait for source cleanup and review the FLV stream again, there is an annoying warning message. ```bash # View HTTP FLV stream by curl, wait for stream to be ready. # curl http://localhost:8080/live/livestream.flv -v >/dev/null HTTP #0 127.0.0.1:58026 GET http://localhost:8080/live/livestream.flv, content-length=-1 new live source, stream_url=/live/livestream http: mount flv stream for sid=/live/livestream, mount=/live/livestream.flv # Cancel the curl and trigger source cleanup without http unmount. client disconnect peer. ret=1007 Live: cleanup die source, id=[], total=1 # View the stream again, it fails. # curl http://localhost:8080/live/livestream.flv -v >/dev/null HTTP #0 127.0.0.1:58040 GET http://localhost:8080/live/livestream.flv, content-length=-1 serve error code=1097(NoSource)(No source found) : process request=0 : cors serve : serve http : no source for /live/livestream serve_http() [srs_app_http_stream.cpp:641] ``` > Note: There is an inconsistency. The first time, you can access the FLV stream and wait for the publisher, but the next time, you cannot. 2. Create a source when starting to serve the FLV client. We do not need to create the source when creating the HTTP handler. Instead, we should try to create the source in the cache or stream. Because the source cleanup does not unmount the HTTP handler, the handler remains after the source is destroyed. The next time you access the FLV stream, the source is not found. ```cpp srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) { SrsSharedPtr live_source; if ((err = _srs_sources->fetch_or_create(r.get(), server, live_source)) != srs_success) { } if ((err = http_mount(r.get())) != srs_success) { } srs_error_t SrsBufferCache::cycle() { SrsSharedPtr live_source = _srs_sources->fetch(req); if (!live_source.get()) { return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str()); } srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { SrsSharedPtr live_source = _srs_sources->fetch(req); if (!live_source.get()) { return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str()); } ``` > Note: We should not create the source in hijack, instead, we create it in cache or stream: ```cpp srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) { if ((err = http_mount(r.get())) != srs_success) { } srs_error_t SrsBufferCache::cycle() { SrsSharedPtr live_source; if ((err = _srs_sources->fetch_or_create(req, server_, live_source)) != srs_success) { } srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { SrsSharedPtr live_source; if ((err = _srs_sources->fetch_or_create(req, server_, live_source)) != srs_success) { } ``` > Note: This fixes the failure and annoying warning message, and maintains consistency by always waiting for the stream to be ready if there is no publisher. 3. Fail the http request if the HTTP handler is disposing, and also keep the handler entry when disposing the stream, because we should dispose the handler entry and stream at the same time. ```cpp srs_error_t SrsHttpStreamServer::http_mount(SrsRequest* r) { entry = streamHandlers[sid]; if (entry->disposing) { return srs_error_new(ERROR_STREAM_DISPOSING, "stream is disposing"); } void SrsHttpStreamServer::http_unmount(SrsRequest* r) { std::map::iterator it = streamHandlers.find(sid); SrsUniquePtr entry(it->second); entry->disposing = true; ``` > Note: If the disposal process takes a long time, this will prevent unexpected behavior or access to the resource that is being disposed of. 4. In edge mode, the edge ingester will unpublish the source when the last consumer quits, which is actually triggered by the HTTP stream. While it also waits for the stream to quit when the HTTP unmounts, there is a self-destruction risk: the HTTP live stream object destroys itself. ```cpp srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { SrsUniquePtr consumer(consumer_raw); // Trigger destroy. void SrsHttpStreamServer::http_unmount(SrsRequest* r) { for (;;) { if (!cache->alive() && !stream->alive()) { break; } // A circle reference. mux.unhandle(entry->mount, stream.get()); // Free the SrsLiveStream itself. ``` > Note: It also introduces a circular reference in the object relationships, the stream reference to itself when unmount: ```text SrsLiveStream::serve_http -> SrsLiveConsumer::~SrsLiveConsumer -> SrsEdgeIngester::stop -> SrsLiveSource::on_unpublish -> SrsHttpStreamServer::http_unmount -> SrsLiveStream::alive ``` > Note: We should use an asynchronous worker to perform the cleanup to avoid the stream destroying itself and to prevent self-referencing. ```cpp void SrsHttpStreamServer::http_unmount(SrsRequest* r) { entry->disposing = true; if ((err = async_->execute(new SrsHttpStreamDestroy(&mux, &streamHandlers, sid))) != srs_success) { } ``` > Note: This also ensures there are no circular references and no self-destruction. --------- Co-authored-by: Jacob Su --- trunk/doc/CHANGELOG.md | 2 + trunk/src/app/srs_app_http_stream.cpp | 183 +++++++++++++++++--------- trunk/src/app/srs_app_http_stream.hpp | 26 +++- trunk/src/core/srs_core_version6.hpp | 2 +- trunk/src/core/srs_core_version7.hpp | 2 +- trunk/src/kernel/srs_kernel_error.hpp | 3 +- 6 files changed, 154 insertions(+), 64 deletions(-) diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 2e5ad9b92f..c6eae0b8fc 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 7.0 Changelog +* v7.0, 2024-09-01, Merge [#4165](https://github.com/ossrs/srs/pull/4165): FLV: Refine source and http handler. v7.0.14 (#4165) * v7.0, 2024-09-01, Merge [#4166](https://github.com/ossrs/srs/pull/4166): Edge: Fix flv edge crash when http unmount. v7.0.13 (#4166) * v7.0, 2024-08-31, Merge [#4162](https://github.com/ossrs/srs/pull/4162): Fix #3767: RTMP: Do not response empty data packet. v7.0.12 (#4162) * v7.0, 2024-08-31, Merge [#4164](https://github.com/ossrs/srs/pull/4164): HTTP-FLV: Notify connection to expire when unpublishing. v7.0.11 (#4164) @@ -25,6 +26,7 @@ The changelog for SRS. ## SRS 6.0 Changelog +* v6.0, 2024-09-01, Merge [#4165](https://github.com/ossrs/srs/pull/4165): FLV: Refine source and http handler. v6.0.155 (#4165) * v6.0, 2024-09-01, Merge [#4166](https://github.com/ossrs/srs/pull/4166): Edge: Fix flv edge crash when http unmount. v6.0.154 (#4166) * v6.0, 2024-08-31, Merge [#4162](https://github.com/ossrs/srs/pull/4162): Fix #3767: RTMP: Do not response empty data packet. v6.0.153 (#4162) * v6.0, 2024-08-31, Merge [#4164](https://github.com/ossrs/srs/pull/4164): HTTP-FLV: Notify connection to expire when unpublishing. v6.0.152 (#4164) diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index df2c4a523e..36131c4515 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -39,8 +39,9 @@ using namespace std; #include #include #include +#include -SrsBufferCache::SrsBufferCache(SrsRequest* r) +SrsBufferCache::SrsBufferCache(SrsServer* s, SrsRequest* r) { req = r->copy()->as_http(); queue = new SrsMessageQueue(true); @@ -48,6 +49,7 @@ SrsBufferCache::SrsBufferCache(SrsRequest* r) // TODO: FIXME: support reload. fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost); + server_ = s; } SrsBufferCache::~SrsBufferCache() @@ -69,6 +71,11 @@ srs_error_t SrsBufferCache::update_auth(SrsRequest* r) srs_error_t SrsBufferCache::start() { srs_error_t err = srs_success; + + // Not enabled. + if (fast_cache <= 0) { + return err; + } if ((err = trd->start()) != srs_success) { return srs_error_wrap(err, "corotine"); @@ -79,11 +86,21 @@ srs_error_t SrsBufferCache::start() void SrsBufferCache::stop() { + // Not enabled. + if (fast_cache <= 0) { + return; + } + trd->stop(); } bool SrsBufferCache::alive() { + // Not enabled. + if (fast_cache <= 0) { + return false; + } + srs_error_t err = trd->pull(); if (err == srs_success) { return true; @@ -115,17 +132,12 @@ srs_error_t SrsBufferCache::dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterA srs_error_t SrsBufferCache::cycle() { srs_error_t err = srs_success; - - // TODO: FIXME: support reload. - if (fast_cache <= 0) { - srs_usleep(SRS_STREAM_CACHE_CYCLE); - return err; - } - SrsSharedPtr live_source = _srs_sources->fetch(req); - if (!live_source.get()) { - return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str()); + SrsSharedPtr live_source; + if ((err = _srs_sources->fetch_or_create(req, server_, live_source)) != srs_success) { + return srs_error_wrap(err, "source create"); } + srs_assert(live_source.get() != NULL); // the stream cache will create consumer to cache stream, // which will trigger to fetch stream from origin for edge. @@ -578,11 +590,12 @@ srs_error_t SrsBufferWriter::writev(const iovec* iov, int iovcnt, ssize_t* pnwri return writer->writev(iov, iovcnt, pnwrite); } -SrsLiveStream::SrsLiveStream(SrsRequest* r, SrsBufferCache* c) +SrsLiveStream::SrsLiveStream(SrsServer* s, SrsRequest* r, SrsBufferCache* c) { cache = c; req = r->copy()->as_http(); security_ = new SrsSecurity(); + server_ = s; } SrsLiveStream::~SrsLiveStream() @@ -636,10 +649,17 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage return srs_error_wrap(err, "http hook"); } - SrsSharedPtr live_source = _srs_sources->fetch(req); - if (!live_source.get()) { - return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str()); + // Always try to create the source, because http handler won't create it. + SrsSharedPtr live_source; + if ((err = _srs_sources->fetch_or_create(req, server_, live_source)) != srs_success) { + return srs_error_wrap(err, "source create"); } + srs_assert(live_source.get() != NULL); + + bool enabled_cache = _srs_config->get_gop_cache(req->vhost); + int gcmf = _srs_config->get_gop_cache_max_frames(req->vhost); + live_source->set_cache(enabled_cache); + live_source->set_gop_cache_max_frames(gcmf); // Create consumer of source, ignore gop cache, use the audio gop cache. SrsLiveConsumer* consumer_raw = NULL; @@ -926,6 +946,7 @@ srs_error_t SrsLiveStream::streaming_send_messages(ISrsBufferEncoder* enc, SrsSh SrsLiveEntry::SrsLiveEntry(std::string m) { mount = m; + disposing = false; stream = NULL; cache = NULL; @@ -967,6 +988,7 @@ bool SrsLiveEntry::is_mp3() SrsHttpStreamServer::SrsHttpStreamServer(SrsServer* svr) { server = svr; + async_ = new SrsAsyncCallWorker(); mux.hijack(this); _srs_config->subscribe(this); @@ -976,6 +998,9 @@ SrsHttpStreamServer::~SrsHttpStreamServer() { mux.unhijack(this); _srs_config->unsubscribe(this); + + async_->stop(); + srs_freep(async_); if (true) { std::map::iterator it; @@ -1003,6 +1028,10 @@ srs_error_t SrsHttpStreamServer::initialize() if ((err = initialize_flv_streaming()) != srs_success) { return srs_error_wrap(err, "http flv stream"); } + + if ((err = async_->start()) != srs_success) { + return srs_error_wrap(err, "async start"); + } return err; } @@ -1037,8 +1066,8 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsRequest* r) entry = new SrsLiveEntry(mount); entry->req = r->copy()->as_http(); - entry->cache = new SrsBufferCache(r); - entry->stream = new SrsLiveStream(r, entry->cache); + entry->cache = new SrsBufferCache(server, r); + entry->stream = new SrsLiveStream(server, r, entry->cache); // TODO: FIXME: maybe refine the logic of http remux service. // if user push streams followed: @@ -1067,6 +1096,12 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsRequest* r) } else { // The entry exists, we reuse it and update the request of stream and cache. entry = streamHandlers[sid]; + + // Fail if system is disposing the entry. + if (entry->disposing) { + return srs_error_new(ERROR_STREAM_DISPOSING, "stream is disposing"); + } + entry->stream->update_auth(r); entry->cache->update_auth(r); } @@ -1088,36 +1123,19 @@ void SrsHttpStreamServer::http_unmount(SrsRequest* r) return; } - // Free all HTTP resources. - SrsUniquePtr entry(it->second); - streamHandlers.erase(it); - - SrsUniquePtr stream(entry->stream); - SrsUniquePtr cache(entry->cache); - - // Notify cache and stream to stop. - if (stream->entry) stream->entry->enabled = false; - stream->expire(); - cache->stop(); - - // Wait for cache and stream to stop. - int i = 0; - for (; i < 1024; i++) { - if (!cache->alive() && !stream->alive()) { - break; - } - srs_usleep(100 * SRS_UTIME_MILLISECONDS); + // Set the entry to disposing, which will prevent the stream to be reused. + SrsLiveEntry* entry = it->second; + if (entry->disposing) { + return; } + entry->disposing = true; - if (cache->alive() || stream->alive()) { - srs_warn("http: try to free a alive stream, cache=%d, stream=%d", cache->alive(), stream->alive()); + // Use async worker to execute the task, which will destroy the stream. + srs_error_t err = srs_success; + if ((err = async_->execute(new SrsHttpStreamDestroy(&mux, &streamHandlers, sid))) != srs_success) { + srs_warn("http: ignore unmount stream failed, sid=%s, err=%s", sid.c_str(), srs_error_desc(err).c_str()); + srs_freep(err); } - - // Unmount the HTTP handler, which will free the entry. Note that we must free it after cache and - // stream stopped for it uses it. - mux.unhandle(entry->mount, stream.get()); - - srs_trace("http: unmount flv stream for sid=%s, i=%d", sid.c_str(), i); } srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) @@ -1214,17 +1232,6 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle } } - SrsSharedPtr live_source; - if ((err = _srs_sources->fetch_or_create(r.get(), server, live_source)) != srs_success) { - return srs_error_wrap(err, "source create"); - } - srs_assert(live_source.get() != NULL); - - bool enabled_cache = _srs_config->get_gop_cache(r->vhost); - int gcmf = _srs_config->get_gop_cache_max_frames(r->vhost); - live_source->set_cache(enabled_cache); - live_source->set_gop_cache_max_frames(gcmf); - // create http streaming handler. if ((err = http_mount(r.get())) != srs_success) { return srs_error_wrap(err, "http mount"); @@ -1235,11 +1242,8 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle entry = streamHandlers[sid]; *ph = entry->stream; } - - // trigger edge to fetch from origin. - bool vhost_is_edge = _srs_config->get_vhost_is_edge(r->vhost); - srs_trace("flv: source url=%s, is_edge=%d, source_id=%s/%s", - r->get_stream_url().c_str(), vhost_is_edge, live_source->source_id().c_str(), live_source->pre_source_id().c_str()); + + srs_trace("flv: hijack %s ok", upath.c_str()); return err; } @@ -1281,3 +1285,64 @@ srs_error_t SrsHttpStreamServer::initialize_flv_entry(std::string vhost) return err; } +SrsHttpStreamDestroy::SrsHttpStreamDestroy(SrsHttpServeMux* mux, map* handlers, string sid) +{ + mux_ = mux; + sid_ = sid; + streamHandlers_ = handlers; +} + +SrsHttpStreamDestroy::~SrsHttpStreamDestroy() +{ +} + +srs_error_t SrsHttpStreamDestroy::call() +{ + srs_error_t err = srs_success; + + std::map::iterator it = streamHandlers_->find(sid_); + if (it == streamHandlers_->end()) { + return err; + } + + // Free all HTTP resources. + SrsUniquePtr entry(it->second); + srs_assert(entry->disposing); + + SrsUniquePtr stream(entry->stream); + SrsUniquePtr cache(entry->cache); + + // Notify cache and stream to stop. + if (stream->entry) stream->entry->enabled = false; + stream->expire(); + cache->stop(); + + // Wait for cache and stream to stop. + int i = 0; + for (; i < 1024; i++) { + if (!cache->alive() && !stream->alive()) { + break; + } + srs_usleep(100 * SRS_UTIME_MILLISECONDS); + } + + if (cache->alive() || stream->alive()) { + srs_warn("http: try to free a alive stream, cache=%d, stream=%d", cache->alive(), stream->alive()); + } + + // Remove the entry from handlers. + streamHandlers_->erase(it); + + // Unmount the HTTP handler, which will free the entry. Note that we must free it after cache and + // stream stopped for it uses it. + mux_->unhandle(entry->mount, stream.get()); + + srs_trace("http: unmount flv stream for sid=%s, i=%d", sid_.c_str(), i); + return err; +} + +string SrsHttpStreamDestroy::to_string() +{ + return "destroy"; +} + diff --git a/trunk/src/app/srs_app_http_stream.hpp b/trunk/src/app/srs_app_http_stream.hpp index cf37379820..352c4f99f8 100755 --- a/trunk/src/app/srs_app_http_stream.hpp +++ b/trunk/src/app/srs_app_http_stream.hpp @@ -10,6 +10,7 @@ #include #include #include +#include #include @@ -17,18 +18,20 @@ class SrsAacTransmuxer; class SrsMp3Transmuxer; class SrsFlvTransmuxer; class SrsTsTransmuxer; +class SrsAsyncCallWorker; // A cache for HTTP Live Streaming encoder, to make android(weixin) happy. class SrsBufferCache : public ISrsCoroutineHandler { private: srs_utime_t fast_cache; + SrsServer* server_; private: SrsMessageQueue* queue; SrsRequest* req; SrsCoroutine* trd; public: - SrsBufferCache(SrsRequest* r); + SrsBufferCache(SrsServer* s, SrsRequest* r); virtual ~SrsBufferCache(); virtual srs_error_t update_auth(SrsRequest* r); public: @@ -184,12 +187,13 @@ class SrsLiveStream : public ISrsHttpHandler, public ISrsExpire SrsRequest* req; SrsBufferCache* cache; SrsSecurity* security_; + SrsServer* server_; // For multiple viewers, which means there will more than one alive viewers for a live stream, so we must // use an int value to represent if there is any viewer is alive. We should never do cleanup unless all // viewers closed the connection. std::vector viewers_; public: - SrsLiveStream(SrsRequest* r, SrsBufferCache* c); + SrsLiveStream(SrsServer* s, SrsRequest* r, SrsBufferCache* c); virtual ~SrsLiveStream(); virtual srs_error_t update_auth(SrsRequest* r); public: @@ -223,6 +227,9 @@ struct SrsLiveEntry SrsLiveStream* stream; SrsBufferCache* cache; + + // Whether is disposing the entry. + bool disposing; SrsLiveEntry(std::string m); virtual ~SrsLiveEntry(); @@ -240,6 +247,7 @@ class SrsHttpStreamServer : public ISrsReloadHandler { private: SrsServer* server; + SrsAsyncCallWorker* async_; public: SrsHttpServeMux mux; // The http live streaming template, to create streams. @@ -263,5 +271,19 @@ class SrsHttpStreamServer : public ISrsReloadHandler virtual srs_error_t initialize_flv_entry(std::string vhost); }; +class SrsHttpStreamDestroy : public ISrsAsyncCallTask +{ +private: + std::string sid_; + std::map* streamHandlers_; + SrsHttpServeMux* mux_; +public: + SrsHttpStreamDestroy(SrsHttpServeMux* mux, std::map* handlers, std::string sid); + virtual ~SrsHttpStreamDestroy(); +public: + virtual srs_error_t call(); + virtual std::string to_string(); +}; + #endif diff --git a/trunk/src/core/srs_core_version6.hpp b/trunk/src/core/srs_core_version6.hpp index ed3ffb50d3..32328d99a8 100644 --- a/trunk/src/core/srs_core_version6.hpp +++ b/trunk/src/core/srs_core_version6.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 6 #define VERSION_MINOR 0 -#define VERSION_REVISION 154 +#define VERSION_REVISION 155 #endif diff --git a/trunk/src/core/srs_core_version7.hpp b/trunk/src/core/srs_core_version7.hpp index 0f7e659efd..ee85bc8206 100644 --- a/trunk/src/core/srs_core_version7.hpp +++ b/trunk/src/core/srs_core_version7.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 7 #define VERSION_MINOR 0 -#define VERSION_REVISION 13 +#define VERSION_REVISION 14 #endif \ No newline at end of file diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index af9acf12de..dcd8184838 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -107,7 +107,8 @@ XX(ERROR_BACKTRACE_ADDR2LINE , 1094, "BacktraceAddr2Line", "Backtrace addr2line failed") \ XX(ERROR_SYSTEM_FILE_NOT_OPEN , 1095, "FileNotOpen", "File is not opened") \ XX(ERROR_SYSTEM_FILE_SETVBUF , 1096, "FileSetVBuf", "Failed to set file vbuf") \ - XX(ERROR_NO_SOURCE , 1097, "NoSource", "No source found") + XX(ERROR_NO_SOURCE , 1097, "NoSource", "No source found") \ + XX(ERROR_STREAM_DISPOSING , 1098, "StreamDisposing", "Stream is disposing") /**************************************************/ /* RTMP protocol error. */