Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Background catalyst pipeline for hevc #1363

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 44 additions & 9 deletions pipeline/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (
// Execute the FFMPEG pipeline first and fallback to the external transcoding
// provider on errors.
StrategyFallbackExternal Strategy = "fallback_external"
// Execute the external transcoder pipeline in foreground and FFMPEG / Livepeer in background.
StrategyBackgroundFFMpeg Strategy = "background_ffmpeg"
// Only mp4s of maxMP4OutDuration will have MP4s generated for each rendition
maxMP4OutDuration = 2 * time.Minute
maxRecordingMP4Duration = 12 * time.Hour
Expand All @@ -48,7 +50,7 @@ const (

func (s Strategy) IsValid() bool {
switch s {
case StrategyExternalDominance, StrategyCatalystFfmpegDominance, StrategyFallbackExternal:
case StrategyExternalDominance, StrategyCatalystFfmpegDominance, StrategyFallbackExternal, StrategyBackgroundFFMpeg:
return true
default:
return false
Expand Down Expand Up @@ -417,17 +419,20 @@ func (c *Coordinator) startUploadJob(p *JobInfo) {

switch strategy {
case StrategyExternalDominance:
c.startOneUploadJob(p, c.pipeExternal, false)
c.startOneUploadJob(p, c.pipeExternal, true, false)
case StrategyCatalystFfmpegDominance:
c.startOneUploadJob(p, c.pipeFfmpeg, false)
c.startOneUploadJob(p, c.pipeFfmpeg, true, false)
case StrategyBackgroundFFMpeg:
c.startOneUploadJob(p, c.pipeExternal, true, false)
c.startOneUploadJob(p, c.pipeFfmpeg, false, false)
case StrategyFallbackExternal:
// nolint:errcheck
go recovered(func() (t bool, e error) {
success := <-c.startOneUploadJob(p, c.pipeFfmpeg, true)
success := <-c.startOneUploadJob(p, c.pipeFfmpeg, true, true)
if !success {
p.inFallbackMode = true
log.Log(p.RequestID, "Entering fallback pipeline")
c.startOneUploadJob(p, c.pipeExternal, false)
c.startOneUploadJob(p, c.pipeExternal, true, false)
}
return
})
Expand All @@ -445,9 +450,13 @@ func checkLivepeerCompatible(requestID string, strategy Strategy, iv video.Input
for _, track := range iv.Tracks {
// If the video codec is not compatible then override to external pipeline to avoid sending to Livepeer
// We always covert the audio to AAC before sending for transcoding, so don't need to check this
if track.Type == video.TrackTypeVideo && strings.ToLower(track.Codec) != "h264" {
log.Log(requestID, "codec not supported by Livepeer pipeline", "trackType", track.Type, "codec", track.Codec)
return livepeerNotSupported(strategy)
if track.Type == video.TrackTypeVideo {
if strings.ToLower(track.Codec) == "hevc" && strategy != StrategyCatalystFfmpegDominance {
strategy = StrategyBackgroundFFMpeg
} else if strings.ToLower(track.Codec) != "h264" {
log.Log(requestID, "codec not supported by Livepeer pipeline", "trackType", track.Type, "codec", track.Codec)
return livepeerNotSupported(strategy)
}
}
if track.Type == video.TrackTypeVideo && track.Rotation != 0 {
log.Log(requestID, "video rotation not supported by Livepeer pipeline", "rotation", track.Rotation)
Expand Down Expand Up @@ -507,7 +516,33 @@ func checkDisplayAspectRatio(track video.InputTrack, requestID string) bool {
// The `hasFallback` argument means the caller has a special logic to handle
// failures (today this means re-running the job in another pipeline). If it's
// set to true, error callbacks from this job will not be sent.
func (c *Coordinator) startOneUploadJob(si *JobInfo, handler Handler, hasFallback bool) <-chan bool {
func (c *Coordinator) startOneUploadJob(si *JobInfo, handler Handler, foreground, hasFallback bool) <-chan bool {
if !foreground {
// TODO only do if output location is livepeer, i.e. don't do for transcode api jobs as we'd be writing unexpected files to customer storage

// replace si with a copy of itself to avoid interfering with the foreground job
requestID := fmt.Sprintf("bg_%s", si.RequestID)
si = &JobInfo{
UploadJobPayload: si.UploadJobPayload,
StreamName: config.SegmentingStreamName(requestID),
statusClient: c.statusClient,

numProfiles: len(si.UploadJobPayload.Profiles),
targetSegmentSizeSecs: si.UploadJobPayload.TargetSegmentSizeSecs,
catalystRegion: os.Getenv("MY_REGION"),

DownloadDone: time.Now(),
}
si.RequestID = requestID

if si.HlsTargetURL != nil {
si.HlsTargetURL = si.HlsTargetURL.JoinPath("..", handler.Name(), path.Base(si.HlsTargetURL.Path))
}
// TODO set other target URLs to nil for now

// this will prevent the callbacks for this job from actually being sent
si.CallbackURL = ""
}
log.AddContext(si.RequestID, "handler", handler.Name())

var pipeline = handler.Name()
Expand Down
4 changes: 2 additions & 2 deletions pipeline/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,8 +678,8 @@ func Test_checkMistCompatible(t *testing.T) {
strategy: StrategyFallbackExternal,
iv: inCompatibleVideo,
},
want: StrategyExternalDominance,
wantSupported: false,
want: StrategyBackgroundFFMpeg,
wantSupported: true,
},
{
name: "compatible with ffmpeg (non AAC audio) - StrategyFallbackExternal",
Expand Down
1 change: 1 addition & 0 deletions pipeline/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ var sourcePlaybackBucketBlocklist = []string{"lp-us-catalyst-vod-pvt-monster", "
const maxBitrateSourcePb = 6_000_000

func (f *ffmpeg) sendSourcePlayback(job *JobInfo) {
// TODO no source playback for hevc / background livepeer pipeline
for _, track := range job.InputFileInfo.Tracks {
if track.Bitrate > maxBitrateSourcePb {
log.Log(job.RequestID, "source playback not available, bitrate too high", "bitrate", track.Bitrate)
Expand Down
Loading