Skip to content

Commit

Permalink
Fix redis emitter (#5474)
Browse files Browse the repository at this point in the history
* fix redis emitter

it had multiple issues:
- memory leak in removeAllListeners
- eventName === 'error' wasn't consistently handled

* rename and add comment
  • Loading branch information
mifi authored Oct 7, 2024
1 parent 8ff9039 commit 7322413
Showing 1 changed file with 76 additions and 54 deletions.
130 changes: 76 additions & 54 deletions packages/@uppy/companion/src/server/emitter/redis-emitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,54 +20,69 @@ function replacer(key, value) {
module.exports = (redisClient, redisPubSubScope) => {
const prefix = redisPubSubScope ? `${redisPubSubScope}:` : ''
const getPrefixedEventName = (eventName) => `${prefix}${eventName}`
const publisher = redisClient.duplicate({ lazyConnect: true })
publisher.on('error', err => logger.error('publisher redis error', err.toString()))
/** @type {import('ioredis').Redis} */
let subscriber

const connectedPromise = publisher.connect().then(() => {
subscriber = publisher.duplicate()
subscriber.on('error', err => logger.error('subscriber redis error', err.toString()))
return subscriber.connect()
})

const handlersByEvent = new Map()

const errorEmitter = new EventEmitter()
const handleError = (err) => errorEmitter.emit('error', err)

connectedPromise.catch((err) => handleError(err))
async function makeRedis() {
const publisher = redisClient.duplicate({ lazyConnect: true })
publisher.on('error', err => logger.error('publisher redis error', err.toString()))
const subscriber = publisher.duplicate()
subscriber.on('error', err => logger.error('subscriber redis error', err.toString()))
await publisher.connect()
await subscriber.connect()
return { subscriber, publisher }
}

const redisPromise = makeRedis()
redisPromise.catch((err) => handleError(err))

/**
*
* @param {(a: Awaited<typeof redisPromise>) => void} fn
*/
async function runWhenConnected (fn) {
try {
await connectedPromise
await fn()
await fn(await redisPromise)
} catch (err) {
handleError(err)
}
}

// because each event can have multiple listeners, we need to keep track of them
/** @type {Map<string, Map<() => unknown, () => unknown>>} */
const handlersByEventName = new Map()

/**
* Remove an event listener
*
* @param {string} eventName name of the event
* @param {any} handler the handler of the event to remove
*/
function removeListener (eventName, handler) {
if (eventName === 'error') return errorEmitter.removeListener('error', handler)
async function removeListener (eventName, handler) {
if (eventName === 'error') {
errorEmitter.removeListener('error', handler)
return
}

return runWhenConnected(() => {
const handlersByThisEventName = handlersByEvent.get(eventName)
if (handlersByThisEventName == null) return undefined
const actualHandlerByHandler = handlersByEventName.get(eventName)
if (actualHandlerByHandler == null) return

const actualHandler = handlersByThisEventName.get(handler)
if (actualHandler == null) return undefined
const actualHandler = actualHandlerByHandler.get(handler)
if (actualHandler == null) return

handlersByThisEventName.delete(handler)
if (handlersByThisEventName.size === 0) handlersByEvent.delete(eventName)
actualHandlerByHandler.delete(handler)

const didRemoveLastListener = actualHandlerByHandler.size === 0
if (didRemoveLastListener) {
handlersByEventName.delete(eventName)
}

await runWhenConnected(async ({ subscriber }) => {
subscriber.off('pmessage', actualHandler)
return subscriber.punsubscribe(getPrefixedEventName(eventName))
if (didRemoveLastListener) {
await subscriber.punsubscribe(getPrefixedEventName(eventName))
}
})
}

Expand All @@ -77,7 +92,13 @@ module.exports = (redisClient, redisPubSubScope) => {
* @param {*} handler
* @param {*} _once
*/
function addListener (eventName, handler, _once = false) {
async function addListener (eventName, handler, _once = false) {
if (eventName === 'error') {
if (_once) errorEmitter.once('error', handler)
else errorEmitter.addListener('error', handler)
return
}

function actualHandler (pattern, channel, message) {
if (pattern !== getPrefixedEventName(eventName)) {
return
Expand All @@ -91,19 +112,20 @@ module.exports = (redisClient, redisPubSubScope) => {
handleError(new Error(`Invalid JSON received! Channel: ${eventName} Message: ${message}`))
return
}

handler(...args)
}

let handlersByThisEventName = handlersByEvent.get(eventName)
if (handlersByThisEventName == null) {
handlersByThisEventName = new WeakMap()
handlersByEvent.set(eventName, handlersByThisEventName)
let actualHandlerByHandler = handlersByEventName.get(eventName)
if (actualHandlerByHandler == null) {
actualHandlerByHandler = new Map()
handlersByEventName.set(eventName, actualHandlerByHandler)
}
handlersByThisEventName.set(handler, actualHandler)
actualHandlerByHandler.set(handler, actualHandler)

runWhenConnected(() => {
await runWhenConnected(async ({ subscriber }) => {
subscriber.on('pmessage', actualHandler)
return subscriber.psubscribe(getPrefixedEventName(eventName))
await subscriber.psubscribe(getPrefixedEventName(eventName))
})
}

Expand All @@ -113,10 +135,8 @@ module.exports = (redisClient, redisPubSubScope) => {
* @param {string} eventName name of the event
* @param {any} handler the handler of the event
*/
function on (eventName, handler) {
if (eventName === 'error') return errorEmitter.on('error', handler)

return addListener(eventName, handler)
async function on (eventName, handler) {
await addListener(eventName, handler)
}

/**
Expand All @@ -125,8 +145,8 @@ module.exports = (redisClient, redisPubSubScope) => {
* @param {string} eventName name of the event
* @param {any} handler the handler of the event
*/
function off (eventName, handler) {
return removeListener(eventName, handler)
async function off (eventName, handler) {
await removeListener(eventName, handler)
}

/**
Expand All @@ -135,36 +155,38 @@ module.exports = (redisClient, redisPubSubScope) => {
* @param {string} eventName name of the event
* @param {any} handler the handler of the event
*/
function once (eventName, handler) {
if (eventName === 'error') return errorEmitter.once('error', handler)

return addListener(eventName, handler, true)
async function once (eventName, handler) {
await addListener(eventName, handler, true)
}

/**
* Announce the occurrence of an event
*
* @param {string} eventName name of the event
*/
function emit (eventName, ...args) {
runWhenConnected(
() => publisher.publish(getPrefixedEventName(eventName),
safeStringify(args, replacer)),
)
async function emit (eventName, ...args) {
await runWhenConnected(async ({ publisher }) => (
publisher.publish(getPrefixedEventName(eventName), safeStringify(args, replacer))
))
}

/**
* Remove all listeners of an event
*
* @param {string} eventName name of the event
*/
function removeAllListeners (eventName) {
if (eventName === 'error') return errorEmitter.removeAllListeners(eventName)
async function removeAllListeners (eventName) {
if (eventName === 'error') {
errorEmitter.removeAllListeners(eventName)
return
}

return runWhenConnected(() => {
handlersByEvent.delete(eventName)
return subscriber.punsubscribe(getPrefixedEventName(eventName))
})
const actualHandlerByHandler = handlersByEventName.get(eventName)
if (actualHandlerByHandler != null) {
for (const handler of actualHandlerByHandler.keys()) {
await removeListener(eventName, handler)
}
}
}

return {
Expand Down

0 comments on commit 7322413

Please sign in to comment.