Skip to content

Commit

Permalink
added more jobs and mocks to the zembleBull
Browse files Browse the repository at this point in the history
  • Loading branch information
RevanToma committed Oct 1, 2024
1 parent 04d9f88 commit 4c1acc1
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
8 changes: 8 additions & 0 deletions packages/bull/ZembleQueueBull.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,13 @@ export class ZembleQueueBull<DataType = unknown, ReturnType = unknown> {
async getDelayed(...args: Parameters<Queue<DataType, ReturnType>['getDelayed']>) {
return this.#queue.getDelayed(...args)
}

async pause() {
return this.#queue.pause()
}

async resume() {
return this.#queue.resume()
}
}
export default ZembleQueueBull
32 changes: 32 additions & 0 deletions packages/bull/ZembleQueueMock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,17 @@ import type {
interface IZembleQueue<DataType = unknown, ReturnType = unknown> {
readonly add: ZembleQueueBull<DataType, ReturnType>['add']
readonly addBulk: ZembleQueueBull<DataType, ReturnType>['addBulk']
readonly remove: ZembleQueueBull<DataType, ReturnType>['remove']
readonly getJob: ZembleQueueBull<DataType, ReturnType>['getJob']
readonly resume: ZembleQueueBull<DataType, ReturnType>['resume']
readonly pause: ZembleQueueBull<DataType, ReturnType>['pause']
}

class ZembleQueueMock<DataType = unknown, ReturnType = unknown> implements IZembleQueue<DataType, ReturnType> {
private jobs: Array<Job<DataType, ReturnType, string>> = []

private isPaused = false

constructor(
readonly worker: ZembleWorker,
_?: ZembleQueueConfig,
Expand Down Expand Up @@ -72,10 +80,34 @@ class ZembleQueueMock<DataType = unknown, ReturnType = unknown> implements IZemb
}

async add(name: string, data: DataType, opts?: JobsOptions | undefined): Promise<Job<DataType, ReturnType, string>> {
if (this.isPaused) {
throw new Error('Queue is paused')
}

const job = this.#createMockJob(name, data, opts)
this.jobs.push(job)
setTimeout(async () => this.#worker(job, { logger: zembleContext.logger }), 0)
return job
}

async remove(jobId: string) {
const initialLength = this.jobs.length
this.jobs = this.jobs.filter((job) => job.id !== jobId)
const removedCount = initialLength - this.jobs.length
return removedCount
}

async getJob(jobId: string): Promise<Job<DataType, ReturnType, string> | undefined> {
return this.jobs.find((job) => job.id === jobId)
}

async pause() {
this.isPaused = true
}

async resume(): Promise<void> {
this.isPaused = false
}
}

export default ZembleQueueMock

0 comments on commit 4c1acc1

Please sign in to comment.