Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transcode: add logic to force a session re-init #902

Merged
merged 3 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions clients/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ type createStreamPayload struct {
}

type LivepeerTranscodeConfiguration struct {
Profiles []video.EncodedProfile `json:"profiles"`
TimeoutMultiplier int `json:"timeoutMultiplier"`
Profiles []video.EncodedProfile `json:"profiles"`
TimeoutMultiplier int `json:"timeoutMultiplier"`
ForceSessionReinit bool `json:"forceSessionReinit"`
}

type Credentials struct {
Expand Down
8 changes: 2 additions & 6 deletions clients/broadcaster_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// Currently only implemented by LocalBroadcasterClient
// TODO: Try to come up with a unified interface across Local and Remote
type BroadcasterClient interface {
TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string) (TranscodeResult, error)
TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string, conf LivepeerTranscodeConfiguration) (TranscodeResult, error)
}

type LocalBroadcasterClient struct {
Expand All @@ -29,15 +29,11 @@
}, nil
}

func (c *LocalBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string) (TranscodeResult, error) {
conf := LivepeerTranscodeConfiguration{
TimeoutMultiplier: 10,
}
func (c *LocalBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string, conf LivepeerTranscodeConfiguration) (TranscodeResult, error) {

Check warning on line 32 in clients/broadcaster_local.go

View check run for this annotation

Codecov / codecov/patch

clients/broadcaster_local.go#L32

Added line #L32 was not covered by tests
conf.Profiles = append(conf.Profiles, profiles...)
transcodeConfig, err := json.Marshal(&conf)
if err != nil {
return TranscodeResult{}, fmt.Errorf("for local B, profiles json encode failed: %v", err)
}

return transcodeSegment(segment, sequenceNumber, durationMillis, c.broadcasterURL, manifestID, profiles, string(transcodeConfig))
}
1 change: 1 addition & 0 deletions pipeline/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
RequestID: job.RequestID,
ReportProgress: job.ReportProgress,
GenerateMP4: job.GenerateMP4,
IsClip: job.ClipStrategy.Enabled,

Check warning on line 84 in pipeline/ffmpeg.go

View check run for this annotation

Codecov / codecov/patch

pipeline/ffmpeg.go#L84

Added line #L84 was not covered by tests
}

inputInfo := video.InputVideo{
Expand Down
26 changes: 25 additions & 1 deletion transcode/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
RequestID string `json:"-"`
ReportProgress func(clients.TranscodeStatus, float64) `json:"-"`
GenerateMP4 bool
IsClip bool
}

func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName string, inputInfo video.InputVideo, broadcaster clients.BroadcasterClient) ([]video.OutputVideo, int, error) {
Expand Down Expand Up @@ -411,6 +412,29 @@
) error {
start := time.Now()

transcodeConf := clients.LivepeerTranscodeConfiguration{
TimeoutMultiplier: 10,
}
// If this is a request to transcode a Clip source input, then
// force T to do a re-init of transcoder after segment at idx=0.
// This is required because the segment at idx=0 is a locally
// re-encoded segment and the following segment at idx=1 is a
// source recorded segment. Without a re-init of the transcoder,
// the different encoding between the two segments causes the
// transcode operation to incorrectly tag the output segment as
// having two video tracks.
if transcodeRequest.IsClip && int64(segment.Index) == 1 {
transcodeConf.ForceSessionReinit = true

Check warning on line 427 in transcode/transcode.go

View check run for this annotation

Codecov / codecov/patch

transcode/transcode.go#L427

Added line #L427 was not covered by tests
} else {
transcodeConf.ForceSessionReinit = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think you can skip this else since false is the default value of bool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just transcodeConf.ForceSessionReinit = transcodeRequest.IsClip && int64(segment.Index) == 1

Copy link
Contributor

@mjh1 mjh1 Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need a re-init for the final segment too (when that's also clipped)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need a re-init for the final segment too (when that's also clipped)?

That's a good point. I guess that yes!

Copy link
Collaborator Author

@emranemran emranemran Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need a re-init for the final segment too (when that's also clipped)?

No actually, surprisingly. For some reason, the transcoder session is able to handle going from source recording segment to final segment that's also clipped (i.e re-encoded differently). Need to understand lpms better to understand why that is. Planning to follow up with clipping directly at T layer and hopefully understand this better.

}
// This is a temporary workaround that implements the same logic
// as the previous if block -- a new manifestID will force a
// T session re-init between segment at index=0 and index=1.
if transcodeRequest.IsClip && int64(segment.Index) == 0 {
manifestID = manifestID + "_clip"
}

Check warning on line 436 in transcode/transcode.go

View check run for this annotation

Codecov / codecov/patch

transcode/transcode.go#L435-L436

Added lines #L435 - L436 were not covered by tests

var tr clients.TranscodeResult
err := backoff.Retry(func() error {
ctx, cancel := context.WithTimeout(context.Background(), clients.MaxCopyFileDuration)
Expand All @@ -434,7 +458,7 @@
return fmt.Errorf("failed to run TranscodeSegmentWithRemoteBroadcaster: %s", err)
}
} else {
tr, err = broadcaster.TranscodeSegment(rc, int64(segment.Index), transcodeProfiles, segment.Input.DurationMillis, manifestID)
tr, err = broadcaster.TranscodeSegment(rc, int64(segment.Index), transcodeProfiles, segment.Input.DurationMillis, manifestID, transcodeConf)
if err != nil {
return fmt.Errorf("failed to run TranscodeSegment: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion transcode/transcode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type StubBroadcasterClient struct {
tr clients.TranscodeResult
}

func (c StubBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string) (clients.TranscodeResult, error) {
func (c StubBroadcasterClient) TranscodeSegment(segment io.Reader, sequenceNumber int64, profiles []video.EncodedProfile, durationMillis int64, manifestID string, conf clients.LivepeerTranscodeConfiguration) (clients.TranscodeResult, error) {
return c.tr, nil
}

Expand Down
Loading