diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml deleted file mode 100644 index 5e07b7f..0000000 --- a/.github/workflows/docker.yml +++ /dev/null @@ -1,25 +0,0 @@ -name: Docker Image Push - -on: - push: - branches: - - master - -jobs: - build-and-push: - runs-on: ubuntu-latest - env: - IMAGE_NAME: ghcr.io/${{ github.actor }}/transcoder - steps: - - name: Checkout Repository - uses: actions/checkout@v4 - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - - name: Build and Push Docker Image - run: | - echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u ${{ github.actor }} --password-stdin - - IMAGE_VERSION=$(git rev-parse --short HEAD) make push-images - IMAGE_VERSION=latest make push-images diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 0000000..8056460 --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,79 @@ +name: CI +on: + push: + branches: master + tags: + - v*.* + pull_request: +jobs: + docker: + if: github.event_name == 'push' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + lfs: true + - run: echo "##[set-output name=branch;]$(echo ${GITHUB_REF#refs/heads/} | tr '/' '-')" + id: branch + - name: Login to Docker Hub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + - name: build server docker + uses: docker/build-push-action@v3 + with: + file: server/Dockerfile + tags: segator/transcoderd:${{ steps.branch.outputs.branch }} + push: true + - run: | + sudo swapoff -a + sudo rm -f /swapfile + sudo apt clean + docker system prune --volumes -f + - name: build worker docker + uses: docker/build-push-action@v3 + with: + file: worker/Dockerfile.encode + tags: segator/encoder-agent:${{ steps.branch.outputs.branch }} + push: true + - name: build worker docker PGS + uses: docker/build-push-action@v3 + with: + file: worker/Dockerfile.pgs + tags: segator/pgs-agent:${{ steps.branch.outputs.branch }} + push: true + binary: + strategy: + fail-fast: false + matrix: + os: ["ubuntu", "windows", "macos"] + runs-on: ${{ matrix.os}}-latest + steps: + - name: Install Go + uses: actions/setup-go@v2 + with: + go-version: 1.19 + - name: Checkout + uses: actions/checkout@v2 + with: + lfs: true + - run: echo "##[set-output name=branch;]$(echo ${GITHUB_REF#refs/heads/} | tr '/' '-')" + shell: bash + id: branch + - name: Build Server + run: | + go run build.go build server -p ${{matrix.os}}-amd64 + - name: Build Workers + run: | + go run build.go build worker -p ${{matrix.os}}-amd64 -m console + - name: Upload Release Asset + if: github.event_name == 'push' + id: upload-release-asset + uses: svenstaro/upload-release-action@v1-release + with: + repo_token: ${{ secrets.GITHUB_TOKEN }} + tag: wip-${{ steps.branch.outputs.branch }} + file: dist/* + overwrite: true + file_glob: true diff --git a/Makefile b/Makefile index 47421c8..b29f703 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ GOOPTS ?= GOOS ?= $(shell $(GO) env GOHOSTOS) GOARCH ?= $(shell $(GO) env GOHOSTARCH) -IMAGE_NAME ?= ghcr.io/pando85/transcoder +IMAGE_NAME ?= pando85/transcoder IMAGE_VERSION ?= latest .DEFAULT: help @@ -49,7 +49,7 @@ image-% push-image-%: -t $(IMAGE_NAME):$(IMAGE_VERSION)-$* \ -f $*/Dockerfile \ . ; \ - if [ "$*" = "worker" ]; then \ + if [[ "$*" == "worker" ]]; then \ docker buildx build \ $${DOCKER_BUILD_ARG} \ -t $(IMAGE_NAME):$(IMAGE_VERSION)-$*-pgs \ diff --git a/README.md b/README.md index 37d1535..6490a03 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,9 @@ ## Container images -- server: `ghcr.io/pando85/transcoder:latest-server` -- worker: `ghcr.io/pando85/transcoder:latest-worker` -- PGS worker: `ghcr.io/pando85/transcoder:latest-worker-pgs` +- server: `pando85/transcoder:latest-server` +- worker: `pando85/transcoder:latest-worker` +- PGS worker: `pando85/transcoder:latest-worker-pgs` ## Config @@ -18,7 +18,7 @@ DIR=/tmp/images/encode mkdir -p $DIR docker run -it -d --restart unless-stopped --cpuset-cpus 16-32 \ --name transcoder-worker --hostname $(hostname) \ - -v $DIR:/tmp/ pando85/ghcr.io/pando85/transcoder:latest-worker \ + -v $DIR:/tmp/ pando85/pando85/transcoder:latest-worker \ --broker.host transcoder.example.com \ --worker.priority 9 ``` @@ -31,7 +31,7 @@ DIR=/tmp/images/pgs mkdir -p $DIR docker run -it -d --restart unless-stopped --cpuset-cpus 1-2 \ --name transcoder-worker-pgs --hostname $(hostname) \ - -v $DIR:/tmp/ ghcr.io/pando85/transcoder:latest-worker-pgs \ + -v $DIR:/tmp/ pando85/transcoder:latest-worker-pgs \ --broker.host transcoder.example.com \ --worker.priority 9 ``` diff --git a/docker-compose.yaml b/docker-compose.yaml index 25b50f2..5cf90a8 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -2,7 +2,7 @@ version: '3' services: server: - image: ghcr.io/pando85/transcoder:latest-server + image: pando85/transcoder:latest-server environment: LOGLEVEL: DEBUG BROKER_HOST: rabbitmq @@ -40,7 +40,7 @@ services: - "5672:5672" worker: - image: ghcr.io/pando85/transcoder:latest-worker + image: pando85/transcoder:latest-worker command: - --log-level - debug @@ -61,7 +61,7 @@ services: - server worker-pgs: - image: ghcr.io/pando85/transcoder:latest-worker-pgs + image: pando85/transcoder:latest-worker-pgs command: - --log-level - debug diff --git a/worker/task/config.go b/worker/task/config.go index dcd3a5e..f276022 100644 --- a/worker/task/config.go +++ b/worker/task/config.go @@ -71,6 +71,6 @@ type Config struct { DotnetPath string `mapstructure:"dotnetPath"` } -func (c Config) HaveSettedPeriodTime() bool { +func (c Config) HaveSetPeriodTime() bool { return c.StartAfter.Hour != 0 || c.StopAfter.Hour != 0 } diff --git a/worker/task/encode.go b/worker/task/encode.go index 4ff5916..434a9cc 100644 --- a/worker/task/encode.go +++ b/worker/task/encode.go @@ -31,7 +31,7 @@ import ( ) const RESET_LINE = "\r\033[K" -const MAX_PREFETCHED_JOBS = 2 +const MAX_PREFETCHED_JOBS = 1 var ffmpegSpeedRegex = regexp.MustCompile(`speed=(\d*\.?\d+)x`) var ErrorJobNotFound = errors.New("job Not found") @@ -113,7 +113,6 @@ func (E *EncodeWorker) resumeJobs() { return nil } if taskEncode.LastState.IsEncoding() { - atomic.AddUint32(&E.prefetchJobs, 1) t := E.terminal.AddTask(fmt.Sprintf("CACHED: %s", taskEncode.Task.TaskEncode.Id.String()), DownloadJobStepType) t.Done() E.encodeChan <- taskEncode.Task @@ -204,7 +203,7 @@ func (J *EncodeWorker) AcceptJobs() bool { if J.workerConfig.Paused { return false } - if J.workerConfig.HaveSettedPeriodTime() { + if J.workerConfig.HaveSetPeriodTime() { startAfter := time.Date(now.Year(), now.Month(), now.Day(), J.workerConfig.StartAfter.Hour, J.workerConfig.StartAfter.Minute, 0, 0, now.Location()) stopAfter := time.Date(now.Year(), now.Month(), now.Day(), J.workerConfig.StopAfter.Hour, J.workerConfig.StopAfter.Minute, 0, 0, now.Location()) return now.After(startAfter) && now.Before(stopAfter) @@ -212,7 +211,7 @@ func (J *EncodeWorker) AcceptJobs() bool { return J.PrefetchJobs() < MAX_PREFETCHED_JOBS } -func (j *EncodeWorker) dowloadFile(job *model.WorkTaskEncode, track *TaskTracks) (err error) { +func (j *EncodeWorker) downloadFile(job *model.WorkTaskEncode, track *TaskTracks) (err error) { err = retry.Do(func() error { track.UpdateValue(0) resp, err := http.Get(job.TaskEncode.DownloadURL) @@ -223,7 +222,7 @@ func (j *EncodeWorker) dowloadFile(job *model.WorkTaskEncode, track *TaskTracks) return ErrorJobNotFound } if resp.StatusCode != http.StatusOK { - return fmt.Errorf(fmt.Sprintf("not 200 respose in dowload code %d", resp.StatusCode)) + return fmt.Errorf(fmt.Sprintf("not 200 response in download code %d", resp.StatusCode)) } defer resp.Body.Close() size, err := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64) @@ -260,7 +259,7 @@ func (j *EncodeWorker) dowloadFile(job *model.WorkTaskEncode, track *TaskTracks) } defer respsha256.Body.Close() if respsha256.StatusCode != http.StatusOK { - return fmt.Errorf(fmt.Sprintf("not 200 respose in sha265 code %d", respsha256.StatusCode)) + return fmt.Errorf(fmt.Sprintf("not 200 response in sha265 code %d", respsha256.StatusCode)) } bodyBytes, err := ioutil.ReadAll(respsha256.Body) @@ -304,7 +303,7 @@ func (J *EncodeWorker) getVideoParameters(inputFile string) (data *ffprobe.Probe fileReader, err := os.Open(inputFile) if err != nil { - return nil, -1, fmt.Errorf("error opening file %s because %v", inputFile, err) + return nil, -1, fmt.Errorf("Error opening file %s because %v", inputFile, err) } stat, err := fileReader.Stat() if err != nil { @@ -314,7 +313,7 @@ func (J *EncodeWorker) getVideoParameters(inputFile string) (data *ffprobe.Probe defer fileReader.Close() data, err = ffprobe.ProbeReader(J.ctx, fileReader) if err != nil { - return nil, 0, fmt.Errorf("error getting data: %v", err) + return nil, 0, fmt.Errorf("Error getting data: %v", err) } return data, stat.Size(), nil } @@ -555,7 +554,7 @@ func (J *EncodeWorker) UploadJob(task *model.WorkTaskEncode, track *TaskTracks) } //wg.Wait() if resp.StatusCode != 201 { - return fmt.Errorf("invalid status Code %d", resp.StatusCode) + return fmt.Errorf("invalid status code %d", resp.StatusCode) } track.UpdateValue(fileSize) return nil @@ -748,14 +747,14 @@ func (J *EncodeWorker) convertPGSToSrt(taskEncode *model.WorkTaskEncode, contain case <-J.ctx.Done(): return J.ctx.Err() case <-time.After(time.Minute * 90): - return errors.New("timeout Waiting for PGS Job Done") + return errors.New("timeout waiting for PGS job done") case response, ok := <-out: if !ok { return nil } log.Debugf("response: %+v", response) if response.Err != "" { - return fmt.Errorf("error on Process PGS %d: %s", response.PGSID, response.Err) + return fmt.Errorf("error on process PGS %d: %s", response.PGSID, response.Err) } subtFilePath := filepath.Join(taskEncode.WorkDir, fmt.Sprintf("%d.srt", response.PGSID)) err := ioutil.WriteFile(subtFilePath, response.Srt, os.ModePerm) @@ -778,7 +777,7 @@ func (J *EncodeWorker) MKVExtract(subtitles []*Subtitle, taskEncode *model.WorkT _, err := mkvExtractCommand.RunWithContext(J.ctx, command.NewAllowedCodesOption(0, 1)) if err != nil { - J.terminal.Cmd("MKVExtract Command:%s", mkvExtractCommand.GetFullCommand()) + J.terminal.Cmd("MKVExtract command:%s", mkvExtractCommand.GetFullCommand()) return fmt.Errorf("MKVExtract unexpected error:%v", err.Error()) } @@ -789,6 +788,7 @@ func (J *EncodeWorker) PrefetchJobs() uint32 { } func (J *EncodeWorker) AddDownloadJob(job *model.WorkTaskEncode) { + log.Debug("add another download job") atomic.AddUint32(&J.prefetchJobs, 1) J.downloadChan <- job } @@ -799,7 +799,7 @@ func (J *EncodeWorker) downloadQueue() { select { case <-J.ctx.Done(): case <-J.ctxStopQueues.Done(): - J.terminal.Warn("Stopping Download Queue") + J.terminal.Warn("stopping download queue") J.wg.Done() return case job, ok := <-J.downloadChan: @@ -810,7 +810,7 @@ func (J *EncodeWorker) downloadQueue() { taskTrack := J.terminal.AddTask(job.TaskEncode.Id.String(), DownloadJobStepType) J.updateTaskStatus(job, model.DownloadNotification, model.StartedNotificationStatus, "") - err := J.dowloadFile(job, taskTrack) + err := J.downloadFile(job, taskTrack) if err != nil { J.updateTaskStatus(job, model.DownloadNotification, model.FailedNotificationStatus, err.Error()) taskTrack.Error() @@ -832,7 +832,7 @@ func (J *EncodeWorker) uploadQueue() { select { case <-J.ctx.Done(): case <-J.ctxStopQueues.Done(): - J.terminal.Warn("Stopping Upload Queue") + J.terminal.Warn("stopping upload queue") J.wg.Done() return case job, ok := <-J.uploadChan: @@ -861,7 +861,7 @@ func (J *EncodeWorker) encodeQueue() { select { case <-J.ctx.Done(): case <-J.ctxStopQueues.Done(): - J.terminal.Warn("Stopping Encode Queue") + J.terminal.Warn("stopping encode queue") J.wg.Done() return case job, ok := <-J.encodeChan: @@ -896,7 +896,7 @@ func (J *EncodeWorker) encodeVideo(job *model.WorkTaskEncode, track *TaskTracks) videoContainer, err := J.clearData(sourceVideoParams) if err != nil { - J.terminal.Warn("Error in clearData", J.GetID()) + J.terminal.Warn("Error in clear data", J.GetID()) return err } if err = J.PGSMkvExtractDetectAndConvert(job, track, videoContainer); err != nil { @@ -947,12 +947,12 @@ func (J *EncodeWorker) encodeVideo(job *model.WorkTaskEncode, track *TaskTracks) } diffDuration := encodedVideoParams.Format.DurationSeconds - sourceVideoParams.Format.DurationSeconds if diffDuration > 60 || diffDuration < -60 { - err = fmt.Errorf("source File duration %f is diferent than encoded %f", sourceVideoParams.Format.DurationSeconds, encodedVideoParams.Format.DurationSeconds) + err = fmt.Errorf("source file duration %f is diferent than encoded %f", sourceVideoParams.Format.DurationSeconds, encodedVideoParams.Format.DurationSeconds) J.updateTaskStatus(job, model.FFMPEGSNotification, model.FailedNotificationStatus, err.Error()) return err } if encodedVideoSize > sourceVideoSize { - err = fmt.Errorf("source File size %d bytes is less than encoded %d bytes", sourceVideoSize, encodedVideoSize) + err = fmt.Errorf("source file size %d bytes is less than encoded %d bytes", sourceVideoSize, encodedVideoSize) J.updateTaskStatus(job, model.FFMPEGSNotification, model.FailedNotificationStatus, err.Error()) return err } diff --git a/worker/task/queue.go b/worker/task/queue.go index d3c4ede..2892e55 100644 --- a/worker/task/queue.go +++ b/worker/task/queue.go @@ -332,6 +332,7 @@ func (Q *RabbitMQClient) encodeQueueProcessor(ctx context.Context, taskQueueName Q.printer.Error("[%s] Error Preparing Job Execution: %v", model.EncodeJobType, err) continue } + log.Debug("execute a new encoder job") delivery.Ack(false) } }