Skip to content

A type-safe queue for background tasks using PocketBase.

License

Notifications You must be signed in to change notification settings

joseferben/pocketbase-queue

Repository files navigation

pocketbase-queue

A type-safe queue for background tasks on top of PocketBase. Works in all JavaScript environments where the PocketBase JS SDK is supported.

This is not a high-throughput queue, but it's a good solution to keep things simple. It works with a vanilla PocketBase installation, no changes or additional hooks needed.

import { createConnection, createQueue } from "pocketbase-queue";

const pb = ... // PocketBase instance
const connection = await createConnection({ pb });

const queue = createQueue<{ message: string }>({
  name: "greeting",
  connection,
});

queue.push({ message: "Hello, world!" });

queue.process({ concurrency: 2 }, async ({ task }) => {
  console.log(task.message);
});

Screenshot

Installation

npm i pocketbase pocketbase-queue

In order to use CJS instead of ESM, import the package like this:

import { createConnection, createQueue } from "pocketbase-queue/cjs";

Import the queue collections to your PocketBase instance. Don't forget to tick Merge with the existing collections:

[
  {
    "id": "v2i9s97ijtkb6as",
    "name": "queue_locks",
    "type": "base",
    "system": false,
    "schema": [
      {
        "system": false,
        "id": "gb9nqipq",
        "name": "worker_id",
        "type": "text",
        "required": true,
        "presentable": false,
        "unique": false,
        "options": {
          "min": null,
          "max": null,
          "pattern": ""
        }
      },
      {
        "system": false,
        "id": "of2tn7ct",
        "name": "task",
        "type": "relation",
        "required": true,
        "presentable": false,
        "unique": false,
        "options": {
          "collectionId": "qgf0f8rtdk0s6pq",
          "cascadeDelete": false,
          "minSelect": null,
          "maxSelect": 1,
          "displayFields": null
        }
      }
    ],
    "indexes": [
      "CREATE UNIQUE INDEX `idx_SKtU4DZ` ON `queue_locks` (`task`)",
      "CREATE INDEX `idx_GpAPmOu` ON `queue_locks` (`created`)"
    ],
    "listRule": null,
    "viewRule": null,
    "createRule": null,
    "updateRule": null,
    "deleteRule": null,
    "options": {}
  },
  {
    "id": "qgf0f8rtdk0s6pq",
    "name": "queue_tasks",
    "type": "base",
    "system": false,
    "schema": [
      {
        "system": false,
        "id": "kdzlqnxa",
        "name": "queue",
        "type": "text",
        "required": true,
        "presentable": false,
        "unique": false,
        "options": {
          "min": null,
          "max": null,
          "pattern": ""
        }
      },
      {
        "system": false,
        "id": "odvdwps6",
        "name": "task",
        "type": "json",
        "required": false,
        "presentable": false,
        "unique": false,
        "options": {
          "maxSize": 2000000
        }
      },
      {
        "system": false,
        "id": "lfsvih6z",
        "name": "failed",
        "type": "date",
        "required": false,
        "presentable": false,
        "unique": false,
        "options": {
          "min": "",
          "max": ""
        }
      },
      {
        "system": false,
        "id": "h530uquw",
        "name": "failed_reason",
        "type": "text",
        "required": false,
        "presentable": false,
        "unique": false,
        "options": {
          "min": null,
          "max": null,
          "pattern": ""
        }
      }
    ],
    "indexes": [
      "CREATE INDEX `idx_jyHoldi` ON `queue_tasks` (\n  `queue`,\n  `failed`,\n  `created`\n)",
      "CREATE INDEX `idx_TJTrLZe` ON `queue_tasks` (\n  `failed`,\n  `updated`\n)"
    ],
    "listRule": null,
    "viewRule": null,
    "createRule": null,
    "updateRule": null,
    "deleteRule": null,
    "options": {}
  },
  {
    "id": "ronp4q37zviiby1",
    "name": "queue_stats",
    "type": "view",
    "system": false,
    "schema": [
      {
        "system": false,
        "id": "an0kocfc",
        "name": "pending_tasks",
        "type": "json",
        "required": false,
        "presentable": false,
        "unique": false,
        "options": {
          "maxSize": 1
        }
      },
      {
        "system": false,
        "id": "nnwglxhx",
        "name": "failed_tasks",
        "type": "json",
        "required": false,
        "presentable": false,
        "unique": false,
        "options": {
          "maxSize": 1
        }
      }
    ],
    "indexes": [],
    "listRule": null,
    "viewRule": null,
    "createRule": null,
    "updateRule": null,
    "deleteRule": null,
    "options": {
      "query": "SELECT \n  queue as id,\n  SUM(CASE WHEN failed IS '' THEN 1 ELSE 0 END) as pending_tasks,\n  SUM(CASE WHEN failed IS NOT '' THEN 1 ELSE 0 END) as failed_tasks\nFROM queue_tasks\nGROUP BY queue;"
    }
  }
]

Usage

import { createConnection, createQueue } from "pocketbase-queue";

// Create a service admin user and use its credentials to create a connection:
const pb = new Pocketbase(process.env.POCKETBASE_URL || "http://127.0.0.1:8090");
await pb.admins.authWithPassword(
  process.env.POCKETBASE_EMAIL, // Email of a PocketBase Admin
  process.env.POCKETBASE_PASSWORD // Password of the PocketBase Admin
);

const connection = await createConnection({ pb, verbose: true });

const queue = createQueue<{ message: string }>({
  name: "greeting",
  connection,
});

queue.push({ message: "Hello, world!" });

queue.process({ concurrency: 2 }, async ({ task }) => {
  console.log(task.message);
});

Advanced usage

queue.on("error", (error) => {
  // Prints pocketbase-queue errors, actual task errors are handled by the task processor
  console.error(error);
});

queue.on("stats", (stats) => {
  // Prints basic queue stats, like tasks per second
  console.log(stats);
});

// On Node, use this to gracefully exit workers. This makes sure there are no unreleased locks.
// Unreleased locks get cleaned up after 5 minutes, but it's better to release them as soon as possible.
process.on("SIGINT", async () => {
  queue.close();
  await new Promise((resolve) => setImmediate(resolve));
  console.log("exiting");
  process.exit(0);
});

Run tests

Install and start PocketBase:

./pocketbase/pocketbase serve --dev

Run the tests:

POCKETBASE_EMAIL=<admin email> POCKETBASE_PASSWORD=<admin password> npm run dev

Considerations

  • I easily reached 50-60 tasks per second with 4 concurrent workers and a local PocketBase instance
  • Make sure to run the workers as close as possible to the PocketBase instance to reduce latency
  • Failed tasks are stored with the error message for 7 days, use failedTaskTtl to configure this setting in milliseconds