Skip to content

Commit

Permalink
transcode: add logic to force a session re-init
Browse files Browse the repository at this point in the history
When a sequence of segments has a discontinuity (e.g. encoded
differently), the transcoder at T must be re-initialized so that
transcode operations can be completed correctly. The T takes some
liberties and assumes a sequence of segments are encoded the same way
and thus, ends up not re-initializing certain parts of the pipeline
(e.g. demuxer, etc) to save time and improve efficiency. This can result
in corrupted segments with two video tracks instead of one audio and one
video track.

For clipping, the first segment (index=0) is encoded differently from
the rest of the file. As such, a session reinit must be forced at the T
using a header set in the request to B.
  • Loading branch information
emranemran committed Oct 6, 2023
1 parent 2eeb00c commit 56d15cb
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 10 deletions.
5 changes: 3 additions & 2 deletions clients/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ type createStreamPayload struct {
}

type LivepeerTranscodeConfiguration struct {
Profiles []video.EncodedProfile `json:"profiles"`
TimeoutMultiplier int `json:"timeoutMultiplier"`
Profiles []video.EncodedProfile `json:"profiles"`
TimeoutMultiplier int `json:"timeoutMultiplier"`
ForceSessionReinit bool `json:"forceSessionReinit"`
}

type Credentials struct {
Expand Down
8 changes: 2 additions & 6 deletions clients/broadcaster_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// Currently only implemented by LocalBroadcasterClient
// TODO: Try to come up with a unified interface across Local and Remote
type BroadcasterClient interface {
TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string) (TranscodeResult, error)
TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string, conf LivepeerTranscodeConfiguration) (TranscodeResult, error)
}

type LocalBroadcasterClient struct {
Expand All @@ -29,15 +29,11 @@ func NewLocalBroadcasterClient(broadcasterURL string) (BroadcasterClient, error)
}, nil
}

func (c *LocalBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string) (TranscodeResult, error) {
conf := LivepeerTranscodeConfiguration{
TimeoutMultiplier: 10,
}
func (c *LocalBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string, conf LivepeerTranscodeConfiguration) (TranscodeResult, error) {
conf.Profiles = append(conf.Profiles, profiles...)
transcodeConfig, err := json.Marshal(&conf)
if err != nil {
return TranscodeResult{}, fmt.Errorf("for local B, profiles json encode failed: %v", err)
}

return transcodeSegment(segment, sequenceNumber, durationMillis, c.broadcasterURL, manifestID, profiles, string(transcodeConfig))
}
1 change: 1 addition & 0 deletions pipeline/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) {
RequestID: job.RequestID,
ReportProgress: job.ReportProgress,
GenerateMP4: job.GenerateMP4,
IsClip: job.ClipStrategy.Enabled,
}

inputInfo := video.InputVideo{
Expand Down
20 changes: 19 additions & 1 deletion transcode/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type TranscodeSegmentRequest struct {
RequestID string `json:"-"`
ReportProgress func(clients.TranscodeStatus, float64) `json:"-"`
GenerateMP4 bool
IsClip bool
}

func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName string, inputInfo video.InputVideo, broadcaster clients.BroadcasterClient) ([]video.OutputVideo, int, error) {
Expand Down Expand Up @@ -411,6 +412,23 @@ func transcodeSegment(
) error {
start := time.Now()

transcodeConf := clients.LivepeerTranscodeConfiguration{
TimeoutMultiplier: 10,
}
// If this is a request to transcode a Clip source input, then
// force T to do a re-init of transcoder after segment at idx=0.
// This is required because the segment at idx=0 is a locally
// re-encoded segment and the following segment at idx=1 is a
// source recorded segment. Without a re-init of the transcoder,
// the different encoding between the two segments causes the
// transcode operation to incorrectly tag the output segment as
// having two video tracks.
if transcodeRequest.IsClip && int64(segment.Index) == 1 {
transcodeConf.ForceSessionReinit = true
} else {
transcodeConf.ForceSessionReinit = false
}

var tr clients.TranscodeResult
err := backoff.Retry(func() error {
ctx, cancel := context.WithTimeout(context.Background(), clients.MaxCopyFileDuration)
Expand All @@ -434,7 +452,7 @@ func transcodeSegment(
return fmt.Errorf("failed to run TranscodeSegmentWithRemoteBroadcaster: %s", err)
}
} else {
tr, err = broadcaster.TranscodeSegment(rc, int64(segment.Index), transcodeProfiles, segment.Input.DurationMillis, manifestID)
tr, err = broadcaster.TranscodeSegment(rc, int64(segment.Index), transcodeProfiles, segment.Input.DurationMillis, manifestID, transcodeConf)
if err != nil {
return fmt.Errorf("failed to run TranscodeSegment: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion transcode/transcode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type StubBroadcasterClient struct {
tr clients.TranscodeResult
}

func (c StubBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string) (clients.TranscodeResult, error) {
func (c StubBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string, conf LivepeerTranscodeConfiguration) (clients.TranscodeResult, error) {

Check failure on line 42 in transcode/transcode_test.go

View workflow job for this annotation

GitHub Actions / Test the catalyst-api project

undefined: LivepeerTranscodeConfiguration

Check failure on line 42 in transcode/transcode_test.go

View workflow job for this annotation

GitHub Actions / lint

undefined: LivepeerTranscodeConfiguration
return c.tr, nil
}

Expand Down

0 comments on commit 56d15cb

Please sign in to comment.