From 0517802ad85fdccd7b74af9a65eb1394715dbd8c Mon Sep 17 00:00:00 2001 From: Mattias Olla Date: Thu, 18 Jan 2024 16:57:44 +0100 Subject: [PATCH 1/6] Handle pubsub redelivers Set an idempotency key when publishing to pubsub, and then create a "lock" in firestore when starting to process the message. Co-authored-by: Alice Boberg Co-authored-by: Max Olofsson --- lib/job-storage/firestore-job-storage.js | 17 +++- lib/job-storage/memory-job-storage.js | 12 ++- lib/message-handler.js | 13 ++- lib/publish-message.js | 5 +- test/feature/idempotency-feature.test.js | 118 +++++++++++++++++++++++ 5 files changed, 159 insertions(+), 6 deletions(-) create mode 100644 test/feature/idempotency-feature.test.js diff --git a/lib/job-storage/firestore-job-storage.js b/lib/job-storage/firestore-job-storage.js index c6c69f5..382f4cd 100644 --- a/lib/job-storage/firestore-job-storage.js +++ b/lib/job-storage/firestore-job-storage.js @@ -106,4 +106,19 @@ async function removeParent(parentCorrelationId) { } } -export { storeParent, completedChild, parentIsComplete, removeParent }; +async function messageAlreadySeen(idempotencyKey, deliveryAttempt) { + try { + await db + .collection("idempotencyLocks") + .doc(`${idempotencyKey}:${deliveryAttempt}`) + .create({ idempotencyKey, deliveryAttempt }); + return false; + } catch (err) { + if (err.code.toLowerCase() === "already-exists") { + return true; + } + throw err; + } +} + +export { storeParent, completedChild, parentIsComplete, removeParent, messageAlreadySeen }; diff --git a/lib/job-storage/memory-job-storage.js b/lib/job-storage/memory-job-storage.js index bab1fd0..3bbaaeb 100644 --- a/lib/job-storage/memory-job-storage.js +++ b/lib/job-storage/memory-job-storage.js @@ -2,7 +2,7 @@ import buildLogger from "../logger.js"; import { bucketHash, parentPayload, scanForInvalidKeys } from "./utils/job-storage-helper.js"; -let db = { processed: {} }; +let db = { processed: {}, idempotencyLocks: {} }; const maxConcurrentRequests = 5; function storeParent(parentCorrelationId, children, message, nextKey) { @@ -79,6 +79,14 @@ function removeParent(parentCorrelationId) { return; } +function messageAlreadySeen(idempotencyKey, deliveryAttempt) { + if (db.idempotencyLocks[`${idempotencyKey}:${deliveryAttempt}`]) { + return true; + } + db.idempotencyLocks[`${idempotencyKey}:${deliveryAttempt}`] = { idempotencyKey, deliveryAttempt }; + return false; +} + function getDB(section = "processed") { return db[section]; } @@ -87,4 +95,4 @@ function clearDB() { db = { processed: {} }; } -export { storeParent, completedChild, parentIsComplete, removeParent, getDB, clearDB }; +export { storeParent, completedChild, parentIsComplete, removeParent, messageAlreadySeen, getDB, clearDB }; diff --git a/lib/message-handler.js b/lib/message-handler.js index b85d04e..79e9284 100644 --- a/lib/message-handler.js +++ b/lib/message-handler.js @@ -32,7 +32,7 @@ async function runTrigger(fn, message, context) { export default async function messageHandler(recipeMap, req, res) { const messageData = parseBody(req.body); - const { key, resumedCount, parentCorrelationId, siblingCount } = messageData.attributes; + const { key, resumedCount, parentCorrelationId, siblingCount, idempotencyKey } = messageData.attributes; const correlationId = messageData.attributes.correlationId || getCorrelationId(messageData); const { messageId, message, deliveryAttempt } = messageData; const context = buildContext(correlationId, key); @@ -44,6 +44,17 @@ export default async function messageHandler(recipeMap, req, res) { return res.status(400).send(); } + if (idempotencyKey) { + try { + if (await jobStorage.messageAlreadySeen(idempotencyKey, deliveryAttempt)) { + logger.warn(`Message has already been handled ${idempotencyKey}:${deliveryAttempt}`); + return res.status(200).send(); + } + } catch (err) { + logger.error(`Firebase error: ${JSON.stringify(err)}`); + return res.status(502).send({ error: err }); + } + } metrics.messages.inc(); logger.info(`incoming message ${JSON.stringify(messageData)}`); diff --git a/lib/publish-message.js b/lib/publish-message.js index df840f6..98ea654 100644 --- a/lib/publish-message.js +++ b/lib/publish-message.js @@ -1,5 +1,6 @@ import { PubSub } from "@google-cloud/pubsub"; import config from "exp-config"; +import * as uuid from "uuid"; import buildLogger from "./logger.js"; @@ -10,7 +11,7 @@ const cleanupAttributes = (attributes) => JSON.parse(JSON.stringify(attributes)) export default async function publishMessage(message, attributes) { const messageId = await pubSubClient .topic(config.topic) - .publishMessage({ json: message, attributes: cleanupAttributes({ ...attributes, topic: config.topic }) }); + .publishMessage({ json: message, attributes: cleanupAttributes({ ...attributes, topic: config.topic, idempotencyKey: uuid.v4() }) }); buildLogger(message.correlationId).info(`Published message ${messageId}`); } @@ -33,7 +34,7 @@ export async function publishMessagesBulk(messages, attributes) { const correlationId = `${attributes.correlationId}:${idx}`; const messageId = await client.publishMessage({ json: message, - attributes: cleanupAttributes({ ...attributes, correlationId, topic: config.topic }), + attributes: cleanupAttributes({ ...attributes, correlationId, topic: config.topic, idempotencyKey: uuid.v4() }), }); buildLogger(correlationId).info(`Published message ${messageId}`); })() diff --git a/test/feature/idempotency-feature.test.js b/test/feature/idempotency-feature.test.js new file mode 100644 index 0000000..fdd09d3 --- /dev/null +++ b/test/feature/idempotency-feature.test.js @@ -0,0 +1,118 @@ +import { fakePubSub } from "@bonniernews/lu-test"; + +import { start, route } from "../../index.js"; +import jobStorage from "../../lib/job-storage/index.js"; + +const triggerMessage = { + type: "advertisement-order", + id: "some-order-id", +}; + +Feature("Message idempotency", () => { + afterEachScenario(() => { + fakePubSub.reset(); + jobStorage.clearDB(); + }); + + Scenario("Same message gets redelivered", () => { + let broker; + let processedCount = 0; + Given("broker is initiated with a recipe", () => { + broker = start({ + startServer: false, + recipes: [ + { + namespace: "sequence", + name: "advertisement-order", + sequence: [ route(".perform.step-1", () => { + processedCount++; + }) ], + }, + ], + }); + }); + + Given("we can publish messages", () => { + fakePubSub.enablePublish(broker); + }); + + let response; + When("a trigger message is received", async () => { + response = await fakePubSub.triggerMessage(broker, triggerMessage, { + key: "trigger.sequence.advertisement-order", + deliveryAttempt: 1, + idempotencyKey: "some-epic-key", + }); + }); + + Then("the status code should be 200 OK", () => { + response.statusCode.should.eql(200, response.text); + }); + + And("the trigger message is redelivered with the same idempotency key", async () => { + response = await fakePubSub.triggerMessage(broker, triggerMessage, { + key: "trigger.sequence.advertisement-order", + deliveryAttempt: 1, + idempotencyKey: "some-epic-key", + }); + }); + + Then("the status code should be 200 OK", () => { + response.statusCode.should.eql(200, response.text); + }); + + And("there should be two message handler responses", () => { + fakePubSub.recordedMessageHandlerResponses().length.should.eql(2); + }); + + And("we should only have processed one message", () => { + processedCount.should.eql(1); + }); + }); + + Scenario("Message gets retried", () => { + let broker; + let processedCount = 0; + Given("broker is initiated with a recipe", () => { + broker = start({ + startServer: false, + recipes: [ + { + namespace: "sequence", + name: "advertisement-order", + sequence: [ route(".perform.step-1", () => { + processedCount++; + }) ], + }, + ], + }); + }); + + Given("we can publish messages", () => { + fakePubSub.enablePublish(broker); + }); + + let response; + When("a trigger message is received and retried", async () => { + await fakePubSub.triggerMessage(broker, triggerMessage, { + key: "trigger.sequence.advertisement-order", + deliveryAttempt: 1, + idempotencyKey: "some-epic-key", + }); + + await fakePubSub.triggerMessage(broker, triggerMessage, { + key: "trigger.sequence.advertisement-order", + deliveryAttempt: 2, + idempotencyKey: "some-epic-key", + }); + }); + + And("there should be two message handler responses", () => { + fakePubSub.recordedMessageHandlerResponses().length.should.eql(2); + }); + + And("we should have processed both messages", () => { + processedCount.should.eql(2); + }); + }); +}); From faf0674dd6da5254ce36241a505ba5d739632530 Mon Sep 17 00:00:00 2001 From: Mattias Olla Date: Mon, 22 Jan 2024 14:51:33 +0100 Subject: [PATCH 2/6] Fix tests Co-authored-by: Alice Boberg Co-authored-by: Max Olofsson --- lib/job-storage/memory-job-storage.js | 2 +- test/feature/idempotency-feature.test.js | 21 ++++++++++---------- test/feature/sequence-feature.test.js | 2 ++ test/feature/sub-sequence-feature.test.js | 1 + test/feature/trigger-handler-feature.test.js | 4 ++++ 5 files changed, 18 insertions(+), 12 deletions(-) diff --git a/lib/job-storage/memory-job-storage.js b/lib/job-storage/memory-job-storage.js index 3bbaaeb..de1a184 100644 --- a/lib/job-storage/memory-job-storage.js +++ b/lib/job-storage/memory-job-storage.js @@ -92,7 +92,7 @@ function getDB(section = "processed") { } function clearDB() { - db = { processed: {} }; + db = { processed: {}, idempotencyLocks: {} }; } export { storeParent, completedChild, parentIsComplete, removeParent, messageAlreadySeen, getDB, clearDB }; diff --git a/test/feature/idempotency-feature.test.js b/test/feature/idempotency-feature.test.js index fdd09d3..9e2c915 100644 --- a/test/feature/idempotency-feature.test.js +++ b/test/feature/idempotency-feature.test.js @@ -9,6 +9,10 @@ const triggerMessage = { }; Feature("Message idempotency", () => { + beforeEachScenario(() => { + fakePubSub.reset(); + jobStorage.clearDB(); + }); afterEachScenario(() => { fakePubSub.reset(); jobStorage.clearDB(); @@ -40,9 +44,8 @@ Feature("Message idempotency", () => { When("a trigger message is received", async () => { response = await fakePubSub.triggerMessage(broker, triggerMessage, { key: "trigger.sequence.advertisement-order", - deliveryAttempt: 1, idempotencyKey: "some-epic-key", - }); + }, { deliveryAttempt: 1 }); }); Then("the status code should be 200 OK", () => { @@ -52,9 +55,8 @@ Feature("Message idempotency", () => { And("the trigger message is redelivered with the same idempotency key", async () => { response = await fakePubSub.triggerMessage(broker, triggerMessage, { key: "trigger.sequence.advertisement-order", - deliveryAttempt: 1, idempotencyKey: "some-epic-key", - }); + }, { deliveryAttempt: 1 }); }); Then("the status code should be 200 OK", () => { @@ -92,23 +94,20 @@ Feature("Message idempotency", () => { fakePubSub.enablePublish(broker); }); - let response; When("a trigger message is received and retried", async () => { await fakePubSub.triggerMessage(broker, triggerMessage, { key: "trigger.sequence.advertisement-order", - deliveryAttempt: 1, idempotencyKey: "some-epic-key", - }); + }, { deliveryAttempt: 1 }); await fakePubSub.triggerMessage(broker, triggerMessage, { key: "trigger.sequence.advertisement-order", - deliveryAttempt: 2, idempotencyKey: "some-epic-key", - }); + }, { deliveryAttempt: 2 }); }); - And("there should be two message handler responses", () => { - fakePubSub.recordedMessageHandlerResponses().length.should.eql(2); + And("there should be two message handler responses per run", () => { + fakePubSub.recordedMessageHandlerResponses().length.should.eql(4); }); And("we should have processed both messages", () => { diff --git a/test/feature/sequence-feature.test.js b/test/feature/sequence-feature.test.js index 63d5d4a..09ee5d9 100644 --- a/test/feature/sequence-feature.test.js +++ b/test/feature/sequence-feature.test.js @@ -56,6 +56,7 @@ Feature("Broker sequence", () => { last.attributes.should.eql({ correlationId: "some-correlation-id", key: "sequence.advertisement-order.processed", + idempotencyKey: last.attributes.idempotencyKey, topic: "b0rker", }); last.message.should.eql({ @@ -529,6 +530,7 @@ Feature("Broker sequence", () => { last.attributes.should.eql({ correlationId: "some-correlation-id", key: "sequence.bananas.processed", + idempotencyKey: last.attributes.idempotencyKey, topic: "b0rker", }); last.message.should.eql({ diff --git a/test/feature/sub-sequence-feature.test.js b/test/feature/sub-sequence-feature.test.js index 1a57689..6c0f9f9 100644 --- a/test/feature/sub-sequence-feature.test.js +++ b/test/feature/sub-sequence-feature.test.js @@ -14,6 +14,7 @@ Feature("Child processes", () => { fakeGcpAuth.authenticated(); nock.disableNetConnect(); nock.enableNetConnect(/(localhost|127\.0\.0\.1):\d+/); + fakePubSub.reset(); }); afterEachScenario(() => { fakePubSub.reset(); diff --git a/test/feature/trigger-handler-feature.test.js b/test/feature/trigger-handler-feature.test.js index ff7b305..9a7e32e 100644 --- a/test/feature/trigger-handler-feature.test.js +++ b/test/feature/trigger-handler-feature.test.js @@ -106,8 +106,10 @@ Feature("Trigger handler", () => { attributes: { key: "trigger.order", correlationId: fakePubSub.recordedMessages()[0].attributes.correlationId, + idempotencyKey: fakePubSub.recordedMessages()[0].attributes.idempotencyKey, topic: "b0rker", }, + deliveryAttempt: 1, }, { topic: "b0rker", @@ -115,8 +117,10 @@ Feature("Trigger handler", () => { attributes: { key: "trigger.order", correlationId: fakePubSub.recordedMessages()[1].attributes.correlationId, + idempotencyKey: fakePubSub.recordedMessages()[1].attributes.idempotencyKey, topic: "b0rker", }, + deliveryAttempt: 1, }, ]); }); From 6d12417451debede5f5e424ca8dc3b78cfccd5a9 Mon Sep 17 00:00:00 2001 From: Mattias Olla Date: Mon, 22 Jan 2024 15:12:27 +0100 Subject: [PATCH 3/6] Add toggle Co-authored-by: Alice Boberg Co-authored-by: Max Olofsson --- config/test.json | 5 ++++- lib/message-handler.js | 3 ++- lib/utils/toggle.js | 14 ++++++++++++++ 3 files changed, 20 insertions(+), 2 deletions(-) create mode 100644 lib/utils/toggle.js diff --git a/config/test.json b/config/test.json index 5b93291..cd5f372 100644 --- a/config/test.json +++ b/config/test.json @@ -11,5 +11,8 @@ "url": "https://some-base.local", "audience": "some-aud" }, - "setXThrottle": true + "setXThrottle": true, + "toggle": { + "checkIdempotency": true + } } diff --git a/lib/message-handler.js b/lib/message-handler.js index 79e9284..dcd21a1 100644 --- a/lib/message-handler.js +++ b/lib/message-handler.js @@ -6,6 +6,7 @@ import resumeMessage from "./resume-message.js"; import buildContext from "./context.js"; import jobStorage from "./job-storage/index.js"; import metrics from "./metrics.js"; +import toggle from "./utils/toggle.js"; const maxResumeCount = config.maxResumeCount || 10; @@ -44,7 +45,7 @@ export default async function messageHandler(recipeMap, req, res) { return res.status(400).send(); } - if (idempotencyKey) { + if (idempotencyKey && toggle("checkIdempotency")) { try { if (await jobStorage.messageAlreadySeen(idempotencyKey, deliveryAttempt)) { logger.warn(`Message has already been handled ${idempotencyKey}:${deliveryAttempt}`); diff --git a/lib/utils/toggle.js b/lib/utils/toggle.js new file mode 100644 index 0000000..f0a7a1a --- /dev/null +++ b/lib/utils/toggle.js @@ -0,0 +1,14 @@ +import config from "exp-config"; + +const { toggle: configToggles = {} } = config; + +/* c8 ignore start */ +export default function toggle(name) { + if (process.env.NODE_ENV === "test") { + if (process.env["NODE-DISABLE-TOGGLE"]?.split(",")?.includes(name)) return false; + return true; + } + const value = configToggles[name]; + return value === true || value === "true"; +} +/* c8 ignore stop */ From e23e0bedeb62514419c5dbac48bccbf3b5ee11b5 Mon Sep 17 00:00:00 2001 From: Mattias Olla Date: Mon, 22 Jan 2024 15:26:21 +0100 Subject: [PATCH 4/6] Bump lu-test Co-authored-by: Alice Boberg Co-authored-by: Max Olofsson --- package-lock.json | 86 ++++++++++++++--------------------------------- package.json | 2 +- 2 files changed, 26 insertions(+), 62 deletions(-) diff --git a/package-lock.json b/package-lock.json index fdaf240..de16ad7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -24,7 +24,7 @@ }, "devDependencies": { "@bonniernews/eslint-config": "^1.0.1", - "@bonniernews/lu-test": "^7.2.0", + "@bonniernews/lu-test": "^8.0.0", "c8": "^8.0.1", "chai": "^4.3.10", "depcheck": "^1.4.7", @@ -258,9 +258,9 @@ } }, "node_modules/@bonniernews/lu-test": { - "version": "7.2.0", - "resolved": "https://registry.npmjs.org/@bonniernews/lu-test/-/lu-test-7.2.0.tgz", - "integrity": "sha512-xp0pArUgA0UHUjB31bd56NAubTheqfkcqQqexwPVOwvrrmpz8DAl7hH9swSklb/LynW0M3K1kb9PzT3trZsdFQ==", + "version": "8.0.0", + "resolved": "https://registry.npmjs.org/@bonniernews/lu-test/-/lu-test-8.0.0.tgz", + "integrity": "sha512-hZTUKTyBEGa0a4/0PixjYiEjdVB+v3Nq6z+VpSJClary82xYxy0apqx16b16zRZQBBmo482THeg7imuXIPkn+A==", "dev": true, "dependencies": { "@google-cloud/pubsub": "^4.0.7", @@ -906,9 +906,9 @@ "integrity": "sha512-RNiOoTPkptFtSVzQevY/yWtZwf/RxyVnPy/OcA9HBM3MlGDnBEYL5B41H0MTn0Uec8Hi+2qUtTfG2WWZBmMejQ==" }, "node_modules/@sinonjs/commons": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/@sinonjs/commons/-/commons-3.0.0.tgz", - "integrity": "sha512-jXBtWAF4vmdNmZgD5FoKsVLv3rPgDnLgPbU84LIJ3otV44vJlDRokVng5v8NFJdCf/da9legHcKaRuZs4L7faA==", + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/@sinonjs/commons/-/commons-3.0.1.tgz", + "integrity": "sha512-K3mCHKQ9sVh8o1C9cxkwxaOmXoAMlDxC1mYyHrjqOWEcBjYr76t96zL2zlj5dUGZ3HSw240X1qgH3Mjf1yJWpQ==", "dev": true, "dependencies": { "type-detect": "4.0.8" @@ -3600,9 +3600,9 @@ "integrity": "sha512-VhXlQgj9ioXCqGstD37E/HBeqEGV/qOD/kmbVG8h5xKBYvM1L3lR1Zn4555cQ8GkYbJa8aJSipLPndE1k6zK2w==" }, "node_modules/fast-xml-parser": { - "version": "4.3.2", - "resolved": "https://registry.npmjs.org/fast-xml-parser/-/fast-xml-parser-4.3.2.tgz", - "integrity": "sha512-rmrXUXwbJedoXkStenj1kkljNF7ugn5ZjR9FJcwmCfcCbtOMDghPajbc+Tck6vE6F5XsDmx+Pr2le9fw8+pXBg==", + "version": "4.3.3", + "resolved": "https://registry.npmjs.org/fast-xml-parser/-/fast-xml-parser-4.3.3.tgz", + "integrity": "sha512-coV/D1MhrShMvU6D0I+VAK3umz6hUaxxhL0yp/9RjfiYUfAv14rDhGQL+PLForhMdr0wq3PiV07WtkkNjJjNHg==", "dev": true, "funding": [ { @@ -5259,9 +5259,9 @@ } }, "node_modules/just-extend": { - "version": "4.2.1", - "resolved": "https://registry.npmjs.org/just-extend/-/just-extend-4.2.1.tgz", - "integrity": "sha512-g3UB796vUFIY90VIv/WX3L2c8CS2MdWUww3CNrYmqza1Fg0DURc2K/O4YrnklBdQarSJ/y8JnJYDGc+1iumQjg==", + "version": "6.2.0", + "resolved": "https://registry.npmjs.org/just-extend/-/just-extend-6.2.0.tgz", + "integrity": "sha512-cYofQu2Xpom82S6qD778jBDpwvvy39s1l/hrYij2u9AMdQcGRpaBu6kY4mVhuno5kJVi1DAz4aiphA2WI1/OAw==", "dev": true }, "node_modules/jwa": { @@ -5930,59 +5930,23 @@ } }, "node_modules/nise": { - "version": "5.1.5", - "resolved": "https://registry.npmjs.org/nise/-/nise-5.1.5.tgz", - "integrity": "sha512-VJuPIfUFaXNRzETTQEEItTOP8Y171ijr+JLq42wHes3DiryR8vT+1TXQW/Rx8JNUhyYYWyIvjXTU6dOhJcs9Nw==", - "dev": true, - "dependencies": { - "@sinonjs/commons": "^2.0.0", - "@sinonjs/fake-timers": "^10.0.2", - "@sinonjs/text-encoding": "^0.7.1", - "just-extend": "^4.0.2", - "path-to-regexp": "^1.7.0" - } - }, - "node_modules/nise/node_modules/@sinonjs/commons": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/@sinonjs/commons/-/commons-2.0.0.tgz", - "integrity": "sha512-uLa0j859mMrg2slwQYdO/AkrOfmH+X6LTVmNTS9CqexuE2IvVORIkSpJLqePAbEnKJ77aMmCwr1NUZ57120Xcg==", - "dev": true, - "dependencies": { - "type-detect": "4.0.8" - } - }, - "node_modules/nise/node_modules/@sinonjs/fake-timers": { - "version": "10.3.0", - "resolved": "https://registry.npmjs.org/@sinonjs/fake-timers/-/fake-timers-10.3.0.tgz", - "integrity": "sha512-V4BG07kuYSUkTCSBHG8G8TNhM+F19jXFWnQtzj+we8DrkpSBCee9Z3Ms8yiGer/dlmhe35/Xdgyo3/0rQKg7YA==", + "version": "5.1.7", + "resolved": "https://registry.npmjs.org/nise/-/nise-5.1.7.tgz", + "integrity": "sha512-wWtNUhkT7k58uvWTB/Gy26eA/EJKtPZFVAhEilN5UYVmmGRYOURbejRUyKm0Uu9XVEW7K5nBOZfR8VMB4QR2RQ==", "dev": true, "dependencies": { - "@sinonjs/commons": "^3.0.0" - } - }, - "node_modules/nise/node_modules/@sinonjs/fake-timers/node_modules/@sinonjs/commons": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/@sinonjs/commons/-/commons-3.0.0.tgz", - "integrity": "sha512-jXBtWAF4vmdNmZgD5FoKsVLv3rPgDnLgPbU84LIJ3otV44vJlDRokVng5v8NFJdCf/da9legHcKaRuZs4L7faA==", - "dev": true, - "dependencies": { - "type-detect": "4.0.8" + "@sinonjs/commons": "^3.0.0", + "@sinonjs/fake-timers": "^11.2.2", + "@sinonjs/text-encoding": "^0.7.2", + "just-extend": "^6.2.0", + "path-to-regexp": "^6.2.1" } }, - "node_modules/nise/node_modules/isarray": { - "version": "0.0.1", - "resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz", - "integrity": "sha512-D2S+3GLxWH+uhrNEcoh/fnmYeP8E8/zHl644d/jdA0g2uyXvy3sb0qxotE+ne0LtccHknQzWwZEzhak7oJ0COQ==", - "dev": true - }, "node_modules/nise/node_modules/path-to-regexp": { - "version": "1.8.0", - "resolved": "https://registry.npmjs.org/path-to-regexp/-/path-to-regexp-1.8.0.tgz", - "integrity": "sha512-n43JRhlUKUAlibEJhPeir1ncUID16QnEjNpwzNdO3Lm4ywrBpBZ5oLD0I6br9evr1Y9JTqwRtAh7JLoOzAQdVA==", - "dev": true, - "dependencies": { - "isarray": "0.0.1" - } + "version": "6.2.1", + "resolved": "https://registry.npmjs.org/path-to-regexp/-/path-to-regexp-6.2.1.tgz", + "integrity": "sha512-JLyh7xT1kizaEvcaXOQwOc2/Yhw6KZOvPf1S8401UyLk86CU79LN3vl7ztXGm/pZ+YjoyAJ4rxmHwbkBXJX+yw==", + "dev": true }, "node_modules/nock": { "version": "13.4.0", diff --git a/package.json b/package.json index e41fd05..d025111 100644 --- a/package.json +++ b/package.json @@ -39,7 +39,7 @@ }, "devDependencies": { "@bonniernews/eslint-config": "^1.0.1", - "@bonniernews/lu-test": "^7.2.0", + "@bonniernews/lu-test": "^8.0.0", "c8": "^8.0.1", "chai": "^4.3.10", "depcheck": "^1.4.7", From 8344ccad26ebf3ee78069e0ef6618f36ef76d202 Mon Sep 17 00:00:00 2001 From: Mattias Olla Date: Mon, 22 Jan 2024 15:30:25 +0100 Subject: [PATCH 5/6] Bump version Co-authored-by: Alice Boberg Co-authored-by: Max Olofsson --- package-lock.json | 4 ++-- package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index de16ad7..e76c9b8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@bonniernews/b0rker", - "version": "7.0.2", + "version": "7.1.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@bonniernews/b0rker", - "version": "7.0.2", + "version": "7.1.0", "license": "MIT", "dependencies": { "@bonniernews/gcp-push-metrics": "^3.2.1", diff --git a/package.json b/package.json index d025111..0843b23 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@bonniernews/b0rker", - "version": "7.0.2", + "version": "7.1.0", "engines": { "node": ">=16" }, From d6bf9a2ce085f298797f45d560b3bba27bdd1456 Mon Sep 17 00:00:00 2001 From: Mattias Olla Date: Mon, 22 Jan 2024 16:05:41 +0100 Subject: [PATCH 6/6] Add `expireAt` to idempotency lock to be able to configure TTL. Co-authored-by: Alice Boberg --- lib/job-storage/firestore-job-storage.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/job-storage/firestore-job-storage.js b/lib/job-storage/firestore-job-storage.js index 382f4cd..462886d 100644 --- a/lib/job-storage/firestore-job-storage.js +++ b/lib/job-storage/firestore-job-storage.js @@ -1,4 +1,4 @@ -import Firestore from "@google-cloud/firestore"; +import { Firestore } from "@google-cloud/firestore"; import { bucketHash, parentPayload, scanForInvalidKeys } from "./utils/job-storage-helper.js"; import buildLogger from "../logger.js"; @@ -111,7 +111,7 @@ async function messageAlreadySeen(idempotencyKey, deliveryAttempt) { await db .collection("idempotencyLocks") .doc(`${idempotencyKey}:${deliveryAttempt}`) - .create({ idempotencyKey, deliveryAttempt }); + .create({ idempotencyKey, deliveryAttempt, expireAt: new Date(Date.now() + 24 * 60 * 60 * 1000) }); return false; } catch (err) { if (err.code.toLowerCase() === "already-exists") {