From 73224133941b8f1911d8b74e9d881d9d1bfd303c Mon Sep 17 00:00:00 2001 From: Mikael Finstad Date: Mon, 7 Oct 2024 13:35:18 +0200 Subject: [PATCH] Fix redis emitter (#5474) * fix redis emitter it had multiple issues: - memory leak in removeAllListeners - eventName === 'error' wasn't consistently handled * rename and add comment --- .../src/server/emitter/redis-emitter.js | 130 ++++++++++-------- 1 file changed, 76 insertions(+), 54 deletions(-) diff --git a/packages/@uppy/companion/src/server/emitter/redis-emitter.js b/packages/@uppy/companion/src/server/emitter/redis-emitter.js index aa62f61baf..0aa145432b 100644 --- a/packages/@uppy/companion/src/server/emitter/redis-emitter.js +++ b/packages/@uppy/companion/src/server/emitter/redis-emitter.js @@ -20,54 +20,69 @@ function replacer(key, value) { module.exports = (redisClient, redisPubSubScope) => { const prefix = redisPubSubScope ? `${redisPubSubScope}:` : '' const getPrefixedEventName = (eventName) => `${prefix}${eventName}` - const publisher = redisClient.duplicate({ lazyConnect: true }) - publisher.on('error', err => logger.error('publisher redis error', err.toString())) - /** @type {import('ioredis').Redis} */ - let subscriber - - const connectedPromise = publisher.connect().then(() => { - subscriber = publisher.duplicate() - subscriber.on('error', err => logger.error('subscriber redis error', err.toString())) - return subscriber.connect() - }) - - const handlersByEvent = new Map() const errorEmitter = new EventEmitter() const handleError = (err) => errorEmitter.emit('error', err) - connectedPromise.catch((err) => handleError(err)) + async function makeRedis() { + const publisher = redisClient.duplicate({ lazyConnect: true }) + publisher.on('error', err => logger.error('publisher redis error', err.toString())) + const subscriber = publisher.duplicate() + subscriber.on('error', err => logger.error('subscriber redis error', err.toString())) + await publisher.connect() + await subscriber.connect() + return { subscriber, publisher } + } + + const redisPromise = makeRedis() + redisPromise.catch((err) => handleError(err)) + /** + * + * @param {(a: Awaited) => void} fn + */ async function runWhenConnected (fn) { try { - await connectedPromise - await fn() + await fn(await redisPromise) } catch (err) { handleError(err) } } + // because each event can have multiple listeners, we need to keep track of them + /** @type {Map unknown, () => unknown>>} */ + const handlersByEventName = new Map() + /** * Remove an event listener * * @param {string} eventName name of the event * @param {any} handler the handler of the event to remove */ - function removeListener (eventName, handler) { - if (eventName === 'error') return errorEmitter.removeListener('error', handler) + async function removeListener (eventName, handler) { + if (eventName === 'error') { + errorEmitter.removeListener('error', handler) + return + } - return runWhenConnected(() => { - const handlersByThisEventName = handlersByEvent.get(eventName) - if (handlersByThisEventName == null) return undefined + const actualHandlerByHandler = handlersByEventName.get(eventName) + if (actualHandlerByHandler == null) return - const actualHandler = handlersByThisEventName.get(handler) - if (actualHandler == null) return undefined + const actualHandler = actualHandlerByHandler.get(handler) + if (actualHandler == null) return - handlersByThisEventName.delete(handler) - if (handlersByThisEventName.size === 0) handlersByEvent.delete(eventName) + actualHandlerByHandler.delete(handler) + const didRemoveLastListener = actualHandlerByHandler.size === 0 + if (didRemoveLastListener) { + handlersByEventName.delete(eventName) + } + + await runWhenConnected(async ({ subscriber }) => { subscriber.off('pmessage', actualHandler) - return subscriber.punsubscribe(getPrefixedEventName(eventName)) + if (didRemoveLastListener) { + await subscriber.punsubscribe(getPrefixedEventName(eventName)) + } }) } @@ -77,7 +92,13 @@ module.exports = (redisClient, redisPubSubScope) => { * @param {*} handler * @param {*} _once */ - function addListener (eventName, handler, _once = false) { + async function addListener (eventName, handler, _once = false) { + if (eventName === 'error') { + if (_once) errorEmitter.once('error', handler) + else errorEmitter.addListener('error', handler) + return + } + function actualHandler (pattern, channel, message) { if (pattern !== getPrefixedEventName(eventName)) { return @@ -91,19 +112,20 @@ module.exports = (redisClient, redisPubSubScope) => { handleError(new Error(`Invalid JSON received! Channel: ${eventName} Message: ${message}`)) return } + handler(...args) } - let handlersByThisEventName = handlersByEvent.get(eventName) - if (handlersByThisEventName == null) { - handlersByThisEventName = new WeakMap() - handlersByEvent.set(eventName, handlersByThisEventName) + let actualHandlerByHandler = handlersByEventName.get(eventName) + if (actualHandlerByHandler == null) { + actualHandlerByHandler = new Map() + handlersByEventName.set(eventName, actualHandlerByHandler) } - handlersByThisEventName.set(handler, actualHandler) + actualHandlerByHandler.set(handler, actualHandler) - runWhenConnected(() => { + await runWhenConnected(async ({ subscriber }) => { subscriber.on('pmessage', actualHandler) - return subscriber.psubscribe(getPrefixedEventName(eventName)) + await subscriber.psubscribe(getPrefixedEventName(eventName)) }) } @@ -113,10 +135,8 @@ module.exports = (redisClient, redisPubSubScope) => { * @param {string} eventName name of the event * @param {any} handler the handler of the event */ - function on (eventName, handler) { - if (eventName === 'error') return errorEmitter.on('error', handler) - - return addListener(eventName, handler) + async function on (eventName, handler) { + await addListener(eventName, handler) } /** @@ -125,8 +145,8 @@ module.exports = (redisClient, redisPubSubScope) => { * @param {string} eventName name of the event * @param {any} handler the handler of the event */ - function off (eventName, handler) { - return removeListener(eventName, handler) + async function off (eventName, handler) { + await removeListener(eventName, handler) } /** @@ -135,10 +155,8 @@ module.exports = (redisClient, redisPubSubScope) => { * @param {string} eventName name of the event * @param {any} handler the handler of the event */ - function once (eventName, handler) { - if (eventName === 'error') return errorEmitter.once('error', handler) - - return addListener(eventName, handler, true) + async function once (eventName, handler) { + await addListener(eventName, handler, true) } /** @@ -146,11 +164,10 @@ module.exports = (redisClient, redisPubSubScope) => { * * @param {string} eventName name of the event */ - function emit (eventName, ...args) { - runWhenConnected( - () => publisher.publish(getPrefixedEventName(eventName), - safeStringify(args, replacer)), - ) + async function emit (eventName, ...args) { + await runWhenConnected(async ({ publisher }) => ( + publisher.publish(getPrefixedEventName(eventName), safeStringify(args, replacer)) + )) } /** @@ -158,13 +175,18 @@ module.exports = (redisClient, redisPubSubScope) => { * * @param {string} eventName name of the event */ - function removeAllListeners (eventName) { - if (eventName === 'error') return errorEmitter.removeAllListeners(eventName) + async function removeAllListeners (eventName) { + if (eventName === 'error') { + errorEmitter.removeAllListeners(eventName) + return + } - return runWhenConnected(() => { - handlersByEvent.delete(eventName) - return subscriber.punsubscribe(getPrefixedEventName(eventName)) - }) + const actualHandlerByHandler = handlersByEventName.get(eventName) + if (actualHandlerByHandler != null) { + for (const handler of actualHandlerByHandler.keys()) { + await removeListener(eventName, handler) + } + } } return {