diff --git a/lib/message-handler.js b/lib/message-handler.js index e9544217..63a1a451 100644 --- a/lib/message-handler.js +++ b/lib/message-handler.js @@ -129,6 +129,12 @@ class MessageHandler { .catch(err => callback(err)); } + add(options, callback) { + this.addAsync(options) + .then(messageAddedData => callback(null, ...messageAddedData)) + .catch(err => callback(err)); + } + /** * Adds or updates messages in the address register that is needed for typeahead address search * @param {ObjectId} user @@ -184,532 +190,536 @@ class MessageHandler { // Monster method for inserting new messages to a mailbox // TODO: Refactor into smaller pieces - add(options, callback) { + async addAsync(options) { if (!options.prepared && options.raw && options.raw.length > consts.MAX_ALLOWED_MESSAGE_SIZE) { - return setImmediate(() => callback(new Error('Message size ' + options.raw.length + ' bytes is too large'))); + throw new Error('Message size ' + options.raw.length + ' bytes is too large'); } - this.getMailbox(options, (err, mailboxData) => { - if (err) { - return callback(err); - } + // get target mailbox data + let mailboxData = await this.getMailboxAsync(options); - // get target mailbox data + options.targetMailboxEncrypted = !!mailboxData.encryptMessages; - options.targetMailboxEncrypted = !!mailboxData.encryptMessages; + // get target user data + const userData = await this.users.collection('users').findOne({ _id: options.user }); - this.users.collection('users').findOne({ _id: options.user }, (err, userData) => { - if (err) { - return callback(err); + let prepared = options.prepared; // might be undefined + + // check if already encrypted + let alreadyEncrypted = false; + + // message already prepared, check if encrypted + if (prepared) { + // got prepared + const parsedHeader = (prepared.mimeTree && prepared.mimeTree?.parsedHeader) || {}; + const parsedContentType = parsedHeader['content-type']; + + if (parsedContentType && parsedContentType.subtype === 'encrypted') { + alreadyEncrypted = true; + } + } else { + // no prepared, use raw + if (options.rawchunks && !options.raw) { + // got rawchunks instead of raw + if (options.chunklen) { + options.raw = Buffer.concat(options.rawchunks, options.chunklen); + } else { + options.raw = Buffer.concat(options.rawchunks); } + } - // get target user data - let prepared = options.prepared; // might be undefined + const rawString = options.raw.toString('binary'); // get string from the raw bytes of message + const regex = /Content-Type:\s*multipart\/encrypted/gim; - // check if already encrypted - let alreadyEncrypted = false; + if (regex.test(rawString)) { + // if there is encrypted content-type then message already encrypted, no need to re-encrypt it + alreadyEncrypted = true; + } + } - // message already prepared, check if encrypted - if (prepared) { - // got prepared - const parsedHeader = (prepared.mimeTree && prepared.mimeTree?.parsedHeader) || {}; - const parsedContentType = parsedHeader['content-type']; + let flags = Array.isArray(options.flags) ? options.flags : [].concat(options.flags || []); - if (parsedContentType && parsedContentType.subtype === 'encrypted') { - alreadyEncrypted = true; - } - } else { - // no prepared, use raw - if (options.rawchunks && !options.raw) { - // got rawchunks instead of raw - if (options.chunklen) { - options.raw = Buffer.concat(options.rawchunks, options.chunklen); - } else { - options.raw = Buffer.concat(options.rawchunks); - } + if (!alreadyEncrypted) { + // not already encrypted, check if user has encryption on or target mailbox is encrypted + if ((userData.encryptMessages || !!mailboxData.encryptMessages) && userData.pubKey && !flags.includes('\\Draft')) { + if (options.rawchunks && !options.raw) { + // got rawchunks instead of raw + if (options.chunklen) { + options.raw = Buffer.concat(options.rawchunks, options.chunklen); + } else { + options.raw = Buffer.concat(options.rawchunks); } + } + // user has encryption on or target mailbox encrypted, encrypt message and prepare again + // do not encrypt drafts + // may have a situation where we got prepared and no options.raw but options.rawchunks instead, concat them - const rawString = options.raw.toString('binary'); // get string from the raw bytes of message - const regex = /Content-Type:\s*multipart\/encrypted/gim; + const encrypted = await this.encryptMessageAsync(userData.pubKey, options.raw); - if (regex.test(rawString)) { - // if there is encrypted content-type then message already encrypted, no need to re-encrypt it - alreadyEncrypted = true; - } + if (encrypted) { + // new encrypted raw available + options.raw = encrypted; } - let flags = Array.isArray(options.flags) ? options.flags : [].concat(options.flags || []); - - let addMessage = () => { - let id = prepared.id; - let mimeTree = prepared.mimeTree; - let size = prepared.size; - let bodystructure = prepared.bodystructure; - let envelope = prepared.envelope; - let idate = prepared.idate; - let hdate = prepared.hdate; - let msgid = prepared.msgid; - let subject = prepared.subject; - let headers = prepared.headers; - - let maildata = options.maildata || this.indexer.getMaildata(mimeTree); - - let cleanup = (...args) => { - if (!args[0]) { - return callback(...args); - } + delete options.prepared; // delete any existing prepared as new will be generated + const newPrepared = await this.prepareMessageAsync(options); - let attachmentIds = Object.keys(mimeTree.attachmentMap || {}).map(key => mimeTree.attachmentMap[key]); - if (!attachmentIds.length) { - return callback(...args); - } + newPrepared.id = prepared.id; // retain original - this.attachmentStorage.deleteMany(attachmentIds, maildata.magic, () => callback(...args)); - }; + options.prepared = newPrepared; // new prepared in options just in case + prepared = newPrepared; // overwrite top-level original prepared + options.maildata = this.indexer.getMaildata(newPrepared.mimeTree); // get new maildata of encrypted message + } else { + // not already encrypted and no need to / cannot ecnrypt + const newPrepared = await this.prepareMessageAsync(options); + prepared = newPrepared; + } + } else { + // message already encrypted + const newPrepared = await this.prepareMessageAsync(options); + prepared = newPrepared; + } - this.indexer.storeNodeBodies(maildata, mimeTree, err => { - if (err) { - return cleanup(err); - } + let id = prepared.id; + let mimeTree = prepared.mimeTree; + let size = prepared.size; + let bodystructure = prepared.bodystructure; + let envelope = prepared.envelope; + let idate = prepared.idate; + let hdate = prepared.hdate; + let msgid = prepared.msgid; + let subject = prepared.subject; + let headers = prepared.headers; + + let maildata = options.maildata || this.indexer.getMaildata(mimeTree); + + let cleanup = async (err, status, data) => { + if (!err) { + // no error + return [status, data]; + } - // prepare message object - let messageData = { - _id: id, + let attachmentIds = Object.keys(mimeTree.attachmentMap || {}).map(key => mimeTree.attachmentMap[key]); + if (!attachmentIds.length) { + // with err, no attachments + throw err; + } - // should be kept when COPY'ing or MOVE'ing - root: id, + // with err, with attachments + try { + await this.attachmentStorage.deleteMany(attachmentIds, maildata.magic); + throw err; + } catch (ignore) { + throw err; + } + }; - v: consts.SCHEMA_VERSION, + try { + await new Promise((resolve, reject) => { + this.indexer.storeNodeBodies(maildata, mimeTree, err => { + if (err) { + return reject(err); + } + return resolve(); + }); + }); + } catch (err) { + return cleanup(err); + } - // if true then expires after rdate + retention - exp: !!mailboxData.retention, - rdate: Date.now() + (mailboxData.retention || 0), + // prepare message object + let messageData = { + _id: id, - // make sure the field exists. it is set to true when user is deleted - userDeleted: false, + // should be kept when COPY'ing or MOVE'ing + root: id, - idate, - hdate, - flags, - size, + v: consts.SCHEMA_VERSION, - // some custom metadata about the delivery - meta: options.meta || {}, + // if true then expires after rdate + retention + exp: !!mailboxData.retention, + rdate: Date.now() + (mailboxData.retention || 0), - // list filter IDs that matched this message - filters: Array.isArray(options.filters) ? options.filters : [].concat(options.filters || []), + // make sure the field exists. it is set to true when user is deleted + userDeleted: false, - headers, - mimeTree, - envelope, - bodystructure, - msgid, + idate, + hdate, + flags, + size, - // use boolean for more commonly used (and searched for) flags - unseen: !flags.includes('\\Seen'), - flagged: flags.includes('\\Flagged'), - undeleted: !flags.includes('\\Deleted'), - draft: flags.includes('\\Draft'), + // some custom metadata about the delivery + meta: options.meta || {}, - magic: maildata.magic, + // list filter IDs that matched this message + filters: Array.isArray(options.filters) ? options.filters : [].concat(options.filters || []), - subject, + headers, + mimeTree, + envelope, + bodystructure, + msgid, - // do not archive deleted messages that have been copied - copied: false - }; + // use boolean for more commonly used (and searched for) flags + unseen: !flags.includes('\\Seen'), + flagged: flags.includes('\\Flagged'), + undeleted: !flags.includes('\\Deleted'), + draft: flags.includes('\\Draft'), - if (options.verificationResults) { - messageData.verificationResults = options.verificationResults; - } + magic: maildata.magic, - if (options.outbound) { - messageData.outbound = [].concat(options.outbound || []); - } + subject, - if (options.forwardTargets) { - messageData.forwardTargets = [].concat(options.forwardTargets || []); - } + // do not archive deleted messages that have been copied + copied: false + }; - if (maildata.attachments && maildata.attachments.length) { - messageData.attachments = maildata.attachments; - messageData.ha = maildata.attachments.some(a => !a.related); - } else { - messageData.ha = false; - } + if (options.verificationResults) { + messageData.verificationResults = options.verificationResults; + } - if (maildata.text) { - messageData.text = maildata.text.replace(/\r\n/g, '\n').trim(); + if (options.outbound) { + messageData.outbound = [].concat(options.outbound || []); + } - // text is indexed with a fulltext index, so only store the beginning of it - if (messageData.text.length > consts.MAX_PLAINTEXT_INDEXED) { - messageData.textFooter = messageData.text.substr(consts.MAX_PLAINTEXT_INDEXED); - messageData.text = messageData.text.substr(0, consts.MAX_PLAINTEXT_INDEXED); + if (options.forwardTargets) { + messageData.forwardTargets = [].concat(options.forwardTargets || []); + } - // truncate remaining text if total length exceeds maximum allowed - if ( - consts.MAX_PLAINTEXT_CONTENT > consts.MAX_PLAINTEXT_INDEXED && - messageData.textFooter.length > consts.MAX_PLAINTEXT_CONTENT - consts.MAX_PLAINTEXT_INDEXED - ) { - messageData.textFooter = messageData.textFooter.substr(0, consts.MAX_PLAINTEXT_CONTENT - consts.MAX_PLAINTEXT_INDEXED); - } - } - messageData.text = - messageData.text.length <= consts.MAX_PLAINTEXT_CONTENT - ? messageData.text - : messageData.text.substr(0, consts.MAX_PLAINTEXT_CONTENT); + if (maildata.attachments && maildata.attachments.length) { + messageData.attachments = maildata.attachments; + messageData.ha = maildata.attachments.some(a => !a.related); + } else { + messageData.ha = false; + } - messageData.intro = this.createIntro(messageData.text); - } + if (maildata.text) { + messageData.text = maildata.text.replace(/\r\n/g, '\n').trim(); - if (maildata.html && maildata.html.length) { - let htmlSize = 0; - messageData.html = maildata.html - .map(html => { - if (htmlSize >= consts.MAX_HTML_CONTENT || !html) { - return ''; - } + // text is indexed with a fulltext index, so only store the beginning of it + if (messageData.text.length > consts.MAX_PLAINTEXT_INDEXED) { + messageData.textFooter = messageData.text.substr(consts.MAX_PLAINTEXT_INDEXED); + messageData.text = messageData.text.substr(0, consts.MAX_PLAINTEXT_INDEXED); - if (htmlSize + Buffer.byteLength(html) <= consts.MAX_HTML_CONTENT) { - htmlSize += Buffer.byteLength(html); - return html; - } + // truncate remaining text if total length exceeds maximum allowed + if ( + consts.MAX_PLAINTEXT_CONTENT > consts.MAX_PLAINTEXT_INDEXED && + messageData.textFooter.length > consts.MAX_PLAINTEXT_CONTENT - consts.MAX_PLAINTEXT_INDEXED + ) { + messageData.textFooter = messageData.textFooter.substr(0, consts.MAX_PLAINTEXT_CONTENT - consts.MAX_PLAINTEXT_INDEXED); + } + } + messageData.text = + messageData.text.length <= consts.MAX_PLAINTEXT_CONTENT ? messageData.text : messageData.text.substr(0, consts.MAX_PLAINTEXT_CONTENT); - html = html.substr(0, consts.MAX_HTML_CONTENT); - htmlSize += Buffer.byteLength(html); - return html; - }) - .filter(html => html); + messageData.intro = this.createIntro(messageData.text); + } - // if message has HTML content use it instead of text/plain content for intro - messageData.intro = this.createIntro(htmlToText(messageData.html.join(''))); - } + if (maildata.html && maildata.html.length) { + let htmlSize = 0; + messageData.html = maildata.html + .map(html => { + if (htmlSize >= consts.MAX_HTML_CONTENT || !html) { + return ''; + } - this.users.collection('users').findOneAndUpdate( - { - _id: mailboxData.user - }, - { - $inc: { - storageUsed: size - } - }, - { - returnDocument: 'after', - projection: { - storageUsed: true - } - }, - (err, r) => { - if (err) { - return cleanup(err); - } + if (htmlSize + Buffer.byteLength(html) <= consts.MAX_HTML_CONTENT) { + htmlSize += Buffer.byteLength(html); + return html; + } - if (r && r.value) { - this.loggelf({ - short_message: '[QUOTA] +', - _mail_action: 'quota', - _user: mailboxData.user, - _inc: size, - _storage_used: r.value.storageUsed, - _sess: options.session && options.session.id, - _mailbox: mailboxData._id - }); - } + html = html.substr(0, consts.MAX_HTML_CONTENT); + htmlSize += Buffer.byteLength(html); + return html; + }) + .filter(html => html); - let rollback = err => { - this.users.collection('users').findOneAndUpdate( - { - _id: mailboxData.user - }, - { - $inc: { - storageUsed: -size - } - }, - { - returnDocument: 'after', - projection: { - storageUsed: true - } - }, - (...args) => { - let r = args && args[1]; - - if (r && r.value) { - this.loggelf({ - short_message: '[QUOTA] -', - _mail_action: 'quota', - _user: mailboxData.user, - _inc: -size, - _storage_used: r.value.storageUsed, - _sess: options.session && options.session.id, - _mailbox: mailboxData._id, - _rollback: 'yes', - _error: err.message, - _code: err.code - }); - } + // if message has HTML content use it instead of text/plain content for intro + messageData.intro = this.createIntro(htmlToText(messageData.html.join(''))); + } - cleanup(err); - } - ); - }; + let r; - // acquire new UID+MODSEQ - this.database.collection('mailboxes').findOneAndUpdate( - { - _id: mailboxData._id - }, - { - $inc: { - // allocate bot UID and MODSEQ values so when journal is later sorted by - // modseq then UIDs are always in ascending order - uidNext: 1, - modifyIndex: 1 - } - }, - { - // use original value to get correct UIDNext - returnDocument: 'before' - }, - (err, item) => { - if (err) { - return rollback(err); - } + try { + r = await this.users.collection('users').findOneAndUpdate( + { + _id: mailboxData.user + }, + { + $inc: { + storageUsed: size + } + }, + { + returnDocument: 'after', + projection: { + storageUsed: true + } + } + ); + } catch (err) { + return cleanup(err); + } - if (!item || !item.value) { - // was not able to acquire a lock - let err = new Error('Mailbox is missing'); - err.imapResponse = 'TRYCREATE'; - return rollback(err); - } + if (r && r.value) { + this.loggelf({ + short_message: '[QUOTA] +', + _mail_action: 'quota', + _user: mailboxData.user, + _inc: size, + _storage_used: r.value.storageUsed, + _sess: options.session && options.session.id, + _mailbox: mailboxData._id + }); + } + + let rollback = async rollbackError => { + let args; + try { + args = await this.users.collection('users').findOneAndUpdate( + { + _id: mailboxData.user + }, + { + $inc: { + storageUsed: -size + } + }, + { + returnDocument: 'after', + projection: { + storageUsed: true + } + } + ); + } catch (ignore) { + // ignore + } - let mailboxData = item.value; + let r = args && args[1]; - // updated message object by setting mailbox specific values - messageData.mailbox = mailboxData._id; - messageData.user = mailboxData.user; - messageData.uid = mailboxData.uidNext; - messageData.modseq = mailboxData.modifyIndex + 1; + if (r && r.value) { + this.loggelf({ + short_message: '[QUOTA] -', + _mail_action: 'quota', + _user: mailboxData.user, + _inc: -size, + _storage_used: r.value.storageUsed, + _sess: options.session && options.session.id, + _mailbox: mailboxData._id, + _rollback: 'yes', + _error: rollbackError.message, + _code: rollbackError.code + }); + } - if (!flags.includes('\\Deleted')) { - messageData.searchable = true; - } + return cleanup(rollbackError); + }; - if (mailboxData.specialUse === '\\Junk') { - messageData.junk = true; - } + // acquire new UID+MODSEQ - this.getThreadId(mailboxData.user, subject, mimeTree, (err, thread) => { - if (err) { - return rollback(err); - } + let item; - messageData.thread = thread; + try { + item = await this.database.collection('mailboxes').findOneAndUpdate( + { + _id: mailboxData._id + }, + { + $inc: { + // allocate bot UID and MODSEQ values so when journal is later sorted by + // modseq then UIDs are always in ascending order + uidNext: 1, + modifyIndex: 1 + } + }, + { + // use original value to get correct UIDNext + returnDocument: 'before' + } + ); + } catch (err) { + return rollback(err); + } - this.database.collection('messages').insertOne(messageData, { writeConcern: 'majority' }, (err, r) => { - if (err) { - return rollback(err); - } + if (!item || !item.value) { + // was not able to acquire a lock + let err = new Error('Mailbox is missing'); + err.imapResponse = 'TRYCREATE'; + return rollback(err); + } - if (!r || !r.acknowledged) { - let err = new Error('Failed to store message [1]'); - err.responseCode = 500; - err.code = 'StoreError'; - return rollback(err); - } + mailboxData = item.value; - let logTime = messageData.meta.time || new Date(); - if (typeof logTime === 'number') { - logTime = new Date(logTime); - } + // updated message object by setting mailbox specific values + messageData.mailbox = mailboxData._id; + messageData.user = mailboxData.user; + messageData.uid = mailboxData.uidNext; + messageData.modseq = mailboxData.modifyIndex + 1; - let uidValidity = mailboxData.uidValidity; - let uid = messageData.uid; + if (!flags.includes('\\Deleted')) { + messageData.searchable = true; + } - if ( - options.session && - options.session.selected && - options.session.selected.mailbox && - options.session.selected.mailbox.toString() === mailboxData._id.toString() - ) { - options.session.writeStream.write(options.session.formatResponse('EXISTS', messageData.uid)); - } + if (mailboxData.specialUse === '\\Junk') { + messageData.junk = true; + } - let updateAddressRegister = next => { - let addresses = []; + let thread; - if (messageData.junk || flags.includes('\\Draft')) { - // skip junk and draft messages - return next(); - } + try { + thread = await this.getThreadIdAsync(mailboxData.user, subject, mimeTree); + } catch (err) { + return rollback(err); + } - let parsed = messageData.mimeTree && messageData.mimeTree.parsedHeader; + messageData.thread = thread; - if (parsed) { - let keyList = mailboxData.specialUse === '\\Sent' ? ['to', 'cc', 'bcc'] : ['from']; + let insertRes; - for (const disallowedHeader of DISALLOWED_HEADERS_FOR_ADDRESS_REGISTER) { - // if email contains headers that we do not want, - // don't add any emails to address register - if (parsed[disallowedHeader]) { - return next(); - } - } + try { + insertRes = await this.database.collection('messages').insertOne(messageData, { writeConcern: 'majority' }); + } catch (err) { + return rollback(err); + } - for (let key of keyList) { - if (parsed[key] && parsed[key].length) { - for (let addr of parsed[key]) { - if (/no-?reply/i.test(addr.address)) { - continue; - } - if (!addresses.some(a => a.address === addr.address)) { - addresses.push(addr); - } - } - } - } - } + if (!insertRes || !insertRes.acknowledged) { + let err = new Error('Failed to store message [1]'); + err.responseCode = 500; + err.code = 'StoreError'; + return rollback(err); + } - if (!addresses.length) { - return next(); - } + let logTime = messageData.meta.time || new Date(); + if (typeof logTime === 'number') { + logTime = new Date(logTime); + } - this.updateAddressRegister(mailboxData.user, addresses) - .then(() => next()) - .catch(err => next(err)); - }; + let uidValidity = mailboxData.uidValidity; + let uid = messageData.uid; - updateAddressRegister(() => { - this.notifier.addEntries( - mailboxData, - { - command: 'EXISTS', - uid: messageData.uid, - ignore: options.session && options.session.id, - message: messageData._id, - modseq: messageData.modseq, - unseen: messageData.unseen, - idate: messageData.idate, - thread: messageData.thread - }, - () => { - this.notifier.fire(mailboxData.user); - - let raw = options.rawchunks || options.raw; - let processAudits = async () => { - let audits = await this.database - .collection('audits') - .find({ user: mailboxData.user, expires: { $gt: new Date() } }) - .toArray(); - - let now = new Date(); - for (let auditData of audits) { - if ((auditData.start && auditData.start > now) || (auditData.end && auditData.end < now)) { - // audit not active - continue; - } - await this.auditHandler.store(auditData._id, raw, { - date: messageData.idate, - msgid: messageData.msgid, - header: messageData.mimeTree && messageData.mimeTree.parsedHeader, - ha: messageData.ha, - mailbox: mailboxData._id, - mailboxPath: mailboxData.path, - info: Object.assign({ queueId: messageData.outbound }, messageData.meta) - }); - } - }; - - let next = () => { - cleanup(null, true, { - uidValidity, - uid, - id: messageData._id.toString(), - mailbox: mailboxData._id.toString(), - mailboxPath: mailboxData.path, - size, - status: 'new' - }); - }; + if ( + options.session && + options.session.selected && + options.session.selected.mailbox && + options.session.selected.mailbox.toString() === mailboxData._id.toString() + ) { + options.session.writeStream.write(options.session.formatResponse('EXISTS', messageData.uid)); + } - // do not use more suitable .finally() as it is not supported in Node v8 - return processAudits().then(next).catch(next); - } - ); - }); - }); - }); - } - ); - } - ); - }); - }; + let updateAddressRegister = async finishFunc => { + let addresses = []; + + if (messageData.junk || flags.includes('\\Draft')) { + // skip junk and draft messages + return finishFunc(); + } - if (!alreadyEncrypted) { - // not already encrypted, check if user has encryption on or target mailbox is encrypted - if ((userData.encryptMessages || !!mailboxData.encryptMessages) && userData.pubKey && !flags.includes('\\Draft')) { - if (options.rawchunks && !options.raw) { - // got rawchunks instead of raw - if (options.chunklen) { - options.raw = Buffer.concat(options.rawchunks, options.chunklen); - } else { - options.raw = Buffer.concat(options.rawchunks); + let parsed = messageData.mimeTree && messageData.mimeTree.parsedHeader; + + if (parsed) { + let keyList = mailboxData.specialUse === '\\Sent' ? ['to', 'cc', 'bcc'] : ['from']; + + for (const disallowedHeader of DISALLOWED_HEADERS_FOR_ADDRESS_REGISTER) { + // if email contains headers that we do not want, + // don't add any emails to address register + if (parsed[disallowedHeader]) { + return finishFunc(); + } + } + + for (let key of keyList) { + if (parsed[key] && parsed[key].length) { + for (let addr of parsed[key]) { + if (/no-?reply/i.test(addr.address)) { + continue; } - } - // user has encryption on or target mailbox encrypted, encrypt message and prepare again - // do not encrypt drafts - // may have a situation where we got prepared and no options.raw but options.rawchunks instead, concat them - this.encryptMessage(userData.pubKey, options.raw, (err, res) => { - if (err) { - return callback(err); + if (!addresses.some(a => a.address === addr.address)) { + addresses.push(addr); } + } + } + } + } - if (res) { - // new encrypted raw available - options.raw = res; - } + if (!addresses.length) { + return finishFunc(); + } - delete options.prepared; // delete any existing prepared as new will be generated - this.prepareMessage(options, (err, newPrepared) => { - if (err) { - return callback(err); + try { + await this.updateAddressRegister(mailboxData.user, addresses); + return finishFunc(); + } catch (err) { + return finishFunc(err); + } + }; + + return updateAddressRegister( + async () => + new Promise(resolve => { + this.notifier.addEntries( + mailboxData, + { + command: 'EXISTS', + uid: messageData.uid, + ignore: options.session && options.session.id, + message: messageData._id, + modseq: messageData.modseq, + unseen: messageData.unseen, + idate: messageData.idate, + thread: messageData.thread + }, + async () => { + // added Entries + this.notifier.fire(mailboxData.user); + + let raw = options.rawchunks || options.raw; + let processAudits = async () => { + let audits = await this.database + .collection('audits') + .find({ user: mailboxData.user, expires: { $gt: new Date() } }) + .toArray(); + + let now = new Date(); + for (let auditData of audits) { + if ((auditData.start && auditData.start > now) || (auditData.end && auditData.end < now)) { + // audit not active + continue; + } + await this.auditHandler.store(auditData._id, raw, { + date: messageData.idate, + msgid: messageData.msgid, + header: messageData.mimeTree && messageData.mimeTree.parsedHeader, + ha: messageData.ha, + mailbox: mailboxData._id, + mailboxPath: mailboxData.path, + info: Object.assign({ queueId: messageData.outbound }, messageData.meta) + }); } + }; - newPrepared.id = prepared.id; // retain original + let next = () => + cleanup(null, true, { + uidValidity, + uid, + id: messageData._id.toString(), + mailbox: mailboxData._id.toString(), + mailboxPath: mailboxData.path, + size, + status: 'new' + }); - options.prepared = newPrepared; // new prepared in options just in case - prepared = newPrepared; // overwrite top-level original prepared - options.maildata = this.indexer.getMaildata(newPrepared.mimeTree); // get new maildata of encrypted message - addMessage(); - }); - }); - } else { - // not already encrypted and no need to - this.prepareMessage(options, (err, newPrepared) => { - if (err) { - return callback(err); + // do not use more suitable .finally() as it is not supported in Node v8 + try { + await processAudits(); + resolve(next()); + } catch (ignore) { + resolve(next()); } - - prepared = newPrepared; - addMessage(); - }); - } - } else { - // message already encrypted - this.prepareMessage(options, (err, newPrepared) => { - if (err) { - return callback(err); } - - prepared = newPrepared; - addMessage(); - }); - } - }); - }); + ); + }) + ); } async updateQuotaAsync(user, inc, options) { @@ -1536,9 +1546,9 @@ class MessageHandler { .filter(line => line); } - prepareMessage(options, callback) { + async prepareMessageAsync(options) { if (options.prepared) { - return setImmediate(() => callback(null, options.prepared)); + return options.prepared; } let id = new ObjectId(); @@ -1587,11 +1597,16 @@ class MessageHandler { subject }; - return setImmediate(() => callback(null, prepared)); + return prepared; } - // resolves or generates new thread id for a message - getThreadId(userId, subject, mimeTree, callback) { + prepareMessage(options, callback) { + this.prepareMessageAsync(options) + .then(prepared => callback(null, prepared)) + .catch(err => callback(err)); + } + + async getThreadIdAsync(userId, subject, mimeTree) { let referenceIds = new Set( [ [].concat(mimeTree.parsedHeader['message-id'] || []).pop() || '', @@ -1612,7 +1627,7 @@ class MessageHandler { referenceIds = Array.from(referenceIds).slice(0, 10); // most messages are not threaded, so an upsert call should be ok to make - this.database.collection('threads').findOneAndUpdate( + const existingThread = await this.database.collection('threads').findOneAndUpdate( { user: userId, ids: { $in: referenceIds }, @@ -1628,31 +1643,29 @@ class MessageHandler { }, { returnDocument: 'after' - }, - (err, r) => { - if (err) { - return callback(err); - } - if (r.value) { - return callback(null, r.value._id); - } - // thread not found, create a new one - this.database.collection('threads').insertOne( - { - user: userId, - subject, - ids: referenceIds, - updated: new Date() - }, - (err, r) => { - if (err) { - return callback(err); - } - return callback(null, r.insertedId); - } - ); } ); + + if (existingThread.value) { + return existingThread.value._id; + } + + // otherwise if no existing thread + const newThread = await this.database.collection('threads').insertOne({ + user: userId, + subject, + ids: referenceIds, + updated: new Date() + }); + + return newThread.insertedId; + } + + // resolves or generates new thread id for a message + getThreadId(userId, subject, mimeTree, callback) { + this.getThreadIdAsync(userId, subject, mimeTree) + .then(id => callback(null, id)) + .catch(err => callback(err)); } normalizeSubject(subject, options) {