Skip to content

Commit

Permalink
feat(bullmq): allow to inject Queue and Worker using @InjectQueue and…
Browse files Browse the repository at this point in the history
… @InjectWorker decorators
  • Loading branch information
Romakita committed Jan 23, 2024
1 parent 70725bf commit 9f31ea6
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 8 deletions.
63 changes: 59 additions & 4 deletions docs/tutorials/bullmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import "@tsed/bullmq"; // import bullmq ts.ed module

@Configuration({
bullmq: {
// Specify queue's name to create
// Define queue names.
// Note: Since v7.60.0 this options is not required anymore, excepted if queue is not defined in JobController decorator
queues: ["default", "special"],
connection: {
// redisio connection options
Expand Down Expand Up @@ -113,7 +114,7 @@ class ExampleJobWithCustomId implements JobMethods {
public handle(payload: {num: number}) {
console.info("look at my awesome number: ", payload.num);
}

public jobId(payload: {num: number}): string {
return `very realistic job id #${payload.num}`;
}
Expand All @@ -123,8 +124,8 @@ class ExampleJobWithCustomId implements JobMethods {
Keep in mind, though, that when defining a job using the dispatcher and dispatching the job, the ID defined using the dispatcher will take precedence!

```ts
this.dispatcher(ExampleJobWithCustomId, { num: 1 }); // id: 'very realistic job id #1'
this.dispatcher(ExampleJobWithCustomId, { num: 2 }, { jobId: 'I do my own thing!' }) // id: 'I do my own thing!'
this.dispatcher(ExampleJobWithCustomId, {num: 1}); // id: 'very realistic job id #1'
this.dispatcher(ExampleJobWithCustomId, {num: 2}, {jobId: "I do my own thing!"}); // id: 'I do my own thing!'
```

## Defining a repeating job
Expand Down Expand Up @@ -266,6 +267,60 @@ class MyService {
}
```

## Inject a Queue <Badge text="v7.60.0+"/>

While JobDispatcher is the recommended way to dispatch jobs use class token, this can be useful in some cases to manipulate the queue directly.
You can inject a queue using the `@InjectQueue` decorator.

```ts
import {InjectQueue, JobController} from "@tsed/bullmq";
import {Queue} from "bullmq";

@JobController("example")
class ExampleJob implements JobMethods {
@InjectQueue("default")
private readonly queue?: Queue;

$onInit() {
if (this.queue) {
// do something with the queue
this.queue.add("some-job", {msg: "this message is part of the payload for the job"});
}
}

public handle(payload: {msg: string}) {
console.info("New message incoming", payload.msg);
}
}
```

## Inject a Worker <Badge text="v7.60.0+"/>

You can also inject a worker using the `@InjectWorker` decorator.

```ts
import {InjectWorker, JobController} from "@tsed/bullmq";

@JobController("example")
class ExampleJob implements JobMethods {
@InjectWorker("default")
private readonly worker?: Worker;

$onInit() {
if (this.worker) {
// do something with the worker
this.worker.on("completed", (job) => {
console.log("Job completed", job);
});
}
}

public handle(payload: {msg: string}) {
console.info("New message incoming", payload.msg);
}
}
```

## Authors

<GithubContributors :users="['abenerd']"/>
Expand Down
2 changes: 1 addition & 1 deletion packages/third-parties/bullmq/jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module.exports = {
...require("@tsed/jest-config"),
coverageThreshold: {
global: {
branches: 90.74,
branches: 94.66,
functions: 100,
lines: 100,
statements: 100
Expand Down
10 changes: 9 additions & 1 deletion packages/third-parties/bullmq/src/BullMQModule.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {PlatformTest} from "@tsed/common";
import {Inject, PlatformTest} from "@tsed/common";
import {catchAsyncError} from "@tsed/core";
import {Queue, Worker} from "bullmq";
import {anything, instance, mock, verify, when} from "ts-mockito";
Expand All @@ -8,6 +8,8 @@ import {BullMQModule} from "./BullMQModule";
import {type BullMQConfig} from "./config/config";
import {JobMethods} from "./contracts";
import {FallbackJobController, JobController} from "./decorators";
import {InjectQueue} from "./decorators/InjectQueue";
import {InjectWorker} from "./decorators/InjectWorker";
import {JobDispatcher} from "./dispatchers";

const queueConstructorSpy = jest.fn();
Expand Down Expand Up @@ -36,6 +38,12 @@ jest.mock("bullmq", () => {
}
})
class CustomCronJob implements JobMethods {
@InjectQueue("default")
queue: Queue;

@InjectWorker("default")
worker: Worker;

handle() {}
}

Expand Down
4 changes: 2 additions & 2 deletions packages/third-parties/bullmq/src/contracts/JobStore.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {JobsOptions} from "bullmq";

export type JobStore = {
export interface JobStore {
name: string;
queue: string;
opts: JobsOptions;
};
}
6 changes: 6 additions & 0 deletions packages/third-parties/bullmq/src/decorators/InjectQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import {Inject} from "@tsed/common";
import {getQueueToken} from "../utils/getQueueToken";

export function InjectQueue(name: string) {
return Inject(getQueueToken(name));
}
6 changes: 6 additions & 0 deletions packages/third-parties/bullmq/src/decorators/InjectWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import {Inject} from "@tsed/common";
import {getWorkerToken} from "../utils/getWorkerToken";

export function InjectWorker(name: string) {
return Inject(getWorkerToken(name));
}
2 changes: 2 additions & 0 deletions packages/third-parties/bullmq/src/decorators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@
* @file Automatically generated by barrelsby.
*/

export * from "./InjectQueue";
export * from "./InjectWorker";
export * from "./JobController";
11 changes: 11 additions & 0 deletions packages/third-parties/bullmq/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,19 @@

export * from "./BullMQModule";
export * from "./config/config";
export * from "./constants/BullMQTypes";
export * from "./constants/constants";
export * from "./contracts/JobMethods";
export * from "./contracts/JobStore";
export * from "./decorators/InjectQueue";
export * from "./decorators/InjectWorker";
export * from "./decorators/JobController";
export * from "./dispatchers/JobDispatcher";
export * from "./dispatchers/JobDispatcherOptions";
export * from "./utils/createQueueProvider";
export * from "./utils/createWorkerProvider";
export * from "./utils/getJobToken";
export * from "./utils/getQueueToken";
export * from "./utils/getWorkerToken";
export * from "./utils/mapQueueOptions";
export * from "./utils/mapWorkerOptions";

0 comments on commit 9f31ea6

Please sign in to comment.