Skip to content

Commit

Permalink
Improve bull plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
robertherber committed Oct 15, 2023
1 parent a06bd51 commit 4d88550
Show file tree
Hide file tree
Showing 19 changed files with 503 additions and 88 deletions.
12 changes: 4 additions & 8 deletions apps/todo-app-with-auth-backend/queues/hello-world.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import type { QueueConfig } from 'zemble-plugin-bull/utils/setupQueues'
import { ZembleQueue } from 'zemble-plugin-bull'

const config: QueueConfig = {
worker: (job) => {
console.log(job.data)
},
}

export default config
export default new ZembleQueue((job) => {
console.log(job.data)
})
79 changes: 79 additions & 0 deletions packages/bull/ZembleQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { Queue, Worker } from 'bullmq'

import type {
Job, JobsOptions, QueueOptions, RedisOptions, RepeatOptions,
} from 'bullmq'

export interface BullPluginConfig extends Zemble.GlobalConfig {
/**
* The url of the redis instance to use for pubsub
*/
readonly redisUrl?: string
/**
* Redis config to use for pubsub
*/
readonly redisOptions?: RedisOptions
}

export type ZembleQueueConfig = {
readonly repeat?: RepeatOptions
readonly defaultJobOptions?: JobsOptions
}

export class ZembleQueue<DataType = unknown, ReturnType = unknown> {
readonly #worker: (job: Job<DataType, ReturnType>) => Promise<void> | void

readonly #config?: ZembleQueueConfig

constructor(
readonly worker: (job: Job<DataType, ReturnType>) => Promise<void> | void,
config?: ZembleQueueConfig,
) {
this.#worker = worker
this.#config = config
}

// eslint-disable-next-line functional/prefer-readonly-type
#queueInternal: Queue<DataType, ReturnType> | undefined

get #queue() {
if (!this.#queueInternal) throw new Error('Queue not initialized, something is wrong!')
return this.#queueInternal
}

async _initQueue(queueName: string, connection: QueueOptions['connection']) {
const queue = new Queue(queueName, {
connection,
defaultJobOptions: this.#config?.defaultJobOptions,
})

// eslint-disable-next-line functional/immutable-data
this.#queueInternal = queue

const repeatJobKey = `zemble-plugin-bull-repeat-${queue.name}`
await queue.removeRepeatableByKey(repeatJobKey)
if (this.#config?.repeat) {
await queue.add(repeatJobKey, {} as DataType, {
repeatJobKey,
repeat: this.#config?.repeat,
})
}

// @ts-expect-error if this cannot be a promise I'm not sure how stuff will work
const worker = new Worker(queueName, this.#worker, {
connection,
})

return { queue, worker }
}

async add(...args: Parameters<Queue<DataType, ReturnType>['add']>) {
return this.#queue.add(...args)
}

async addBulk(...args: Parameters<Queue<DataType, ReturnType>['addBulk']>) {
return this.#queue.addBulk(...args)
}
}

export default ZembleQueue
6 changes: 4 additions & 2 deletions packages/bull/graphql/Mutation/addJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ import { Queue } from 'bullmq'

import type { MutationResolvers } from '../schema.generated'

const addJob: MutationResolvers['addJob'] = async (_, { queue }) => {
const addJob: MutationResolvers['addJob'] = async (_, { queue, data, jobId }) => {
const q = new Queue(queue, {

})

const job = await q.add(queue, {}, {})
const job = await q.add(queue, data, {
jobId: jobId ?? undefined,
})

return job
}
Expand Down
22 changes: 22 additions & 0 deletions packages/bull/graphql/Mutation/addRepeatableJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Queue } from 'bullmq'

import type { MutationResolvers } from '../schema.generated'

const addJob: MutationResolvers['addRepeatableJob'] = async (_, {
queue, data, repeatJobKey, pattern,
}) => {
const q = new Queue(queue, {

})

const job = await q.add(queue, data, {
repeatJobKey: repeatJobKey ?? undefined,
repeat: {
pattern,
},
})

return job
}

export default addJob
17 changes: 17 additions & 0 deletions packages/bull/graphql/Mutation/cleanQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Queue } from 'bullmq'

import type { MutationResolvers } from '../schema.generated'

const cleanQueue: MutationResolvers['cleanQueue'] = async (_, {
queue, limit, grace, type,
}) => {
const q = new Queue(queue, {

})

const res = await q.clean(grace, limit, type ?? undefined)

return res
}

export default cleanQueue
15 changes: 15 additions & 0 deletions packages/bull/graphql/Mutation/drainQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { Queue } from 'bullmq'

import type { MutationResolvers } from '../schema.generated'

const drainQueue: MutationResolvers['drainQueue'] = async (_, { queue }) => {
const q = new Queue(queue, {

})

await q.drain()

return true
}

export default drainQueue
15 changes: 15 additions & 0 deletions packages/bull/graphql/Mutation/removeJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { Queue } from 'bullmq'

import type { MutationResolvers } from '../schema.generated'

const removeJob: MutationResolvers['removeJob'] = async (_, { queue, jobId }) => {
const q = new Queue(queue, {

})

const success = await q.remove(jobId)

return success === 1
}

export default removeJob
17 changes: 17 additions & 0 deletions packages/bull/graphql/Mutation/removeRepeatableJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Queue } from 'bullmq'

import type { MutationResolvers } from '../schema.generated'

const removeRepeatableJob: MutationResolvers['removeRepeatableJob'] = async (_, {
queue, repeatJobKey,
}) => {
const q = new Queue(queue, {

})

const job = await q.removeRepeatableByKey(repeatJobKey)

return job
}

export default removeRepeatableJob
10 changes: 2 additions & 8 deletions packages/bull/graphql/Subscription/jobUpdated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,8 @@ import type { Job } from 'bullmq'

const jobUpdated: SubscriptionResolvers['jobUpdated'] = {
// subscribe to the jobUpdated event
subscribe: (_, __, { pubsub }) => {
console.log('subscribing to jobUpdated')
return pubsub.subscribe('jobUpdated')
},
resolve: (payload: Job) => {
console.log('resolving jobUpdated', payload)
return payload
},
subscribe: (_, __, { pubsub }) => pubsub.subscribe('jobUpdated'),
resolve: (payload: Job) => payload,
}

export default jobUpdated
File renamed without changes.
42 changes: 42 additions & 0 deletions packages/bull/graphql/Type/BullQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import type { Resolvers } from '../schema.generated'
import type { Queue } from 'bullmq'

const BullQueueResolvers: Resolvers['BullQueue'] = {
name: ({ name }) => name,
count: async (queue) => queue.count(),
completedCount: async (queue) => queue.getCompletedCount(),
activeCount: async (queue) => queue.getActiveCount(),
waitingCount: async (queue) => queue.getWaitingCount(),
waitingChildrenCount: async (queue) => queue.getWaitingChildrenCount(),
failedCount: async (queue) => queue.getFailedCount(),
isPaused: async (queue) => queue.isPaused(),
repeatableJobs: async (queue, {
start,
end,
asc,
}) => {
const repeatableJobs = await queue.getRepeatableJobs(
start ?? undefined,
end ?? undefined,
asc ?? undefined,
)

return repeatableJobs
},
jobs: async (queue: Queue, {
start, end, asc, type,
}) => {
const jobs = await queue.getJobs(
// @ts-expect-error readonly stuff
Array.isArray(type) ? [...type] : type,
start,
end,
asc,
)

// filter out repeatable jobs (they're undefined, weird but yeah)
return jobs.filter((j) => !!j) ?? []
},
}

export default BullQueueResolvers
16 changes: 0 additions & 16 deletions packages/bull/graphql/Type/Queue.ts

This file was deleted.

Loading

0 comments on commit 4d88550

Please sign in to comment.