From 56d15cb43e6a6c5ed49a1bde8b51fa9fc90e0fba Mon Sep 17 00:00:00 2001 From: emranemran Date: Thu, 5 Oct 2023 23:05:00 -0400 Subject: [PATCH] transcode: add logic to force a session re-init When a sequence of segments has a discontinuity (e.g. encoded differently), the transcoder at T must be re-initialized so that transcode operations can be completed correctly. The T takes some liberties and assumes a sequence of segments are encoded the same way and thus, ends up not re-initializing certain parts of the pipeline (e.g. demuxer, etc) to save time and improve efficiency. This can result in corrupted segments with two video tracks instead of one audio and one video track. For clipping, the first segment (index=0) is encoded differently from the rest of the file. As such, a session reinit must be forced at the T using a header set in the request to B. --- clients/broadcaster.go | 5 +++-- clients/broadcaster_local.go | 8 ++------ pipeline/ffmpeg.go | 1 + transcode/transcode.go | 20 +++++++++++++++++++- transcode/transcode_test.go | 2 +- 5 files changed, 26 insertions(+), 10 deletions(-) diff --git a/clients/broadcaster.go b/clients/broadcaster.go index 09972d5c..68e00174 100644 --- a/clients/broadcaster.go +++ b/clients/broadcaster.go @@ -34,8 +34,9 @@ type createStreamPayload struct { } type LivepeerTranscodeConfiguration struct { - Profiles []video.EncodedProfile `json:"profiles"` - TimeoutMultiplier int `json:"timeoutMultiplier"` + Profiles []video.EncodedProfile `json:"profiles"` + TimeoutMultiplier int `json:"timeoutMultiplier"` + ForceSessionReinit bool `json:"forceSessionReinit"` } type Credentials struct { diff --git a/clients/broadcaster_local.go b/clients/broadcaster_local.go index e2a0c749..66025246 100644 --- a/clients/broadcaster_local.go +++ b/clients/broadcaster_local.go @@ -12,7 +12,7 @@ import ( // Currently only implemented by LocalBroadcasterClient // TODO: Try to come up with a unified interface across Local and Remote type BroadcasterClient interface { - TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string) (TranscodeResult, error) + TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string, conf LivepeerTranscodeConfiguration) (TranscodeResult, error) } type LocalBroadcasterClient struct { @@ -29,15 +29,11 @@ func NewLocalBroadcasterClient(broadcasterURL string) (BroadcasterClient, error) }, nil } -func (c *LocalBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string) (TranscodeResult, error) { - conf := LivepeerTranscodeConfiguration{ - TimeoutMultiplier: 10, - } +func (c *LocalBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string, conf LivepeerTranscodeConfiguration) (TranscodeResult, error) { conf.Profiles = append(conf.Profiles, profiles...) transcodeConfig, err := json.Marshal(&conf) if err != nil { return TranscodeResult{}, fmt.Errorf("for local B, profiles json encode failed: %v", err) } - return transcodeSegment(segment, sequenceNumber, durationMillis, c.broadcasterURL, manifestID, profiles, string(transcodeConfig)) } diff --git a/pipeline/ffmpeg.go b/pipeline/ffmpeg.go index 3afa14f3..2293721a 100644 --- a/pipeline/ffmpeg.go +++ b/pipeline/ffmpeg.go @@ -81,6 +81,7 @@ func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) { RequestID: job.RequestID, ReportProgress: job.ReportProgress, GenerateMP4: job.GenerateMP4, + IsClip: job.ClipStrategy.Enabled, } inputInfo := video.InputVideo{ diff --git a/transcode/transcode.go b/transcode/transcode.go index 7c581edf..3ad2a01c 100644 --- a/transcode/transcode.go +++ b/transcode/transcode.go @@ -47,6 +47,7 @@ type TranscodeSegmentRequest struct { RequestID string `json:"-"` ReportProgress func(clients.TranscodeStatus, float64) `json:"-"` GenerateMP4 bool + IsClip bool } func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName string, inputInfo video.InputVideo, broadcaster clients.BroadcasterClient) ([]video.OutputVideo, int, error) { @@ -411,6 +412,23 @@ func transcodeSegment( ) error { start := time.Now() + transcodeConf := clients.LivepeerTranscodeConfiguration{ + TimeoutMultiplier: 10, + } + // If this is a request to transcode a Clip source input, then + // force T to do a re-init of transcoder after segment at idx=0. + // This is required because the segment at idx=0 is a locally + // re-encoded segment and the following segment at idx=1 is a + // source recorded segment. Without a re-init of the transcoder, + // the different encoding between the two segments causes the + // transcode operation to incorrectly tag the output segment as + // having two video tracks. + if transcodeRequest.IsClip && int64(segment.Index) == 1 { + transcodeConf.ForceSessionReinit = true + } else { + transcodeConf.ForceSessionReinit = false + } + var tr clients.TranscodeResult err := backoff.Retry(func() error { ctx, cancel := context.WithTimeout(context.Background(), clients.MaxCopyFileDuration) @@ -434,7 +452,7 @@ func transcodeSegment( return fmt.Errorf("failed to run TranscodeSegmentWithRemoteBroadcaster: %s", err) } } else { - tr, err = broadcaster.TranscodeSegment(rc, int64(segment.Index), transcodeProfiles, segment.Input.DurationMillis, manifestID) + tr, err = broadcaster.TranscodeSegment(rc, int64(segment.Index), transcodeProfiles, segment.Input.DurationMillis, manifestID, transcodeConf) if err != nil { return fmt.Errorf("failed to run TranscodeSegment: %s", err) } diff --git a/transcode/transcode_test.go b/transcode/transcode_test.go index f41903f2..96b76666 100644 --- a/transcode/transcode_test.go +++ b/transcode/transcode_test.go @@ -39,7 +39,7 @@ type StubBroadcasterClient struct { tr clients.TranscodeResult } -func (c StubBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string) (clients.TranscodeResult, error) { +func (c StubBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string, conf LivepeerTranscodeConfiguration) (clients.TranscodeResult, error) { return c.tr, nil }