A PL/PGSQL based work queue (Publisher/Consumer), with a python asyncio/asyncpg api
alpha software
Implements a two layer API:
A postgresql layer: tasks can be published from PL/PGSQL functions, or procedures. Also can be extended using triggers.
A python layer (or any client with a postgresql driver). The default implementations is based on asyncio python, using the awesome asyncpg driver.
It's compatible with postgrest. All procedures, and tables, are scoped on an owned postgresql schema, and can be exposed throught it, with postgrest
Retry logic, schedule_at or timeout, are implemented on the publish method. A task, can be published, with a max_retries, param, or an especific timeout.
Internally uses two tables
the table where pending and running tasks are scheduled, andjobs.job
the table where ended tasks, are moved (success or failures). -
By default, tasks are retyried three times, with backoff.
Timeout jobs, are expired, tasks by default had a 60s tiemout.
Tasks can be scheduled on the future, just provide a
param. -
There are views to monitor queue stats:
(all tasks),jobs.expired
Tasks could also be priorized, provide a priority number, greater priority, precedence over other tasks
consumer_topic, allows to consume tasks with a * (topic.element.%)
rudimentary benchs on my laptop showed that it can handle 1000 tasks/second, but anyway it depends on your postgres instance.
instead of a worker daemon, tasks could also be consumed from a cronjob, or a regular python or a kubernetes job. (It could be used to parallelize k8 jobs)
- All jobs had to be aknowledged positive or negative (ack/nack)
i_task -- method or function to be executed,
i_body::jsonb = null -- arguments passed to it (on python {args:[], kwargs:{}}),
i_scheduled_at: timestamp = null, -- when the task should run
i_timeout:numeric(7,2) -- timeout in seconds for the job
i_priority:integer = null -- gretare number more priority
On the worker side:
SELECT * from jobs.consume(
num: integer -- number of desired jobs
returns a list of jobs to be processed,
Or selective consume a topic:
SELECT * from jobs.consume_topic('topic.xxx.%', 10)
jobs are marked as processing, and should be acnlowledged with:
SELECT FROM jobs.ack(job_id);
or to return a failed job.
SELECT FROM jobs.nack(job_id, traceback, i_schedule_at)
Also you can batch enqueue multiple jobs in a single request, using
SELECT * FROM jobs.publish_bulk(jobs.bulk_job[]);
where jobs.bulk_job is
create type jobs.bulk_job as (
task varchar,
body jsonb,
scheduled_at timestamp,
timeout integer,
priority integer,
max_retries integer
On this side, implementing a worker, should be something like
db = await asyncpg.connect(dsn)
while True:
jobs = await jobs.consume(db, 1)
for job in jobs:
await jobs.run(db, job["job_id"])
await jobs.ack(job["job_id"])
except Exception as e:
await jobs.nack(job["job_id"], str(e))
await asyncio.sleep(1)
On the publisher side, jobs could be enqueued from between a postgresql transaction:
db = await asyncpg.connect(dsn)
async with db.transaction():
# do whatever is needed,
# queue a task
await jobs.publish("package.file.sum", args=[1,2])
pip install pgjobs
jobs-migrate postgresql://user:password@localhost:5432/db
This will create the schema on the `jobs` postgresql schema
To run the worker,
jobs-worker postgresql://dsn
At the moment there are no too much things implemented there,
just a single threaded worker, that needs a bit more of love :)
If your application resides on a python package,
tasks like yourpackage.file.method
will be runnable as is.
With psql, or exposing them throught postgresql_exporter
connect notifications, using pg_notify, when tasks are queued, are picked, are completed. With this in place, it's easy enought to write o WS to send notifications to connected customers.
improve the worker to run every job on an asyncio task
handle better exceptions on the python side
fix requirements file
add github actions to run CI
write better docs and some examples