Skip to content

Commit

Permalink
fix: Rename status started/added in favour of progressing/queued
Browse files Browse the repository at this point in the history
  • Loading branch information
pando85 committed Jan 24, 2024
1 parent b7c4151 commit 401241c
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 39 deletions.
28 changes: 14 additions & 14 deletions model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ const (
PGSNotification NotificationType = "PGS"
FFMPEGSNotification NotificationType = "FFMPEG"

AddedNotificationStatus NotificationStatus = "added"
ReAddedNotificationStatus NotificationStatus = "readded"
StartedNotificationStatus NotificationStatus = "started"
CompletedNotificationStatus NotificationStatus = "completed"
CanceledNotificationStatus NotificationStatus = "canceled"
FailedNotificationStatus NotificationStatus = "failed"
QueuedNotificationStatus NotificationStatus = "queued"
ReQueuedNotificationStatus NotificationStatus = "requeued"
ProgressingNotificationStatus NotificationStatus = "progressing"
CompletedNotificationStatus NotificationStatus = "completed"
CanceledNotificationStatus NotificationStatus = "canceled"
FailedNotificationStatus NotificationStatus = "failed"

EncodeJobType JobType = "encode"
PGSToSrtJobType JobType = "pgstosrt"
Expand Down Expand Up @@ -135,11 +135,11 @@ func (e TaskEvent) IsDownloading() bool {
if e.EventType != NotificationEvent {
return false
}
if e.NotificationType == DownloadNotification && e.Status == StartedNotificationStatus {
if e.NotificationType == DownloadNotification && e.Status == ProgressingNotificationStatus {
return true
}

if e.NotificationType == JobNotification && (e.Status == StartedNotificationStatus) {
if e.NotificationType == JobNotification && (e.Status == ProgressingNotificationStatus) {
return true
}
return false
Expand All @@ -153,16 +153,16 @@ func (e TaskEvent) IsEncoding() bool {
return true
}

if e.NotificationType == MKVExtractNotification && (e.Status == StartedNotificationStatus || e.Status == CompletedNotificationStatus) {
if e.NotificationType == MKVExtractNotification && (e.Status == ProgressingNotificationStatus || e.Status == CompletedNotificationStatus) {
return true
}
if e.NotificationType == FFProbeNotification && (e.Status == StartedNotificationStatus || e.Status == CompletedNotificationStatus) {
if e.NotificationType == FFProbeNotification && (e.Status == ProgressingNotificationStatus || e.Status == CompletedNotificationStatus) {
return true
}
if e.NotificationType == PGSNotification && (e.Status == StartedNotificationStatus || e.Status == CompletedNotificationStatus) {
if e.NotificationType == PGSNotification && (e.Status == ProgressingNotificationStatus || e.Status == CompletedNotificationStatus) {
return true
}
if e.NotificationType == FFMPEGSNotification && e.Status == StartedNotificationStatus {
if e.NotificationType == FFMPEGSNotification && e.Status == ProgressingNotificationStatus {
return true
}

Expand All @@ -177,7 +177,7 @@ func (e TaskEvent) IsUploading() bool {
return true
}

if e.NotificationType == UploadNotification && e.Status == StartedNotificationStatus {
if e.NotificationType == UploadNotification && e.Status == ProgressingNotificationStatus {
return true
}

Expand Down Expand Up @@ -230,7 +230,7 @@ type JobRequest struct {
ForceCompleted bool `json:"forceCompleted"`
ForceFailed bool `json:"forceFailed"`
ForceExecuting bool `json:"forceExecuting"`
ForceAdded bool `json:"forceAdded"`
ForceQueued bool `json:"forceQueued"`
Priority int `json:"priority"`
}

Expand Down
8 changes: 4 additions & 4 deletions radarr/add/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type FailedItem struct {
ForceCompleted bool `json:"forceCompleted"`
ForceFailed bool `json:"forceFailed"`
ForceExecuting bool `json:"forceExecuting"`
ForceAdded bool `json:"forceAdded"`
ForceQueued bool `json:"forceQueued"`
Priority int `json:"priority"`
Error string `json:"error"`
}
Expand All @@ -82,11 +82,11 @@ func PrintTranscoderResponse(jsonStr []byte) error {

switch {
case len(response.Scheduled) > 0:
fmt.Println("Movie successfully added.")
fmt.Println("Movie successfully queued.")
case len(response.Failed) > 0:
fmt.Println("Movie was not added.")
fmt.Println("Movie was not queued.")
default:
return errors.New("Movie was neither added nor failed.")
return errors.New("Movie was neither queued nor failed.")
}

return nil
Expand Down
20 changes: 10 additions & 10 deletions server/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewScheduler(config SchedulerConfig, repo repository.Repository, queue queu
func (R *RuntimeScheduler) Run(wg *sync.WaitGroup, ctx context.Context) {
log.Info("starting scheduler")
R.start(ctx)
log.Info("started scheduler")
log.Info("progressing scheduler")
wg.Add(1)
go func() {
<-ctx.Done()
Expand Down Expand Up @@ -122,7 +122,7 @@ func (R *RuntimeScheduler) schedule(ctx context.Context) {
log.Error(err)
}
for _, taskEvent := range taskEvents {
if taskEvent.Status == model.StartedNotificationStatus {
if taskEvent.Status == model.ProgressingNotificationStatus {
log.Infof("rescheduling %s after job timeout", taskEvent.Id.String())
video, err := R.repo.GetJob(ctx, taskEvent.Id.String())
if err != nil {
Expand Down Expand Up @@ -187,7 +187,7 @@ func (R *RuntimeScheduler) createNewJobRequestByJobRequestDirectory(ctx context.
ForceCompleted: parentJobRequest.ForceCompleted,
ForceFailed: parentJobRequest.ForceFailed,
ForceExecuting: parentJobRequest.ForceExecuting,
ForceAdded: parentJobRequest.ForceAdded,
ForceQueued: parentJobRequest.ForceQueued,
Priority: parentJobRequest.Priority,
},
errors: jobRequestErrors,
Expand Down Expand Up @@ -222,23 +222,23 @@ func (R *RuntimeScheduler) scheduleJobRequest(ctx context.Context, jobRequest *m
if err != nil {
return err
}
startEvent := video.AddEvent(model.NotificationEvent, model.JobNotification, model.AddedNotificationStatus)
startEvent := video.AddEvent(model.NotificationEvent, model.JobNotification, model.QueuedNotificationStatus)
eventsToAdd = append(eventsToAdd, startEvent)
} else {
//If video exist we check if we can retry the job
lastEvent := video.Events.GetLatestPerNotificationType(model.JobNotification)
status := video.Events.GetStatus()
if jobRequest.ForceExecuting && status == model.StartedNotificationStatus {
if jobRequest.ForceExecuting && status == model.ProgressingNotificationStatus {
cancelEvent := video.AddEvent(model.NotificationEvent, model.JobNotification, model.CanceledNotificationStatus)
eventsToAdd = append(eventsToAdd, cancelEvent)
}
if (jobRequest.ForceCompleted && status == model.CompletedNotificationStatus) ||
(jobRequest.ForceFailed && (status == model.FailedNotificationStatus || status == model.CanceledNotificationStatus)) ||
(jobRequest.ForceAdded && (status == model.AddedNotificationStatus || status == model.ReAddedNotificationStatus)) ||
(jobRequest.ForceExecuting && status == model.StartedNotificationStatus) {
requeueEvent := video.AddEvent(model.NotificationEvent, model.JobNotification, model.ReAddedNotificationStatus)
(jobRequest.ForceQueued && (status == model.QueuedNotificationStatus || status == model.ReQueuedNotificationStatus)) ||
(jobRequest.ForceExecuting && status == model.ProgressingNotificationStatus) {
requeueEvent := video.AddEvent(model.NotificationEvent, model.JobNotification, model.ReQueuedNotificationStatus)
eventsToAdd = append(eventsToAdd, requeueEvent)
} else if !(jobRequest.ForceExecuting && status == model.StartedNotificationStatus) {
} else if !(jobRequest.ForceExecuting && status == model.ProgressingNotificationStatus) {
return fmt.Errorf("%s (%s) job is in %s state by %s, can not be rescheduled", video.Id.String(), jobRequest.SourcePath, lastEvent.Status, lastEvent.WorkerName)
}
}
Expand Down Expand Up @@ -348,7 +348,7 @@ func (R *RuntimeScheduler) isValidStremeableJob(ctx context.Context, uuid string
return nil, err
}
status := video.Events.GetLatestPerNotificationType(model.JobNotification).Status
if status != model.StartedNotificationStatus {
if status != model.ProgressingNotificationStatus {
return nil, fmt.Errorf("%w: job is in status %s", ErrorStreamNotAllowed, status)
}
return video, nil
Expand Down
6 changes: 3 additions & 3 deletions server/web/ui/src/JobTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,8 @@ const JobTable: React.FC<JobTableProps> = ({ token, setShowJobTable }) => {
};

const statusFilterOptions = [
'started',
'added',
'progressing',
'queued',
'completed',
'failed',
];
Expand Down Expand Up @@ -346,7 +346,7 @@ const JobTable: React.FC<JobTableProps> = ({ token, setShowJobTable }) => {
};

const renderStatusCellContent = (job: Job) => {
if (job.status === 'started') {
if (job.status === 'progressing') {
return job.status_message ? (
(() => {
try {
Expand Down
16 changes: 8 additions & 8 deletions worker/task/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ func (P *ProgressTrackReader) SumSha() []byte {
}

func (J *EncodeWorker) UploadJob(task *model.WorkTaskEncode, track *TaskTracks) error {
J.updateTaskStatus(task, model.UploadNotification, model.StartedNotificationStatus, "")
J.updateTaskStatus(task, model.UploadNotification, model.ProgressingNotificationStatus, "")
err := retry.Do(func() error {
track.UpdateValue(0)
encodedFile, err := os.Open(task.TargetFilePath)
Expand Down Expand Up @@ -600,7 +600,7 @@ func (J *EncodeWorker) Execute(workData []byte) error {
}
os.MkdirAll(workDir, os.ModePerm)

J.updateTaskStatus(workTaskEncode, model.JobNotification, model.StartedNotificationStatus, "")
J.updateTaskStatus(workTaskEncode, model.JobNotification, model.ProgressingNotificationStatus, "")
J.AddDownloadJob(workTaskEncode)
return nil
}
Expand Down Expand Up @@ -681,7 +681,7 @@ func (J *EncodeWorker) PGSMkvExtractDetectAndConvert(taskEncode *model.WorkTaskE
}
}
if len(PGSTOSrt) > 0 {
J.updateTaskStatus(taskEncode, model.MKVExtractNotification, model.StartedNotificationStatus, "")
J.updateTaskStatus(taskEncode, model.MKVExtractNotification, model.ProgressingNotificationStatus, "")
track.Message(string(model.MKVExtractNotification))
track.SetTotal(0)
err := J.MKVExtract(PGSTOSrt, taskEncode)
Expand All @@ -692,7 +692,7 @@ func (J *EncodeWorker) PGSMkvExtractDetectAndConvert(taskEncode *model.WorkTaskE
J.updateTaskStatus(taskEncode, model.MKVExtractNotification, model.CompletedNotificationStatus, "")

log.Debug("is going to start PGS task?")
J.updateTaskStatus(taskEncode, model.PGSNotification, model.StartedNotificationStatus, "")
J.updateTaskStatus(taskEncode, model.PGSNotification, model.ProgressingNotificationStatus, "")
track.Message(string(model.PGSNotification))
log.Debugf("converting PGS to SRT: %+v", PGSTOSrt)
err = J.convertPGSToSrt(taskEncode, container, PGSTOSrt)
Expand Down Expand Up @@ -808,7 +808,7 @@ func (J *EncodeWorker) downloadQueue() {

taskTrack := J.terminal.AddTask(job.TaskEncode.Id.String(), DownloadJobStepType)

J.updateTaskStatus(job, model.DownloadNotification, model.StartedNotificationStatus, "")
J.updateTaskStatus(job, model.DownloadNotification, model.ProgressingNotificationStatus, "")
err := J.downloadFile(job, taskTrack)
if err != nil {
J.updateTaskStatus(job, model.DownloadNotification, model.FailedNotificationStatus, err.Error())
Expand Down Expand Up @@ -884,7 +884,7 @@ func (J *EncodeWorker) encodeQueue() {
}

func (J *EncodeWorker) encodeVideo(job *model.WorkTaskEncode, track *TaskTracks) error {
J.updateTaskStatus(job, model.FFProbeNotification, model.StartedNotificationStatus, "")
J.updateTaskStatus(job, model.FFProbeNotification, model.ProgressingNotificationStatus, "")
track.Message(string(model.FFProbeNotification))
sourceVideoParams, sourceVideoSize, err := J.getVideoParameters(job.SourceFilePath)
if err != nil {
Expand All @@ -901,7 +901,7 @@ func (J *EncodeWorker) encodeVideo(job *model.WorkTaskEncode, track *TaskTracks)
if err = J.PGSMkvExtractDetectAndConvert(job, track, videoContainer); err != nil {
return err
}
J.updateTaskStatus(job, model.FFMPEGSNotification, model.StartedNotificationStatus, "")
J.updateTaskStatus(job, model.FFMPEGSNotification, model.ProgressingNotificationStatus, "")
track.ResetMessage()
track.SetTotal(int64(videoContainer.Video.Duration.Seconds()) * int64(videoContainer.Video.FrameRate))
FFMPEGProgressChan := make(chan FFMPEGProgress)
Expand All @@ -924,7 +924,7 @@ func (J *EncodeWorker) encodeVideo(job *model.WorkTaskEncode, track *TaskTracks)
track.Increment(encodeFramesIncrement)

if FFMPEGProgress.percent-lastProgressEvent > 10 {
J.updateTaskStatus(job, model.FFMPEGSNotification, model.StartedNotificationStatus, fmt.Sprintf("{\"progress\":\"%.2f\"}", track.PercentDone()))
J.updateTaskStatus(job, model.FFMPEGSNotification, model.ProgressingNotificationStatus, fmt.Sprintf("{\"progress\":\"%.2f\"}", track.PercentDone()))
lastProgressEvent = FFMPEGProgress.percent
}
}
Expand Down

0 comments on commit 401241c

Please sign in to comment.