diff --git a/apps/meteor/app/autotranslate/server/autotranslate.ts b/apps/meteor/app/autotranslate/server/autotranslate.ts index 6cac8028e1f2..7a9eb8780a2d 100644 --- a/apps/meteor/app/autotranslate/server/autotranslate.ts +++ b/apps/meteor/app/autotranslate/server/autotranslate.ts @@ -1,4 +1,3 @@ -import { api } from '@rocket.chat/core-services'; import type { IMessage, IRoom, @@ -16,7 +15,7 @@ import _ from 'underscore'; import { callbacks } from '../../../lib/callbacks'; import { isTruthy } from '../../../lib/isTruthy'; -import { broadcastMessageSentEvent } from '../../../server/modules/watchers/lib/messages'; +import { broadcastMessageFromData } from '../../../server/modules/watchers/lib/messages'; import { Markdown } from '../../markdown/server'; import { settings } from '../../settings/server'; @@ -333,9 +332,8 @@ export abstract class AutoTranslate { } private notifyTranslatedMessage(messageId: string): void { - void broadcastMessageSentEvent({ + void broadcastMessageFromData({ id: messageId, - broadcastCallback: (message) => api.broadcast('message.sent', message), }); } diff --git a/apps/meteor/app/discussion/server/hooks/propagateDiscussionMetadata.ts b/apps/meteor/app/discussion/server/hooks/propagateDiscussionMetadata.ts index e366216ed7f9..0f42f495e962 100644 --- a/apps/meteor/app/discussion/server/hooks/propagateDiscussionMetadata.ts +++ b/apps/meteor/app/discussion/server/hooks/propagateDiscussionMetadata.ts @@ -1,9 +1,8 @@ -import { api } from '@rocket.chat/core-services'; import type { IRoom } from '@rocket.chat/core-typings'; import { Messages, Rooms } from '@rocket.chat/models'; import { callbacks } from '../../../../lib/callbacks'; -import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages'; +import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages'; import { deleteRoom } from '../../../lib/server/functions/deleteRoom'; const updateAndNotifyParentRoomWithParentMessage = async (room: IRoom): Promise => { @@ -11,10 +10,9 @@ const updateAndNotifyParentRoomWithParentMessage = async (room: IRoom): Promise< if (!parentMessage) { return; } - void broadcastMessageSentEvent({ + void broadcastMessageFromData({ id: parentMessage._id, data: parentMessage, - broadcastCallback: (message) => api.broadcast('message.sent', message), }); }; diff --git a/apps/meteor/app/federation/server/endpoints/dispatch.js b/apps/meteor/app/federation/server/endpoints/dispatch.js index e54441a7aa9d..4b3e148bbdad 100644 --- a/apps/meteor/app/federation/server/endpoints/dispatch.js +++ b/apps/meteor/app/federation/server/endpoints/dispatch.js @@ -3,7 +3,7 @@ import { eventTypes } from '@rocket.chat/core-typings'; import { FederationServers, FederationRoomEvents, Rooms, Messages, Subscriptions, Users, ReadReceipts } from '@rocket.chat/models'; import EJSON from 'ejson'; -import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages'; +import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages'; import { API } from '../../../api/server'; import { FileUpload } from '../../../file-upload/server'; import { deleteRoom } from '../../../lib/server/functions/deleteRoom'; @@ -284,10 +284,9 @@ const eventHandlers = { } } if (messageForNotification) { - void broadcastMessageSentEvent({ + void broadcastMessageFromData({ id: messageForNotification._id, data: messageForNotification, - broadcastCallback: (message) => api.broadcast('message.sent', message), }); } } @@ -316,14 +315,13 @@ const eventHandlers = { } else { // Update the message await Messages.updateOne({ _id: persistedMessage._id }, { $set: { msg: message.msg, federation: message.federation } }); - void broadcastMessageSentEvent({ + void broadcastMessageFromData({ id: persistedMessage._id, data: { ...persistedMessage, msg: message.msg, federation: message.federation, }, - broadcastCallback: (message) => api.broadcast('message.sent', message), }); } } @@ -387,7 +385,7 @@ const eventHandlers = { // Update the property await Messages.updateOne({ _id: messageId }, { $set: { [`reactions.${reaction}`]: reactionObj } }); - void broadcastMessageSentEvent({ + void broadcastMessageFromData({ id: persistedMessage._id, data: { ...persistedMessage, @@ -396,7 +394,6 @@ const eventHandlers = { [reaction]: reactionObj, }, }, - broadcastCallback: (message) => api.broadcast('message.sent', message), }); } @@ -446,7 +443,7 @@ const eventHandlers = { // Otherwise, update the property await Messages.updateOne({ _id: messageId }, { $set: { [`reactions.${reaction}`]: reactionObj } }); } - void broadcastMessageSentEvent({ + void broadcastMessageFromData({ id: persistedMessage._id, data: { ...persistedMessage, @@ -455,7 +452,6 @@ const eventHandlers = { [reaction]: reactionObj, }, }, - broadcastCallback: (message) => api.broadcast('message.sent', message), }); } diff --git a/apps/meteor/app/lib/server/functions/deleteMessage.ts b/apps/meteor/app/lib/server/functions/deleteMessage.ts index 37ae72254418..cd4456b24514 100644 --- a/apps/meteor/app/lib/server/functions/deleteMessage.ts +++ b/apps/meteor/app/lib/server/functions/deleteMessage.ts @@ -5,7 +5,7 @@ import { Meteor } from 'meteor/meteor'; import { Apps } from '../../../../ee/server/apps'; import { callbacks } from '../../../../lib/callbacks'; -import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages'; +import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages'; import { canDeleteMessageAsync } from '../../../authorization/server/functions/canDeleteMessage'; import { FileUpload } from '../../../file-upload/server'; import { settings } from '../../../settings/server'; @@ -90,9 +90,8 @@ export async function deleteMessage(message: IMessage, user: IUser): Promise api.broadcast('message.sent', message), }); } diff --git a/apps/meteor/app/lib/server/functions/sendMessage.ts b/apps/meteor/app/lib/server/functions/sendMessage.ts index 4145cc4cf627..486f7c360f99 100644 --- a/apps/meteor/app/lib/server/functions/sendMessage.ts +++ b/apps/meteor/app/lib/server/functions/sendMessage.ts @@ -1,4 +1,4 @@ -import { Message, api } from '@rocket.chat/core-services'; +import { Message } from '@rocket.chat/core-services'; import type { IMessage, IRoom } from '@rocket.chat/core-typings'; import { Messages } from '@rocket.chat/models'; import { Match, check } from 'meteor/check'; @@ -7,7 +7,7 @@ import { Apps } from '../../../../ee/server/apps'; import { callbacks } from '../../../../lib/callbacks'; import { isRelativeURL } from '../../../../lib/utils/isRelativeURL'; import { isURL } from '../../../../lib/utils/isURL'; -import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages'; +import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages'; import { hasPermissionAsync } from '../../../authorization/server/functions/hasPermission'; import { FileUpload } from '../../../file-upload/server'; import notifications from '../../../notifications/server/lib/Notifications'; @@ -285,9 +285,8 @@ export const sendMessage = async function (user: any, message: any, room: any, u // Execute all callbacks await callbacks.run('afterSaveMessage', message, room); - void broadcastMessageSentEvent({ + void broadcastMessageFromData({ id: message._id, - broadcastCallback: (message) => api.broadcast('message.sent', message), }); return message; }; diff --git a/apps/meteor/app/lib/server/functions/updateMessage.ts b/apps/meteor/app/lib/server/functions/updateMessage.ts index 8abb8b4b0a02..5cfe29ef41ae 100644 --- a/apps/meteor/app/lib/server/functions/updateMessage.ts +++ b/apps/meteor/app/lib/server/functions/updateMessage.ts @@ -1,11 +1,11 @@ -import { Message, api } from '@rocket.chat/core-services'; +import { Message } from '@rocket.chat/core-services'; import type { IEditedMessage, IMessage, IUser, AtLeast } from '@rocket.chat/core-typings'; import { Messages, Rooms } from '@rocket.chat/models'; import { Meteor } from 'meteor/meteor'; import { Apps } from '../../../../ee/server/apps'; import { callbacks } from '../../../../lib/callbacks'; -import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages'; +import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages'; import { settings } from '../../../settings/server'; import { parseUrlsInMessage } from './parseUrlsInMessage'; @@ -85,10 +85,9 @@ export const updateMessage = async function ( const msg = await Messages.findOneById(_id); if (msg) { await callbacks.run('afterSaveMessage', msg, room, user._id); - void broadcastMessageSentEvent({ + void broadcastMessageFromData({ id: msg._id, data: msg, - broadcastCallback: (message) => api.broadcast('message.sent', message), }); } }); diff --git a/apps/meteor/app/message-pin/server/pinMessage.ts b/apps/meteor/app/message-pin/server/pinMessage.ts index ff38c7e8d4bc..1ed0a172028b 100644 --- a/apps/meteor/app/message-pin/server/pinMessage.ts +++ b/apps/meteor/app/message-pin/server/pinMessage.ts @@ -1,4 +1,4 @@ -import { Message, api } from '@rocket.chat/core-services'; +import { Message } from '@rocket.chat/core-services'; import { isQuoteAttachment, isRegisterUser } from '@rocket.chat/core-typings'; import type { IMessage, MessageAttachment, MessageQuoteAttachment } from '@rocket.chat/core-typings'; import { Messages, Rooms, Subscriptions, Users, ReadReceipts } from '@rocket.chat/models'; @@ -8,7 +8,7 @@ import { Meteor } from 'meteor/meteor'; import { Apps, AppEvents } from '../../../ee/server/apps/orchestrator'; import { isTruthy } from '../../../lib/isTruthy'; -import { broadcastMessageSentEvent } from '../../../server/modules/watchers/lib/messages'; +import { broadcastMessageFromData } from '../../../server/modules/watchers/lib/messages'; import { canAccessRoomAsync, roomAccessAttributes } from '../../authorization/server'; import { hasPermissionAsync } from '../../authorization/server/functions/hasPermission'; import { isTheLastMessage } from '../../lib/server/functions/isTheLastMessage'; @@ -222,9 +222,8 @@ Meteor.methods({ if (settings.get('Message_Read_Receipt_Store_Users')) { await ReadReceipts.setPinnedByMessageId(originalMessage._id, originalMessage.pinned); } - void broadcastMessageSentEvent({ + void broadcastMessageFromData({ id: message._id, - broadcastCallback: (message) => api.broadcast('message.sent', message), }); return true; diff --git a/apps/meteor/app/message-star/server/starMessage.ts b/apps/meteor/app/message-star/server/starMessage.ts index aaa5657c5b35..8f025d920057 100644 --- a/apps/meteor/app/message-star/server/starMessage.ts +++ b/apps/meteor/app/message-star/server/starMessage.ts @@ -1,11 +1,10 @@ -import { api } from '@rocket.chat/core-services'; import type { IMessage } from '@rocket.chat/core-typings'; import { Messages, Subscriptions, Rooms } from '@rocket.chat/models'; import type { ServerMethods } from '@rocket.chat/ui-contexts'; import { Meteor } from 'meteor/meteor'; import { Apps, AppEvents } from '../../../ee/server/apps/orchestrator'; -import { broadcastMessageSentEvent } from '../../../server/modules/watchers/lib/messages'; +import { broadcastMessageFromData } from '../../../server/modules/watchers/lib/messages'; import { canAccessRoomAsync, roomAccessAttributes } from '../../authorization/server'; import { isTheLastMessage } from '../../lib/server/functions/isTheLastMessage'; import { settings } from '../../settings/server'; @@ -62,9 +61,8 @@ Meteor.methods({ await Messages.updateUserStarById(message._id, uid, message.starred); - void broadcastMessageSentEvent({ + void broadcastMessageFromData({ id: message._id, - broadcastCallback: (message) => api.broadcast('message.sent', message), }); return true; diff --git a/apps/meteor/app/reactions/server/setReaction.ts b/apps/meteor/app/reactions/server/setReaction.ts index fab1100fc615..27fe4d36a053 100644 --- a/apps/meteor/app/reactions/server/setReaction.ts +++ b/apps/meteor/app/reactions/server/setReaction.ts @@ -8,7 +8,7 @@ import _ from 'underscore'; import { AppEvents, Apps } from '../../../ee/server/apps/orchestrator'; import { callbacks } from '../../../lib/callbacks'; import { i18n } from '../../../server/lib/i18n'; -import { broadcastMessageSentEvent } from '../../../server/modules/watchers/lib/messages'; +import { broadcastMessageFromData } from '../../../server/modules/watchers/lib/messages'; import { canAccessRoomAsync } from '../../authorization/server'; import { hasPermissionAsync } from '../../authorization/server/functions/hasPermission'; import { emoji } from '../../emoji/server'; @@ -108,9 +108,8 @@ async function setReaction(room: IRoom, user: IUser, message: IMessage, reaction await Apps.triggerEvent(AppEvents.IPostMessageReacted, message, user, reaction, isReacted); - void broadcastMessageSentEvent({ + void broadcastMessageFromData({ id: message._id, - broadcastCallback: (message) => api.broadcast('message.sent', message), }); } diff --git a/apps/meteor/app/threads/server/hooks/aftersavemessage.ts b/apps/meteor/app/threads/server/hooks/aftersavemessage.ts index 6fa780e12f8d..4b0f94aa8b52 100644 --- a/apps/meteor/app/threads/server/hooks/aftersavemessage.ts +++ b/apps/meteor/app/threads/server/hooks/aftersavemessage.ts @@ -1,11 +1,10 @@ -import { api } from '@rocket.chat/core-services'; import type { IMessage, IRoom } from '@rocket.chat/core-typings'; import { isEditedMessage } from '@rocket.chat/core-typings'; import { Messages } from '@rocket.chat/models'; import { Meteor } from 'meteor/meteor'; import { callbacks } from '../../../../lib/callbacks'; -import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages'; +import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages'; import { updateThreadUsersSubscriptions, getMentions } from '../../../lib/server/lib/notifyUsersOnMessage'; import { sendMessageNotifications } from '../../../lib/server/lib/sendNotificationsOnMessage'; import { settings } from '../../../settings/server'; @@ -63,9 +62,8 @@ export async function processThreads(message: IMessage, room: IRoom) { await notifyUsersOnReply(message, replies, room); await metaData(message, parentMessage, replies); await notification(message, room, replies); - void broadcastMessageSentEvent({ + void broadcastMessageFromData({ id: message.tmid, - broadcastCallback: (message) => api.broadcast('message.sent', message), }); return message; diff --git a/apps/meteor/ee/server/lib/message-read-receipt/ReadReceipt.js b/apps/meteor/ee/server/lib/message-read-receipt/ReadReceipt.js index d0d627b74665..432fa897243f 100644 --- a/apps/meteor/ee/server/lib/message-read-receipt/ReadReceipt.js +++ b/apps/meteor/ee/server/lib/message-read-receipt/ReadReceipt.js @@ -5,7 +5,7 @@ import { Random } from '@rocket.chat/random'; import { settings } from '../../../../app/settings/server'; import { SystemLogger } from '../../../../server/lib/logger/system'; import { roomCoordinator } from '../../../../server/lib/rooms/roomCoordinator'; -import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages'; +import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages'; // debounced function by roomId, so multiple calls within 2 seconds to same roomId runs only once const list = {}; @@ -68,9 +68,8 @@ export const ReadReceipt = { if (isUserAlone) { const result = await Messages.setAsReadById(message._id); if (result.modifiedCount > 0) { - void broadcastMessageSentEvent({ + void broadcastMessageFromData({ id: message._id, - broadcastCallback: (message) => api.broadcast('message.sent', message), }); } } diff --git a/apps/meteor/server/features/EmailInbox/EmailInbox_Incoming.ts b/apps/meteor/server/features/EmailInbox/EmailInbox_Incoming.ts index 9b1ba98e0f3d..d974aa9c91be 100644 --- a/apps/meteor/server/features/EmailInbox/EmailInbox_Incoming.ts +++ b/apps/meteor/server/features/EmailInbox/EmailInbox_Incoming.ts @@ -1,4 +1,3 @@ -import { api } from '@rocket.chat/core-services'; import type { ILivechatVisitor, IOmnichannelRoom, @@ -17,7 +16,7 @@ import { Livechat as LivechatTyped } from '../../../app/livechat/server/lib/Live import { QueueManager } from '../../../app/livechat/server/lib/QueueManager'; import { settings } from '../../../app/settings/server'; import { i18n } from '../../lib/i18n'; -import { broadcastMessageSentEvent } from '../../modules/watchers/lib/messages'; +import { broadcastMessageFromData } from '../../modules/watchers/lib/messages'; import { logger } from './logger'; type FileAttachment = VideoAttachmentProps & ImageAttachmentProps & AudioAttachmentProps; @@ -238,9 +237,8 @@ export async function onEmailReceived(email: ParsedMail, inbox: string, departme }, ); room && (await LivechatRooms.updateEmailThreadByRoomId(room._id, thread)); - void broadcastMessageSentEvent({ + void broadcastMessageFromData({ id: msgId, - broadcastCallback: (message) => api.broadcast('message.sent', message), }); }) .catch((err) => { diff --git a/apps/meteor/server/features/EmailInbox/EmailInbox_Outgoing.ts b/apps/meteor/server/features/EmailInbox/EmailInbox_Outgoing.ts index b66610f326c1..708d00422b5d 100644 --- a/apps/meteor/server/features/EmailInbox/EmailInbox_Outgoing.ts +++ b/apps/meteor/server/features/EmailInbox/EmailInbox_Outgoing.ts @@ -1,4 +1,3 @@ -import { api } from '@rocket.chat/core-services'; import { isIMessageInbox } from '@rocket.chat/core-typings'; import type { IEmailInbox, IUser, IMessage, IOmnichannelRoom, SlashCommandCallbackParams } from '@rocket.chat/core-typings'; import { Messages, Uploads, LivechatRooms, Rooms, Users } from '@rocket.chat/models'; @@ -11,7 +10,7 @@ import { settings } from '../../../app/settings/server'; import { slashCommands } from '../../../app/utils/server/slashCommand'; import { callbacks } from '../../../lib/callbacks'; import { i18n } from '../../lib/i18n'; -import { broadcastMessageSentEvent } from '../../modules/watchers/lib/messages'; +import { broadcastMessageFromData } from '../../modules/watchers/lib/messages'; import { inboxes } from './EmailInbox'; import type { Inbox } from './EmailInbox'; import { logger } from './logger'; @@ -172,9 +171,8 @@ slashCommands.add({ }, }, ); - void broadcastMessageSentEvent({ + void broadcastMessageFromData({ id: message._id, - broadcastCallback: (message) => api.broadcast('message.sent', message), }); return sendSuccessReplyMessage({ diff --git a/apps/meteor/server/modules/listeners/listeners.module.ts b/apps/meteor/server/modules/listeners/listeners.module.ts index e3691530843e..49a3017af81c 100644 --- a/apps/meteor/server/modules/listeners/listeners.module.ts +++ b/apps/meteor/server/modules/listeners/listeners.module.ts @@ -1,7 +1,7 @@ import type { AppStatus } from '@rocket.chat/apps-engine/definition/AppStatus'; import type { ISetting as AppsSetting } from '@rocket.chat/apps-engine/definition/settings'; import type { IServiceClass } from '@rocket.chat/core-services'; -import { EnterpriseSettings, listenToMessageSentEvent } from '@rocket.chat/core-services'; +import { EnterpriseSettings } from '@rocket.chat/core-services'; import { isSettingColor, isSettingEnterprise } from '@rocket.chat/core-typings'; import type { IUser, IRoom, VideoConference, ISetting, IOmnichannelRoom } from '@rocket.chat/core-typings'; import { Logger } from '@rocket.chat/logger'; @@ -158,7 +158,7 @@ export class ListenersModule { }); }); - listenToMessageSentEvent(service, async (message) => { + service.onEvent('watch.messages', async ({ message }) => { if (!message.rid) { return; } diff --git a/apps/meteor/server/modules/watchers/lib/messages.ts b/apps/meteor/server/modules/watchers/lib/messages.ts index ded1c2389e17..576f27f83b96 100644 --- a/apps/meteor/server/modules/watchers/lib/messages.ts +++ b/apps/meteor/server/modules/watchers/lib/messages.ts @@ -1,3 +1,4 @@ +import { api, dbWatchersDisabled } from '@rocket.chat/core-services'; import type { IMessage, SettingValue, IUser } from '@rocket.chat/core-typings'; import { Messages, Settings, Users } from '@rocket.chat/models'; import mem from 'mem'; @@ -12,41 +13,48 @@ const getUserNameCached = mem( { maxAge: 10000 }, ); -export const broadcastMessageSentEvent = async ({ - id, - data, - broadcastCallback, -}: { - id: IMessage['_id']; - broadcastCallback: (message: IMessage) => Promise; - data?: IMessage; -}): Promise => { +export async function getMessageToBroadcast({ id, data }: { id: IMessage['_id']; data?: IMessage }): Promise { const message = data ?? (await Messages.findOneById(id)); if (!message) { return; } - if (message._hidden !== true && message.imported == null) { - const UseRealName = (await getSettingCached('UI_Use_Real_Name')) === true; + if (message._hidden || message.imported != null) { + return; + } - if (UseRealName) { - if (message.u?._id) { - const name = await getUserNameCached(message.u._id); - if (name) { - message.u.name = name; - } + const useRealName = (await getSettingCached('UI_Use_Real_Name')) === true; + if (useRealName) { + if (message.u?._id) { + const name = await getUserNameCached(message.u._id); + if (name) { + message.u.name = name; } + } - if (message.mentions?.length) { - for await (const mention of message.mentions) { - const name = await getUserNameCached(mention._id); - if (name) { - mention.name = name; - } + if (message.mentions?.length) { + for await (const mention of message.mentions) { + const name = await getUserNameCached(mention._id); + if (name) { + mention.name = name; } } } + } + + return message; +} - void broadcastCallback(message); +// TODO once the broadcast from file apps/meteor/server/modules/watchers/watchers.module.ts is removed +// this function can be renamed to broadcastMessage +export async function broadcastMessageFromData({ id, data }: { id: IMessage['_id']; data?: IMessage }): Promise { + // if db watchers are active, the event will be triggered automatically so we don't need to broadcast it here. + if (!dbWatchersDisabled) { + return; + } + const message = await getMessageToBroadcast({ id, data }); + if (!message) { + return; } -}; + void api.broadcast('watch.messages', { message }); +} diff --git a/apps/meteor/server/modules/watchers/watchers.module.ts b/apps/meteor/server/modules/watchers/watchers.module.ts index efa0866ab4a0..08d6d3c21a15 100644 --- a/apps/meteor/server/modules/watchers/watchers.module.ts +++ b/apps/meteor/server/modules/watchers/watchers.module.ts @@ -39,7 +39,7 @@ import { import { subscriptionFields, roomFields } from '../../../lib/publishFields'; import type { DatabaseWatcher } from '../../database/DatabaseWatcher'; -import { broadcastMessageSentEvent } from './lib/messages'; +import { getMessageToBroadcast } from './lib/messages'; type BroadcastCallback = (event: T, ...args: Parameters) => Promise; @@ -64,25 +64,21 @@ export function isWatcherRunning(): boolean { return watcherStarted; } -const messageWatcher = (watcher: DatabaseWatcher, broadcast: BroadcastCallback): void => { - watcher.on(Messages.getCollectionName(), async ({ clientAction, id, data }) => { - switch (clientAction) { - case 'inserted': - case 'updated': - void broadcastMessageSentEvent({ - id, - data, - broadcastCallback: (message) => broadcast('watch.messages', { clientAction, message }), - }); - break; - } - }); -}; - export function initWatchers(watcher: DatabaseWatcher, broadcast: BroadcastCallback): void { - const dbWatchersEnabled = !dbWatchersDisabled; - if (dbWatchersEnabled) { - messageWatcher(watcher, broadcast); + // watch for changes on the database and broadcast them to the other instances + if (!dbWatchersDisabled) { + watcher.on(Messages.getCollectionName(), async ({ clientAction, id, data }) => { + switch (clientAction) { + case 'inserted': + case 'updated': + const message = await getMessageToBroadcast({ id, data }); + if (!message) { + return; + } + void broadcast('watch.messages', { message }); + break; + } + }); } watcher.on(Subscriptions.getCollectionName(), async ({ clientAction, id, data, diff }) => { diff --git a/apps/meteor/server/services/messages/service.ts b/apps/meteor/server/services/messages/service.ts index f20c545f6abe..f0c30c359110 100644 --- a/apps/meteor/server/services/messages/service.ts +++ b/apps/meteor/server/services/messages/service.ts @@ -11,7 +11,7 @@ import { executeSetReaction } from '../../../app/reactions/server/setReaction'; import { settings } from '../../../app/settings/server'; import { getUserAvatarURL } from '../../../app/utils/server/getUserAvatarURL'; import { BeforeSaveCannedResponse } from '../../../ee/server/hooks/messages/BeforeSaveCannedResponse'; -import { broadcastMessageSentEvent } from '../../modules/watchers/lib/messages'; +import { broadcastMessageFromData } from '../../modules/watchers/lib/messages'; import { BeforeSaveBadWords } from './hooks/BeforeSaveBadWords'; import { BeforeSaveCheckMAC } from './hooks/BeforeSaveCheckMAC'; import { BeforeSaveJumpToMessage } from './hooks/BeforeSaveJumpToMessage'; @@ -121,9 +121,8 @@ export class MessageService extends ServiceClassInternal implements IMessageServ Rooms.incMsgCountById(rid, 1), ]); - void broadcastMessageSentEvent({ + void broadcastMessageFromData({ id: result.insertedId, - broadcastCallback: async (message) => this.api?.broadcast('message.sent', message), }); return result.insertedId; diff --git a/apps/meteor/server/services/meteor/service.ts b/apps/meteor/server/services/meteor/service.ts index 65d5807a8a66..2ed97981c842 100644 --- a/apps/meteor/server/services/meteor/service.ts +++ b/apps/meteor/server/services/meteor/service.ts @@ -1,4 +1,4 @@ -import { api, ServiceClassInternal, listenToMessageSentEvent } from '@rocket.chat/core-services'; +import { api, ServiceClassInternal } from '@rocket.chat/core-services'; import type { AutoUpdateRecord, IMeteor } from '@rocket.chat/core-services'; import type { ILivechatAgent, LoginServiceConfiguration, UserStatus } from '@rocket.chat/core-typings'; import { LoginServiceConfiguration as LoginServiceConfigurationModel, Users } from '@rocket.chat/models'; @@ -222,7 +222,7 @@ export class MeteorService extends ServiceClassInternal implements IMeteor { }); if (!disableMsgRoundtripTracking) { - listenToMessageSentEvent(this, async (message) => { + this.onEvent('watch.messages', async ({ message }) => { if (message?._updatedAt instanceof Date) { metrics.messageRoundtripTime.observe(Date.now() - message._updatedAt.getTime()); } diff --git a/apps/meteor/server/services/video-conference/service.ts b/apps/meteor/server/services/video-conference/service.ts index fb788c18542f..90a7a3302427 100644 --- a/apps/meteor/server/services/video-conference/service.ts +++ b/apps/meteor/server/services/video-conference/service.ts @@ -49,7 +49,7 @@ import { i18n } from '../../lib/i18n'; import { isRoomCompatibleWithVideoConfRinging } from '../../lib/isRoomCompatibleWithVideoConfRinging'; import { videoConfProviders } from '../../lib/videoConfProviders'; import { videoConfTypes } from '../../lib/videoConfTypes'; -import { broadcastMessageSentEvent } from '../../modules/watchers/lib/messages'; +import { broadcastMessageFromData } from '../../modules/watchers/lib/messages'; const { db } = MongoInternals.defaultRemoteCollectionDriver().mongo; @@ -328,9 +328,8 @@ export class VideoConfService extends ServiceClassInternal implements IVideoConf const text = i18n.t('video_livechat_missed', { username: name }); await Messages.setBlocksById(call.messages.started, [this.buildMessageBlock(text)]); - await broadcastMessageSentEvent({ + await broadcastMessageFromData({ id: call.messages.started, - broadcastCallback: (message) => api.broadcast('message.sent', message), }); } diff --git a/packages/core-services/src/events/Events.ts b/packages/core-services/src/events/Events.ts index dfe9b31ff8ce..5b14d78128bf 100644 --- a/packages/core-services/src/events/Events.ts +++ b/packages/core-services/src/events/Events.ts @@ -152,7 +152,7 @@ export type EventSignatures = { user: Pick; previousStatus: UserStatus | undefined; }): void; - 'watch.messages'(data: { clientAction: ClientAction; message: IMessage }): void; + 'watch.messages'(data: { message: IMessage }): void; 'watch.roles'( data: | { clientAction: Exclude; role: IRole } @@ -300,5 +300,4 @@ export type EventSignatures = { 'command.updated'(command: string): void; 'command.removed'(command: string): void; 'actions.changed'(): void; - 'message.sent'(message: IMessage): void; }; diff --git a/packages/core-services/src/events/listeners.ts b/packages/core-services/src/events/listeners.ts deleted file mode 100644 index ced986cb6f30..000000000000 --- a/packages/core-services/src/events/listeners.ts +++ /dev/null @@ -1,12 +0,0 @@ -import type { IMessage } from '@rocket.chat/core-typings'; - -import type { IServiceClass } from '../types/ServiceClass'; - -export const dbWatchersDisabled = ['yes', 'true'].includes(String(process.env.DISABLE_DB_WATCHERS).toLowerCase()); - -export const listenToMessageSentEvent = (service: IServiceClass, action: (message: IMessage) => Promise): void => { - if (dbWatchersDisabled) { - return service.onEvent('message.sent', (message: IMessage) => action(message)); - } - return service.onEvent('watch.messages', ({ message }) => action(message)); -}; diff --git a/packages/core-services/src/index.ts b/packages/core-services/src/index.ts index e84ecb9da9e1..a82318dc7133 100644 --- a/packages/core-services/src/index.ts +++ b/packages/core-services/src/index.ts @@ -52,7 +52,6 @@ export { asyncLocalStorage } from './lib/asyncLocalStorage'; export { MeteorError, isMeteorError } from './MeteorError'; export { api } from './api'; export { EventSignatures } from './events/Events'; -export { listenToMessageSentEvent, dbWatchersDisabled } from './events/listeners'; export { LocalBroker } from './LocalBroker'; export { IBroker, IBrokerNode, BaseMetricOptions, IServiceMetrics } from './types/IBroker'; @@ -137,6 +136,8 @@ export { IOmnichannelAnalyticsService, }; +export const dbWatchersDisabled = ['yes', 'true'].includes(String(process.env.DISABLE_DB_WATCHERS).toLowerCase()); + // TODO think in a way to not have to pass the service name to proxify here as well export const Authorization = proxifyWithWait('authorization'); export const Apps = proxifyWithWait('apps-engine');