Skip to content

Commit

Permalink
added task priority queue
Browse files Browse the repository at this point in the history
  • Loading branch information
vigan-abd committed Jul 31, 2023
1 parent 23e4bc7 commit 01de7c5
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 2 deletions.
13 changes: 13 additions & 0 deletions @types/task.priority.queue.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { AsyncPriorityQueue } from 'async'

export class TaskPriorityQueue {
protected queues: Map<string, AsyncPriorityQueue<any>>

public hasQueue(key: string): boolean

public initQueue(key: string, concurrency?: number): boolean

public getQueue(key: string): AsyncPriorityQueue<any> | undefined

public pushTask(key: string, task: () => Promise<any>, priority: number): Promise<any>
}
3 changes: 1 addition & 2 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
'use strict'

export { TaskPriorityQueue } from './@types/task.priority.queue'
export { TaskQueue } from './@types/task.queue'
4 changes: 4 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
'use strict'

const TaskPriorityQueue = require('./src/task.priority.queue')
const TaskQueue = require('./src/task.queue')

module.exports = {
TaskPriorityQueue,
TaskQueue
}
64 changes: 64 additions & 0 deletions src/task.priority.queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
'use strict'

const async = require('async')
const flatPromise = require('flat-promise')

class TaskPriorityQueue {
constructor () {
/** @type {Map<string, async.AsyncPriorityQueue>} */
this.queues = new Map()
}

/**
* @param {string} key
*/
hasQueue (key) {
return this.queues.has(key)
}

/**
* @param {string} key
* @param {number} [concurrency]
*/
initQueue (key, concurrency = 1) {
if (this.queues.has(key)) return false

this.queues.set(key, async.priorityQueue(async (job, cb) => {
try {
const res = await job.task()
job.resolve(res)
} catch (err) {
job.reject(err)
} finally {
cb() // task queue cb
}
}, concurrency))

return true
}

/**
* @param {string} key
*/
getQueue (key) {
return this.queues.get(key)
}

/**
* @param {string} key - queue key
* @param {() => Promise<any>} task
* @param {number} priority
*/
pushTask (key, task, priority) {
const queue = this.queues.get(key)
if (!queue) return Promise.reject(new Error('ERR_TASK_QUEUE_NOT_FOUND'))

const { promise, reject, resolve } = flatPromise()
const job = { task, resolve, reject }
queue.push(job, priority)

return promise
}
}

module.exports = TaskPriorityQueue
168 changes: 168 additions & 0 deletions test/task.priority.queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
'use strict'

/* eslint-env mocha */

const assert = require('assert')
const { TaskPriorityQueue } = require('../')
const { sleep } = require('./helper')

describe('TaskPriorityQueue tests', () => {
describe('hasQueue tests', () => {
it('should return false when queue does not exist', () => {
const tq = new TaskPriorityQueue()

assert.strictEqual(tq.hasQueue('foo'), false)
})

it('should return true when queue exists', () => {
const tq = new TaskPriorityQueue()
tq.initQueue('foo')

assert.strictEqual(tq.hasQueue('foo'), true)
})
})

describe('initQueue tests', () => {
it('should return true when queue does not exist', () => {
const tq = new TaskPriorityQueue()
const res = tq.initQueue('foo')

assert.strictEqual(res, true)
assert.strictEqual(tq.hasQueue('foo'), true)
})

it('should return true when queue already exists', () => {
const tq = new TaskPriorityQueue()
tq.initQueue('foo')
const res = tq.initQueue('foo')

assert.strictEqual(res, false)
assert.strictEqual(tq.hasQueue('foo'), true)
})
})

describe('getQueue tests', () => {
it('should return undefined when queue does not exist', () => {
const tq = new TaskPriorityQueue()
const res = tq.getQueue('foo')

assert.strictEqual(res, undefined)
})

it('should return instance of async queue when it exists', () => {
const tq = new TaskPriorityQueue()
tq.initQueue('foo')
const res = tq.getQueue('foo')

assert.strictEqual(typeof res, 'object')
assert.strictEqual(res.concurrency, 1)
})
})

describe('pushTask tests', () => {
const job = async (mts, i, arr) => {
await sleep(mts)
arr.push(i)
return i
}

it('should reject when queue does not exist', async () => {
const tq = new TaskPriorityQueue()

await assert.rejects(
tq.pushTask('foo', async () => 123, 2),
(err) => {
assert.ok(err instanceof Error)
assert.strictEqual(err.message, 'ERR_TASK_QUEUE_NOT_FOUND')
return true
}
)
})

it('should process promise and return result', async () => {
const tq = new TaskPriorityQueue()
tq.initQueue('foo')
const res = await tq.pushTask('foo', async () => {
await sleep(500)
return 123
}, 2)

assert.strictEqual(res, 123)
})

it('should work with non async functions', async () => {
const tq = new TaskPriorityQueue()
tq.initQueue('foo')
const res = await tq.pushTask('foo', () => 123, 2)

assert.strictEqual(res, 123)
})

it('should work process items according to concurrency', async () => {
const tq = new TaskPriorityQueue()
tq.initQueue('foo', 1)

const promises = []
const process = []
promises.push(tq.pushTask('foo', () => job(500, 1, process), 2))
promises.push(tq.pushTask('foo', () => job(200, 2, process), 2))
promises.push(tq.pushTask('foo', () => job(700, 3, process), 2))
promises.push(tq.pushTask('foo', () => job(300, 4, process), 2))
const res = await Promise.all(promises)

assert.deepStrictEqual(res, [1, 2, 3, 4])
assert.deepStrictEqual(process, [1, 2, 3, 4])
})

it('should support parallel processing as well', async () => {
const tq = new TaskPriorityQueue()
tq.initQueue('foo', 3)

const promises = []
const process = []

promises.push(tq.pushTask('foo', () => job(3000, 1, process), 2))
promises.push(tq.pushTask('foo', () => job(5000, 2, process), 2))
promises.push(tq.pushTask('foo', () => job(4000, 3, process), 2))
promises.push(tq.pushTask('foo', () => job(1000, 4, process), 2))
const res = await Promise.all(promises)

assert.deepStrictEqual(res, [1, 2, 3, 4])
assert.deepStrictEqual(process, [1, 3, 4, 2])
}).timeout(10000)

it('should handle tasks based on priority according to concurrency', async () => {
const tq = new TaskPriorityQueue()
tq.initQueue('foo', 1)

const promises = []
const process = []

promises.push(tq.pushTask('foo', () => job(3000, 1, process), 2))
promises.push(tq.pushTask('foo', () => job(4000, 2, process), 1))
promises.push(tq.pushTask('foo', () => job(2000, 3, process), 2))
promises.push(tq.pushTask('foo', () => job(1000, 4, process), 1))
const res = await Promise.all(promises)

assert.deepStrictEqual(res, [1, 2, 3, 4])
assert.deepStrictEqual(process, [2, 4, 1, 3])
}).timeout(20000)

it('should handle tasks based on priority in parallel as well', async () => {
const tq = new TaskPriorityQueue()
tq.initQueue('foo', 2)

const promises = []
const process = []

promises.push(tq.pushTask('foo', () => job(2000, 1, process), 2))
promises.push(tq.pushTask('foo', () => job(4000, 2, process), 1))
promises.push(tq.pushTask('foo', () => job(2000, 3, process), 3))
promises.push(tq.pushTask('foo', () => job(3000, 4, process), 1))
const res = await Promise.all(promises)

assert.deepStrictEqual(res, [1, 2, 3, 4])
assert.deepStrictEqual(process, [4, 2, 1, 3])
}).timeout(20000)
})
}).timeout(7000)

0 comments on commit 01de7c5

Please sign in to comment.