From 3a7e6349e061458934d451e80b29aa7344d9e67e Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Fri, 20 Sep 2024 01:24:09 +0530 Subject: [PATCH] refactored email queue processor to handle any type of email --- src/controllers/groupController.js | 11 +++ src/controllers/userController.js | 2 +- src/services/emailQueueProcessor.js | 67 +++++++++---------- ...enderProducer.js => emailQueueProducer.js} | 21 +++++- src/services/emailServices.js | 2 +- 5 files changed, 65 insertions(+), 38 deletions(-) rename src/services/{emailsenderProducer.js => emailQueueProducer.js} (52%) diff --git a/src/controllers/groupController.js b/src/controllers/groupController.js index 0b5abb5..195dcee 100644 --- a/src/controllers/groupController.js +++ b/src/controllers/groupController.js @@ -31,12 +31,23 @@ module.exports.createGroup = async (req, res) => { return res.status(200).json({ message: "Group created successfully. Proceed to add members.", groupId: newgroup._id, + nextStage: 2, }); } // Stage 2: Add member and send invites if (stage == 2) { const { groupId } = groupData; + const { members } = req.body; + if (!groupId || !members || members.length === 0) { + return res.status(400).json({ + message: "Group ID and member's email required", + }); + } + const groupById = await groupModel.findById({ groupId }); + if (!groupById) { + return res.status(404).json({ message: "Group not found." }); + } } } catch (error) {} }; diff --git a/src/controllers/userController.js b/src/controllers/userController.js index 9821d94..69d7785 100644 --- a/src/controllers/userController.js +++ b/src/controllers/userController.js @@ -9,7 +9,7 @@ const { verifyEmail, verifyPhone } = require("../utils/isContactsValid"); const cookieToken = require("../utils/cookieToken"); const generateUserProfileImage = require("../utils/ generateUserProfileImage"); const { validateUsersChoice } = require("../helpers/validateUserChoice"); -const queueEmailSending = require("../services/emailsenderProducer"); +const queueEmailSending = require("../services/emailQueueProducer"); // Register a new user module.exports.register = async (req, res) => { diff --git a/src/services/emailQueueProcessor.js b/src/services/emailQueueProcessor.js index 1a6ef44..effe142 100644 --- a/src/services/emailQueueProcessor.js +++ b/src/services/emailQueueProcessor.js @@ -1,6 +1,7 @@ const { promisify } = require("util"); const redisClient = require("./redisServer"); const nodemailer = require("nodemailer"); +const { json } = require("stream/consumers"); // Using promisify to convert the callback based to promise chains const rpushAsync = promisify(redisClient.rPush).bind(redisClient); @@ -14,8 +15,6 @@ const zremAsync = promisify(redisClient.zRem).bind(redisClient); // Options for retry and queue names const MAX_RETRIES = 3; const RETRY_DELAY_MS = 2000; -const RETRY_QUEUE = "retry_queue"; -const DLQ_KEY = "email_dlq"; // Function to send email using Nodemailer async function sendMailWithRetry(mailOptions) { @@ -33,60 +32,60 @@ async function sendMailWithRetry(mailOptions) { await transporter.sendMail(mailOptions); } -// process email from queue and handle retries -async function processEmailQueue() { - const jobData = await lpopAsync("email_queue"); - if (!jobData) return; +// modular approach to handle different queue - const job = JSON.parse(jobData); - const { mailOptions, retries } = job; +async function processQueue(queueName, retryQueueName, dlqName) { + const job = await lpopAsync(queueName); + if (!job) return; + + // parse the queue information + const jobData = json.parse(jobData); + const { mailOptions, retries } = jobData; try { + // Send email from respective queue await sendMailWithRetry(mailOptions); } catch (error) { - console.error(`Email Sending failed : ${error.message}`); + console.error(`Error sending email from ${queueName}:`, error.message); - // Retry Logic based on the retries and MAX_RETRIES if (retries < MAX_RETRIES) { - console.log(`Retrying job.. Attempt ${retries + 1}`); - - // increase the retries so that it doesn't remain the main queue + // Retry Logic: increment retries and add to retry queue + console.log(`Retrying job from ${queueName}... Attempt ${retries + 1}`); job.retries += 1; - - // add to retry queue await zaddAsync( - RETRY_QUEUE, + retryQueueName, Date.now() + RETRY_DELAY_MS, JSON.stringify(job) ); } else { - console.log(`Moving to dlq after ${MAX_RETRIES} attempts`); - await rpushAsync(DLQ_KEY, JSON.stringify(job)); + // If max retries are reached, move to DLQ + console.log( + `Moving job from ${queueName} to DLQ after ${MAX_RETRIES} attempts` + ); + await rpushAsync(dlqName, JSON.stringify(job)); } } } -// Process the RETRY_QUEUE -async function processRetryQueue() { +// Generic function to process retry queues +async function processRetryQueue(retryQueueName, mainQueueName) { const timeNow = Date.now(); - const retryJobs = await zrangebyscoreAsync(RETRY_QUEUE, "-inf", timeNow); - - // for (let i = 0; i < retryJobs.length(); i++) { - // await rpushAsync("email_queue", jobData); // Re-add to queue - // await zremAsync(RETRY_QUEUE, jobData); - // } + const retryJobs = await zrangebyscoreAsync(retryQueueName, "-inf", timeNow); for (const jobData of retryJobs) { - await rpushAsync("email-queue", jobData); - await zremAsync(RETRY_QUEUE, jobData); + // Re-add job to the main queue and remove from retry queue + await rpushAsync(mainQueueName, jobData); + await zremAsync(retryQueueName, jobData); } } -// Poll the main email queue and retry queue periodicallly -setInterval(processEmailQueue, 1000); // process jobs(main email queue) periodically -setInterval(processRetryQueue, 1000); // check the retry queue every seconds - +// Periodic queue polling function +function pollQueues(mainQueueName, retryQueueName, dlqName) { + setInterval(() => processQueue(mainQueueName, retryQueueName, dlqName), 1000); + setInterval(() => processRetryQueue(retryQueueName, mainQueueName), 1000); +} module.exports = { - processEmailQueue, - processRetryQueue, + pollEmailQueue: () => pollQueues("email_queue", "retry_queue", "email_dlq"), + pollInviteQueue: () => + pollQueues("invite_queue", "retry_invite_queue", "invite_dlq"), }; diff --git a/src/services/emailsenderProducer.js b/src/services/emailQueueProducer.js similarity index 52% rename from src/services/emailsenderProducer.js rename to src/services/emailQueueProducer.js index 8739c8c..02ff40f 100644 --- a/src/services/emailsenderProducer.js +++ b/src/services/emailQueueProducer.js @@ -2,7 +2,7 @@ const { promisify } = require("util"); const redisClient = require("./redisServer"); const rpushAsync = promisify(redisClient.rPush).bind(redisClient); -// function to add a job to email queue +// function to add verify-email to email queue async function queueEmailSending(mailOptions) { const jobData = JSON.stringify({ mailOptions, @@ -18,4 +18,21 @@ async function queueEmailSending(mailOptions) { } } -module.exports = queueEmailSending; +async function queueInviteEmailSending(mailOptions) { + const job = json.stringify({ + mailOptions, + retries: 0, + }); + try { + await rpushAsync("invite_queue", job); + console.log(`invite emails addded to queue`); + } catch (error) { + console.error(`Error adding emails to invite queue`); + throw new Error(`Error adding emails to invite queue`); + } +} + +module.exports = { + queueEmailSending, + queueInviteEmailSending, +}; diff --git a/src/services/emailServices.js b/src/services/emailServices.js index 37c4e5f..f27ce1b 100644 --- a/src/services/emailServices.js +++ b/src/services/emailServices.js @@ -3,7 +3,7 @@ const crypto = require("crypto"); const fs = require("fs"); const path = require("path"); const redisClient = require("./redisServer"); -const queueEmailSending = require("../services/emailsenderProducer"); +const queueEmailSending = require("./emailQueueProducer"); // promisify Redis function for avoiding callback hell const setAsync = promisify(redisClient.set).bind(redisClient);