Skip to content

Commit

Permalink
Edge: Fix flv edge crash when http unmount. v6.0.154 (#4166)
Browse files Browse the repository at this point in the history
Edge FLV is not working because it is stuck in an infinite loop waiting.
Previously, there was no need to wait for exit since resources were not
being cleaned up. Now, since resources need to be cleaned up, it must
wait for all active connections to exit, which causes this issue.

To reproduce the issue, start SRS edge, run the bellow command and press
`CTRL+C` to stop the request:

```bash
curl http://localhost:8080/live/livestream.flv -v >/dev/null
```

It will cause edge to fetch stream from origin, and free the consumer
when client quit. When `SrsLiveStream::do_serve_http` return, it will
free the consumer:

```cpp
srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) {
    SrsUniquePtr<SrsLiveConsumer> consumer(consumer_raw);
```

Keep in mind that in this moment, the stream is alive, because only set
to not alive after this function return:

```cpp
    alive_viewers_++;
    err = do_serve_http(w, r); // Free 'this' alive stream.
    alive_viewers_--; // Crash here, because 'this' is freed.
```

When freeing the consumer, it will cause the source to unpublish and
attempt to free the HTTP handler, which ultimately waits for the stream
not to be alive:

```cpp
SrsLiveConsumer::~SrsLiveConsumer() {
    source_->on_consumer_destroy(this);

void SrsLiveSource::on_consumer_destroy(SrsLiveConsumer* consumer) {
    if (consumers.empty()) {
        play_edge->on_all_client_stop();

void SrsLiveSource::on_unpublish() {
    handler->on_unpublish(req);

void SrsHttpStreamServer::http_unmount(SrsRequest* r) {
    if (stream->entry) stream->entry->enabled = false;

    for (; i < 1024; i++) {
        if (!cache->alive() && !stream->alive()) {
            break;
        }
        srs_usleep(100 * SRS_UTIME_MILLISECONDS);
    }
```

After 120 seconds, it will free the stream and cause SRS to crash
because the stream is still active. In order to track this potential
issue, also add an important warning log:

```cpp
srs_warn("http: try to free a alive stream, cache=%d, stream=%d", cache->alive(), stream->alive());
```

SRS may crash if got this log.

---------

Co-authored-by: Jacob Su <[email protected]>
  • Loading branch information
winlinvip and suzp1984 committed Aug 31, 2024
1 parent 0bac21d commit 920f87c
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 21 deletions.
1 change: 1 addition & 0 deletions trunk/doc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The changelog for SRS.
<a name="v6-changes"></a>

## SRS 6.0 Changelog
* v6.0, 2024-09-01, Merge [#4166](https://github.com/ossrs/srs/pull/4166): Edge: Fix flv edge crash when http unmount. v6.0.154 (#4166)
* v6.0, 2024-08-31, Merge [#4162](https://github.com/ossrs/srs/pull/4162): Fix #3767: RTMP: Do not response empty data packet. v6.0.153 (#4162)
* v6.0, 2024-08-31, Merge [#4164](https://github.com/ossrs/srs/pull/4164): HTTP-FLV: Notify connection to expire when unpublishing. v6.0.152 (#4164)
* v6.0, 2024-08-24, Merge [#4157](https://github.com/ossrs/srs/pull/4157): Fix crash when quiting. v6.0.151 (#4157)
Expand Down
41 changes: 24 additions & 17 deletions trunk/src/app/srs_app_http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -636,17 +636,32 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage
return srs_error_wrap(err, "http hook");
}

SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(req);
if (!live_source.get()) {
return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
}

// Create consumer of source, ignore gop cache, use the audio gop cache.
SrsLiveConsumer* consumer_raw = NULL;
if ((err = live_source->create_consumer(consumer_raw)) != srs_success) {
return srs_error_wrap(err, "create consumer");
}
// When freeing the consumer, it may trigger the source unpublishing for edge. This will trigger the http
// unmount, which waiting for all http live stream to dispose, so we should free the consumer when this
// object is not alive.
SrsUniquePtr<SrsLiveConsumer> consumer(consumer_raw);

// Add the viewer to the viewers list.
viewers_.push_back(hc);

// Serve the viewer connection.
err = do_serve_http(w, r);
err = do_serve_http(live_source.get(), consumer.get(), w, r);

// Remove viewer from the viewers list.
vector<ISrsExpire*>::iterator it = std::find(viewers_.begin(), viewers_.end(), hc);
srs_assert (it != viewers_.end());
viewers_.erase(it);

// Do hook after serving.
http_hooks_on_stop(r);

Expand All @@ -667,7 +682,7 @@ void SrsLiveStream::expire()
}
}

srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
srs_error_t SrsLiveStream::do_serve_http(SrsLiveSource* source, SrsLiveConsumer* consumer, ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
srs_error_t err = srs_success;

Expand Down Expand Up @@ -711,19 +726,7 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
// Enter chunked mode, because we didn't set the content-length.
w->write_header(SRS_CONSTS_HTTP_OK);

SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(req);
if (!live_source.get()) {
return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
}

// create consumer of souce, ignore gop cache, use the audio gop cache.
SrsLiveConsumer* consumer_raw = NULL;
if ((err = live_source->create_consumer(consumer_raw)) != srs_success) {
return srs_error_wrap(err, "create consumer");
}
SrsUniquePtr<SrsLiveConsumer> consumer(consumer_raw);

if ((err = live_source->consumer_dumps(consumer.get(), true, true, !enc->has_cache())) != srs_success) {
if ((err = source->consumer_dumps(consumer, true, true, !enc->has_cache())) != srs_success) {
return srs_error_wrap(err, "dumps consumer");
}

Expand All @@ -744,7 +747,7 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess

// if gop cache enabled for encoder, dump to consumer.
if (enc->has_cache()) {
if ((err = enc->dump_cache(consumer.get(), live_source->jitter())) != srs_success) {
if ((err = enc->dump_cache(consumer, source->jitter())) != srs_success) {
return srs_error_wrap(err, "encoder dump cache");
}
}
Expand Down Expand Up @@ -1106,6 +1109,10 @@ void SrsHttpStreamServer::http_unmount(SrsRequest* r)
srs_usleep(100 * SRS_UTIME_MILLISECONDS);
}

if (cache->alive() || stream->alive()) {
srs_warn("http: try to free a alive stream, cache=%d, stream=%d", cache->alive(), stream->alive());
}

// Unmount the HTTP handler, which will free the entry. Note that we must free it after cache and
// stream stopped for it uses it.
mux.unhandle(entry->mount, stream.get());
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_http_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class SrsLiveStream : public ISrsHttpHandler, public ISrsExpire
public:
virtual void expire();
private:
virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
virtual srs_error_t do_serve_http(SrsLiveSource* source, SrsLiveConsumer* consumer, ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
virtual srs_error_t http_hooks_on_play(ISrsHttpMessage* r);
virtual void http_hooks_on_stop(ISrsHttpMessage* r);
virtual srs_error_t streaming_send_messages(ISrsBufferEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs);
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_rtc_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ class SrsRtcSource : public ISrsFastTimer
void set_publish_stream(ISrsRtcPublishStream* v);
// Consume the shared RTP packet, user must free it.
srs_error_t on_rtp(SrsRtpPacket* pkt);
// Set and get stream description for souce
// Set and get stream description for source
bool has_stream_desc();
void set_stream_desc(SrsRtcSourceDescription* stream_desc);
std::vector<SrsRtcTrackDescription*> get_track_desc(std::string type, std::string media_type);
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_utility.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ extern std::string srs_path_build_timestamp(std::string template_path);
// @return an int error code.
extern srs_error_t srs_kill_forced(int& pid);

// Current process resouce usage.
// Current process resource usage.
// @see: man getrusage
class SrsRusage
{
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/core/srs_core_version6.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@

#define VERSION_MAJOR 6
#define VERSION_MINOR 0
#define VERSION_REVISION 153
#define VERSION_REVISION 154

#endif

0 comments on commit 920f87c

Please sign in to comment.