From 01de7c5570afe925e7dda6c2c15fe3b9cd56df56 Mon Sep 17 00:00:00 2001 From: vigan-abd Date: Mon, 31 Jul 2023 16:52:17 +0200 Subject: [PATCH] added task priority queue --- @types/task.priority.queue.d.ts | 13 +++ index.d.ts | 3 +- index.js | 4 + src/task.priority.queue.js | 64 ++++++++++++ test/task.priority.queue.js | 168 ++++++++++++++++++++++++++++++++ 5 files changed, 250 insertions(+), 2 deletions(-) create mode 100644 @types/task.priority.queue.d.ts create mode 100644 src/task.priority.queue.js create mode 100644 test/task.priority.queue.js diff --git a/@types/task.priority.queue.d.ts b/@types/task.priority.queue.d.ts new file mode 100644 index 0000000..5682d6b --- /dev/null +++ b/@types/task.priority.queue.d.ts @@ -0,0 +1,13 @@ +import { AsyncPriorityQueue } from 'async' + +export class TaskPriorityQueue { + protected queues: Map> + + public hasQueue(key: string): boolean + + public initQueue(key: string, concurrency?: number): boolean + + public getQueue(key: string): AsyncPriorityQueue | undefined + + public pushTask(key: string, task: () => Promise, priority: number): Promise +} diff --git a/index.d.ts b/index.d.ts index 36b8806..5f0a6d8 100644 --- a/index.d.ts +++ b/index.d.ts @@ -1,3 +1,2 @@ -'use strict' - +export { TaskPriorityQueue } from './@types/task.priority.queue' export { TaskQueue } from './@types/task.queue' diff --git a/index.js b/index.js index 830a19b..88265c7 100644 --- a/index.js +++ b/index.js @@ -1,5 +1,9 @@ +'use strict' + +const TaskPriorityQueue = require('./src/task.priority.queue') const TaskQueue = require('./src/task.queue') module.exports = { + TaskPriorityQueue, TaskQueue } diff --git a/src/task.priority.queue.js b/src/task.priority.queue.js new file mode 100644 index 0000000..ecd72ba --- /dev/null +++ b/src/task.priority.queue.js @@ -0,0 +1,64 @@ +'use strict' + +const async = require('async') +const flatPromise = require('flat-promise') + +class TaskPriorityQueue { + constructor () { + /** @type {Map} */ + this.queues = new Map() + } + + /** + * @param {string} key + */ + hasQueue (key) { + return this.queues.has(key) + } + + /** + * @param {string} key + * @param {number} [concurrency] + */ + initQueue (key, concurrency = 1) { + if (this.queues.has(key)) return false + + this.queues.set(key, async.priorityQueue(async (job, cb) => { + try { + const res = await job.task() + job.resolve(res) + } catch (err) { + job.reject(err) + } finally { + cb() // task queue cb + } + }, concurrency)) + + return true + } + + /** + * @param {string} key + */ + getQueue (key) { + return this.queues.get(key) + } + + /** + * @param {string} key - queue key + * @param {() => Promise} task + * @param {number} priority + */ + pushTask (key, task, priority) { + const queue = this.queues.get(key) + if (!queue) return Promise.reject(new Error('ERR_TASK_QUEUE_NOT_FOUND')) + + const { promise, reject, resolve } = flatPromise() + const job = { task, resolve, reject } + queue.push(job, priority) + + return promise + } +} + +module.exports = TaskPriorityQueue diff --git a/test/task.priority.queue.js b/test/task.priority.queue.js new file mode 100644 index 0000000..65411ea --- /dev/null +++ b/test/task.priority.queue.js @@ -0,0 +1,168 @@ +'use strict' + +/* eslint-env mocha */ + +const assert = require('assert') +const { TaskPriorityQueue } = require('../') +const { sleep } = require('./helper') + +describe('TaskPriorityQueue tests', () => { + describe('hasQueue tests', () => { + it('should return false when queue does not exist', () => { + const tq = new TaskPriorityQueue() + + assert.strictEqual(tq.hasQueue('foo'), false) + }) + + it('should return true when queue exists', () => { + const tq = new TaskPriorityQueue() + tq.initQueue('foo') + + assert.strictEqual(tq.hasQueue('foo'), true) + }) + }) + + describe('initQueue tests', () => { + it('should return true when queue does not exist', () => { + const tq = new TaskPriorityQueue() + const res = tq.initQueue('foo') + + assert.strictEqual(res, true) + assert.strictEqual(tq.hasQueue('foo'), true) + }) + + it('should return true when queue already exists', () => { + const tq = new TaskPriorityQueue() + tq.initQueue('foo') + const res = tq.initQueue('foo') + + assert.strictEqual(res, false) + assert.strictEqual(tq.hasQueue('foo'), true) + }) + }) + + describe('getQueue tests', () => { + it('should return undefined when queue does not exist', () => { + const tq = new TaskPriorityQueue() + const res = tq.getQueue('foo') + + assert.strictEqual(res, undefined) + }) + + it('should return instance of async queue when it exists', () => { + const tq = new TaskPriorityQueue() + tq.initQueue('foo') + const res = tq.getQueue('foo') + + assert.strictEqual(typeof res, 'object') + assert.strictEqual(res.concurrency, 1) + }) + }) + + describe('pushTask tests', () => { + const job = async (mts, i, arr) => { + await sleep(mts) + arr.push(i) + return i + } + + it('should reject when queue does not exist', async () => { + const tq = new TaskPriorityQueue() + + await assert.rejects( + tq.pushTask('foo', async () => 123, 2), + (err) => { + assert.ok(err instanceof Error) + assert.strictEqual(err.message, 'ERR_TASK_QUEUE_NOT_FOUND') + return true + } + ) + }) + + it('should process promise and return result', async () => { + const tq = new TaskPriorityQueue() + tq.initQueue('foo') + const res = await tq.pushTask('foo', async () => { + await sleep(500) + return 123 + }, 2) + + assert.strictEqual(res, 123) + }) + + it('should work with non async functions', async () => { + const tq = new TaskPriorityQueue() + tq.initQueue('foo') + const res = await tq.pushTask('foo', () => 123, 2) + + assert.strictEqual(res, 123) + }) + + it('should work process items according to concurrency', async () => { + const tq = new TaskPriorityQueue() + tq.initQueue('foo', 1) + + const promises = [] + const process = [] + promises.push(tq.pushTask('foo', () => job(500, 1, process), 2)) + promises.push(tq.pushTask('foo', () => job(200, 2, process), 2)) + promises.push(tq.pushTask('foo', () => job(700, 3, process), 2)) + promises.push(tq.pushTask('foo', () => job(300, 4, process), 2)) + const res = await Promise.all(promises) + + assert.deepStrictEqual(res, [1, 2, 3, 4]) + assert.deepStrictEqual(process, [1, 2, 3, 4]) + }) + + it('should support parallel processing as well', async () => { + const tq = new TaskPriorityQueue() + tq.initQueue('foo', 3) + + const promises = [] + const process = [] + + promises.push(tq.pushTask('foo', () => job(3000, 1, process), 2)) + promises.push(tq.pushTask('foo', () => job(5000, 2, process), 2)) + promises.push(tq.pushTask('foo', () => job(4000, 3, process), 2)) + promises.push(tq.pushTask('foo', () => job(1000, 4, process), 2)) + const res = await Promise.all(promises) + + assert.deepStrictEqual(res, [1, 2, 3, 4]) + assert.deepStrictEqual(process, [1, 3, 4, 2]) + }).timeout(10000) + + it('should handle tasks based on priority according to concurrency', async () => { + const tq = new TaskPriorityQueue() + tq.initQueue('foo', 1) + + const promises = [] + const process = [] + + promises.push(tq.pushTask('foo', () => job(3000, 1, process), 2)) + promises.push(tq.pushTask('foo', () => job(4000, 2, process), 1)) + promises.push(tq.pushTask('foo', () => job(2000, 3, process), 2)) + promises.push(tq.pushTask('foo', () => job(1000, 4, process), 1)) + const res = await Promise.all(promises) + + assert.deepStrictEqual(res, [1, 2, 3, 4]) + assert.deepStrictEqual(process, [2, 4, 1, 3]) + }).timeout(20000) + + it('should handle tasks based on priority in parallel as well', async () => { + const tq = new TaskPriorityQueue() + tq.initQueue('foo', 2) + + const promises = [] + const process = [] + + promises.push(tq.pushTask('foo', () => job(2000, 1, process), 2)) + promises.push(tq.pushTask('foo', () => job(4000, 2, process), 1)) + promises.push(tq.pushTask('foo', () => job(2000, 3, process), 3)) + promises.push(tq.pushTask('foo', () => job(3000, 4, process), 1)) + const res = await Promise.all(promises) + + assert.deepStrictEqual(res, [1, 2, 3, 4]) + assert.deepStrictEqual(process, [4, 2, 1, 3]) + }).timeout(20000) + }) +}).timeout(7000)