From a76bd689d8f237acbd555ee3f44e948aed1e6fb2 Mon Sep 17 00:00:00 2001 From: emranemran Date: Thu, 5 Oct 2023 23:05:00 -0400 Subject: [PATCH 1/3] transcode: add logic to force a session re-init When a sequence of segments has a discontinuity (e.g. encoded differently), the transcoder at T must be re-initialized so that transcode operations can be completed correctly. The T takes some liberties and assumes a sequence of segments are encoded the same way and thus, ends up not re-initializing certain parts of the pipeline (e.g. demuxer, etc) to save time and improve efficiency. This can result in corrupted segments with two video tracks instead of one audio and one video track. For clipping, the first segment (index=0) is encoded differently from the rest of the file. As such, a session reinit must be forced at the T using a header set in the request to B. --- clients/broadcaster.go | 5 +++-- clients/broadcaster_local.go | 8 ++------ pipeline/ffmpeg.go | 1 + transcode/transcode.go | 20 +++++++++++++++++++- transcode/transcode_test.go | 2 +- 5 files changed, 26 insertions(+), 10 deletions(-) diff --git a/clients/broadcaster.go b/clients/broadcaster.go index 09972d5c1..68e001741 100644 --- a/clients/broadcaster.go +++ b/clients/broadcaster.go @@ -34,8 +34,9 @@ type createStreamPayload struct { } type LivepeerTranscodeConfiguration struct { - Profiles []video.EncodedProfile `json:"profiles"` - TimeoutMultiplier int `json:"timeoutMultiplier"` + Profiles []video.EncodedProfile `json:"profiles"` + TimeoutMultiplier int `json:"timeoutMultiplier"` + ForceSessionReinit bool `json:"forceSessionReinit"` } type Credentials struct { diff --git a/clients/broadcaster_local.go b/clients/broadcaster_local.go index e2a0c749b..660252468 100644 --- a/clients/broadcaster_local.go +++ b/clients/broadcaster_local.go @@ -12,7 +12,7 @@ import ( // Currently only implemented by LocalBroadcasterClient // TODO: Try to come up with a unified interface across Local and Remote type BroadcasterClient interface { - TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string) (TranscodeResult, error) + TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string, conf LivepeerTranscodeConfiguration) (TranscodeResult, error) } type LocalBroadcasterClient struct { @@ -29,15 +29,11 @@ func NewLocalBroadcasterClient(broadcasterURL string) (BroadcasterClient, error) }, nil } -func (c *LocalBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string) (TranscodeResult, error) { - conf := LivepeerTranscodeConfiguration{ - TimeoutMultiplier: 10, - } +func (c *LocalBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string, conf LivepeerTranscodeConfiguration) (TranscodeResult, error) { conf.Profiles = append(conf.Profiles, profiles...) transcodeConfig, err := json.Marshal(&conf) if err != nil { return TranscodeResult{}, fmt.Errorf("for local B, profiles json encode failed: %v", err) } - return transcodeSegment(segment, sequenceNumber, durationMillis, c.broadcasterURL, manifestID, profiles, string(transcodeConfig)) } diff --git a/pipeline/ffmpeg.go b/pipeline/ffmpeg.go index 3afa14f36..2293721ac 100644 --- a/pipeline/ffmpeg.go +++ b/pipeline/ffmpeg.go @@ -81,6 +81,7 @@ func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) { RequestID: job.RequestID, ReportProgress: job.ReportProgress, GenerateMP4: job.GenerateMP4, + IsClip: job.ClipStrategy.Enabled, } inputInfo := video.InputVideo{ diff --git a/transcode/transcode.go b/transcode/transcode.go index 7c581edf5..3ad2a01c5 100644 --- a/transcode/transcode.go +++ b/transcode/transcode.go @@ -47,6 +47,7 @@ type TranscodeSegmentRequest struct { RequestID string `json:"-"` ReportProgress func(clients.TranscodeStatus, float64) `json:"-"` GenerateMP4 bool + IsClip bool } func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName string, inputInfo video.InputVideo, broadcaster clients.BroadcasterClient) ([]video.OutputVideo, int, error) { @@ -411,6 +412,23 @@ func transcodeSegment( ) error { start := time.Now() + transcodeConf := clients.LivepeerTranscodeConfiguration{ + TimeoutMultiplier: 10, + } + // If this is a request to transcode a Clip source input, then + // force T to do a re-init of transcoder after segment at idx=0. + // This is required because the segment at idx=0 is a locally + // re-encoded segment and the following segment at idx=1 is a + // source recorded segment. Without a re-init of the transcoder, + // the different encoding between the two segments causes the + // transcode operation to incorrectly tag the output segment as + // having two video tracks. + if transcodeRequest.IsClip && int64(segment.Index) == 1 { + transcodeConf.ForceSessionReinit = true + } else { + transcodeConf.ForceSessionReinit = false + } + var tr clients.TranscodeResult err := backoff.Retry(func() error { ctx, cancel := context.WithTimeout(context.Background(), clients.MaxCopyFileDuration) @@ -434,7 +452,7 @@ func transcodeSegment( return fmt.Errorf("failed to run TranscodeSegmentWithRemoteBroadcaster: %s", err) } } else { - tr, err = broadcaster.TranscodeSegment(rc, int64(segment.Index), transcodeProfiles, segment.Input.DurationMillis, manifestID) + tr, err = broadcaster.TranscodeSegment(rc, int64(segment.Index), transcodeProfiles, segment.Input.DurationMillis, manifestID, transcodeConf) if err != nil { return fmt.Errorf("failed to run TranscodeSegment: %s", err) } diff --git a/transcode/transcode_test.go b/transcode/transcode_test.go index f41903f2f..5874639b0 100644 --- a/transcode/transcode_test.go +++ b/transcode/transcode_test.go @@ -39,7 +39,7 @@ type StubBroadcasterClient struct { tr clients.TranscodeResult } -func (c StubBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string) (clients.TranscodeResult, error) { +func (c StubBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string, conf clients.LivepeerTranscodeConfiguration) (clients.TranscodeResult, error) { return c.tr, nil } From 0f6ece2e01b2bbe1f9993c50d5dbf66ebc3fc66b Mon Sep 17 00:00:00 2001 From: emranemran Date: Fri, 6 Oct 2023 07:45:34 -0400 Subject: [PATCH 2/3] transcode: add workaround to force T session re-init --- transcode/transcode.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/transcode/transcode.go b/transcode/transcode.go index 3ad2a01c5..942fbdbc0 100644 --- a/transcode/transcode.go +++ b/transcode/transcode.go @@ -428,6 +428,12 @@ func transcodeSegment( } else { transcodeConf.ForceSessionReinit = false } + // This is a temporary workaround that implements the same logic + // as the previous if block -- a new manifestID will force a + // T session re-init between segment at index=0 and index=1. + if int64(segment.Index) == 0 { + manifestID = manifestID + "_clip" + } var tr clients.TranscodeResult err := backoff.Retry(func() error { From 6fa1d4202dc961539fb4c65885924a11a84dbcae Mon Sep 17 00:00:00 2001 From: emranemran Date: Fri, 6 Oct 2023 08:45:40 -0400 Subject: [PATCH 3/3] transcode: only apply re-init logic for a clip request --- transcode/transcode.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transcode/transcode.go b/transcode/transcode.go index 942fbdbc0..31274df49 100644 --- a/transcode/transcode.go +++ b/transcode/transcode.go @@ -431,7 +431,7 @@ func transcodeSegment( // This is a temporary workaround that implements the same logic // as the previous if block -- a new manifestID will force a // T session re-init between segment at index=0 and index=1. - if int64(segment.Index) == 0 { + if transcodeRequest.IsClip && int64(segment.Index) == 0 { manifestID = manifestID + "_clip" }