Skip to content

Commit

Permalink
FFmpeg: Restart if time and speed abnormal. v5.13.21
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Feb 1, 2024
1 parent 952a42f commit 6686cac
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 114 deletions.
1 change: 1 addition & 0 deletions DEVELOPER.md
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,7 @@ The following are the update records for the SRS Stack server.
* Room: AI-Talk support user different languages. v5.13.18
* Room: AI-Talk allow disable ASR/TTS, enable text. v5.13.19
* Room: AI-Talk support dictation mode. v5.13.20
* FFmpeg: Restart if time and speed abnormal. v5.13.21
* v5.12
* Refine local variable name conf to config. v5.12.1
* Add forced exit on timeout for program termination. v5.12.1
Expand Down
35 changes: 6 additions & 29 deletions platform/camera-live-stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,48 +888,25 @@ func (v *CameraTask) doCameraStreaming(ctx context.Context, input *FFprobeSource
return errors.Wrapf(err, "save task %v", v.String())
}

// Monitor FFmpeg update, restart if not update for a while.
// Pull the latest log frame.
heartbeat := NewFFmpegHeartbeat()
heartbeat.Polling(ctx, stderr)
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(3 * time.Second):
case frame := <-heartbeat.FrameLogs:
v.updateFrame(frame)
}

if v.update.Add(10 * time.Second).Before(time.Now()) {
logger.Tf(ctx, "Camera: FFmpeg not update for a while, restart it")
cancel()
return
}
}
}()

// Read stderr to update status and output of FFmpeg.
pollingCtx, pollingCancel := context.WithCancel(ctx)
go func() {
defer pollingCancel()
buf := make([]byte, 4096)
for ctx.Err() == nil {
nn, err := stderr.Read(buf)
if err != nil || nn == 0 {
break
}

line := string(buf[:nn])
for strings.Contains(line, "= ") {
line = strings.ReplaceAll(line, "= ", "=")
}

v.updateFrame(line)
}
}()

// Process terminated, or user cancel the process.
select {
case <-parentCtx.Done():
case <-ctx.Done():
case <-pollingCtx.Done():
case <-heartbeat.PollingCtx.Done():
}

err = cmd.Wait()
Expand Down
34 changes: 6 additions & 28 deletions platform/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,47 +624,25 @@ func (v *ForwardTask) doForward(ctx context.Context, input *SrsStream) error {
return errors.Wrapf(err, "save task %v", v.String())
}

// Monitor FFmpeg update, restart if not update for a while.
// Pull the latest log frame.
heartbeat := NewFFmpegHeartbeat()
heartbeat.Polling(ctx, stderr)
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(3 * time.Second):
case frame := <-heartbeat.FrameLogs:
v.updateFrame(frame)
}

if v.update.Add(10 * time.Second).Before(time.Now()) {
logger.Tf(ctx, "forward FFmpeg not update for a while, restart it")
cancel()
return
}
}
}()

// Read stderr to update status and output of FFmpeg.
pollingCtx, pollingCancel := context.WithCancel(ctx)
go func() {
defer pollingCancel()
buf := make([]byte, 4096)
for ctx.Err() == nil {
nn, err := stderr.Read(buf)
if err != nil || nn == 0 {
break
}

line := string(buf[:nn])
for strings.Contains(line, "= ") {
line = strings.ReplaceAll(line, "= ", "=")
}
v.updateFrame(line)
}
}()

// Process terminated, or user cancel the process.
select {
case <-parentCtx.Done():
case <-ctx.Done():
case <-pollingCtx.Done():
case <-heartbeat.PollingCtx.Done():
}

err = cmd.Wait()
Expand Down
34 changes: 6 additions & 28 deletions platform/trancode.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,47 +525,25 @@ func (v *TranscodeTask) doTranscode(ctx context.Context, input *SrsStream) error
return errors.Wrapf(err, "save task %v", v.String())
}

// Monitor FFmpeg update, restart if not update for a while.
// Pull the latest log frame.
heartbeat := NewFFmpegHeartbeat()
heartbeat.Polling(ctx, stderr)
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(3 * time.Second):
case frame := <-heartbeat.FrameLogs:
v.updateFrame(frame)
}

if v.update.Add(10 * time.Second).Before(time.Now()) {
logger.Tf(ctx, "transcode FFmpeg not update for a while, restart it")
cancel()
return
}
}
}()

// Read stderr to update status and output of FFmpeg.
pollingCtx, pollingCancel := context.WithCancel(ctx)
go func() {
defer pollingCancel()
buf := make([]byte, 4096)
for ctx.Err() == nil {
nn, err := stderr.Read(buf)
if err != nil || nn == 0 {
break
}

line := string(buf[:nn])
for strings.Contains(line, "= ") {
line = strings.ReplaceAll(line, "= ", "=")
}
v.updateFrame(line)
}
}()

// Process terminated, or user cancel the process.
select {
case <-parentCtx.Done():
case <-ctx.Done():
case <-pollingCtx.Done():
case <-heartbeat.PollingCtx.Done():
}

err = cmd.Wait()
Expand Down
199 changes: 199 additions & 0 deletions platform/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"net/url"
"os"
"path"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -1455,3 +1457,200 @@ func (v *FFprobeSource) String() string {
v.Name, v.Size, v.UUID, v.Target, v.Format, v.Video, v.Audio,
)
}

// The FFmpegHeartbeat is used to manage the heartbeat of FFmpeg, the status of FFmpeg, by detecting the
// log message from FFmpeg output.
type FFmpegHeartbeat struct {
// The last update time.
update time.Time

// Whether exit normally, the log is like:
// Exiting normally, received signal 2.
exitingNormally bool
// Successful parsed log count.
parsedCount uint64
// FFmpeg's standard cycle logs every 1 second. Additional logs, such as FFmpeg error logs, are stored
// separately as extra logs.
extraLogs []string
// Last line of FFmpeg log.
timestamp, speed string
// Total count of failed to parsed logs.
failedParsedCount uint64
// Last line of log which is not parsed successfully.
lastFailedParsed string
// Total count of not changing log.
notChangedCount uint64
// Last line of log which is not changing.
lastNotChanged string
// Total count of log parsing the speed.
failedSpeedCount uint64
// Last failed parsing the speed log.
lastFailedSpeed string
// The most recent continuous high speed, such as 1.5x.
veryFastSpeedCount uint64
// The most recent continuous low speed, such as 0.5x.
verySlowSpeedCount uint64

// FFmpeg frame logs.
FrameLogs chan string
// Whether FFmpeg is EOF polling.
PollingCtx context.Context
}

func NewFFmpegHeartbeat() *FFmpegHeartbeat {
return &FFmpegHeartbeat{
update: time.Now(),
FrameLogs: make(chan string, 1),
}
}

// Polling the FFmpeg stderr and detect the error.
func (v *FFmpegHeartbeat) Polling(ctx context.Context, stderr io.Reader) {
var pollingCancel context.CancelFunc
v.PollingCtx, pollingCancel = context.WithCancel(ctx)

// Print the extra logs when quit.
go func() {
select {
case <-ctx.Done():
case <-v.PollingCtx.Done():
}
logger.Tf(ctx, "FFmpeg: exit-normally=%v, parsed=%v, failed=%v,<%v>, speed=%v,%v,%v,<%v>, not-change=%v,<%v>, extra logs is %v",
v.exitingNormally, v.parsedCount, v.failedParsedCount, v.lastFailedParsed, v.failedSpeedCount,
v.veryFastSpeedCount, v.verySlowSpeedCount, v.lastFailedSpeed, v.notChangedCount, v.lastNotChanged,
strings.Join(v.extraLogs, " "))
}()

// Monitor FFmpeg update, restart if not update for a while.
go func() {
for {
select {
case <-ctx.Done():
return
case <-v.PollingCtx.Done():
return
case <-time.After(3 * time.Second):
}

if v.update.Add(10 * time.Second).Before(time.Now()) {
logger.Wf(ctx, "FFmpeg: not update for %v, restart it", time.Since(v.update))
pollingCancel()
return
}
}
}()

// Read stderr to update status and output of FFmpeg.
go func() {
defer pollingCancel()

buf := make([]byte, 4096)
for ctx.Err() == nil {
nn, err := stderr.Read(buf)
if err != nil {
if !v.exitingNormally {
logger.Wf(ctx, "FFmpeg: read stderr failed: %v", err)
}
return
}
if nn == 0 {
if !v.exitingNormally {
logger.Wf(ctx, "FFmpeg: EOF of stderr")
}
return
}

// Filter the line of log.
line := strings.TrimSpace(string(buf[:nn]))
for strings.Contains(line, "= ") {
line = strings.ReplaceAll(line, "= ", "=")
}
line = strings.ReplaceAll(line, "\n", " ")
line = strings.ReplaceAll(line, "\r", " ")

// Whether exit normally.
if strings.Contains(line, "Exiting normally") {
v.exitingNormally = true
}

// Handle the extra logs.
if !strings.Contains(line, "size=") && !strings.Contains(line, "time=") {
v.extraLogs = append(v.extraLogs, line)
continue
}

// Scanf the log line, get the time and speed.
timestamp, speed, err := ParseFFmpegCycleLog(line)
if err != nil {
v.failedParsedCount, v.lastFailedParsed = v.failedParsedCount+1, line
continue
}

// The FFmpeg is alive, only if time is changing.
if v.timestamp == timestamp {
v.notChangedCount, v.lastNotChanged = v.notChangedCount+1, line
continue
}

// During live streaming, the speed should typically be approximately 1x. At times, particularly
// during initialization, it might exceed 1x, but the mean rate should remain at 1x.
if speedv, err := strconv.ParseFloat(strings.Trim(speed, "x"), 64); err != nil || speedv <= 0 {
v.failedSpeedCount, v.lastFailedSpeed = v.failedSpeedCount+1, line
} else {
// The number of consecutive instances of high speed, like 1.5x.
if speedv > FFmpegAbnormalFastSpeed {
v.veryFastSpeedCount++
} else {
v.veryFastSpeedCount = 0
}

// The number of continuous instances of low speed, like 0.5x.
if speedv < FFmpegAbnormalSlowSpeed {
v.verySlowSpeedCount++
} else {
v.verySlowSpeedCount = 0
}

// If the speed is very fast or slow, restart FFmpeg.
if mv := RestartFFmpegCountAbnormalSpeed; v.veryFastSpeedCount > mv || v.verySlowSpeedCount > mv {
logger.Wf(ctx, "FFmpeg: abnormal speed=%v, fast=%v, slow=%v, mv=%v, restart it",
speed, v.veryFastSpeedCount, v.verySlowSpeedCount, mv)
pollingCancel()
return
}
}

v.update, v.parsedCount = time.Now(), v.parsedCount+1
v.timestamp, v.speed = timestamp, speed

// Handle the routine heartbeat logs.
select {
case <-ctx.Done():
case v.FrameLogs <- line:
}
}
}()
}

// If the speed of FFmpeg exceed this value, it's abnormal.
const FFmpegAbnormalFastSpeed = 1.5

// If the speed of FFmpeg lower than this value, it's abnormal.
const FFmpegAbnormalSlowSpeed = 0.5

// If the speed of FFmpeg exceed this value for a long time, restart FFmpeg.
const RestartFFmpegCountAbnormalSpeed = uint64(30)

// ParseFFmpegCycleLog parse the FFmpeg cycle log, return the timestamp and speed. The log is mostly like:
// size=18859kB time=00:10:09.38 bitrate=253.5kbits/s speed=1x
// frame=184 fps=9.7 q=28.0 size=364kB time=00:00:19.41 bitrate=153.7kbits/s dup=0 drop=235 speed=1.03x
func ParseFFmpegCycleLog(line string) (timestamp, speed string, err error) {
re := regexp.MustCompile(`time=(\S+)( .*)speed=(\S+)`)
matches := re.FindStringSubmatch(line)
if len(matches) != 4 {
err = errors.Errorf("parse %v failed, matches=%v", line, matches)
return
}
timestamp, speed = matches[1], matches[3]
return
}
Loading

0 comments on commit 6686cac

Please sign in to comment.