Skip to content

Commit

Permalink
added queue for email sending
Browse files Browse the repository at this point in the history
  • Loading branch information
abhiraj-ku committed Sep 7, 2024
1 parent 6c157a2 commit 8f807cb
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 20 deletions.
32 changes: 25 additions & 7 deletions src/controllers/userController.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
const User = require("../models/userModel");
const bcrypt = require("bcryptjs");
const { storeuser, verifyCode } = require("../services/emailServices");
const {
storeuser,
verifyCode,
sendVerificationCode,
} = require("../services/emailServices");
const { verifyEmail, verifyPhone } = require("../utils/isContactsValid");
const cookieToken = require("../utils/cookieToken");
const { validateUsersChoice } = require("../helpers/validateUserChoice");
const queueEmailSending = require("../services/emailsenderProducer");

// Register a new user
module.exports.register = async (req, res) => {
Expand Down Expand Up @@ -51,9 +56,15 @@ module.exports.register = async (req, res) => {
}

// Generate verification code and save this to redis with TTL of 3 minutes
await storeuser(name, email);
const verificationCode = await storeuser(name, email);

// send verification code via email
// Queue email for sending verification code
const emailContent = await sendVerificationCode(email);
await queueEmailSending({
email,
subject: "Email Verification",
html: emailContent,
});

await sendVerificationCode(email);

Expand All @@ -65,7 +76,7 @@ module.exports.register = async (req, res) => {
phone,
});
await user.save();

// Don't send password in the response body
password = undefined;

const token = await user.createJwtToken();
Expand Down Expand Up @@ -140,6 +151,7 @@ module.exports.verifyEmail = async (req, res) => {
});
}

// find user in database
const user = await User.findOne({ email });
if (!user) {
return res.status(404).json({ message: "User not found." });
Expand All @@ -158,6 +170,7 @@ module.exports.verifyEmail = async (req, res) => {
await user.save();

// Generate the JWT token now
const token = await user.createJwtToken();
await cookieToken(user, res);

return res.status(200).json({
Expand Down Expand Up @@ -207,10 +220,15 @@ exports.resendVerificationEmail = async (req, res) => {
}

// Store user in Redis and get a new verification code
await storeuser(user.name, email);
const verificationCode = await storeuser(user.name, email);

// Send the verification code via email
await sendVerificationCode(email);
// Generate email content and queue the email for sending
const emailContent = await sendVerificationCode(email);
await queueEmailSending({
email,
subject: "Resend Email Verification",
html: emailContent,
});

return res
.status(200)
Expand Down
92 changes: 92 additions & 0 deletions src/services/emailQueueProcessor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
const { promisify } = require("util");
const redisClient = require("./redisServer");
const nodemailer = require("nodemailer");

// Using promisify to convert the callback based to promise chains
const rpushAsync = promisify(redisClient.rPush).bind(redisClient);
const lpopAsync = promisify(redisClient.lPop).bind(redisClient);
const zaddAsync = promisify(redisClient.zAdd).bind(redisClient);
const zrangebyscoreAsync = promisify(redisClient.zRangeByScore).bind(
redisClient
);
const zremAsync = promisify(redisClient.zRem).bind(redisClient);

// Options for retry and queue names
const MAX_RETRIES = 3;
const RETRY_DELAY_MS = 2000;
const RETRY_QUEUE = "retry_queue";
const DLQ_KEY = "email_dlq";

// Function to send email using Nodemailer
async function sendMailWithRetry(mailOptions) {
const transporter = nodemailer.createTransport({
host: "mail.privateemail.com",
port: 587,
secure: false,
auth: {
user: process.env.EMAIL_USER,
pass: process.env.EMAIL_PASS,
},
});

// Try to send the email
await transporter.sendMail(mailOptions);
}

// process email from queue and handle retries
async function processEmailQueue() {
const jobData = await lpopAsync("email_queue");
if (!jobData) return;

const job = JSON.parse(jobData);
const { mailOptions, retries } = job;

try {
await sendMailWithRetry(mailOptions);
} catch (error) {
console.error(`Email Sending failed : ${error.message}`);

// Retry Logic based on the retries and MAX_RETRIES
if (retries < MAX_RETRIES) {
console.log(`Retrying job.. Attempt ${retries + 1}`);

// increase the retries so that it doesn't remain the main queue
job.retries += 1;

// add to retry queue
await zaddAsync(
RETRY_QUEUE,
Date.now() + RETRY_DELAY_MS,
JSON.stringify(job)
);
} else {
console.log(`Moving to dlq after ${MAX_RETRIES} attempts`);
await rpushAsync(DLQ_KEY, JSON.stringify(job));
}
}
}

// Process the RETRY_QUEUE
async function processRetryQueue() {
const timeNow = Date.now();
const retryJobs = await zrangebyscoreAsync(RETRY_QUEUE, "-inf", timeNow);

// for (let i = 0; i < retryJobs.length(); i++) {
// await rpushAsync("email_queue", jobData); // Re-add to queue
// await zremAsync(RETRY_QUEUE, jobData);
// }

for (const jobData of retryJobs) {
await rpushAsync("email-queue", jobData);
await zremAsync(RETRY_QUEUE, jobData);
}
}

// Poll the main email queue and retry queue periodicallly
setInterval(processEmailQueue, 1000); // process jobs(main email queue) periodically
setInterval(processRetryQueue, 1000); // check the retry queue every seconds

module.exports = {
processEmailQueue,
processRetryQueue,
};
15 changes: 2 additions & 13 deletions src/services/emailServices.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const crypto = require("crypto");
const fs = require("fs");
const path = require("path");
const redisClient = require("./redisServer");
const nodemailer = require("nodemailer");
const queueEmailSending = require("../services/emailsenderProducer");

// promisify Redis function for avoiding callback hell
const setAsync = promisify(redisClient.set).bind(redisClient);
Expand Down Expand Up @@ -82,17 +82,6 @@ async function sendVerificationCode(email) {
// Generate the emailTemplate before the transporter
const html = await emailTemplate(user.verificationCode);

// create nodemailer transporter
const transporter = nodemailer.createTransport({
host: "mail.privateemail.com",
port: 587,
secure: false,
auth: {
user: process.env.EMAIL_USER,
pass: process.env.EMAIL_PASS,
},
});

const mailOptions = {
from: "[email protected]",
to: email,
Expand All @@ -101,7 +90,7 @@ async function sendVerificationCode(email) {
};

try {
await transporter.sendMail(mailOptions);
await queueEmailSending(mailOptions);
console.log(`Verification code sent sucessfully`);
} catch (error) {
console.error(`Error sending email to ${email}:`, error);
Expand Down
21 changes: 21 additions & 0 deletions src/services/emailsenderProducer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
const { promisify } = require("utile");
const redisClient = require("./redisServer");
const rpushAsync = promisify(redisClient.rpush).bind(redisClient);

// function to add a job to email queue
async function queueEmailSending(mailOptions) {
const jobData = JSON.stringify({
mailOptions,
retries: 0, // Keep track of retries
});

try {
await rpushAsync("email_queue", jobData);
console.log("Job added to email queue");
} catch (error) {
console.error(`Error adding to email Queue`);
throw new Error(`Error adding to email Queue`);
}
}

module.exports = queueEmailSending;

0 comments on commit 8f807cb

Please sign in to comment.