Skip to content

Commit

Permalink
worker: Prefetch just one job
Browse files Browse the repository at this point in the history
  • Loading branch information
pando85 committed Jan 13, 2024
1 parent 06bdf99 commit 8f2718e
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 55 deletions.
25 changes: 0 additions & 25 deletions .github/workflows/docker.yml

This file was deleted.

79 changes: 79 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
name: CI
on:
push:
branches: master
tags:
- v*.*
pull_request:
jobs:
docker:
if: github.event_name == 'push'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
lfs: true
- run: echo "##[set-output name=branch;]$(echo ${GITHUB_REF#refs/heads/} | tr '/' '-')"
id: branch
- name: Login to Docker Hub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: build server docker
uses: docker/build-push-action@v3
with:
file: server/Dockerfile
tags: segator/transcoderd:${{ steps.branch.outputs.branch }}
push: true
- run: |
sudo swapoff -a
sudo rm -f /swapfile
sudo apt clean
docker system prune --volumes -f
- name: build worker docker
uses: docker/build-push-action@v3
with:
file: worker/Dockerfile.encode
tags: segator/encoder-agent:${{ steps.branch.outputs.branch }}
push: true
- name: build worker docker PGS
uses: docker/build-push-action@v3
with:
file: worker/Dockerfile.pgs
tags: segator/pgs-agent:${{ steps.branch.outputs.branch }}
push: true
binary:
strategy:
fail-fast: false
matrix:
os: ["ubuntu", "windows", "macos"]
runs-on: ${{ matrix.os}}-latest
steps:
- name: Install Go
uses: actions/setup-go@v2
with:
go-version: 1.19
- name: Checkout
uses: actions/checkout@v2
with:
lfs: true
- run: echo "##[set-output name=branch;]$(echo ${GITHUB_REF#refs/heads/} | tr '/' '-')"
shell: bash
id: branch
- name: Build Server
run: |
go run build.go build server -p ${{matrix.os}}-amd64
- name: Build Workers
run: |
go run build.go build worker -p ${{matrix.os}}-amd64 -m console
- name: Upload Release Asset
if: github.event_name == 'push'
id: upload-release-asset
uses: svenstaro/upload-release-action@v1-release
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
tag: wip-${{ steps.branch.outputs.branch }}
file: dist/*
overwrite: true
file_glob: true
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ GOOPTS ?=
GOOS ?= $(shell $(GO) env GOHOSTOS)
GOARCH ?= $(shell $(GO) env GOHOSTARCH)

IMAGE_NAME ?= ghcr.io/pando85/transcoder
IMAGE_NAME ?= pando85/transcoder
IMAGE_VERSION ?= latest

.DEFAULT: help
Expand Down Expand Up @@ -49,7 +49,7 @@ image-% push-image-%:
-t $(IMAGE_NAME):$(IMAGE_VERSION)-$* \
-f $*/Dockerfile \
. ; \
if [ "$*" = "worker" ]; then \
if [[ "$*" == "worker" ]]; then \
docker buildx build \
$${DOCKER_BUILD_ARG} \
-t $(IMAGE_NAME):$(IMAGE_VERSION)-$*-pgs \
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

## Container images

- server: `ghcr.io/pando85/transcoder:latest-server`
- worker: `ghcr.io/pando85/transcoder:latest-worker`
- PGS worker: `ghcr.io/pando85/transcoder:latest-worker-pgs`
- server: `pando85/transcoder:latest-server`
- worker: `pando85/transcoder:latest-worker`
- PGS worker: `pando85/transcoder:latest-worker-pgs`

## Config

Expand All @@ -18,7 +18,7 @@ DIR=/tmp/images/encode
mkdir -p $DIR
docker run -it -d --restart unless-stopped --cpuset-cpus 16-32 \
--name transcoder-worker --hostname $(hostname) \
-v $DIR:/tmp/ pando85/ghcr.io/pando85/transcoder:latest-worker \
-v $DIR:/tmp/ pando85/pando85/transcoder:latest-worker \
--broker.host transcoder.example.com \
--worker.priority 9
```
Expand All @@ -31,7 +31,7 @@ DIR=/tmp/images/pgs
mkdir -p $DIR
docker run -it -d --restart unless-stopped --cpuset-cpus 1-2 \
--name transcoder-worker-pgs --hostname $(hostname) \
-v $DIR:/tmp/ ghcr.io/pando85/transcoder:latest-worker-pgs \
-v $DIR:/tmp/ pando85/transcoder:latest-worker-pgs \
--broker.host transcoder.example.com \
--worker.priority 9
```
Expand Down
6 changes: 3 additions & 3 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3'

services:
server:
image: ghcr.io/pando85/transcoder:latest-server
image: pando85/transcoder:latest-server
environment:
LOGLEVEL: DEBUG
BROKER_HOST: rabbitmq
Expand Down Expand Up @@ -40,7 +40,7 @@ services:
- "5672:5672"

worker:
image: ghcr.io/pando85/transcoder:latest-worker
image: pando85/transcoder:latest-worker
command:
- --log-level
- debug
Expand All @@ -61,7 +61,7 @@ services:
- server

worker-pgs:
image: ghcr.io/pando85/transcoder:latest-worker-pgs
image: pando85/transcoder:latest-worker-pgs
command:
- --log-level
- debug
Expand Down
2 changes: 1 addition & 1 deletion worker/task/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ type Config struct {
DotnetPath string `mapstructure:"dotnetPath"`
}

func (c Config) HaveSettedPeriodTime() bool {
func (c Config) HaveSetPeriodTime() bool {
return c.StartAfter.Hour != 0 || c.StopAfter.Hour != 0
}
38 changes: 19 additions & 19 deletions worker/task/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

const RESET_LINE = "\r\033[K"
const MAX_PREFETCHED_JOBS = 2
const MAX_PREFETCHED_JOBS = 1

var ffmpegSpeedRegex = regexp.MustCompile(`speed=(\d*\.?\d+)x`)
var ErrorJobNotFound = errors.New("job Not found")
Expand Down Expand Up @@ -113,7 +113,6 @@ func (E *EncodeWorker) resumeJobs() {
return nil
}
if taskEncode.LastState.IsEncoding() {
atomic.AddUint32(&E.prefetchJobs, 1)
t := E.terminal.AddTask(fmt.Sprintf("CACHED: %s", taskEncode.Task.TaskEncode.Id.String()), DownloadJobStepType)
t.Done()
E.encodeChan <- taskEncode.Task
Expand Down Expand Up @@ -204,15 +203,15 @@ func (J *EncodeWorker) AcceptJobs() bool {
if J.workerConfig.Paused {
return false
}
if J.workerConfig.HaveSettedPeriodTime() {
if J.workerConfig.HaveSetPeriodTime() {
startAfter := time.Date(now.Year(), now.Month(), now.Day(), J.workerConfig.StartAfter.Hour, J.workerConfig.StartAfter.Minute, 0, 0, now.Location())
stopAfter := time.Date(now.Year(), now.Month(), now.Day(), J.workerConfig.StopAfter.Hour, J.workerConfig.StopAfter.Minute, 0, 0, now.Location())
return now.After(startAfter) && now.Before(stopAfter)
}
return J.PrefetchJobs() < MAX_PREFETCHED_JOBS
}

func (j *EncodeWorker) dowloadFile(job *model.WorkTaskEncode, track *TaskTracks) (err error) {
func (j *EncodeWorker) downloadFile(job *model.WorkTaskEncode, track *TaskTracks) (err error) {
err = retry.Do(func() error {
track.UpdateValue(0)
resp, err := http.Get(job.TaskEncode.DownloadURL)
Expand All @@ -223,7 +222,7 @@ func (j *EncodeWorker) dowloadFile(job *model.WorkTaskEncode, track *TaskTracks)
return ErrorJobNotFound
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf(fmt.Sprintf("not 200 respose in dowload code %d", resp.StatusCode))
return fmt.Errorf(fmt.Sprintf("not 200 response in download code %d", resp.StatusCode))
}
defer resp.Body.Close()
size, err := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
Expand Down Expand Up @@ -260,7 +259,7 @@ func (j *EncodeWorker) dowloadFile(job *model.WorkTaskEncode, track *TaskTracks)
}
defer respsha256.Body.Close()
if respsha256.StatusCode != http.StatusOK {
return fmt.Errorf(fmt.Sprintf("not 200 respose in sha265 code %d", respsha256.StatusCode))
return fmt.Errorf(fmt.Sprintf("not 200 response in sha265 code %d", respsha256.StatusCode))
}

bodyBytes, err := ioutil.ReadAll(respsha256.Body)
Expand Down Expand Up @@ -304,7 +303,7 @@ func (J *EncodeWorker) getVideoParameters(inputFile string) (data *ffprobe.Probe

fileReader, err := os.Open(inputFile)
if err != nil {
return nil, -1, fmt.Errorf("error opening file %s because %v", inputFile, err)
return nil, -1, fmt.Errorf("Error opening file %s because %v", inputFile, err)
}
stat, err := fileReader.Stat()
if err != nil {
Expand All @@ -314,7 +313,7 @@ func (J *EncodeWorker) getVideoParameters(inputFile string) (data *ffprobe.Probe
defer fileReader.Close()
data, err = ffprobe.ProbeReader(J.ctx, fileReader)
if err != nil {
return nil, 0, fmt.Errorf("error getting data: %v", err)
return nil, 0, fmt.Errorf("Error getting data: %v", err)
}
return data, stat.Size(), nil
}
Expand Down Expand Up @@ -555,7 +554,7 @@ func (J *EncodeWorker) UploadJob(task *model.WorkTaskEncode, track *TaskTracks)
}
//wg.Wait()
if resp.StatusCode != 201 {
return fmt.Errorf("invalid status Code %d", resp.StatusCode)
return fmt.Errorf("invalid status code %d", resp.StatusCode)
}
track.UpdateValue(fileSize)
return nil
Expand Down Expand Up @@ -748,14 +747,14 @@ func (J *EncodeWorker) convertPGSToSrt(taskEncode *model.WorkTaskEncode, contain
case <-J.ctx.Done():
return J.ctx.Err()
case <-time.After(time.Minute * 90):
return errors.New("timeout Waiting for PGS Job Done")
return errors.New("timeout waiting for PGS job done")
case response, ok := <-out:
if !ok {
return nil
}
log.Debugf("response: %+v", response)
if response.Err != "" {
return fmt.Errorf("error on Process PGS %d: %s", response.PGSID, response.Err)
return fmt.Errorf("error on process PGS %d: %s", response.PGSID, response.Err)
}
subtFilePath := filepath.Join(taskEncode.WorkDir, fmt.Sprintf("%d.srt", response.PGSID))
err := ioutil.WriteFile(subtFilePath, response.Srt, os.ModePerm)
Expand All @@ -778,7 +777,7 @@ func (J *EncodeWorker) MKVExtract(subtitles []*Subtitle, taskEncode *model.WorkT

_, err := mkvExtractCommand.RunWithContext(J.ctx, command.NewAllowedCodesOption(0, 1))
if err != nil {
J.terminal.Cmd("MKVExtract Command:%s", mkvExtractCommand.GetFullCommand())
J.terminal.Cmd("MKVExtract command:%s", mkvExtractCommand.GetFullCommand())
return fmt.Errorf("MKVExtract unexpected error:%v", err.Error())
}

Expand All @@ -789,6 +788,7 @@ func (J *EncodeWorker) PrefetchJobs() uint32 {
}

func (J *EncodeWorker) AddDownloadJob(job *model.WorkTaskEncode) {
log.Debug("add another download job")
atomic.AddUint32(&J.prefetchJobs, 1)
J.downloadChan <- job
}
Expand All @@ -799,7 +799,7 @@ func (J *EncodeWorker) downloadQueue() {
select {
case <-J.ctx.Done():
case <-J.ctxStopQueues.Done():
J.terminal.Warn("Stopping Download Queue")
J.terminal.Warn("stopping download queue")
J.wg.Done()
return
case job, ok := <-J.downloadChan:
Expand All @@ -810,7 +810,7 @@ func (J *EncodeWorker) downloadQueue() {
taskTrack := J.terminal.AddTask(job.TaskEncode.Id.String(), DownloadJobStepType)

J.updateTaskStatus(job, model.DownloadNotification, model.StartedNotificationStatus, "")
err := J.dowloadFile(job, taskTrack)
err := J.downloadFile(job, taskTrack)
if err != nil {
J.updateTaskStatus(job, model.DownloadNotification, model.FailedNotificationStatus, err.Error())
taskTrack.Error()
Expand All @@ -832,7 +832,7 @@ func (J *EncodeWorker) uploadQueue() {
select {
case <-J.ctx.Done():
case <-J.ctxStopQueues.Done():
J.terminal.Warn("Stopping Upload Queue")
J.terminal.Warn("stopping upload queue")
J.wg.Done()
return
case job, ok := <-J.uploadChan:
Expand Down Expand Up @@ -861,7 +861,7 @@ func (J *EncodeWorker) encodeQueue() {
select {
case <-J.ctx.Done():
case <-J.ctxStopQueues.Done():
J.terminal.Warn("Stopping Encode Queue")
J.terminal.Warn("stopping encode queue")
J.wg.Done()
return
case job, ok := <-J.encodeChan:
Expand Down Expand Up @@ -896,7 +896,7 @@ func (J *EncodeWorker) encodeVideo(job *model.WorkTaskEncode, track *TaskTracks)

videoContainer, err := J.clearData(sourceVideoParams)
if err != nil {
J.terminal.Warn("Error in clearData", J.GetID())
J.terminal.Warn("Error in clear data", J.GetID())
return err
}
if err = J.PGSMkvExtractDetectAndConvert(job, track, videoContainer); err != nil {
Expand Down Expand Up @@ -947,12 +947,12 @@ func (J *EncodeWorker) encodeVideo(job *model.WorkTaskEncode, track *TaskTracks)
}
diffDuration := encodedVideoParams.Format.DurationSeconds - sourceVideoParams.Format.DurationSeconds
if diffDuration > 60 || diffDuration < -60 {
err = fmt.Errorf("source File duration %f is diferent than encoded %f", sourceVideoParams.Format.DurationSeconds, encodedVideoParams.Format.DurationSeconds)
err = fmt.Errorf("source file duration %f is diferent than encoded %f", sourceVideoParams.Format.DurationSeconds, encodedVideoParams.Format.DurationSeconds)
J.updateTaskStatus(job, model.FFMPEGSNotification, model.FailedNotificationStatus, err.Error())
return err
}
if encodedVideoSize > sourceVideoSize {
err = fmt.Errorf("source File size %d bytes is less than encoded %d bytes", sourceVideoSize, encodedVideoSize)
err = fmt.Errorf("source file size %d bytes is less than encoded %d bytes", sourceVideoSize, encodedVideoSize)
J.updateTaskStatus(job, model.FFMPEGSNotification, model.FailedNotificationStatus, err.Error())
return err
}
Expand Down
1 change: 1 addition & 0 deletions worker/task/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func (Q *RabbitMQClient) encodeQueueProcessor(ctx context.Context, taskQueueName
Q.printer.Error("[%s] Error Preparing Job Execution: %v", model.EncodeJobType, err)
continue
}
log.Debug("execute a new encoder job")
delivery.Ack(false)
}
}
Expand Down

0 comments on commit 8f2718e

Please sign in to comment.