Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix redis emitter #5474

Merged
merged 2 commits into from
Oct 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 76 additions & 54 deletions packages/@uppy/companion/src/server/emitter/redis-emitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,54 +20,69 @@ function replacer(key, value) {
module.exports = (redisClient, redisPubSubScope) => {
const prefix = redisPubSubScope ? `${redisPubSubScope}:` : ''
const getPrefixedEventName = (eventName) => `${prefix}${eventName}`
const publisher = redisClient.duplicate({ lazyConnect: true })
publisher.on('error', err => logger.error('publisher redis error', err.toString()))
/** @type {import('ioredis').Redis} */
let subscriber

const connectedPromise = publisher.connect().then(() => {
subscriber = publisher.duplicate()
subscriber.on('error', err => logger.error('subscriber redis error', err.toString()))
return subscriber.connect()
})

const handlersByEvent = new Map()

const errorEmitter = new EventEmitter()
const handleError = (err) => errorEmitter.emit('error', err)

connectedPromise.catch((err) => handleError(err))
async function makeRedis() {
const publisher = redisClient.duplicate({ lazyConnect: true })
publisher.on('error', err => logger.error('publisher redis error', err.toString()))
const subscriber = publisher.duplicate()
subscriber.on('error', err => logger.error('subscriber redis error', err.toString()))
await publisher.connect()
await subscriber.connect()
return { subscriber, publisher }
}

const redisPromise = makeRedis()
redisPromise.catch((err) => handleError(err))

/**
*
* @param {(a: Awaited<typeof redisPromise>) => void} fn
*/
async function runWhenConnected (fn) {
try {
await connectedPromise
await fn()
await fn(await redisPromise)
} catch (err) {
handleError(err)
}
}

// because each event can have multiple listeners, we need to keep track of them
/** @type {Map<string, Map<() => unknown, () => unknown>>} */
const handlersByEventName = new Map()

/**
* Remove an event listener
*
* @param {string} eventName name of the event
* @param {any} handler the handler of the event to remove
*/
function removeListener (eventName, handler) {
if (eventName === 'error') return errorEmitter.removeListener('error', handler)
async function removeListener (eventName, handler) {
if (eventName === 'error') {
errorEmitter.removeListener('error', handler)
return
}

return runWhenConnected(() => {
const handlersByThisEventName = handlersByEvent.get(eventName)
if (handlersByThisEventName == null) return undefined
const actualHandlerByHandler = handlersByEventName.get(eventName)
if (actualHandlerByHandler == null) return

const actualHandler = handlersByThisEventName.get(handler)
if (actualHandler == null) return undefined
const actualHandler = actualHandlerByHandler.get(handler)
if (actualHandler == null) return

handlersByThisEventName.delete(handler)
if (handlersByThisEventName.size === 0) handlersByEvent.delete(eventName)
actualHandlerByHandler.delete(handler)

const didRemoveLastListener = actualHandlerByHandler.size === 0
if (didRemoveLastListener) {
handlersByEventName.delete(eventName)
}

await runWhenConnected(async ({ subscriber }) => {
subscriber.off('pmessage', actualHandler)
return subscriber.punsubscribe(getPrefixedEventName(eventName))
if (didRemoveLastListener) {
await subscriber.punsubscribe(getPrefixedEventName(eventName))
}
})
}

Expand All @@ -77,7 +92,13 @@ module.exports = (redisClient, redisPubSubScope) => {
* @param {*} handler
* @param {*} _once
*/
function addListener (eventName, handler, _once = false) {
async function addListener (eventName, handler, _once = false) {
if (eventName === 'error') {
if (_once) errorEmitter.once('error', handler)
else errorEmitter.addListener('error', handler)
return
}

function actualHandler (pattern, channel, message) {
if (pattern !== getPrefixedEventName(eventName)) {
return
Expand All @@ -91,19 +112,20 @@ module.exports = (redisClient, redisPubSubScope) => {
handleError(new Error(`Invalid JSON received! Channel: ${eventName} Message: ${message}`))
return
}

handler(...args)
}

let handlersByThisEventName = handlersByEvent.get(eventName)
if (handlersByThisEventName == null) {
handlersByThisEventName = new WeakMap()
handlersByEvent.set(eventName, handlersByThisEventName)
let actualHandlerByHandler = handlersByEventName.get(eventName)
if (actualHandlerByHandler == null) {
actualHandlerByHandler = new Map()
handlersByEventName.set(eventName, actualHandlerByHandler)
}
handlersByThisEventName.set(handler, actualHandler)
actualHandlerByHandler.set(handler, actualHandler)

runWhenConnected(() => {
await runWhenConnected(async ({ subscriber }) => {
subscriber.on('pmessage', actualHandler)
return subscriber.psubscribe(getPrefixedEventName(eventName))
await subscriber.psubscribe(getPrefixedEventName(eventName))
})
}

Expand All @@ -113,10 +135,8 @@ module.exports = (redisClient, redisPubSubScope) => {
* @param {string} eventName name of the event
* @param {any} handler the handler of the event
*/
function on (eventName, handler) {
if (eventName === 'error') return errorEmitter.on('error', handler)

return addListener(eventName, handler)
async function on (eventName, handler) {
await addListener(eventName, handler)
}

/**
Expand All @@ -125,8 +145,8 @@ module.exports = (redisClient, redisPubSubScope) => {
* @param {string} eventName name of the event
* @param {any} handler the handler of the event
*/
function off (eventName, handler) {
return removeListener(eventName, handler)
async function off (eventName, handler) {
await removeListener(eventName, handler)
}

/**
Expand All @@ -135,36 +155,38 @@ module.exports = (redisClient, redisPubSubScope) => {
* @param {string} eventName name of the event
* @param {any} handler the handler of the event
*/
function once (eventName, handler) {
if (eventName === 'error') return errorEmitter.once('error', handler)

return addListener(eventName, handler, true)
async function once (eventName, handler) {
await addListener(eventName, handler, true)
}

/**
* Announce the occurrence of an event
*
* @param {string} eventName name of the event
*/
function emit (eventName, ...args) {
runWhenConnected(
() => publisher.publish(getPrefixedEventName(eventName),
safeStringify(args, replacer)),
)
async function emit (eventName, ...args) {
await runWhenConnected(async ({ publisher }) => (
publisher.publish(getPrefixedEventName(eventName), safeStringify(args, replacer))
))
}

/**
* Remove all listeners of an event
*
* @param {string} eventName name of the event
*/
function removeAllListeners (eventName) {
if (eventName === 'error') return errorEmitter.removeAllListeners(eventName)
async function removeAllListeners (eventName) {
if (eventName === 'error') {
errorEmitter.removeAllListeners(eventName)
return
}

return runWhenConnected(() => {
handlersByEvent.delete(eventName)
return subscriber.punsubscribe(getPrefixedEventName(eventName))
})
const actualHandlerByHandler = handlersByEventName.get(eventName)
if (actualHandlerByHandler != null) {
for (const handler of actualHandlerByHandler.keys()) {
await removeListener(eventName, handler)
}
}
}

return {
Expand Down
Loading