diff --git a/pipeline/coordinator.go b/pipeline/coordinator.go index 12bb9932..ffa0504b 100644 --- a/pipeline/coordinator.go +++ b/pipeline/coordinator.go @@ -40,6 +40,8 @@ const ( // Execute the FFMPEG pipeline first and fallback to the external transcoding // provider on errors. StrategyFallbackExternal Strategy = "fallback_external" + // Execute the external transcoder pipeline in foreground and FFMPEG / Livepeer in background. + StrategyBackgroundFFMpeg Strategy = "background_ffmpeg" // Only mp4s of maxMP4OutDuration will have MP4s generated for each rendition maxMP4OutDuration = 2 * time.Minute maxRecordingMP4Duration = 12 * time.Hour @@ -48,7 +50,7 @@ const ( func (s Strategy) IsValid() bool { switch s { - case StrategyExternalDominance, StrategyCatalystFfmpegDominance, StrategyFallbackExternal: + case StrategyExternalDominance, StrategyCatalystFfmpegDominance, StrategyFallbackExternal, StrategyBackgroundFFMpeg: return true default: return false @@ -417,17 +419,20 @@ func (c *Coordinator) startUploadJob(p *JobInfo) { switch strategy { case StrategyExternalDominance: - c.startOneUploadJob(p, c.pipeExternal, false) + c.startOneUploadJob(p, c.pipeExternal, true, false) case StrategyCatalystFfmpegDominance: - c.startOneUploadJob(p, c.pipeFfmpeg, false) + c.startOneUploadJob(p, c.pipeFfmpeg, true, false) + case StrategyBackgroundFFMpeg: + c.startOneUploadJob(p, c.pipeExternal, true, false) + c.startOneUploadJob(p, c.pipeFfmpeg, false, false) case StrategyFallbackExternal: // nolint:errcheck go recovered(func() (t bool, e error) { - success := <-c.startOneUploadJob(p, c.pipeFfmpeg, true) + success := <-c.startOneUploadJob(p, c.pipeFfmpeg, true, true) if !success { p.inFallbackMode = true log.Log(p.RequestID, "Entering fallback pipeline") - c.startOneUploadJob(p, c.pipeExternal, false) + c.startOneUploadJob(p, c.pipeExternal, true, false) } return }) @@ -445,9 +450,13 @@ func checkLivepeerCompatible(requestID string, strategy Strategy, iv video.Input for _, track := range iv.Tracks { // If the video codec is not compatible then override to external pipeline to avoid sending to Livepeer // We always covert the audio to AAC before sending for transcoding, so don't need to check this - if track.Type == video.TrackTypeVideo && strings.ToLower(track.Codec) != "h264" { - log.Log(requestID, "codec not supported by Livepeer pipeline", "trackType", track.Type, "codec", track.Codec) - return livepeerNotSupported(strategy) + if track.Type == video.TrackTypeVideo { + if strings.ToLower(track.Codec) == "hevc" && strategy != StrategyCatalystFfmpegDominance { + strategy = StrategyBackgroundFFMpeg + } else if strings.ToLower(track.Codec) != "h264" { + log.Log(requestID, "codec not supported by Livepeer pipeline", "trackType", track.Type, "codec", track.Codec) + return livepeerNotSupported(strategy) + } } if track.Type == video.TrackTypeVideo && track.Rotation != 0 { log.Log(requestID, "video rotation not supported by Livepeer pipeline", "rotation", track.Rotation) @@ -507,7 +516,33 @@ func checkDisplayAspectRatio(track video.InputTrack, requestID string) bool { // The `hasFallback` argument means the caller has a special logic to handle // failures (today this means re-running the job in another pipeline). If it's // set to true, error callbacks from this job will not be sent. -func (c *Coordinator) startOneUploadJob(si *JobInfo, handler Handler, hasFallback bool) <-chan bool { +func (c *Coordinator) startOneUploadJob(si *JobInfo, handler Handler, foreground, hasFallback bool) <-chan bool { + if !foreground { + // TODO only do if output location is livepeer, i.e. don't do for transcode api jobs as we'd be writing unexpected files to customer storage + + // replace si with a copy of itself to avoid interfering with the foreground job + requestID := fmt.Sprintf("bg_%s", si.RequestID) + si = &JobInfo{ + UploadJobPayload: si.UploadJobPayload, + StreamName: config.SegmentingStreamName(requestID), + statusClient: c.statusClient, + + numProfiles: len(si.UploadJobPayload.Profiles), + targetSegmentSizeSecs: si.UploadJobPayload.TargetSegmentSizeSecs, + catalystRegion: os.Getenv("MY_REGION"), + + DownloadDone: time.Now(), + } + si.RequestID = requestID + + if si.HlsTargetURL != nil { + si.HlsTargetURL = si.HlsTargetURL.JoinPath("..", handler.Name(), path.Base(si.HlsTargetURL.Path)) + } + // TODO set other target URLs to nil for now + + // this will prevent the callbacks for this job from actually being sent + si.CallbackURL = "" + } log.AddContext(si.RequestID, "handler", handler.Name()) var pipeline = handler.Name() diff --git a/pipeline/coordinator_test.go b/pipeline/coordinator_test.go index afac5d11..7b7d79ca 100644 --- a/pipeline/coordinator_test.go +++ b/pipeline/coordinator_test.go @@ -678,8 +678,8 @@ func Test_checkMistCompatible(t *testing.T) { strategy: StrategyFallbackExternal, iv: inCompatibleVideo, }, - want: StrategyExternalDominance, - wantSupported: false, + want: StrategyBackgroundFFMpeg, + wantSupported: true, }, { name: "compatible with ffmpeg (non AAC audio) - StrategyFallbackExternal", diff --git a/pipeline/ffmpeg.go b/pipeline/ffmpeg.go index 56952b0a..de965a2a 100644 --- a/pipeline/ffmpeg.go +++ b/pipeline/ffmpeg.go @@ -194,6 +194,7 @@ var sourcePlaybackBucketBlocklist = []string{"lp-us-catalyst-vod-pvt-monster", " const maxBitrateSourcePb = 6_000_000 func (f *ffmpeg) sendSourcePlayback(job *JobInfo) { + // TODO no source playback for hevc / background livepeer pipeline for _, track := range job.InputFileInfo.Tracks { if track.Bitrate > maxBitrateSourcePb { log.Log(job.RequestID, "source playback not available, bitrate too high", "bitrate", track.Bitrate)