Skip to content

Commit

Permalink
transcode: apply T re-init logic for last clipped segment
Browse files Browse the repository at this point in the history
  • Loading branch information
emranemran committed Oct 11, 2023
1 parent 8dc484d commit b8a0fed
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 7 deletions.
9 changes: 5 additions & 4 deletions transcode/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,15 +423,15 @@ func transcodeSegment(
// the different encoding between the two segments causes the
// transcode operation to incorrectly tag the output segment as
// having two video tracks.
if transcodeRequest.IsClip && int64(segment.Index) == 1 {
if transcodeRequest.IsClip && (int64(segment.Index) == 0 || segment.IsLastSegment) {
transcodeConf.ForceSessionReinit = true
} else {
transcodeConf.ForceSessionReinit = false
}
// This is a temporary workaround that implements the same logic
// as the previous if block -- a new manifestID will force a
// T session re-init between segment at index=0 and index=1.
if transcodeRequest.IsClip && int64(segment.Index) == 0 {
if transcodeRequest.IsClip && (int64(segment.Index) == 0 || segment.IsLastSegment) {
manifestID = manifestID + "_clip"
}

Expand Down Expand Up @@ -538,8 +538,9 @@ func channelFromWaitgroup(wg *sync.WaitGroup) chan bool {
}

type segmentInfo struct {
Input clients.SourceSegment
Index int
Input clients.SourceSegment
Index int
IsLastSegment bool
}

func statsFromProfiles(profiles []video.EncodedProfile) []*video.RenditionStats {
Expand Down
11 changes: 8 additions & 3 deletions transcode/transcode_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@ type ParallelTranscoding struct {
}

func NewParallelTranscoding(sourceSegmentURLs []clients.SourceSegment, work func(segment segmentInfo) error) *ParallelTranscoding {
totalSegs := len(sourceSegmentURLs)
jobs := &ParallelTranscoding{
queue: make(chan segmentInfo, len(sourceSegmentURLs)),
queue: make(chan segmentInfo, totalSegs),
errors: make(chan error, 100),
work: work,
isRunning: true,
totalSegments: len(sourceSegmentURLs),
totalSegments: totalSegs,
}
// post all jobs on buffered queue for goroutines to process
for segmentIndex, u := range sourceSegmentURLs {
jobs.queue <- segmentInfo{Input: u, Index: segmentIndex}
if segmentIndex == totalSegs-1 {
jobs.queue <- segmentInfo{Input: u, Index: segmentIndex, IsLastSegment: true}
} else {
jobs.queue <- segmentInfo{Input: u, Index: segmentIndex, IsLastSegment: false}
}
}
close(jobs.queue)
return jobs
Expand Down
27 changes: 27 additions & 0 deletions transcode/transcode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,30 @@ func TestParallelJobSaveTime(t *testing.T) {
require.Less(t, elapsed, 160*time.Millisecond) // usually takes less than 101ms on idle machine
time.Sleep(10 * time.Millisecond) // wait for other workers to exit
}

func TestNewParallelTranscoding(t *testing.T) {
sourceSegmentURLs := []clients.SourceSegment{
{URL: segmentURL(t, "1.ts"), DurationMillis: 1000}, {URL: segmentURL(t, "2.ts"), DurationMillis: 1000}, {URL: segmentURL(t, "3.ts"), DurationMillis: 1000},
{URL: segmentURL(t, "4.ts"), DurationMillis: 1000}, {URL: segmentURL(t, "5.ts"), DurationMillis: 1000}, {URL: segmentURL(t, "6.ts"), DurationMillis: 1000},
}

// Define a test work function that doesn't do anything.
testWork := func(segmentInfo) error {
return nil
}

jobs := NewParallelTranscoding(sourceSegmentURLs, testWork)

for i, u := range sourceSegmentURLs {
expectedIsLastSegment := i == len(sourceSegmentURLs)-1
segment := <-jobs.queue
if segment.Input != u || segment.IsLastSegment != expectedIsLastSegment {
t.Errorf("Test case failed for segment #%d, expected IsLastSegment=%v, got IsLastSegment=%v", i, expectedIsLastSegment, segment.IsLastSegment)
}
}

// Check if there are any remaining segments in the queue (should be closed).
if _, more := <-jobs.queue; more {
t.Error("Expected the queue to be closed after processing all segments.")
}
}

0 comments on commit b8a0fed

Please sign in to comment.