From 096601b4580516e6147cb6bdbde3fa7f237ef5f4 Mon Sep 17 00:00:00 2001 From: Max Holland Date: Fri, 9 Aug 2024 14:38:05 +0100 Subject: [PATCH 1/2] Background ffmpeg pipeline --- pipeline/coordinator.go | 35 ++++++++++++++++++++++++++--------- pipeline/ffmpeg.go | 1 + 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/pipeline/coordinator.go b/pipeline/coordinator.go index 3f625732..b057f658 100644 --- a/pipeline/coordinator.go +++ b/pipeline/coordinator.go @@ -40,6 +40,8 @@ const ( // Execute the FFMPEG pipeline first and fallback to the external transcoding // provider on errors. StrategyFallbackExternal Strategy = "fallback_external" + // Execute the external transcoder pipeline in foreground and FFMPEG / Livepeer in background. + StrategyBackgroundFFMpeg Strategy = "background_ffmpeg" // Only mp4s of maxMP4OutDuration will have MP4s generated for each rendition maxMP4OutDuration = 2 * time.Minute maxRecordingMP4Duration = 12 * time.Hour @@ -48,7 +50,7 @@ const ( func (s Strategy) IsValid() bool { switch s { - case StrategyExternalDominance, StrategyCatalystFfmpegDominance, StrategyFallbackExternal: + case StrategyExternalDominance, StrategyCatalystFfmpegDominance, StrategyFallbackExternal, StrategyBackgroundFFMpeg: return true default: return false @@ -417,17 +419,20 @@ func (c *Coordinator) startUploadJob(p *JobInfo) { switch strategy { case StrategyExternalDominance: - c.startOneUploadJob(p, c.pipeExternal, false) + c.startOneUploadJob(p, c.pipeExternal, true, false) case StrategyCatalystFfmpegDominance: - c.startOneUploadJob(p, c.pipeFfmpeg, false) + c.startOneUploadJob(p, c.pipeFfmpeg, true, false) + case StrategyBackgroundFFMpeg: + c.startOneUploadJob(p, c.pipeExternal, true, false) + c.startOneUploadJob(p, c.pipeFfmpeg, false, false) case StrategyFallbackExternal: // nolint:errcheck go recovered(func() (t bool, e error) { - success := <-c.startOneUploadJob(p, c.pipeFfmpeg, true) + success := <-c.startOneUploadJob(p, c.pipeFfmpeg, true, true) if !success { p.inFallbackMode = true log.Log(p.RequestID, "Entering fallback pipeline") - c.startOneUploadJob(p, c.pipeExternal, false) + c.startOneUploadJob(p, c.pipeExternal, true, false) } return }) @@ -445,9 +450,13 @@ func checkLivepeerCompatible(requestID string, strategy Strategy, iv video.Input for _, track := range iv.Tracks { // If the video codec is not compatible then override to external pipeline to avoid sending to Livepeer // We always covert the audio to AAC before sending for transcoding, so don't need to check this - if track.Type == video.TrackTypeVideo && strings.ToLower(track.Codec) != "h264" { - log.Log(requestID, "codec not supported by Livepeer pipeline", "trackType", track.Type, "codec", track.Codec) - return livepeerNotSupported(strategy) + if track.Type == video.TrackTypeVideo { + if strings.ToLower(track.Codec) == "hevc" { + strategy = StrategyBackgroundFFMpeg + } else if strings.ToLower(track.Codec) != "h264" { + log.Log(requestID, "codec not supported by Livepeer pipeline", "trackType", track.Type, "codec", track.Codec) + return livepeerNotSupported(strategy) + } } if track.Type == video.TrackTypeVideo && track.Rotation != 0 { log.Log(requestID, "video rotation not supported by Livepeer pipeline", "rotation", track.Rotation) @@ -507,7 +516,15 @@ func checkDisplayAspectRatio(track video.InputTrack, requestID string) bool { // The `hasFallback` argument means the caller has a special logic to handle // failures (today this means re-running the job in another pipeline). If it's // set to true, error callbacks from this job will not be sent. -func (c *Coordinator) startOneUploadJob(si *JobInfo, handler Handler, hasFallback bool) <-chan bool { +func (c *Coordinator) startOneUploadJob(si *JobInfo, handler Handler, foreground, hasFallback bool) <-chan bool { + if !foreground { + si.RequestID = fmt.Sprintf("bg_%s", si.RequestID) + if si.HlsTargetURL != nil { + si.HlsTargetURL = si.HlsTargetURL.JoinPath("..", handler.Name(), path.Base(si.HlsTargetURL.Path)) + } + // this will prevent the callbacks for this job from actually being sent + si.CallbackURL = "" + } log.AddContext(si.RequestID, "handler", handler.Name()) var pipeline = handler.Name() diff --git a/pipeline/ffmpeg.go b/pipeline/ffmpeg.go index a0b8bf64..b7130e61 100644 --- a/pipeline/ffmpeg.go +++ b/pipeline/ffmpeg.go @@ -194,6 +194,7 @@ var sourcePlaybackBucketBlocklist = []string{"lp-us-catalyst-vod-pvt-monster", " const maxBitrateSourcePb = 6_000_000 func (f *ffmpeg) sendSourcePlayback(job *JobInfo) { + // TODO no source playback for hevc / background livepeer pipeline for _, track := range job.InputFileInfo.Tracks { if track.Bitrate > maxBitrateSourcePb { log.Log(job.RequestID, "source playback not available, bitrate too high", "bitrate", track.Bitrate) From 0e0efd498bd8ff18b4351521b6e97624a9eae284 Mon Sep 17 00:00:00 2001 From: Max Holland Date: Wed, 21 Aug 2024 14:43:14 +0100 Subject: [PATCH 2/2] Make copy of JobInfo struct to avoid interfering with foreground job --- pipeline/coordinator.go | 22 ++++++++++++++++++++-- pipeline/coordinator_test.go | 4 ++-- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/pipeline/coordinator.go b/pipeline/coordinator.go index 3c8c3ab8..ffa0504b 100644 --- a/pipeline/coordinator.go +++ b/pipeline/coordinator.go @@ -451,7 +451,7 @@ func checkLivepeerCompatible(requestID string, strategy Strategy, iv video.Input // If the video codec is not compatible then override to external pipeline to avoid sending to Livepeer // We always covert the audio to AAC before sending for transcoding, so don't need to check this if track.Type == video.TrackTypeVideo { - if strings.ToLower(track.Codec) == "hevc" { + if strings.ToLower(track.Codec) == "hevc" && strategy != StrategyCatalystFfmpegDominance { strategy = StrategyBackgroundFFMpeg } else if strings.ToLower(track.Codec) != "h264" { log.Log(requestID, "codec not supported by Livepeer pipeline", "trackType", track.Type, "codec", track.Codec) @@ -518,10 +518,28 @@ func checkDisplayAspectRatio(track video.InputTrack, requestID string) bool { // set to true, error callbacks from this job will not be sent. func (c *Coordinator) startOneUploadJob(si *JobInfo, handler Handler, foreground, hasFallback bool) <-chan bool { if !foreground { - si.RequestID = fmt.Sprintf("bg_%s", si.RequestID) + // TODO only do if output location is livepeer, i.e. don't do for transcode api jobs as we'd be writing unexpected files to customer storage + + // replace si with a copy of itself to avoid interfering with the foreground job + requestID := fmt.Sprintf("bg_%s", si.RequestID) + si = &JobInfo{ + UploadJobPayload: si.UploadJobPayload, + StreamName: config.SegmentingStreamName(requestID), + statusClient: c.statusClient, + + numProfiles: len(si.UploadJobPayload.Profiles), + targetSegmentSizeSecs: si.UploadJobPayload.TargetSegmentSizeSecs, + catalystRegion: os.Getenv("MY_REGION"), + + DownloadDone: time.Now(), + } + si.RequestID = requestID + if si.HlsTargetURL != nil { si.HlsTargetURL = si.HlsTargetURL.JoinPath("..", handler.Name(), path.Base(si.HlsTargetURL.Path)) } + // TODO set other target URLs to nil for now + // this will prevent the callbacks for this job from actually being sent si.CallbackURL = "" } diff --git a/pipeline/coordinator_test.go b/pipeline/coordinator_test.go index afac5d11..7b7d79ca 100644 --- a/pipeline/coordinator_test.go +++ b/pipeline/coordinator_test.go @@ -678,8 +678,8 @@ func Test_checkMistCompatible(t *testing.T) { strategy: StrategyFallbackExternal, iv: inCompatibleVideo, }, - want: StrategyExternalDominance, - wantSupported: false, + want: StrategyBackgroundFFMpeg, + wantSupported: true, }, { name: "compatible with ffmpeg (non AAC audio) - StrategyFallbackExternal",