Skip to content

Latest commit

 

History

History
132 lines (102 loc) · 3.71 KB

README.md

File metadata and controls

132 lines (102 loc) · 3.71 KB

QWXYZ

A queue library with batch processing. Queue.getQueue operates within a single transaction to lock tasks for processing by the consumer. The consumer uses atomic transactions for each task to ensure isolated commits and rollbacks. Task execution runs in parallel using Promise.allSettled.

TODO

  • Test script and Scenarios
  • Pruning strategies
  • IPC channel support

Sample DB implementations

To create a custom database, implement the IQueueDb. Refer to the following examples below for guidance.

Installation

npm i @reke592/qwxyz

Usage

A Queue has a topic and db implementation

Queue, MemoryDb, IQueueDb

const { MemoryDb, Queue, QueueEvent } = require("../src");

// create db implementation
const memDb = new MemoryDb();

// create the Queue instance
const Q = new Queue({
  topic: "A",
  db: memDb,
});

To initialize a Queue Consumer we need to call the process method with the following options.

  • autorun - this will trigger the consume command every checkInterval
  • checkInterval - in milliseconds defaults to 3s
  • batchSize - number of queue to run in parallel on handleBulk
  • handler - an Async Function that accepts a Task

Consumer, Task

// initialize the Consumer
Q.process({
  autorun: true,
  batchSize: 5,
  handler: async (task) => {
    // do something with the task...
    // return any result
  },
});

We can also have multiple consumers sharing a single queue producer. This can be helpful when running consumers on different hosts or servers.

// create consumers
const C1 = new Consumer(Q, {
  autorun: true,
  batchSize: 3,
  async handler(task) {},
});

const C2 = new Consumer(Q, {
  autorun: true,
  batchSize: 10,
  async handler(task) {},
});

// start consumers
C1.consume();
C2.consume();

Then we add a queue by calling the add method that requires a Queueable data which then became a Task.

  • params - required Record<string, any> to process by consumer
  • topic - optional, this will insert a new record in database, considered stalled or pending for other consumers.

Queueable, Task

// add task to process
await Q.add({
  params: {
    value,
  },
});

Queue hooks

QueueEvent, QueueEventCallback

// Instance hooks to subscribe on Queue instance
Q.on(QueueEvent.waiting, (error, task, result) => {
  console.log(`topic: ${task.topic}, waiting: ${task.id}`);
});
Q.on(QueueEvent.completed, (error, task, result) => {
  console.log(`topic: ${task.topic}, completed: ${task.id}`);
});
Q.on(QueueEvent.locked, (error, task, result) => {
  console.log(`topic: ${task.topic}, locked: ${task.id}`);
});

// Static or Global hooks (all Queue instance)
Queue.on(QueueEvent.waiting, (error, task, result) => {
  console.log(`topic: ${task.topic}, waiting: ${task.id}`);
});
Queue.on(QueueEvent.completed, (error, task, result) => {
  console.log(`topic: ${task.topic}, completed: ${task.id}`);
});
Queue.on(QueueEvent.locked, (error, task, result) => {
  console.log(`topic: ${task.topic}, locked: ${task.id}`);
});