diff --git a/storage/framework/core/queue/src/job.ts b/storage/framework/core/queue/src/job.ts index 619fe9e28..307ef91f6 100644 --- a/storage/framework/core/queue/src/job.ts +++ b/storage/framework/core/queue/src/job.ts @@ -22,52 +22,47 @@ interface Dispatchable { } export async function runJob(name: string, options: QueueOption = {}): Promise { - try { - const jobModule = await import(appPath(`Jobs/${name}.ts`)) - const job = jobModule.default as JobConfig + const jobModule = await import(appPath(`Jobs/${name}.ts`)) + const job = jobModule.default as JobConfig - if (options.payload) { - // Attach payload to the job instance if it exists - Object.assign(job, { payload: options.payload }) - } + if (options.payload) { + // Attach payload to the job instance if it exists + Object.assign(job, { payload: options.payload }) + } - if (options.context) { - // Attach context to the job instance if it exists - Object.assign(job, { context: options.context }) - } + if (options.context) { + // Attach context to the job instance if it exists + Object.assign(job, { context: options.context }) + } - if (job.action) { - // If action is a string, run it via runAction - if (typeof job.action === 'string') { - await runAction(job.action) - } - // If action is a function, execute it directly - else if (typeof job.action === 'function') { - await job.action() - } + if (job.action) { + // If action is a string, run it via runAction + if (typeof job.action === 'string') { + await runAction(job.action) } - // If handle is defined, execute it - else if (job.handle) { - await job.handle(options.payload) + // If action is a function, execute it directly + else if (typeof job.action === 'function') { + await job.action() } - // If no handle or action, try to execute the module directly - else if (typeof jobModule.default === 'function') { - await jobModule.default(options.payload, options.context) + } + // If handle is defined, execute it + else if (job.handle) { + await job.handle(options.payload) + } + // If no handle or action, try to execute the module directly + else if (typeof jobModule.default === 'function') { + await jobModule.default(options.payload, options.context) + } + else { + // Try to execute the file itself if it exports a function + const possibleFunction = Object.values(jobModule).find(exp => typeof exp === 'function') + if (possibleFunction) { + await possibleFunction(options.payload, options.context) } else { - // Try to execute the file itself if it exports a function - const possibleFunction = Object.values(jobModule).find(exp => typeof exp === 'function') - if (possibleFunction) { - await possibleFunction(options.payload, options.context) - } - else { - throw new Error(`Job ${name} must export a function, or define either a handle function or an action`) - } + throw new Error(`Job ${name} must export a function, or define either a handle function or an action`) } } - catch (error) { - throw error - } } export class Queue implements Dispatchable { diff --git a/storage/framework/core/queue/src/process.ts b/storage/framework/core/queue/src/process.ts index 1e471500c..8a1bab320 100644 --- a/storage/framework/core/queue/src/process.ts +++ b/storage/framework/core/queue/src/process.ts @@ -1,7 +1,8 @@ import type { JitterConfig, JobOptions } from '@stacksjs/types' -import type { JobModel } from '../../../orm/src/models/Job' import { ok, type Ok } from '@stacksjs/error-handling' import { log } from '@stacksjs/logging' +import { FailedJob } from '../../../orm/src/models/FailedJob' +import { Job, type JobModel } from '../../../orm/src/models/Job' import { runJob } from './job' interface QueuePayload { @@ -34,30 +35,24 @@ export async function executeFailedJobs(): Promise { continue const body: QueuePayload = JSON.parse(job.payload) - const jobPayload = JSON.parse(job.payload) as QueuePayload - const classPayload = JSON.parse(jobPayload.classPayload) as JobOptions + const classPayload = JSON.parse(body.classPayload) as JobOptions const maxTries = Number(classPayload.tries || 3) log.info(`Retrying job: ${body.path}`) - try { - await runJob(body.name, { - queue: job.queue, - payload: body.params, - context: '', - maxTries, - timeout: 60, - }) + await runJob(body.name, { + queue: job.queue, + payload: body.params, + context: '', + maxTries, + timeout: 60, + }) - await job.delete() + await job.delete() - log.info(`Successfully ran job: ${body.path}`) - } - catch (error) { - console.log(error) - } + log.info(`Successfully ran job: ${body.path}`) } } @@ -74,22 +69,17 @@ export async function retryFailedJob(id: number): Promise { log.info(`Retrying job: ${body.path}`) - try { - await runJob(body.name, { - queue: failedJob.queue, - payload: body.params, - context: '', - maxTries, - timeout: 60, - }) + await runJob(body.name, { + queue: failedJob.queue, + payload: body.params, + context: '', + maxTries, + timeout: 60, + }) - await failedJob.delete() + await failedJob.delete() - log.info(`Successfully ran job: ${body.path}`) - } - catch (error) { - console.log(error) - } + log.info(`Successfully ran job: ${body.path}`) } } @@ -106,9 +96,8 @@ async function executeJobs(queue: string | undefined): Promise { continue const body: QueuePayload = JSON.parse(job.payload) - const jobPayload = JSON.parse(job.payload) as QueuePayload - const classPayload = JSON.parse(jobPayload.classPayload) as JobOptions + const classPayload = JSON.parse(body.classPayload) as JobOptions const maxTries = Number(classPayload.tries || 3) @@ -132,7 +121,6 @@ async function executeJobs(queue: string | undefined): Promise { log.info(`Successfully ran job: ${body.path}`) } catch (error) { - console.log(error) // Increment the attempt count currentAttempts++ diff --git a/storage/framework/core/types/src/queue.ts b/storage/framework/core/types/src/queue.ts index a0cf10e1f..76a4bce7e 100644 --- a/storage/framework/core/types/src/queue.ts +++ b/storage/framework/core/types/src/queue.ts @@ -38,6 +38,10 @@ export interface QueueOptions { export interface QueueOption extends JobOptions { delay?: number + payload?: any + afterResponse?: any + context?: string + maxTries?: number } export type QueueConfig = DeepPartial