Skip to content

Commit

Permalink
refactored email queue processor to handle any type of email
Browse files Browse the repository at this point in the history
  • Loading branch information
abhiraj-ku committed Sep 19, 2024
1 parent b180e75 commit 3a7e634
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 38 deletions.
11 changes: 11 additions & 0 deletions src/controllers/groupController.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,23 @@ module.exports.createGroup = async (req, res) => {
return res.status(200).json({
message: "Group created successfully. Proceed to add members.",
groupId: newgroup._id,
nextStage: 2,
});
}

// Stage 2: Add member and send invites
if (stage == 2) {
const { groupId } = groupData;
const { members } = req.body;
if (!groupId || !members || members.length === 0) {
return res.status(400).json({
message: "Group ID and member's email required",
});
}
const groupById = await groupModel.findById({ groupId });
if (!groupById) {
return res.status(404).json({ message: "Group not found." });
}
}
} catch (error) {}
};
Expand Down
2 changes: 1 addition & 1 deletion src/controllers/userController.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const { verifyEmail, verifyPhone } = require("../utils/isContactsValid");
const cookieToken = require("../utils/cookieToken");
const generateUserProfileImage = require("../utils/ generateUserProfileImage");
const { validateUsersChoice } = require("../helpers/validateUserChoice");
const queueEmailSending = require("../services/emailsenderProducer");
const queueEmailSending = require("../services/emailQueueProducer");

// Register a new user
module.exports.register = async (req, res) => {
Expand Down
67 changes: 33 additions & 34 deletions src/services/emailQueueProcessor.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const { promisify } = require("util");
const redisClient = require("./redisServer");
const nodemailer = require("nodemailer");
const { json } = require("stream/consumers");

// Using promisify to convert the callback based to promise chains
const rpushAsync = promisify(redisClient.rPush).bind(redisClient);
Expand All @@ -14,8 +15,6 @@ const zremAsync = promisify(redisClient.zRem).bind(redisClient);
// Options for retry and queue names
const MAX_RETRIES = 3;
const RETRY_DELAY_MS = 2000;
const RETRY_QUEUE = "retry_queue";
const DLQ_KEY = "email_dlq";

// Function to send email using Nodemailer
async function sendMailWithRetry(mailOptions) {
Expand All @@ -33,60 +32,60 @@ async function sendMailWithRetry(mailOptions) {
await transporter.sendMail(mailOptions);
}

// process email from queue and handle retries
async function processEmailQueue() {
const jobData = await lpopAsync("email_queue");
if (!jobData) return;
// modular approach to handle different queue

const job = JSON.parse(jobData);
const { mailOptions, retries } = job;
async function processQueue(queueName, retryQueueName, dlqName) {
const job = await lpopAsync(queueName);
if (!job) return;

// parse the queue information
const jobData = json.parse(jobData);
const { mailOptions, retries } = jobData;

try {
// Send email from respective queue
await sendMailWithRetry(mailOptions);
} catch (error) {
console.error(`Email Sending failed : ${error.message}`);
console.error(`Error sending email from ${queueName}:`, error.message);

// Retry Logic based on the retries and MAX_RETRIES
if (retries < MAX_RETRIES) {
console.log(`Retrying job.. Attempt ${retries + 1}`);

// increase the retries so that it doesn't remain the main queue
// Retry Logic: increment retries and add to retry queue
console.log(`Retrying job from ${queueName}... Attempt ${retries + 1}`);
job.retries += 1;

// add to retry queue
await zaddAsync(
RETRY_QUEUE,
retryQueueName,
Date.now() + RETRY_DELAY_MS,
JSON.stringify(job)
);
} else {
console.log(`Moving to dlq after ${MAX_RETRIES} attempts`);
await rpushAsync(DLQ_KEY, JSON.stringify(job));
// If max retries are reached, move to DLQ
console.log(
`Moving job from ${queueName} to DLQ after ${MAX_RETRIES} attempts`
);
await rpushAsync(dlqName, JSON.stringify(job));
}
}
}

// Process the RETRY_QUEUE
async function processRetryQueue() {
// Generic function to process retry queues
async function processRetryQueue(retryQueueName, mainQueueName) {
const timeNow = Date.now();
const retryJobs = await zrangebyscoreAsync(RETRY_QUEUE, "-inf", timeNow);

// for (let i = 0; i < retryJobs.length(); i++) {
// await rpushAsync("email_queue", jobData); // Re-add to queue
// await zremAsync(RETRY_QUEUE, jobData);
// }
const retryJobs = await zrangebyscoreAsync(retryQueueName, "-inf", timeNow);

for (const jobData of retryJobs) {
await rpushAsync("email-queue", jobData);
await zremAsync(RETRY_QUEUE, jobData);
// Re-add job to the main queue and remove from retry queue
await rpushAsync(mainQueueName, jobData);
await zremAsync(retryQueueName, jobData);
}
}

// Poll the main email queue and retry queue periodicallly
setInterval(processEmailQueue, 1000); // process jobs(main email queue) periodically
setInterval(processRetryQueue, 1000); // check the retry queue every seconds

// Periodic queue polling function
function pollQueues(mainQueueName, retryQueueName, dlqName) {
setInterval(() => processQueue(mainQueueName, retryQueueName, dlqName), 1000);
setInterval(() => processRetryQueue(retryQueueName, mainQueueName), 1000);
}
module.exports = {
processEmailQueue,
processRetryQueue,
pollEmailQueue: () => pollQueues("email_queue", "retry_queue", "email_dlq"),
pollInviteQueue: () =>
pollQueues("invite_queue", "retry_invite_queue", "invite_dlq"),
};
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const { promisify } = require("util");
const redisClient = require("./redisServer");
const rpushAsync = promisify(redisClient.rPush).bind(redisClient);

// function to add a job to email queue
// function to add verify-email to email queue
async function queueEmailSending(mailOptions) {
const jobData = JSON.stringify({
mailOptions,
Expand All @@ -18,4 +18,21 @@ async function queueEmailSending(mailOptions) {
}
}

module.exports = queueEmailSending;
async function queueInviteEmailSending(mailOptions) {
const job = json.stringify({
mailOptions,
retries: 0,
});
try {
await rpushAsync("invite_queue", job);
console.log(`invite emails addded to queue`);
} catch (error) {
console.error(`Error adding emails to invite queue`);
throw new Error(`Error adding emails to invite queue`);
}
}

module.exports = {
queueEmailSending,
queueInviteEmailSending,
};
2 changes: 1 addition & 1 deletion src/services/emailServices.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const crypto = require("crypto");
const fs = require("fs");
const path = require("path");
const redisClient = require("./redisServer");
const queueEmailSending = require("../services/emailsenderProducer");
const queueEmailSending = require("./emailQueueProducer");

// promisify Redis function for avoiding callback hell
const setAsync = promisify(redisClient.set).bind(redisClient);
Expand Down

0 comments on commit 3a7e634

Please sign in to comment.