Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Merge pull request #54 from BonnierNews/fix/catch-pubsub-publishing-e…
Browse files Browse the repository at this point in the history
…rrors

Log pubsub publishing errors
  • Loading branch information
MattiasOlla authored Jan 23, 2024
2 parents 1bc440f + 8c68280 commit bee0d64
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 16 deletions.
45 changes: 32 additions & 13 deletions lib/publish-message.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,23 @@ const pubSubClient = config.pubSub ? new PubSub({ apiEndpoint: config.pubSub.api
const cleanupAttributes = (attributes) => JSON.parse(JSON.stringify(attributes));

export default async function publishMessage(message, attributes) {
const messageId = await pubSubClient
.topic(config.topic)
.publishMessage({ json: message, attributes: cleanupAttributes({ ...attributes, topic: config.topic, idempotencyKey: uuid.v4() }) });
buildLogger(message.correlationId).info(`Published message ${messageId}`);
const logger = buildLogger(message.correlationId);
const messageId = await publishPubsubMessage(
pubSubClient.topic(config.topic),
{ json: message, attributes: cleanupAttributes({ ...attributes, topic: config.topic, idempotencyKey: uuid.v4() }) },
logger
);
logger.info(`Published message ${messageId}`);
}

export async function rejectMessage(message, attributes) {
const messageId = await pubSubClient
.topic(config.deadLetterTopic)
.publishMessage({ json: message, attributes: cleanupAttributes({ ...attributes, topic: config.topic }) });
buildLogger(message.correlationId).error(`Rejected message ${messageId}`);
const logger = buildLogger(message.correlationId);
const messageId = await publishPubsubMessage(
pubSubClient.topic(config.deadLetterTopic),
{ json: message, attributes: cleanupAttributes({ ...attributes, topic: config.topic }) },
logger
);
logger.error(`Rejected message ${messageId}`);
}

export async function publishMessagesBulk(messages, attributes) {
Expand All @@ -32,13 +38,26 @@ export async function publishMessagesBulk(messages, attributes) {
const promises = messages.map((message, idx) =>
(async () => {
const correlationId = `${attributes.correlationId}:${idx}`;
const messageId = await client.publishMessage({
json: message,
attributes: cleanupAttributes({ ...attributes, correlationId, topic: config.topic, idempotencyKey: uuid.v4() }),
});
buildLogger(correlationId).info(`Published message ${messageId}`);
const logger = buildLogger(correlationId);
const messageId = await publishPubsubMessage(
client, {
json: message,
attributes: cleanupAttributes({ ...attributes, correlationId, topic: config.topic, idempotencyKey: uuid.v4() }),
},
logger
);
logger.info(`Published message ${messageId}`);
})()
);

await Promise.all(promises);
}

async function publishPubsubMessage(topicClient, message, logger) {
try {
return await topicClient.publishMessage(message);
} catch (err) {
logger.error(`Error publishing PubSub message: "${err}". Full message: ${JSON.stringify(message)}`);
throw err;
}
}
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@bonniernews/b0rker",
"version": "7.1.0",
"version": "7.1.1",
"engines": {
"node": ">=16"
},
Expand Down

0 comments on commit bee0d64

Please sign in to comment.