From d08eaed99e8f80349a8129d8a997bde4b8338cbd Mon Sep 17 00:00:00 2001 From: Nikolai Ovtsinnikov Date: Thu, 30 Jan 2025 10:52:03 +0200 Subject: [PATCH] rewrite move message, add moveAsync, use async/await --- lib/message-handler.js | 841 +++++++++++++++++++++-------------------- 1 file changed, 436 insertions(+), 405 deletions(-) diff --git a/lib/message-handler.js b/lib/message-handler.js index 63a1a451..ff4d00e7 100644 --- a/lib/message-handler.js +++ b/lib/message-handler.js @@ -915,451 +915,482 @@ class MessageHandler { } move(options, callback) { - this.getMailbox(options.source, (err, mailboxData) => { - if (err) { - return callback(err); + this.moveAsync(options) + .then(movedMessageRes => callback(null, ...movedMessageRes)) + .catch(err => callback(err)); + } + + async moveAsync(options) { + // concurrent promises + const [mailboxData, targetData] = await Promise.all([this.getMailboxAsync(options.source), this.getMailboxAsync(options.destination)]); + + const item = await this.database.collection('mailboxes').findOneAndUpdate( + { + _id: mailboxData._id + }, + { + $inc: { + // increase the mailbox modification index + // to indicate that something happened + modifyIndex: 1 + } + }, + { + returnDocument: 'after', + projection: { + _id: true, + uidNext: true, + modifyIndex: true + } } + ); + + let newModseq = (item && item.value && item.value.modifyIndex) || 1; - this.getMailbox(options.destination, (err, targetData) => { + let cursor = this.database + .collection('messages') + .find({ + mailbox: mailboxData._id, + uid: options.messageQuery ? options.messageQuery : tools.checkRangeQuery(options.messages) + }) + // ordering is needed for IMAP UIDPLUS results + .sort({ uid: 1 }); + + let sourceUid = []; + let destinationUid = []; + + let removeEntries = []; + let existsEntries = []; + + let done = async err => { + let next = () => { if (err) { - return callback(err); + throw err; } + return [ + true, + { + uidValidity: targetData.uidValidity, + sourceUid, + destinationUid, + mailbox: mailboxData._id, + target: targetData._id, + status: 'moved' + } + ]; + }; + + if (sourceUid.length && options.showExpunged) { + options.session.writeStream.write({ + tag: '*', + command: String(options.session.selected.uidList.length), + attributes: [ + { + type: 'atom', + value: 'EXISTS' + } + ] + }); + } + + if (existsEntries.length) { + // mark messages as deleted from old mailbox + return await new Promise((resolve, reject) => { + this.notifier.addEntries(mailboxData, removeEntries, () => { + // mark messages as added to new mailbox + this.notifier.addEntries(targetData, existsEntries, () => { + this.notifier.fire(mailboxData.user); + + try { + resolve(next()); // resolve with return value of next() + } catch (err) { + reject(err); + } + }); + }); + }); + } + + return next(); + }; + + let processNext = async () => { + let message; + try { + message = await cursor.next(); - this.database.collection('mailboxes').findOneAndUpdate( + if (!message) { + await cursor.close(); // close cursor + return done(); + } + } catch (err) { + return done(err); + } + + let messageId = message._id; + let messageUid = message.uid; + + if (options.returnIds) { + sourceUid.push(message._id); + } else { + sourceUid.push(messageUid); + } + + let item; + + try { + item = await this.database.collection('mailboxes').findOneAndUpdate( { - _id: mailboxData._id + _id: targetData._id }, { $inc: { - // increase the mailbox modification index - // to indicate that something happened - modifyIndex: 1 + uidNext: 1 } }, { - returnDocument: 'after', projection: { - _id: true, uidNext: true, modifyIndex: true - } - }, - (err, item) => { - if (err) { - return callback(err); - } + }, + returnDocument: 'before' + } + ); + + if (!item || !item.value) { + await cursor.close(); + return done(new Error('Mailbox disappeared')); + } + } catch (err) { + await cursor.close(); + return done(err); + } - let newModseq = (item && item.value && item.value.modifyIndex) || 1; + message._id = new ObjectId(); - let cursor = this.database - .collection('messages') - .find({ - mailbox: mailboxData._id, - uid: options.messageQuery ? options.messageQuery : tools.checkRangeQuery(options.messages) - }) - // ordering is needed for IMAP UIDPLUS results - .sort({ uid: 1 }); + let uidNext = item.value.uidNext; + let modifyIndex = item.value.modifyIndex; + + if (options.returnIds) { + destinationUid.push(message._id); + } else { + destinationUid.push(uidNext); + } - let sourceUid = []; - let destinationUid = []; + // set new mailbox + message.mailbox = targetData._id; - let removeEntries = []; - let existsEntries = []; + // new mailbox means new UID + message.uid = uidNext; - let done = err => { - let next = () => { - if (err) { - return callback(err); - } - return callback(null, true, { - uidValidity: targetData.uidValidity, - sourceUid, - destinationUid, - mailbox: mailboxData._id, - target: targetData._id, - status: 'moved' - }); - }; + // retention settings + message.exp = !!targetData.retention; + message.rdate = Date.now() + (targetData.retention || 0); + message.modseq = modifyIndex; // reset message modseq to whatever it is for the mailbox right now - if (sourceUid.length && options.showExpunged) { - options.session.writeStream.write({ - tag: '*', - command: String(options.session.selected.uidList.length), - attributes: [ - { - type: 'atom', - value: 'EXISTS' - } - ] - }); + let unseen = message.unseen; + + if (!message.flags.includes('\\Deleted')) { + message.searchable = true; + } else { + delete message.searchable; + } + + let junk = false; + if (targetData.specialUse === '\\Junk' && !message.junk) { + message.junk = true; + junk = 1; + } else if (targetData.specialUse !== '\\Trash' && message.junk) { + delete message.junk; + junk = -1; + } + + Object.keys(options.updates || {}).forEach(key => { + switch (key) { + case 'seen': + case 'deleted': + { + let fname = '\\' + key.charAt(0).toUpperCase() + key.substr(1); + + if (options.updates[key] && !message.flags.includes(fname)) { + // add missing flag + message.flags.push(fname); + } else if (!options.updates[key] && message.flags.includes(fname)) { + // remove non-needed flag + let flags = new Set(message.flags); + flags.delete(fname); + message.flags = Array.from(flags); } + message['un' + key] = !options.updates[key]; + } + break; - if (existsEntries.length) { - // mark messages as deleted from old mailbox - return this.notifier.addEntries(mailboxData, removeEntries, () => { - // mark messages as added to new mailbox - this.notifier.addEntries(targetData, existsEntries, () => { - this.notifier.fire(mailboxData.user); - next(); - }); - }); + case 'flagged': + case 'draft': + { + let fname = '\\' + key.charAt(0).toUpperCase() + key.substr(1); + if (options.updates[key] && !message.flags.includes(fname)) { + // add missing flag + message.flags.push(fname); + } else if (!options.updates[key] && message.flags.includes(fname)) { + // remove non-needed flag + let flags = new Set(message.flags); + flags.delete(fname); + message.flags = Array.from(flags); } + message[key] = options.updates[key]; + } + break; - next(); - }; + case 'expires': + { + if (options.updates.expires) { + message.exp = true; + message.rdate = options.updates.expires.getTime(); + } else { + message.exp = false; + } + } + break; - let processNext = () => { - cursor.next((err, message) => { - if (err) { - return done(err); - } - if (!message) { - return cursor.close(done); + case 'metaData': + message.meta = message.meta || {}; + message.meta.custom = options.updates.metaData; + break; + + case 'outbound': + message.outbound = [].concat(message.outbound || []).concat(options.updates.outbound || []); + break; + } + }); + + if (options.markAsSeen) { + message.unseen = false; + if (!message.flags.includes('\\Seen')) { + message.flags.push('\\Seen'); + } + } + + const updateMessage = async () => { + let r; + + try { + r = await this.database.collection('messages').insertOne(message, { writeConcern: 'majority' }); + + if (!r || !r.acknowledged) { + let err = new Error('Failed to store message [2]'); + err.responseCode = 500; + err.code = 'StoreError'; + + await cursor.close(); + return done(err); + } + } catch (err) { + await cursor.close(); + return done(err); + } + + let insertId = r.insertedId; + + // delete old message + let deleteMessageRes; + + try { + deleteMessageRes = await this.database.collection('messages').deleteOne( + { + _id: messageId, + mailbox: mailboxData._id, + uid: messageUid + }, + { writeConcern: 'majority' } + ); + } catch (err) { + await cursor.close(); + return done(err); + } + + if (deleteMessageRes && deleteMessageRes.deletedCount) { + if (options.session) { + options.session.writeStream.write(options.session.formatResponse('EXPUNGE', sourceUid)); + } + + removeEntries.push({ + command: 'EXPUNGE', + ignore: options.session && options.session.id, + uid: messageUid, + message: messageId, + unseen, + // modseq is needed to avoid updating mailbox entry + modseq: newModseq + }); + + if (options.showExpunged) { + options.session.writeStream.write(options.session.formatResponse('EXPUNGE', messageUid)); + } + } + + let entry = { + command: 'EXISTS', + uid: uidNext, + message: insertId, + unseen: message.unseen, + idate: message.idate, + thread: message.thread + }; + if (junk) { + entry.junk = junk; + } + existsEntries.push(entry); + + if (existsEntries.length >= consts.BULK_BATCH_SIZE) { + // mark messages as deleted from old mailbox + return new Promise(resolve => { + this.notifier.addEntries(mailboxData, removeEntries, () => { + // mark messages as added to new mailbox + this.notifier.addEntries(targetData, existsEntries, () => { + removeEntries = []; + existsEntries = []; + this.notifier.fire(mailboxData.user); + resolve(processNext()); + }); + }); + }); + } + return processNext(); + }; + + if (targetData.encryptMessages) { + // move target mailbox is encrypted + const parsedHeader = (message.mimeTree && message.mimeTree.parsedHeader) || {}; + const parsedContentType = parsedHeader['content-type']; + + if (parsedContentType && parsedContentType.subtype === 'encrypted') { + // message already encrypted, just continue move + return updateMessage(); + } else { + // not yet encrypted, need to encrypt + let res; + + try { + res = await this.users.collection('users').findOne({ _id: mailboxData.user }); + } catch (err) { + return done(err); + } + + // get user data + if (!res.pubKey) { + // no public key available, cannot encrypt + return updateMessage(); + } + + // get raw from existing mimetree + let outputStream = this.indexer.rebuild(message.mimeTree); // get raw rebuilder response obj (.value is the stream) + + if (!outputStream || outputStream.type !== 'stream' || !outputStream.value) { + return done(new Error('Cannot fetch message')); + } + outputStream = outputStream.value; // set stream to actual stream object (.value) + + let chunks = []; + let chunklen = 0; + + return await new Promise((resolve, reject) => { + outputStream + .on('readable', () => { + let chunk; + while ((chunk = outputStream.read()) !== null) { + chunks.push(chunk); + chunklen += chunk.length; } + }) + .on('end', async () => { + // when done rebuilding + const raw = Buffer.concat(chunks, chunklen); - let messageId = message._id; - let messageUid = message.uid; + let encryptRes; - if (options.returnIds) { - sourceUid.push(message._id); - } else { - sourceUid.push(messageUid); + try { + encryptRes = await this.encryptMessageAsync(res.pubKey, raw); + } catch (err) { + return reject(done(err)); } - this.database.collection('mailboxes').findOneAndUpdate( - { - _id: targetData._id - }, - { - $inc: { - uidNext: 1 - } - }, - { - projection: { - uidNext: true, - modifyIndex: true - }, - returnDocument: 'before' - }, - (err, item) => { - if (err) { - return cursor.close(() => done(err)); - } - - if (!item || !item.value) { - return cursor.close(() => done(new Error('Mailbox disappeared'))); - } - - message._id = new ObjectId(); - - let uidNext = item.value.uidNext; - let modifyIndex = item.value.modifyIndex; - - if (options.returnIds) { - destinationUid.push(message._id); - } else { - destinationUid.push(uidNext); - } - - // set new mailbox - message.mailbox = targetData._id; - - // new mailbox means new UID - message.uid = uidNext; - - // retention settings - message.exp = !!targetData.retention; - message.rdate = Date.now() + (targetData.retention || 0); - message.modseq = modifyIndex; // reset message modseq to whatever it is for the mailbox right now - - let unseen = message.unseen; - - if (!message.flags.includes('\\Deleted')) { - message.searchable = true; - } else { - delete message.searchable; - } - - let junk = false; - if (targetData.specialUse === '\\Junk' && !message.junk) { - message.junk = true; - junk = 1; - } else if (targetData.specialUse !== '\\Trash' && message.junk) { - delete message.junk; - junk = -1; - } - - Object.keys(options.updates || {}).forEach(key => { - switch (key) { - case 'seen': - case 'deleted': - { - let fname = '\\' + key.charAt(0).toUpperCase() + key.substr(1); - - if (options.updates[key] && !message.flags.includes(fname)) { - // add missing flag - message.flags.push(fname); - } else if (!options.updates[key] && message.flags.includes(fname)) { - // remove non-needed flag - let flags = new Set(message.flags); - flags.delete(fname); - message.flags = Array.from(flags); - } - message['un' + key] = !options.updates[key]; - } - break; - - case 'flagged': - case 'draft': - { - let fname = '\\' + key.charAt(0).toUpperCase() + key.substr(1); - if (options.updates[key] && !message.flags.includes(fname)) { - // add missing flag - message.flags.push(fname); - } else if (!options.updates[key] && message.flags.includes(fname)) { - // remove non-needed flag - let flags = new Set(message.flags); - flags.delete(fname); - message.flags = Array.from(flags); - } - message[key] = options.updates[key]; - } - break; - - case 'expires': - { - if (options.updates.expires) { - message.exp = true; - message.rdate = options.updates.expires.getTime(); - } else { - message.exp = false; - } - } - break; - - case 'metaData': - message.meta = message.meta || {}; - message.meta.custom = options.updates.metaData; - break; - - case 'outbound': - message.outbound = [].concat(message.outbound || []).concat(options.updates.outbound || []); - break; - } - }); - - if (options.markAsSeen) { - message.unseen = false; - if (!message.flags.includes('\\Seen')) { - message.flags.push('\\Seen'); - } - } - - const updateMessage = () => { - this.database.collection('messages').insertOne(message, { writeConcern: 'majority' }, (err, r) => { - if (err) { - return cursor.close(() => done(err)); - } - - if (!r || !r.acknowledged) { - let err = new Error('Failed to store message [2]'); - err.responseCode = 500; - err.code = 'StoreError'; - return cursor.close(() => done(err)); - } - - let insertId = r.insertedId; - - // delete old message - this.database.collection('messages').deleteOne( - { - _id: messageId, - mailbox: mailboxData._id, - uid: messageUid - }, - { writeConcern: 'majority' }, - (err, r) => { - if (err) { - return cursor.close(() => done(err)); - } + // encrypt rebuilt raw - if (r && r.deletedCount) { - if (options.session) { - options.session.writeStream.write(options.session.formatResponse('EXPUNGE', sourceUid)); - } - - removeEntries.push({ - command: 'EXPUNGE', - ignore: options.session && options.session.id, - uid: messageUid, - message: messageId, - unseen, - // modseq is needed to avoid updating mailbox entry - modseq: newModseq - }); - - if (options.showExpunged) { - options.session.writeStream.write(options.session.formatResponse('EXPUNGE', messageUid)); - } - } + if (encryptRes) { + // encrypted + let prepared; - let entry = { - command: 'EXISTS', - uid: uidNext, - message: insertId, - unseen: message.unseen, - idate: message.idate, - thread: message.thread - }; - if (junk) { - entry.junk = junk; - } - existsEntries.push(entry); - - if (existsEntries.length >= consts.BULK_BATCH_SIZE) { - // mark messages as deleted from old mailbox - return this.notifier.addEntries(mailboxData, removeEntries, () => { - // mark messages as added to new mailbox - this.notifier.addEntries(targetData, existsEntries, () => { - removeEntries = []; - existsEntries = []; - this.notifier.fire(mailboxData.user); - processNext(); - }); - }); - } - processNext(); - } - ); - }); - }; - - if (targetData.encryptMessages) { - // move target mailbox is encrypted - const parsedHeader = (message.mimeTree && message.mimeTree.parsedHeader) || {}; - const parsedContentType = parsedHeader['content-type']; - - if (parsedContentType && parsedContentType.subtype === 'encrypted') { - // message already encrypted, just continue move - updateMessage(); - } else { - // not yet encrypted - this.users.collection('users').findOne({ _id: mailboxData.user }, (err, res) => { - if (err) { - return done(err); - } - // get user data - if (!res.pubKey) { - return updateMessage(); - } + try { + prepared = await this.prepareMessageAsync({ raw: encryptRes }); + } catch (err) { + return reject(done(err)); + } + + // prepare new message structure from encrypted raw + + prepared.id = message.id; // reuse existing id - // get raw from existing mimetree - let outputStream = this.indexer.rebuild(message.mimeTree); // get raw rebuilder response obj (.value is the stream) + const maildata = this.indexer.getMaildata(prepared.mimeTree); // get new maildata - if (!outputStream || outputStream.type !== 'stream' || !outputStream.value) { - return done(new Error('Cannot fetch message')); + // add attachments of encrypted messages + if (maildata.attachments && maildata.attachments.length) { + message.attachments = maildata.attachments; + message.ha = maildata.attachments.some(a => !a.related); + } else { + message.ha = false; + } + + // remove fields that may leak data in FE or DB + delete message.text; + delete message.html; + message.intro = ''; + + try { + // resolve inner promise in outer promise + return resolve( + await new Promise((resolve, reject) => { + this.indexer.storeNodeBodies(maildata, prepared.mimeTree, async err => { + // store new attachments + if (err) { + let attachmentIds = Object.keys(prepared.mimeTree.attachmentMap || {}).map( + key => prepared.mimeTree.attachmentMap[key] + ); + if (!attachmentIds.length) { + // with err, no attachments + return reject(err); + } + + await this.attachmentStorage.deleteMany(attachmentIds, maildata.magic); + return reject(err); // reject from inner promise } - outputStream = outputStream.value; // set stream to actual stream object (.value) - - let chunks = []; - let chunklen = 0; - outputStream - .on('readable', () => { - let chunk; - while ((chunk = outputStream.read()) !== null) { - chunks.push(chunk); - chunklen += chunk.length; - } - }) - .on('end', () => { - // when done rebuilding - const raw = Buffer.concat(chunks, chunklen); - this.encryptMessage(res.pubKey, raw, (err, res) => { - if (err) { - return done(err); - } - - // encrypt rebuilt raw - - if (res) { - // encrypted - this.prepareMessage({ raw: res }, (err, prepared) => { - if (err) { - return done(err); - } - // prepare new message structure from encrypted raw - - prepared.id = message.id; // reuse existing id - - const maildata = this.indexer.getMaildata(prepared.mimeTree); // get new maildata - - // add attachments of encrypted messages - if (maildata.attachments && maildata.attachments.length) { - message.attachments = maildata.attachments; - message.ha = maildata.attachments.some(a => !a.related); - } else { - message.ha = false; - } - - // remove fields that may leak data in FE or DB - delete message.text; - delete message.html; - message.intro = ''; - - this.indexer.storeNodeBodies(maildata, prepared.mimeTree, err => { - // store new attachments - let cleanup = (...args) => { - if (!args[0]) { - return callback(...args); - } - - let attachmentIds = Object.keys(prepared.mimeTree.attachmentMap || {}).map( - key => prepared.mimeTree.attachmentMap[key] - ); - if (!attachmentIds.length) { - return callback(...args); - } - - this.attachmentStorage.deleteMany(attachmentIds, maildata.magic, () => - callback(...args) - ); - }; - - if (err) { - return cleanup(err); - } - - // overwrite required values of existing message with new values - message.mimeTree = prepared.mimeTree; - message.size = prepared.size; - message.bodystructure = prepared.bodystructure; - message.envelope = prepared.envelope; - message.headers = prepared.headers; - updateMessage(); - }); - }); - } else { - updateMessage(); - } - }); - }); + + // overwrite required values of existing message with new values + message.mimeTree = prepared.mimeTree; + message.size = prepared.size; + message.bodystructure = prepared.bodystructure; + message.envelope = prepared.envelope; + message.headers = prepared.headers; + return resolve(updateMessage()); }); - } - } else { - // move target is not encrypted so proceed - updateMessage(); - } + }) + ); + } catch (err) { + return reject(err); // in case inner promise rejects just in case reject the outer too } - ); + } else { + return resolve(updateMessage()); + } }); - }; + }); + } + } else { + // move target is not encrypted so proceed + return updateMessage(); + } + }; - processNext(); - } - ); - }); - }); + return processNext(); } // NB! does not update user quota