Skip to content

Commit

Permalink
chore: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
glennmichael123 committed Feb 5, 2025
1 parent 45dd616 commit 450ecef
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 71 deletions.
69 changes: 32 additions & 37 deletions storage/framework/core/queue/src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,52 +22,47 @@ interface Dispatchable {
}

export async function runJob(name: string, options: QueueOption = {}): Promise<void> {
try {
const jobModule = await import(appPath(`Jobs/${name}.ts`))
const job = jobModule.default as JobConfig
const jobModule = await import(appPath(`Jobs/${name}.ts`))
const job = jobModule.default as JobConfig

if (options.payload) {
// Attach payload to the job instance if it exists
Object.assign(job, { payload: options.payload })
}
if (options.payload) {
// Attach payload to the job instance if it exists
Object.assign(job, { payload: options.payload })
}

if (options.context) {
// Attach context to the job instance if it exists
Object.assign(job, { context: options.context })
}
if (options.context) {
// Attach context to the job instance if it exists
Object.assign(job, { context: options.context })
}

if (job.action) {
// If action is a string, run it via runAction
if (typeof job.action === 'string') {
await runAction(job.action)
}
// If action is a function, execute it directly
else if (typeof job.action === 'function') {
await job.action()
}
if (job.action) {
// If action is a string, run it via runAction
if (typeof job.action === 'string') {
await runAction(job.action)
}
// If handle is defined, execute it
else if (job.handle) {
await job.handle(options.payload)
// If action is a function, execute it directly
else if (typeof job.action === 'function') {
await job.action()
}
// If no handle or action, try to execute the module directly
else if (typeof jobModule.default === 'function') {
await jobModule.default(options.payload, options.context)
}
// If handle is defined, execute it
else if (job.handle) {
await job.handle(options.payload)
}
// If no handle or action, try to execute the module directly
else if (typeof jobModule.default === 'function') {
await jobModule.default(options.payload, options.context)
}
else {
// Try to execute the file itself if it exports a function
const possibleFunction = Object.values(jobModule).find(exp => typeof exp === 'function')
if (possibleFunction) {
await possibleFunction(options.payload, options.context)
}
else {
// Try to execute the file itself if it exports a function
const possibleFunction = Object.values(jobModule).find(exp => typeof exp === 'function')
if (possibleFunction) {
await possibleFunction(options.payload, options.context)
}
else {
throw new Error(`Job ${name} must export a function, or define either a handle function or an action`)
}
throw new Error(`Job ${name} must export a function, or define either a handle function or an action`)
}
}
catch (error) {
throw error
}
}

export class Queue implements Dispatchable {
Expand Down
56 changes: 22 additions & 34 deletions storage/framework/core/queue/src/process.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import type { JitterConfig, JobOptions } from '@stacksjs/types'
import type { JobModel } from '../../../orm/src/models/Job'
import { ok, type Ok } from '@stacksjs/error-handling'
import { log } from '@stacksjs/logging'
import { FailedJob } from '../../../orm/src/models/FailedJob'
import { Job, type JobModel } from '../../../orm/src/models/Job'
import { runJob } from './job'

interface QueuePayload {
Expand Down Expand Up @@ -34,30 +35,24 @@ export async function executeFailedJobs(): Promise<void> {
continue

const body: QueuePayload = JSON.parse(job.payload)
const jobPayload = JSON.parse(job.payload) as QueuePayload

const classPayload = JSON.parse(jobPayload.classPayload) as JobOptions
const classPayload = JSON.parse(body.classPayload) as JobOptions

const maxTries = Number(classPayload.tries || 3)

log.info(`Retrying job: ${body.path}`)

try {
await runJob(body.name, {
queue: job.queue,
payload: body.params,
context: '',
maxTries,
timeout: 60,
})
await runJob(body.name, {
queue: job.queue,
payload: body.params,
context: '',
maxTries,
timeout: 60,
})

await job.delete()
await job.delete()

log.info(`Successfully ran job: ${body.path}`)
}
catch (error) {
console.log(error)
}
log.info(`Successfully ran job: ${body.path}`)
}
}

Expand All @@ -74,22 +69,17 @@ export async function retryFailedJob(id: number): Promise<void> {

log.info(`Retrying job: ${body.path}`)

try {
await runJob(body.name, {
queue: failedJob.queue,
payload: body.params,
context: '',
maxTries,
timeout: 60,
})
await runJob(body.name, {
queue: failedJob.queue,
payload: body.params,
context: '',
maxTries,
timeout: 60,
})

await failedJob.delete()
await failedJob.delete()

log.info(`Successfully ran job: ${body.path}`)
}
catch (error) {
console.log(error)
}
log.info(`Successfully ran job: ${body.path}`)
}
}

Expand All @@ -106,9 +96,8 @@ async function executeJobs(queue: string | undefined): Promise<void> {
continue

const body: QueuePayload = JSON.parse(job.payload)
const jobPayload = JSON.parse(job.payload) as QueuePayload

const classPayload = JSON.parse(jobPayload.classPayload) as JobOptions
const classPayload = JSON.parse(body.classPayload) as JobOptions

const maxTries = Number(classPayload.tries || 3)

Expand All @@ -132,7 +121,6 @@ async function executeJobs(queue: string | undefined): Promise<void> {
log.info(`Successfully ran job: ${body.path}`)
}
catch (error) {
console.log(error)
// Increment the attempt count
currentAttempts++

Expand Down
4 changes: 4 additions & 0 deletions storage/framework/core/types/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ export interface QueueOptions {

export interface QueueOption extends JobOptions {
delay?: number
payload?: any
afterResponse?: any
context?: string
maxTries?: number
}

export type QueueConfig = DeepPartial<QueueOptions>

0 comments on commit 450ecef

Please sign in to comment.