Skip to content

Commit

Permalink
fix: pg mq first draft
Browse files Browse the repository at this point in the history
  • Loading branch information
rotorsoft committed Sep 29, 2024
1 parent be12e85 commit 0ae4fff
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 28 deletions.
108 changes: 88 additions & 20 deletions libs/eventually-pg/src/PostgresMessageQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ import {
log,
logAdapterCreated,
logAdapterDisposed,
Message,
MessageQueue,
Messages
} from "@rotorsoft/eventually";
import { Pool } from "pg";
import { config } from "./config";
import { message_queue } from "./seed";

export const PostgresMessageQueue = <M extends Messages>(
table: string
Expand All @@ -19,34 +21,100 @@ export const PostgresMessageQueue = <M extends Messages>(
await pool.end();
},

// TODO: implement
enqueue: async (messages) => {
const sql = `TODO INSERT`.concat(
messages.map((message) => message.name).join(", ")
);
log().green().data("sql:", sql);
await pool.query(sql);
},

// TODO: implement
dequeue: async (callback, stream) => {
const sql = `SELECT * FROM "${table}" WHERE stream = '${stream}' ORDER BY created ASC LIMIT 1`;
log().green().data("sql:", sql);
// const result = await pool.query(sql);
// TODO: handle results with callback
return Promise.resolve();
},

// TODO: implement
seed: async () => {
const seed = "TODO SEED";
const seed = message_queue(table);
log().green().info(`>>> Seeding message queue table: ${table}`);
log().gray().info(seed);
await pool.query(seed);
},

drop: async (): Promise<void> => {
await pool.query(`DROP TABLE IF EXISTS "${table}"`);
},

/**
* Enqueues messages into the table.
* Each message should be inserted with its stream and a timestamp.
*/
enqueue: async (messages) => {
const N = 3;
const sql = `
INSERT INTO "${table}" (name, stream, data)
VALUES ${messages
.map(
(_, i) =>
`(${[...Array(N)].map((_, j) => `$${i * N + j + 1}`).join(", ")})`
)
.join(", ")}
`;
const values = messages.flatMap((m) => [
m.name,
m.stream || "", // empty stream for non-streamed messages
JSON.stringify(m.data)
]);

log().green().data("sql:", sql, values);
await pool.query(sql, values);
},

/**
* Dequeues the oldest available message in the specified stream and passes it to the callback.
* It uses a lock mechanism to ensure the message is only processed by one consumer at a time.
*/
dequeue: async (callback, { stream, leaseMillis = 30_000 }) => {
const sql = `SELECT * FROM "${table}" WHERE stream = '${stream}' ORDER BY created ASC LIMIT 1`;
log().green().data("sql:", sql);

const client = await pool.connect();
try {
await client.query("BEGIN");

// select and lock the next message in the queue
const { rows: next } = await client.query<
Message<M> & { id: number; created: Date }
>(
`
SELECT * FROM "${table}"
WHERE stream = $1 AND (locked_until IS NULL OR locked_until < NOW())
ORDER BY created ASC LIMIT 1
FOR UPDATE SKIP LOCKED
`,
[stream]
);
if (next.length === 0) {
log().yellow().trace(`No messages available for stream: ${stream}`);
await client.query("ROLLBACK");
return;
}

// update lock to prevent other consumers from accessing it
const message = next[0];
await client.query(
`
UPDATE "${table}" SET locked_until = NOW() + INTERVAL '1 millisecond' * $1
WHERE id = $2
`,
[leaseMillis, message.id]
);

// process the message using the provided callback
try {
await callback(message);
// delete message from the queue on success
await client.query(`DELETE FROM "${table}" WHERE id = $1`, [
message.id
]);
} catch (err) {
// release the lock on failure (no need for additional actions as lock will expire)
log().red().error(err);
}
await client.query("COMMIT");
} catch (err) {
await client.query("ROLLBACK");
log().red().error(err);
} finally {
client.release();
}
}
};

Expand Down
71 changes: 71 additions & 0 deletions libs/eventually-pg/src/__tests__/message-queue.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { dispose, Message, sleep } from "@rotorsoft/eventually";
import { PostgresMessageQueue } from "..";

const table = "message_queue_test";
const mq = PostgresMessageQueue(table);

describe("message queue", () => {
beforeAll(async () => {
await mq.drop();
await mq.seed();
});

afterAll(async () => {
await dispose()();
});

it("should enqueue and dequeue", async () => {
await mq.enqueue([{ name: "a", stream: "test", data: { value: "1" } }]);
await mq.enqueue([
{ name: "a", stream: "test", data: { value: "2" } },
{ name: "a", stream: "test", data: { value: "3" } },
{ name: "a", stream: "test", data: { value: "4" } }
]);
await mq.enqueue([{ name: "a", stream: "test", data: { value: "5" } }]);

// should dequeue in order
const messages: Message[] = [];
await mq.dequeue(
(message) => {
messages.push(message);
return Promise.resolve();
},
{ stream: "test" }
);
expect(messages.length).toBe(1);

// should not dequeue if stream is locked
await Promise.all([
// should lock the stream for 1 second
mq.dequeue(
async (message) => {
await sleep(1000);
messages.push(message);
},
{ stream: "test" }
),
// the stream should be locked for 1 second, thus should not dequeue
async () => {
await sleep(300);
await mq.dequeue(
async (message) => {
messages.push(message);
return Promise.resolve();
},
{ stream: "test" }
);
expect(messages.length).toBe(1);
}
]);

// should be able to dequeue again
await mq.dequeue(
(message) => {
messages.push(message);
return Promise.resolve();
},
{ stream: "test" }
);
expect(messages.length).toBe(3);
});
});
23 changes: 18 additions & 5 deletions libs/eventually-pg/src/seed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ CREATE TABLE IF NOT EXISTS public."${table}"
(
id serial PRIMARY KEY,
name varchar(100) COLLATE pg_catalog."default" NOT NULL,
data json,
data jsonb,
stream varchar(100) COLLATE pg_catalog."default" NOT NULL,
version int NOT NULL,
created timestamptz NOT NULL DEFAULT now(),
actor varchar(100) COLLATE pg_catalog."default",
metadata json
metadata jsonb
) TABLESPACE pg_default;
DO $$
Expand All @@ -40,7 +40,7 @@ ALTER TABLE public."${table}"
ADD COLUMN IF NOT EXISTS actor varchar(100) COLLATE pg_catalog."default";
ALTER TABLE public."${table}"
ADD COLUMN IF NOT EXISTS metadata json;
ADD COLUMN IF NOT EXISTS metadata jsonb;
CREATE UNIQUE INDEX IF NOT EXISTS "${table}_stream_ix"
ON public."${table}" USING btree (stream COLLATE pg_catalog."default" ASC, version ASC)
Expand Down Expand Up @@ -83,8 +83,8 @@ const ZOD2PG: { [K in z.ZodFirstPartyTypeKind]?: string } = {
[z.ZodFirstPartyTypeKind.ZodBoolean]: "BOOLEAN",
[z.ZodFirstPartyTypeKind.ZodDate]: "TIMESTAMPTZ",
[z.ZodFirstPartyTypeKind.ZodBigInt]: "BIGINT",
[z.ZodFirstPartyTypeKind.ZodObject]: "JSON",
[z.ZodFirstPartyTypeKind.ZodRecord]: "JSON",
[z.ZodFirstPartyTypeKind.ZodObject]: "JSONB",
[z.ZodFirstPartyTypeKind.ZodRecord]: "JSONB",
[z.ZodFirstPartyTypeKind.ZodNativeEnum]: "TEXT",
[z.ZodFirstPartyTypeKind.ZodEnum]: "TEXT"
};
Expand Down Expand Up @@ -162,3 +162,16 @@ export const projector = <S extends State>(
)
.join("\n")}`;
};

export const message_queue = (table: string): string => `
CREATE TABLE IF NOT EXISTS public."${table}"
(
id serial PRIMARY KEY,
name varchar(100) COLLATE pg_catalog."default" NOT NULL,
stream varchar(100) COLLATE pg_catalog."default" NOT NULL,
data jsonb NOT NULL,
created timestamptz NOT NULL DEFAULT now(),
locked_until timestamptz,
CONSTRAINT message_queue_unique_stream_id UNIQUE (stream, id)
) TABLESPACE pg_default;
`;
9 changes: 6 additions & 3 deletions libs/eventually/src/interfaces/stores.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,15 @@ export interface MessageQueue<M extends Messages> extends Disposable {

/**
* Dequeues message on top of the queue after being processed by consumer callback
* @param callback consumer callback
* @param callback consumer callback that receives the message with storage attributes {id, created} and returns a promise or error
* @param stream optional stream name to support independent concurrent consumers
* @param leaseMillis optional lease duration in milliseconds, adapters should default to a meaningful value
*/
dequeue: (
callback: (message: Message<M> & { created: Date }) => Promise<void>,
stream?: string
callback: (
message: Message<M> & { id: number | string; created: Date }
) => Promise<void>,
opts: { stream?: string; leaseMillis?: number }
) => Promise<void>;

/**
Expand Down

0 comments on commit 0ae4fff

Please sign in to comment.