A queue library with batch processing. Queue.getQueue
operates within a single transaction to lock tasks for processing by the consumer. The consumer uses atomic transactions for each task to ensure isolated commits and rollbacks. Task execution runs in parallel using Promise.allSettled
.
- Test script and Scenarios
- Pruning strategies
- IPC channel support
To create a custom database, implement the IQueueDb. Refer to the following examples below for guidance.
- MemoryDb - non-persistent storage. sample
- SequelizeDb - has concurrency when using sqlite dialect. sample
npm i @reke592/qwxyz
A Queue has a topic
and db
implementation
const { MemoryDb, Queue, QueueEvent } = require("../src");
// create db implementation
const memDb = new MemoryDb();
// create the Queue instance
const Q = new Queue({
topic: "A",
db: memDb,
});
To initialize a Queue Consumer we need to call the process
method with the following options.
- autorun - this will trigger the
consume
command everycheckInterval
- checkInterval - in milliseconds defaults to 3s
- batchSize - number of queue to run in parallel on
handleBulk
- handler - an Async Function that accepts a Task
// initialize the Consumer
Q.process({
autorun: true,
batchSize: 5,
handler: async (task) => {
// do something with the task...
// return any result
},
});
We can also have multiple consumers sharing a single queue producer. This can be helpful when running consumers on different hosts or servers.
// create consumers
const C1 = new Consumer(Q, {
autorun: true,
batchSize: 3,
async handler(task) {},
});
const C2 = new Consumer(Q, {
autorun: true,
batchSize: 10,
async handler(task) {},
});
// start consumers
C1.consume();
C2.consume();
Then we add a queue by calling the add
method that requires a Queueable data which then became a Task.
- params - required
Record<string, any>
to process by consumer - topic - optional, this will insert a new record in database, considered stalled or pending for other consumers.
// add task to process
await Q.add({
params: {
value,
},
});
QueueEvent, QueueEventCallback
// Instance hooks to subscribe on Queue instance
Q.on(QueueEvent.waiting, (error, task, result) => {
console.log(`topic: ${task.topic}, waiting: ${task.id}`);
});
Q.on(QueueEvent.completed, (error, task, result) => {
console.log(`topic: ${task.topic}, completed: ${task.id}`);
});
Q.on(QueueEvent.locked, (error, task, result) => {
console.log(`topic: ${task.topic}, locked: ${task.id}`);
});
// Static or Global hooks (all Queue instance)
Queue.on(QueueEvent.waiting, (error, task, result) => {
console.log(`topic: ${task.topic}, waiting: ${task.id}`);
});
Queue.on(QueueEvent.completed, (error, task, result) => {
console.log(`topic: ${task.topic}, completed: ${task.id}`);
});
Queue.on(QueueEvent.locked, (error, task, result) => {
console.log(`topic: ${task.topic}, locked: ${task.id}`);
});