diff --git a/apps/todo-app-with-auth-backend/queues/hello-world.ts b/apps/todo-app-with-auth-backend/queues/hello-world.ts index 10843060..485c009f 100644 --- a/apps/todo-app-with-auth-backend/queues/hello-world.ts +++ b/apps/todo-app-with-auth-backend/queues/hello-world.ts @@ -1,9 +1,5 @@ -import type { QueueConfig } from 'zemble-plugin-bull/utils/setupQueues' +import { ZembleQueue } from 'zemble-plugin-bull' -const config: QueueConfig = { - worker: (job) => { - console.log(job.data) - }, -} - -export default config +export default new ZembleQueue((job) => { + console.log(job.data) +}) diff --git a/packages/bull/ZembleQueue.ts b/packages/bull/ZembleQueue.ts new file mode 100644 index 00000000..6009f943 --- /dev/null +++ b/packages/bull/ZembleQueue.ts @@ -0,0 +1,79 @@ +import { Queue, Worker } from 'bullmq' + +import type { + Job, JobsOptions, QueueOptions, RedisOptions, RepeatOptions, +} from 'bullmq' + +export interface BullPluginConfig extends Zemble.GlobalConfig { + /** + * The url of the redis instance to use for pubsub + */ + readonly redisUrl?: string + /** + * Redis config to use for pubsub + */ + readonly redisOptions?: RedisOptions +} + +export type ZembleQueueConfig = { + readonly repeat?: RepeatOptions + readonly defaultJobOptions?: JobsOptions +} + +export class ZembleQueue { + readonly #worker: (job: Job) => Promise | void + + readonly #config?: ZembleQueueConfig + + constructor( + readonly worker: (job: Job) => Promise | void, + config?: ZembleQueueConfig, + ) { + this.#worker = worker + this.#config = config + } + + // eslint-disable-next-line functional/prefer-readonly-type + #queueInternal: Queue | undefined + + get #queue() { + if (!this.#queueInternal) throw new Error('Queue not initialized, something is wrong!') + return this.#queueInternal + } + + async _initQueue(queueName: string, connection: QueueOptions['connection']) { + const queue = new Queue(queueName, { + connection, + defaultJobOptions: this.#config?.defaultJobOptions, + }) + + // eslint-disable-next-line functional/immutable-data + this.#queueInternal = queue + + const repeatJobKey = `zemble-plugin-bull-repeat-${queue.name}` + await queue.removeRepeatableByKey(repeatJobKey) + if (this.#config?.repeat) { + await queue.add(repeatJobKey, {} as DataType, { + repeatJobKey, + repeat: this.#config?.repeat, + }) + } + + // @ts-expect-error if this cannot be a promise I'm not sure how stuff will work + const worker = new Worker(queueName, this.#worker, { + connection, + }) + + return { queue, worker } + } + + async add(...args: Parameters['add']>) { + return this.#queue.add(...args) + } + + async addBulk(...args: Parameters['addBulk']>) { + return this.#queue.addBulk(...args) + } +} + +export default ZembleQueue diff --git a/packages/bull/graphql/Mutation/addJob.ts b/packages/bull/graphql/Mutation/addJob.ts index 60907055..ea3a8de8 100644 --- a/packages/bull/graphql/Mutation/addJob.ts +++ b/packages/bull/graphql/Mutation/addJob.ts @@ -2,12 +2,14 @@ import { Queue } from 'bullmq' import type { MutationResolvers } from '../schema.generated' -const addJob: MutationResolvers['addJob'] = async (_, { queue }) => { +const addJob: MutationResolvers['addJob'] = async (_, { queue, data, jobId }) => { const q = new Queue(queue, { }) - const job = await q.add(queue, {}, {}) + const job = await q.add(queue, data, { + jobId: jobId ?? undefined, + }) return job } diff --git a/packages/bull/graphql/Mutation/addRepeatableJob.ts b/packages/bull/graphql/Mutation/addRepeatableJob.ts new file mode 100644 index 00000000..9cbf43c7 --- /dev/null +++ b/packages/bull/graphql/Mutation/addRepeatableJob.ts @@ -0,0 +1,22 @@ +import { Queue } from 'bullmq' + +import type { MutationResolvers } from '../schema.generated' + +const addJob: MutationResolvers['addRepeatableJob'] = async (_, { + queue, data, repeatJobKey, pattern, +}) => { + const q = new Queue(queue, { + + }) + + const job = await q.add(queue, data, { + repeatJobKey: repeatJobKey ?? undefined, + repeat: { + pattern, + }, + }) + + return job +} + +export default addJob diff --git a/packages/bull/graphql/Mutation/cleanQueue.ts b/packages/bull/graphql/Mutation/cleanQueue.ts new file mode 100644 index 00000000..e7e7d441 --- /dev/null +++ b/packages/bull/graphql/Mutation/cleanQueue.ts @@ -0,0 +1,17 @@ +import { Queue } from 'bullmq' + +import type { MutationResolvers } from '../schema.generated' + +const cleanQueue: MutationResolvers['cleanQueue'] = async (_, { + queue, limit, grace, type, +}) => { + const q = new Queue(queue, { + + }) + + const res = await q.clean(grace, limit, type ?? undefined) + + return res +} + +export default cleanQueue diff --git a/packages/bull/graphql/Mutation/drainQueue.ts b/packages/bull/graphql/Mutation/drainQueue.ts new file mode 100644 index 00000000..3671155b --- /dev/null +++ b/packages/bull/graphql/Mutation/drainQueue.ts @@ -0,0 +1,15 @@ +import { Queue } from 'bullmq' + +import type { MutationResolvers } from '../schema.generated' + +const drainQueue: MutationResolvers['drainQueue'] = async (_, { queue }) => { + const q = new Queue(queue, { + + }) + + await q.drain() + + return true +} + +export default drainQueue diff --git a/packages/bull/graphql/Mutation/removeJob.ts b/packages/bull/graphql/Mutation/removeJob.ts new file mode 100644 index 00000000..cc6a9c24 --- /dev/null +++ b/packages/bull/graphql/Mutation/removeJob.ts @@ -0,0 +1,15 @@ +import { Queue } from 'bullmq' + +import type { MutationResolvers } from '../schema.generated' + +const removeJob: MutationResolvers['removeJob'] = async (_, { queue, jobId }) => { + const q = new Queue(queue, { + + }) + + const success = await q.remove(jobId) + + return success === 1 +} + +export default removeJob diff --git a/packages/bull/graphql/Mutation/removeRepeatableJob.ts b/packages/bull/graphql/Mutation/removeRepeatableJob.ts new file mode 100644 index 00000000..b9d87880 --- /dev/null +++ b/packages/bull/graphql/Mutation/removeRepeatableJob.ts @@ -0,0 +1,17 @@ +import { Queue } from 'bullmq' + +import type { MutationResolvers } from '../schema.generated' + +const removeRepeatableJob: MutationResolvers['removeRepeatableJob'] = async (_, { + queue, repeatJobKey, +}) => { + const q = new Queue(queue, { + + }) + + const job = await q.removeRepeatableByKey(repeatJobKey) + + return job +} + +export default removeRepeatableJob diff --git a/packages/bull/graphql/Subscription/jobUpdated.ts b/packages/bull/graphql/Subscription/jobUpdated.ts index 18b43fc9..f03fadc7 100644 --- a/packages/bull/graphql/Subscription/jobUpdated.ts +++ b/packages/bull/graphql/Subscription/jobUpdated.ts @@ -3,14 +3,8 @@ import type { Job } from 'bullmq' const jobUpdated: SubscriptionResolvers['jobUpdated'] = { // subscribe to the jobUpdated event - subscribe: (_, __, { pubsub }) => { - console.log('subscribing to jobUpdated') - return pubsub.subscribe('jobUpdated') - }, - resolve: (payload: Job) => { - console.log('resolving jobUpdated', payload) - return payload - }, + subscribe: (_, __, { pubsub }) => pubsub.subscribe('jobUpdated'), + resolve: (payload: Job) => payload, } export default jobUpdated diff --git a/packages/bull/graphql/Type/Job.ts b/packages/bull/graphql/Type/BullJob.ts similarity index 100% rename from packages/bull/graphql/Type/Job.ts rename to packages/bull/graphql/Type/BullJob.ts diff --git a/packages/bull/graphql/Type/BullQueue.ts b/packages/bull/graphql/Type/BullQueue.ts new file mode 100644 index 00000000..437315e0 --- /dev/null +++ b/packages/bull/graphql/Type/BullQueue.ts @@ -0,0 +1,42 @@ +import type { Resolvers } from '../schema.generated' +import type { Queue } from 'bullmq' + +const BullQueueResolvers: Resolvers['BullQueue'] = { + name: ({ name }) => name, + count: async (queue) => queue.count(), + completedCount: async (queue) => queue.getCompletedCount(), + activeCount: async (queue) => queue.getActiveCount(), + waitingCount: async (queue) => queue.getWaitingCount(), + waitingChildrenCount: async (queue) => queue.getWaitingChildrenCount(), + failedCount: async (queue) => queue.getFailedCount(), + isPaused: async (queue) => queue.isPaused(), + repeatableJobs: async (queue, { + start, + end, + asc, + }) => { + const repeatableJobs = await queue.getRepeatableJobs( + start ?? undefined, + end ?? undefined, + asc ?? undefined, + ) + + return repeatableJobs + }, + jobs: async (queue: Queue, { + start, end, asc, type, + }) => { + const jobs = await queue.getJobs( + // @ts-expect-error readonly stuff + Array.isArray(type) ? [...type] : type, + start, + end, + asc, + ) + + // filter out repeatable jobs (they're undefined, weird but yeah) + return jobs.filter((j) => !!j) ?? [] + }, +} + +export default BullQueueResolvers diff --git a/packages/bull/graphql/Type/Queue.ts b/packages/bull/graphql/Type/Queue.ts deleted file mode 100644 index 8b994c62..00000000 --- a/packages/bull/graphql/Type/Queue.ts +++ /dev/null @@ -1,16 +0,0 @@ -import type { JobType, Queue } from 'bullmq' - -export default { - name: ({ name }: Queue) => name, - count: async (queue: Queue) => queue.count(), - completedCount: async (queue: Queue) => queue.getCompletedCount(), - activeCount: async (queue: Queue) => queue.getActiveCount(), - waitingCount: async (queue: Queue) => queue.getWaitingCount(), - waitingChildrenCount: async (queue: Queue) => queue.getWaitingChildrenCount(), - failedCount: async (queue: Queue) => queue.getFailedCount(), - delayedCount: async (queue: Queue) => queue.getDelayedCount(), - isPaused: async (queue: Queue) => queue.isPaused(), - jobs: async (queue: Queue, { - start, end, asc, types, - }: { readonly start?: number, readonly end?: number, readonly asc?: boolean, readonly types?: JobType }) => queue.getJobs(types, start, end, asc), -} diff --git a/packages/bull/graphql/client.generated/graphql.ts b/packages/bull/graphql/client.generated/graphql.ts index eba242f0..54ca6223 100644 --- a/packages/bull/graphql/client.generated/graphql.ts +++ b/packages/bull/graphql/client.generated/graphql.ts @@ -20,11 +20,12 @@ export type Scalars = { export type BullJob = { __typename?: 'BullJob'; - data: Scalars['JSON']['output']; + data?: Maybe; delay?: Maybe; - id?: Maybe; + id: Scalars['ID']['output']; name: Scalars['String']['output']; progress?: Maybe; + repeatJobKey?: Maybe; state: JobState; timestamp: Scalars['Int']['output']; }; @@ -39,6 +40,7 @@ export type BullQueue = { isPaused: Scalars['Boolean']['output']; jobs: Array; name: Scalars['String']['output']; + repeatableJobs: Array; waitingChildrenCount: Scalars['Int']['output']; waitingCount: Scalars['Int']['output']; }; @@ -48,9 +50,26 @@ export type BullQueueJobsArgs = { asc?: InputMaybe; end?: InputMaybe; start?: InputMaybe; - type?: InputMaybe>; + type?: InputMaybe>; }; + +export type BullQueueRepeatableJobsArgs = { + asc?: InputMaybe; + end?: InputMaybe; + start?: InputMaybe; +}; + +export enum CleanQueueType { + Active = 'active', + Completed = 'completed', + Delayed = 'delayed', + Failed = 'failed', + Paused = 'paused', + Prioritized = 'prioritized', + Wait = 'wait' +} + export enum JobState { Active = 'active', Completed = 'completed', @@ -61,14 +80,13 @@ export enum JobState { WaitingChildren = 'waiting_children' } -export enum JobType { +export enum JobTypeOrState { Active = 'active', Completed = 'completed', Delayed = 'delayed', Failed = 'failed', Paused = 'paused', Prioritized = 'prioritized', - Repeat = 'repeat', Wait = 'wait', Waiting = 'waiting', WaitingChildren = 'waiting_children' @@ -77,18 +95,70 @@ export enum JobType { export type Mutation = { __typename?: 'Mutation'; addJob: BullJob; + addRepeatableJob: BullJob; + cleanQueue: Array; + drainQueue: Scalars['Boolean']['output']; + removeJob: Scalars['Boolean']['output']; + removeRepeatableJob: Scalars['Boolean']['output']; }; export type MutationAddJobArgs = { + data?: InputMaybe; + jobId?: InputMaybe; queue: Scalars['String']['input']; }; + +export type MutationAddRepeatableJobArgs = { + data?: InputMaybe; + pattern: Scalars['String']['input']; + queue: Scalars['String']['input']; + repeatJobKey?: InputMaybe; +}; + + +export type MutationCleanQueueArgs = { + grace: Scalars['Int']['input']; + limit: Scalars['Int']['input']; + queue: Scalars['String']['input']; + type?: InputMaybe; +}; + + +export type MutationDrainQueueArgs = { + delayed?: InputMaybe; + queue: Scalars['String']['input']; +}; + + +export type MutationRemoveJobArgs = { + jobId: Scalars['ID']['input']; + queue: Scalars['String']['input']; +}; + + +export type MutationRemoveRepeatableJobArgs = { + queue: Scalars['String']['input']; + repeatJobKey: Scalars['ID']['input']; +}; + export type Query = { __typename?: 'Query'; queues: Array; }; +export type RepeatableJob = { + __typename?: 'RepeatableJob'; + endDate?: Maybe; + id: Scalars['String']['output']; + key: Scalars['ID']['output']; + name: Scalars['String']['output']; + next: Scalars['DateTime']['output']; + pattern: Scalars['String']['output']; + tz: Scalars['String']['output']; +}; + export type Subscription = { __typename?: 'Subscription'; jobUpdated: BullJob; diff --git a/packages/bull/graphql/schema.generated.ts b/packages/bull/graphql/schema.generated.ts index 67d2c451..f73dd72a 100644 --- a/packages/bull/graphql/schema.generated.ts +++ b/packages/bull/graphql/schema.generated.ts @@ -22,11 +22,12 @@ export type Scalars = { export type BullJob = { readonly __typename?: 'BullJob'; - readonly data: Scalars['JSON']['output']; + readonly data?: Maybe; readonly delay?: Maybe; - readonly id?: Maybe; + readonly id: Scalars['ID']['output']; readonly name: Scalars['String']['output']; readonly progress?: Maybe; + readonly repeatJobKey?: Maybe; readonly state: JobState; readonly timestamp: Scalars['Int']['output']; }; @@ -41,6 +42,7 @@ export type BullQueue = { readonly isPaused: Scalars['Boolean']['output']; readonly jobs: ReadonlyArray; readonly name: Scalars['String']['output']; + readonly repeatableJobs: ReadonlyArray; readonly waitingChildrenCount: Scalars['Int']['output']; readonly waitingCount: Scalars['Int']['output']; }; @@ -50,9 +52,26 @@ export type BullQueueJobsArgs = { asc?: InputMaybe; end?: InputMaybe; start?: InputMaybe; - type?: InputMaybe>; + type?: InputMaybe>; }; + +export type BullQueueRepeatableJobsArgs = { + asc?: InputMaybe; + end?: InputMaybe; + start?: InputMaybe; +}; + +export enum CleanQueueType { + Active = 'active', + Completed = 'completed', + Delayed = 'delayed', + Failed = 'failed', + Paused = 'paused', + Prioritized = 'prioritized', + Wait = 'wait' +} + export enum JobState { Active = 'active', Completed = 'completed', @@ -63,14 +82,13 @@ export enum JobState { WaitingChildren = 'waiting_children' } -export enum JobType { +export enum JobTypeOrState { Active = 'active', Completed = 'completed', Delayed = 'delayed', Failed = 'failed', Paused = 'paused', Prioritized = 'prioritized', - Repeat = 'repeat', Wait = 'wait', Waiting = 'waiting', WaitingChildren = 'waiting_children' @@ -79,11 +97,52 @@ export enum JobType { export type Mutation = { readonly __typename?: 'Mutation'; readonly addJob: BullJob; + readonly addRepeatableJob: BullJob; + readonly cleanQueue: ReadonlyArray; + readonly drainQueue: Scalars['Boolean']['output']; + readonly removeJob: Scalars['Boolean']['output']; + readonly removeRepeatableJob: Scalars['Boolean']['output']; }; export type MutationAddJobArgs = { + data?: InputMaybe; + jobId?: InputMaybe; + queue: Scalars['String']['input']; +}; + + +export type MutationAddRepeatableJobArgs = { + data?: InputMaybe; + pattern: Scalars['String']['input']; queue: Scalars['String']['input']; + repeatJobKey?: InputMaybe; +}; + + +export type MutationCleanQueueArgs = { + grace: Scalars['Int']['input']; + limit: Scalars['Int']['input']; + queue: Scalars['String']['input']; + type?: InputMaybe; +}; + + +export type MutationDrainQueueArgs = { + delayed?: InputMaybe; + queue: Scalars['String']['input']; +}; + + +export type MutationRemoveJobArgs = { + jobId: Scalars['ID']['input']; + queue: Scalars['String']['input']; +}; + + +export type MutationRemoveRepeatableJobArgs = { + queue: Scalars['String']['input']; + repeatJobKey: Scalars['ID']['input']; }; export type Query = { @@ -91,6 +150,17 @@ export type Query = { readonly queues: ReadonlyArray; }; +export type RepeatableJob = { + readonly __typename?: 'RepeatableJob'; + readonly endDate?: Maybe; + readonly id: Scalars['String']['output']; + readonly key: Scalars['ID']['output']; + readonly name: Scalars['String']['output']; + readonly next: Scalars['DateTime']['output']; + readonly pattern: Scalars['String']['output']; + readonly tz: Scalars['String']['output']; +}; + export type Subscription = { readonly __typename?: 'Subscription'; readonly jobUpdated: BullJob; @@ -171,15 +241,17 @@ export type ResolversTypes = ResolversObject<{ Boolean: ResolverTypeWrapper; BullJob: ResolverTypeWrapper; BullQueue: ResolverTypeWrapper; + CleanQueueType: CleanQueueType; DateTime: ResolverTypeWrapper; Float: ResolverTypeWrapper; ID: ResolverTypeWrapper; Int: ResolverTypeWrapper; JSON: ResolverTypeWrapper; JobState: JobState; - JobType: JobType; + JobTypeOrState: JobTypeOrState; Mutation: ResolverTypeWrapper<{}>; Query: ResolverTypeWrapper<{}>; + RepeatableJob: ResolverTypeWrapper; String: ResolverTypeWrapper; Subscription: ResolverTypeWrapper<{}>; }>; @@ -196,16 +268,18 @@ export type ResolversParentTypes = ResolversObject<{ JSON: Scalars['JSON']['output']; Mutation: {}; Query: {}; + RepeatableJob: RepeatableJob; String: Scalars['String']['output']; Subscription: {}; }>; export type BullJobResolvers = ResolversObject<{ - data?: Resolver; + data?: Resolver, ParentType, ContextType>; delay?: Resolver, ParentType, ContextType>; - id?: Resolver, ParentType, ContextType>; + id?: Resolver; name?: Resolver; progress?: Resolver, ParentType, ContextType>; + repeatJobKey?: Resolver, ParentType, ContextType>; state?: Resolver; timestamp?: Resolver; __isTypeOf?: IsTypeOfResolverFn; @@ -220,6 +294,7 @@ export type BullQueueResolvers; jobs?: Resolver, ParentType, ContextType, Partial>; name?: Resolver; + repeatableJobs?: Resolver, ParentType, ContextType, Partial>; waitingChildrenCount?: Resolver; waitingCount?: Resolver; __isTypeOf?: IsTypeOfResolverFn; @@ -235,12 +310,28 @@ export interface JsonScalarConfig extends GraphQLScalarTypeConfig = ResolversObject<{ addJob?: Resolver>; + addRepeatableJob?: Resolver>; + cleanQueue?: Resolver, ParentType, ContextType, RequireFields>; + drainQueue?: Resolver>; + removeJob?: Resolver>; + removeRepeatableJob?: Resolver>; }>; export type QueryResolvers = ResolversObject<{ queues?: Resolver, ParentType, ContextType>; }>; +export type RepeatableJobResolvers = ResolversObject<{ + endDate?: Resolver, ParentType, ContextType>; + id?: Resolver; + key?: Resolver; + name?: Resolver; + next?: Resolver; + pattern?: Resolver; + tz?: Resolver; + __isTypeOf?: IsTypeOfResolverFn; +}>; + export type SubscriptionResolvers = ResolversObject<{ jobUpdated?: SubscriptionResolver; }>; @@ -252,6 +343,7 @@ export type Resolvers = ResolversObject<{ JSON?: GraphQLScalarType; Mutation?: MutationResolvers; Query?: QueryResolvers; + RepeatableJob?: RepeatableJobResolvers; Subscription?: SubscriptionResolvers; }>; diff --git a/packages/bull/graphql/schema.graphql b/packages/bull/graphql/schema.graphql index 2206b9e8..4f48b485 100644 --- a/packages/bull/graphql/schema.graphql +++ b/packages/bull/graphql/schema.graphql @@ -3,12 +3,13 @@ scalar DateTime type BullJob { name: String! - data: JSON! - id: ID + data: JSON + id: ID! progress: Float state: JobState! timestamp: Int! delay: Int + repeatJobKey: ID } enum JobState { @@ -21,6 +22,16 @@ enum JobState { waiting_children } +type RepeatableJob { + key: ID! + name: String! + id: String! + endDate: Float + tz: String! + pattern: String! + next: DateTime! +} + type BullQueue { name: String! count: Int! @@ -31,15 +42,20 @@ type BullQueue { failedCount: Int! delayedCount: Int! isPaused: Boolean! + repeatableJobs( + start: Int + end: Int + asc: Boolean + ): [RepeatableJob!]! jobs( - type: [JobType!] + type: [JobTypeOrState!] start: Int end: Int asc: Boolean ): [BullJob!]! } -enum JobType { +enum JobTypeOrState { completed failed active @@ -48,7 +64,7 @@ enum JobType { waiting waiting_children paused - repeat + # repeat (probably doesn't make sense, since we have repeatableJobs for this) wait } @@ -57,7 +73,22 @@ type Query { } type Mutation { - addJob(queue: String!): BullJob! + addJob(queue: String!, data: JSON, jobId: ID): BullJob! + addRepeatableJob(queue: String!, data: JSON, repeatJobKey: ID, pattern: String!): BullJob! + removeRepeatableJob(queue: String!, repeatJobKey: ID!): Boolean! + drainQueue(queue: String!, delayed: Boolean): Boolean! + cleanQueue(queue: String!, grace: Int!, type: CleanQueueType, limit: Int!): [ID!]! + removeJob(queue: String!, jobId: ID!): Boolean! +} + +enum CleanQueueType { + completed + wait + active + paused + prioritized + delayed + failed } type Subscription { diff --git a/packages/bull/package.json b/packages/bull/package.json index 436c0d66..fb9800f7 100644 --- a/packages/bull/package.json +++ b/packages/bull/package.json @@ -4,7 +4,7 @@ "description": "", "main": "plugin.ts", "scripts": { - "dev": "PLUGIN_DEV=true bun --hot plugin.ts", + "dev": "REDIS_URL=redis://localhost:6379 PLUGIN_DEV=true bun --hot plugin.ts", "lint": "eslint .", "lint-quiet": "eslint . --quiet", "graphql-codegen": "graphql-codegen", diff --git a/packages/bull/plugin.ts b/packages/bull/plugin.ts index f0b319ae..224afd79 100644 --- a/packages/bull/plugin.ts +++ b/packages/bull/plugin.ts @@ -1,8 +1,13 @@ import { PluginWithMiddleware } from '@zemble/core' +import GraphQL from '@zemble/graphql' import setupQueues from './utils/setupQueues' +import { ZembleQueue } from './ZembleQueue' -import type { RedisOptions } from 'bullmq' +import type { ZembleQueueConfig } from './ZembleQueue' +import type { + RedisOptions, +} from 'bullmq' export interface BullPluginConfig extends Zemble.GlobalConfig { /** @@ -19,10 +24,22 @@ const defaults = { redisUrl: process.env.REDIS_URL, } satisfies BullPluginConfig +export type { ZembleQueueConfig } + +export { ZembleQueue } + export default new PluginWithMiddleware(__dirname, (config) => ({ plugins, context: { pubsub } }) => { plugins.forEach(({ pluginPath }) => { setupQueues(pluginPath, pubsub, config) }) }, { defaultConfig: defaults, + dependencies: [ + { + plugin: GraphQL.configure({ + + }), + }, + + ], }) diff --git a/packages/bull/queues/hello-world.ts b/packages/bull/queues/hello-world.ts index b8374248..db520c7b 100644 --- a/packages/bull/queues/hello-world.ts +++ b/packages/bull/queues/hello-world.ts @@ -1,9 +1,11 @@ -import type { QueueConfig } from '../utils/setupQueues' +import { ZembleQueue } from '../ZembleQueue' -const config: QueueConfig = { - worker: (job) => { - console.log(job.data) +export default new ZembleQueue((job) => { + console.log(job.data) +}, { + repeat: { + // every 5 seconds + pattern: '*/5 * * * * *', + jobId: 'hello-world', }, -} - -export default config +}) diff --git a/packages/bull/utils/setupQueues.ts b/packages/bull/utils/setupQueues.ts index 4181bbc0..a2cf7fa8 100644 --- a/packages/bull/utils/setupQueues.ts +++ b/packages/bull/utils/setupQueues.ts @@ -1,18 +1,18 @@ +/* eslint-disable no-underscore-dangle */ import readDir from '@zemble/core/utils/readDir' -import { Queue, Worker } from 'bullmq' import * as fs from 'node:fs' import * as path from 'node:path' import '@zemble/graphql' import createClient from '../clients/redis' +import { ZembleQueue, type BullPluginConfig } from '../plugin' -import type { BullPluginConfig } from '../plugin' -import type { Job } from 'bullmq' - -export type QueueConfig = { - readonly worker: (job: Job) => Promise | void -} +import type { + Queue, + Job, + QueueListener, +} from 'bullmq' // eslint-disable-next-line functional/prefer-readonly-type const queues: Queue[] = [] @@ -26,6 +26,21 @@ const setupQueues = (pluginPath: string, pubSub: Zemble.PubSubType, config: Bull pubSub.publish('jobUpdated', job) } + function queuePubber>(status: T, queue: Queue) { + queue.on(status, (...args) => { + const typedArgs = args as Parameters[T]> + + pubSub.publish( + 'queueUpdated', + { + args: typedArgs, + queue, + status, + }, + ) + }) + } + if (hasQueues) { const redisUrl = config?.redisUrl @@ -34,25 +49,30 @@ const setupQueues = (pluginPath: string, pubSub: Zemble.PubSubType, config: Bull filenames.forEach(async (filename) => { const fileNameWithoutExtension = filename.substring(0, filename.length - 3) - const queueConfig = (await import(path.join(queuePath, filename))).default as QueueConfig - - const queue = new Queue(fileNameWithoutExtension, { - connection: createClient(redisUrl, config.redisOptions), - }) - - // @ts-expect-error if this cannot be a promise I'm not sure how stuff will work - const worker = new Worker(fileNameWithoutExtension, queueConfig.worker, { - connection: createClient(redisUrl, config.redisOptions), - }) - - worker.on('completed', jobUpdated) - worker.on('active', jobUpdated) - - worker.on('progress', jobUpdated) - worker.on('failed', (job) => (job ? jobUpdated(job) : null)) - - // eslint-disable-next-line functional/immutable-data - queues.push(queue) + const queueConfig = (await import(path.join(queuePath, filename))).default + + if (queueConfig instanceof ZembleQueue) { + const { queue, worker } = await queueConfig._initQueue(fileNameWithoutExtension, createClient(redisUrl, config.redisOptions)) + + queuePubber('cleaned', queue) + queuePubber('error', queue) + queuePubber('progress', queue) + queuePubber('removed', queue) + queuePubber('waiting', queue) + queuePubber('paused', queue) + queuePubber('resumed', queue) + + worker.on('completed', jobUpdated) + worker.on('active', jobUpdated) + + worker.on('progress', jobUpdated) + worker.on('failed', (job) => (job ? jobUpdated(job) : null)) + + // eslint-disable-next-line functional/immutable-data + queues.push(queue) + } else { + throw new Error(`Failed to load queue ${filename}, make sure it exports a ZembleQueue`) + } }) } else { console.error('[bull-plugin] Failed to initialize. No redisUrl provided for bull plugin, you can specify it directly or with REDIS_URL')