Skip to content

Commit

Permalink
fix: need to update bull
Browse files Browse the repository at this point in the history
  • Loading branch information
PandelisZ committed Jul 10, 2023
1 parent 4bad6ee commit 51c0979
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 74 deletions.
210 changes: 169 additions & 41 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
"bull-arena": "^3.29.3",
"bull-master": "^1.0.5",
"bull-prom": "ejhayes/bull-prom#master",
"bullmq": "^1.86.2",
"bullmq": "^3.11.0",
"commander": "^9.3.0",
"envalid": "^7.3.1",
"forever-monitor": "^3.0.3",
Expand Down
34 changes: 2 additions & 32 deletions src/bull/bull-queues.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { ConfigService } from '@app/config/config.service';
import { InjectLogger, LoggerService } from '@app/logger';
import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { Mutex, withTimeout } from 'async-mutex';
import { Queue, QueueScheduler } from 'bullmq';
import { Queue } from 'bullmq';
import { RedisService } from 'nestjs-redis';
import { TypedEmitter } from 'tiny-typed-emitter2';
import {
Expand Down Expand Up @@ -39,7 +39,6 @@ const REDIS_CONFIG_NOTIFY_KEYSPACE_EVENTS_FLAGS = 'A$K';
export class BullQueuesService implements OnModuleInit, OnModuleDestroy {
private _initialized = false;
private readonly _queues: { [queueName: string]: Queue } = {};
private readonly _schedulers: { [queueName: string]: QueueScheduler } = {};
private readonly _redisMutex = withTimeout(new Mutex(), 10000);
private readonly _bullMutex = withTimeout(new Mutex(), 10000);

Expand Down Expand Up @@ -117,30 +116,6 @@ export class BullQueuesService implements OnModuleInit, OnModuleDestroy {
this._queues[queueKey].on('ioredis:close', () => {
this.removeQueue(queuePrefix, queueName);
});
/**
* From: https://docs.bullmq.io/guide/connections
*
* Every class will consume at least one Redis connection, but it
* is also possible to reuse connections in some situations. For example,
* the Queue and Worker classes can accept an existing ioredis instance, and
* by that reusing that connection, however QueueScheduler and QueueEvents
* cannot do that because they require blocking connections to Redis, which
* makes it impossible to reuse them.
*/
this._schedulers[queueKey] = new QueueScheduler(queueName, {
prefix: queuePrefix,
connection: {
host: this.configService.config.REDIS_HOST,
port: this.configService.config.REDIS_PORT,
password: this.configService.config.REDIS_PASSWORD,
family: this.configService.config.REDIS_FAMILY,
},
});
this._schedulers[queueKey].on('error', (err) => {
Error.captureStackTrace(err);
this.logger.error(err.stack);
this.removeQueue(queuePrefix, queueName);
});
this.eventEmitter.emit(
EVENT_TYPES.QUEUE_CREATED,
new QueueCreatedEvent(queuePrefix, this._queues[queueKey]),
Expand All @@ -158,14 +133,12 @@ export class BullQueuesService implements OnModuleInit, OnModuleDestroy {

try {
await this._queues[queueKey].close();
await this._schedulers[queueKey].close();
} catch (err) {
// in the event of an error just ignore it and move on
this.logger.error(err);
}

delete this._queues[queueKey];
delete this._schedulers[queueKey];

this.eventEmitter.emit(
EVENT_TYPES.QUEUE_REMOVED,
Expand Down Expand Up @@ -470,10 +443,7 @@ export class BullQueuesService implements OnModuleInit, OnModuleDestroy {
this.eventEmitter.removeAllListeners();

// close all connections
for (const queue of [
Object.values(this._queues),
Object.values(this._schedulers),
].flat()) {
for (const queue of [Object.values(this._queues)].flat()) {
this.logger.warn(`Closing queue: ${queue.name}`);
await new Promise<void>(async (resolve) => {
(await queue.client).on('close', () => {
Expand Down

0 comments on commit 51c0979

Please sign in to comment.