Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transcode: apply T re-init logic for last clipped segment #910

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions transcode/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,15 +423,15 @@ func transcodeSegment(
// the different encoding between the two segments causes the
// transcode operation to incorrectly tag the output segment as
// having two video tracks.
if transcodeRequest.IsClip && int64(segment.Index) == 1 {
if transcodeRequest.IsClip && (int64(segment.Index) == 0 || segment.IsLastSegment) {
transcodeConf.ForceSessionReinit = true
} else {
transcodeConf.ForceSessionReinit = false
}
// This is a temporary workaround that implements the same logic
// as the previous if block -- a new manifestID will force a
// T session re-init between segment at index=0 and index=1.
if transcodeRequest.IsClip && int64(segment.Index) == 0 {
if transcodeRequest.IsClip && (int64(segment.Index) == 0 || segment.IsLastSegment) {
manifestID = manifestID + "_clip"
}

Expand Down Expand Up @@ -538,8 +538,9 @@ func channelFromWaitgroup(wg *sync.WaitGroup) chan bool {
}

type segmentInfo struct {
Input clients.SourceSegment
Index int
Input clients.SourceSegment
Index int
IsLastSegment bool
}

func statsFromProfiles(profiles []video.EncodedProfile) []*video.RenditionStats {
Expand Down
11 changes: 8 additions & 3 deletions transcode/transcode_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@ type ParallelTranscoding struct {
}

func NewParallelTranscoding(sourceSegmentURLs []clients.SourceSegment, work func(segment segmentInfo) error) *ParallelTranscoding {
totalSegs := len(sourceSegmentURLs)
jobs := &ParallelTranscoding{
queue: make(chan segmentInfo, len(sourceSegmentURLs)),
queue: make(chan segmentInfo, totalSegs),
errors: make(chan error, 100),
work: work,
isRunning: true,
totalSegments: len(sourceSegmentURLs),
totalSegments: totalSegs,
}
// post all jobs on buffered queue for goroutines to process
for segmentIndex, u := range sourceSegmentURLs {
jobs.queue <- segmentInfo{Input: u, Index: segmentIndex}
if segmentIndex == totalSegs-1 {
jobs.queue <- segmentInfo{Input: u, Index: segmentIndex, IsLastSegment: true}
} else {
jobs.queue <- segmentInfo{Input: u, Index: segmentIndex, IsLastSegment: false}
}
emranemran marked this conversation as resolved.
Show resolved Hide resolved
}
close(jobs.queue)
return jobs
Expand Down
27 changes: 27 additions & 0 deletions transcode/transcode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,30 @@ func TestParallelJobSaveTime(t *testing.T) {
require.Less(t, elapsed, 160*time.Millisecond) // usually takes less than 101ms on idle machine
time.Sleep(10 * time.Millisecond) // wait for other workers to exit
}

func TestNewParallelTranscoding(t *testing.T) {
sourceSegmentURLs := []clients.SourceSegment{
{URL: segmentURL(t, "1.ts"), DurationMillis: 1000}, {URL: segmentURL(t, "2.ts"), DurationMillis: 1000}, {URL: segmentURL(t, "3.ts"), DurationMillis: 1000},
{URL: segmentURL(t, "4.ts"), DurationMillis: 1000}, {URL: segmentURL(t, "5.ts"), DurationMillis: 1000}, {URL: segmentURL(t, "6.ts"), DurationMillis: 1000},
}

// Define a test work function that doesn't do anything.
testWork := func(segmentInfo) error {
return nil
}

jobs := NewParallelTranscoding(sourceSegmentURLs, testWork)

for i, u := range sourceSegmentURLs {
expectedIsLastSegment := i == len(sourceSegmentURLs)-1
segment := <-jobs.queue
if segment.Input != u || segment.IsLastSegment != expectedIsLastSegment {
t.Errorf("Test case failed for segment #%d, expected IsLastSegment=%v, got IsLastSegment=%v", i, expectedIsLastSegment, segment.IsLastSegment)
}
}

// Check if there are any remaining segments in the queue (should be closed).
if _, more := <-jobs.queue; more {
t.Error("Expected the queue to be closed after processing all segments.")
}
}
Loading