From b8a0fed3cc4992cfe007b7f4cee7b759bf553c05 Mon Sep 17 00:00:00 2001 From: emranemran Date: Mon, 9 Oct 2023 23:11:29 -0700 Subject: [PATCH] transcode: apply T re-init logic for last clipped segment --- transcode/transcode.go | 9 +++++---- transcode/transcode_jobs.go | 11 ++++++++--- transcode/transcode_test.go | 27 +++++++++++++++++++++++++++ 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/transcode/transcode.go b/transcode/transcode.go index 2cce63afb..80d7f12e5 100644 --- a/transcode/transcode.go +++ b/transcode/transcode.go @@ -423,7 +423,7 @@ func transcodeSegment( // the different encoding between the two segments causes the // transcode operation to incorrectly tag the output segment as // having two video tracks. - if transcodeRequest.IsClip && int64(segment.Index) == 1 { + if transcodeRequest.IsClip && (int64(segment.Index) == 0 || segment.IsLastSegment) { transcodeConf.ForceSessionReinit = true } else { transcodeConf.ForceSessionReinit = false @@ -431,7 +431,7 @@ func transcodeSegment( // This is a temporary workaround that implements the same logic // as the previous if block -- a new manifestID will force a // T session re-init between segment at index=0 and index=1. - if transcodeRequest.IsClip && int64(segment.Index) == 0 { + if transcodeRequest.IsClip && (int64(segment.Index) == 0 || segment.IsLastSegment) { manifestID = manifestID + "_clip" } @@ -538,8 +538,9 @@ func channelFromWaitgroup(wg *sync.WaitGroup) chan bool { } type segmentInfo struct { - Input clients.SourceSegment - Index int + Input clients.SourceSegment + Index int + IsLastSegment bool } func statsFromProfiles(profiles []video.EncodedProfile) []*video.RenditionStats { diff --git a/transcode/transcode_jobs.go b/transcode/transcode_jobs.go index 0f67976de..e63da20b3 100644 --- a/transcode/transcode_jobs.go +++ b/transcode/transcode_jobs.go @@ -21,16 +21,21 @@ type ParallelTranscoding struct { } func NewParallelTranscoding(sourceSegmentURLs []clients.SourceSegment, work func(segment segmentInfo) error) *ParallelTranscoding { + totalSegs := len(sourceSegmentURLs) jobs := &ParallelTranscoding{ - queue: make(chan segmentInfo, len(sourceSegmentURLs)), + queue: make(chan segmentInfo, totalSegs), errors: make(chan error, 100), work: work, isRunning: true, - totalSegments: len(sourceSegmentURLs), + totalSegments: totalSegs, } // post all jobs on buffered queue for goroutines to process for segmentIndex, u := range sourceSegmentURLs { - jobs.queue <- segmentInfo{Input: u, Index: segmentIndex} + if segmentIndex == totalSegs-1 { + jobs.queue <- segmentInfo{Input: u, Index: segmentIndex, IsLastSegment: true} + } else { + jobs.queue <- segmentInfo{Input: u, Index: segmentIndex, IsLastSegment: false} + } } close(jobs.queue) return jobs diff --git a/transcode/transcode_test.go b/transcode/transcode_test.go index 5874639b0..520760cb9 100644 --- a/transcode/transcode_test.go +++ b/transcode/transcode_test.go @@ -266,3 +266,30 @@ func TestParallelJobSaveTime(t *testing.T) { require.Less(t, elapsed, 160*time.Millisecond) // usually takes less than 101ms on idle machine time.Sleep(10 * time.Millisecond) // wait for other workers to exit } + +func TestNewParallelTranscoding(t *testing.T) { + sourceSegmentURLs := []clients.SourceSegment{ + {URL: segmentURL(t, "1.ts"), DurationMillis: 1000}, {URL: segmentURL(t, "2.ts"), DurationMillis: 1000}, {URL: segmentURL(t, "3.ts"), DurationMillis: 1000}, + {URL: segmentURL(t, "4.ts"), DurationMillis: 1000}, {URL: segmentURL(t, "5.ts"), DurationMillis: 1000}, {URL: segmentURL(t, "6.ts"), DurationMillis: 1000}, + } + + // Define a test work function that doesn't do anything. + testWork := func(segmentInfo) error { + return nil + } + + jobs := NewParallelTranscoding(sourceSegmentURLs, testWork) + + for i, u := range sourceSegmentURLs { + expectedIsLastSegment := i == len(sourceSegmentURLs)-1 + segment := <-jobs.queue + if segment.Input != u || segment.IsLastSegment != expectedIsLastSegment { + t.Errorf("Test case failed for segment #%d, expected IsLastSegment=%v, got IsLastSegment=%v", i, expectedIsLastSegment, segment.IsLastSegment) + } + } + + // Check if there are any remaining segments in the queue (should be closed). + if _, more := <-jobs.queue; more { + t.Error("Expected the queue to be closed after processing all segments.") + } +}