Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Merge pull request #52 from BonnierNews/feature/handle-pubsub-redelivers
Browse files Browse the repository at this point in the history
Handle pubsub redelivers
  • Loading branch information
MattiasOlla authored Jan 22, 2024
2 parents 1d9c194 + d6bf9a2 commit 1bc440f
Show file tree
Hide file tree
Showing 12 changed files with 215 additions and 74 deletions.
5 changes: 4 additions & 1 deletion config/test.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,8 @@
"url": "https://some-base.local",
"audience": "some-aud"
},
"setXThrottle": true
"setXThrottle": true,
"toggle": {
"checkIdempotency": true
}
}
19 changes: 17 additions & 2 deletions lib/job-storage/firestore-job-storage.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Firestore from "@google-cloud/firestore";
import { Firestore } from "@google-cloud/firestore";

import { bucketHash, parentPayload, scanForInvalidKeys } from "./utils/job-storage-helper.js";
import buildLogger from "../logger.js";
Expand Down Expand Up @@ -106,4 +106,19 @@ async function removeParent(parentCorrelationId) {
}
}

export { storeParent, completedChild, parentIsComplete, removeParent };
async function messageAlreadySeen(idempotencyKey, deliveryAttempt) {
try {
await db
.collection("idempotencyLocks")
.doc(`${idempotencyKey}:${deliveryAttempt}`)
.create({ idempotencyKey, deliveryAttempt, expireAt: new Date(Date.now() + 24 * 60 * 60 * 1000) });
return false;
} catch (err) {
if (err.code.toLowerCase() === "already-exists") {
return true;
}
throw err;
}
}

export { storeParent, completedChild, parentIsComplete, removeParent, messageAlreadySeen };
14 changes: 11 additions & 3 deletions lib/job-storage/memory-job-storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import buildLogger from "../logger.js";
import { bucketHash, parentPayload, scanForInvalidKeys } from "./utils/job-storage-helper.js";

let db = { processed: {} };
let db = { processed: {}, idempotencyLocks: {} };
const maxConcurrentRequests = 5;

function storeParent(parentCorrelationId, children, message, nextKey) {
Expand Down Expand Up @@ -79,12 +79,20 @@ function removeParent(parentCorrelationId) {
return;
}

function messageAlreadySeen(idempotencyKey, deliveryAttempt) {
if (db.idempotencyLocks[`${idempotencyKey}:${deliveryAttempt}`]) {
return true;
}
db.idempotencyLocks[`${idempotencyKey}:${deliveryAttempt}`] = { idempotencyKey, deliveryAttempt };
return false;
}

function getDB(section = "processed") {
return db[section];
}

function clearDB() {
db = { processed: {} };
db = { processed: {}, idempotencyLocks: {} };
}

export { storeParent, completedChild, parentIsComplete, removeParent, getDB, clearDB };
export { storeParent, completedChild, parentIsComplete, removeParent, messageAlreadySeen, getDB, clearDB };
14 changes: 13 additions & 1 deletion lib/message-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import resumeMessage from "./resume-message.js";
import buildContext from "./context.js";
import jobStorage from "./job-storage/index.js";
import metrics from "./metrics.js";
import toggle from "./utils/toggle.js";

const maxResumeCount = config.maxResumeCount || 10;

Expand All @@ -32,7 +33,7 @@ async function runTrigger(fn, message, context) {

export default async function messageHandler(recipeMap, req, res) {
const messageData = parseBody(req.body);
const { key, resumedCount, parentCorrelationId, siblingCount } = messageData.attributes;
const { key, resumedCount, parentCorrelationId, siblingCount, idempotencyKey } = messageData.attributes;
const correlationId = messageData.attributes.correlationId || getCorrelationId(messageData);
const { messageId, message, deliveryAttempt } = messageData;
const context = buildContext(correlationId, key);
Expand All @@ -44,6 +45,17 @@ export default async function messageHandler(recipeMap, req, res) {
return res.status(400).send();
}

if (idempotencyKey && toggle("checkIdempotency")) {
try {
if (await jobStorage.messageAlreadySeen(idempotencyKey, deliveryAttempt)) {
logger.warn(`Message has already been handled ${idempotencyKey}:${deliveryAttempt}`);
return res.status(200).send();
}
} catch (err) {
logger.error(`Firebase error: ${JSON.stringify(err)}`);
return res.status(502).send({ error: err });
}
}
metrics.messages.inc();

logger.info(`incoming message ${JSON.stringify(messageData)}`);
Expand Down
5 changes: 3 additions & 2 deletions lib/publish-message.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { PubSub } from "@google-cloud/pubsub";
import config from "exp-config";
import * as uuid from "uuid";

import buildLogger from "./logger.js";

Expand All @@ -10,7 +11,7 @@ const cleanupAttributes = (attributes) => JSON.parse(JSON.stringify(attributes))
export default async function publishMessage(message, attributes) {
const messageId = await pubSubClient
.topic(config.topic)
.publishMessage({ json: message, attributes: cleanupAttributes({ ...attributes, topic: config.topic }) });
.publishMessage({ json: message, attributes: cleanupAttributes({ ...attributes, topic: config.topic, idempotencyKey: uuid.v4() }) });
buildLogger(message.correlationId).info(`Published message ${messageId}`);
}

Expand All @@ -33,7 +34,7 @@ export async function publishMessagesBulk(messages, attributes) {
const correlationId = `${attributes.correlationId}:${idx}`;
const messageId = await client.publishMessage({
json: message,
attributes: cleanupAttributes({ ...attributes, correlationId, topic: config.topic }),
attributes: cleanupAttributes({ ...attributes, correlationId, topic: config.topic, idempotencyKey: uuid.v4() }),
});
buildLogger(correlationId).info(`Published message ${messageId}`);
})()
Expand Down
14 changes: 14 additions & 0 deletions lib/utils/toggle.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import config from "exp-config";

const { toggle: configToggles = {} } = config;

/* c8 ignore start */
export default function toggle(name) {
if (process.env.NODE_ENV === "test") {
if (process.env["NODE-DISABLE-TOGGLE"]?.split(",")?.includes(name)) return false;
return true;
}
const value = configToggles[name];
return value === true || value === "true";
}
/* c8 ignore stop */
90 changes: 27 additions & 63 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@bonniernews/b0rker",
"version": "7.0.2",
"version": "7.1.0",
"engines": {
"node": ">=16"
},
Expand Down Expand Up @@ -39,7 +39,7 @@
},
"devDependencies": {
"@bonniernews/eslint-config": "^1.0.1",
"@bonniernews/lu-test": "^7.2.0",
"@bonniernews/lu-test": "^8.0.0",
"c8": "^8.0.1",
"chai": "^4.3.10",
"depcheck": "^1.4.7",
Expand Down
Loading

0 comments on commit 1bc440f

Please sign in to comment.