Skip to content

transcode: add logic to force a session re-init #902

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions clients/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ type createStreamPayload struct {
}

type LivepeerTranscodeConfiguration struct {
Profiles []video.EncodedProfile `json:"profiles"`
TimeoutMultiplier int `json:"timeoutMultiplier"`
Profiles []video.EncodedProfile `json:"profiles"`
TimeoutMultiplier int `json:"timeoutMultiplier"`
ForceSessionReinit bool `json:"forceSessionReinit"`
}

type Credentials struct {
Expand Down
8 changes: 2 additions & 6 deletions clients/broadcaster_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// Currently only implemented by LocalBroadcasterClient
// TODO: Try to come up with a unified interface across Local and Remote
type BroadcasterClient interface {
TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string) (TranscodeResult, error)
TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string, conf LivepeerTranscodeConfiguration) (TranscodeResult, error)
}

type LocalBroadcasterClient struct {
Expand All @@ -29,15 +29,11 @@ func NewLocalBroadcasterClient(broadcasterURL string) (BroadcasterClient, error)
}, nil
}

func (c *LocalBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string) (TranscodeResult, error) {
conf := LivepeerTranscodeConfiguration{
TimeoutMultiplier: 10,
}
func (c *LocalBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string, conf LivepeerTranscodeConfiguration) (TranscodeResult, error) {
conf.Profiles = append(conf.Profiles, profiles...)
transcodeConfig, err := json.Marshal(&conf)
if err != nil {
return TranscodeResult{}, fmt.Errorf("for local B, profiles json encode failed: %v", err)
}

return transcodeSegment(segment, sequenceNumber, durationMillis, c.broadcasterURL, manifestID, profiles, string(transcodeConfig))
}
1 change: 1 addition & 0 deletions pipeline/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) {
RequestID: job.RequestID,
ReportProgress: job.ReportProgress,
GenerateMP4: job.GenerateMP4,
IsClip: job.ClipStrategy.Enabled,
}

inputInfo := video.InputVideo{
Expand Down
26 changes: 25 additions & 1 deletion transcode/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type TranscodeSegmentRequest struct {
RequestID string `json:"-"`
ReportProgress func(clients.TranscodeStatus, float64) `json:"-"`
GenerateMP4 bool
IsClip bool
}

func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName string, inputInfo video.InputVideo, broadcaster clients.BroadcasterClient) ([]video.OutputVideo, int, error) {
Expand Down Expand Up @@ -411,6 +412,29 @@ func transcodeSegment(
) error {
start := time.Now()

transcodeConf := clients.LivepeerTranscodeConfiguration{
TimeoutMultiplier: 10,
}
// If this is a request to transcode a Clip source input, then
// force T to do a re-init of transcoder after segment at idx=0.
// This is required because the segment at idx=0 is a locally
// re-encoded segment and the following segment at idx=1 is a
// source recorded segment. Without a re-init of the transcoder,
// the different encoding between the two segments causes the
// transcode operation to incorrectly tag the output segment as
// having two video tracks.
if transcodeRequest.IsClip && int64(segment.Index) == 1 {
transcodeConf.ForceSessionReinit = true
} else {
transcodeConf.ForceSessionReinit = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think you can skip this else since false is the default value of bool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just transcodeConf.ForceSessionReinit = transcodeRequest.IsClip && int64(segment.Index) == 1

Copy link
Contributor

@mjh1 mjh1 Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need a re-init for the final segment too (when that's also clipped)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need a re-init for the final segment too (when that's also clipped)?

That's a good point. I guess that yes!

Copy link
Collaborator Author

@emranemran emranemran Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need a re-init for the final segment too (when that's also clipped)?

No actually, surprisingly. For some reason, the transcoder session is able to handle going from source recording segment to final segment that's also clipped (i.e re-encoded differently). Need to understand lpms better to understand why that is. Planning to follow up with clipping directly at T layer and hopefully understand this better.

}
// This is a temporary workaround that implements the same logic
// as the previous if block -- a new manifestID will force a
// T session re-init between segment at index=0 and index=1.
if transcodeRequest.IsClip && int64(segment.Index) == 0 {
manifestID = manifestID + "_clip"
}

var tr clients.TranscodeResult
err := backoff.Retry(func() error {
ctx, cancel := context.WithTimeout(context.Background(), clients.MaxCopyFileDuration)
Expand All @@ -434,7 +458,7 @@ func transcodeSegment(
return fmt.Errorf("failed to run TranscodeSegmentWithRemoteBroadcaster: %s", err)
}
} else {
tr, err = broadcaster.TranscodeSegment(rc, int64(segment.Index), transcodeProfiles, segment.Input.DurationMillis, manifestID)
tr, err = broadcaster.TranscodeSegment(rc, int64(segment.Index), transcodeProfiles, segment.Input.DurationMillis, manifestID, transcodeConf)
if err != nil {
return fmt.Errorf("failed to run TranscodeSegment: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion transcode/transcode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type StubBroadcasterClient struct {
tr clients.TranscodeResult
}

func (c StubBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string) (clients.TranscodeResult, error) {
func (c StubBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string, conf clients.LivepeerTranscodeConfiguration) (clients.TranscodeResult, error) {
return c.tr, nil
}

Expand Down