-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix redis emitter #5474
Fix redis emitter #5474
Conversation
it had multiple issues: - memory leak in removeAllListeners - eventName === 'error' wasn't consistently handled
Diff output filesdiff --git a/packages/@uppy/companion/lib/server/emitter/redis-emitter.d.ts b/packages/@uppy/companion/lib/server/emitter/redis-emitter.d.ts
index 0d502d9..e64f5f0 100644
--- a/packages/@uppy/companion/lib/server/emitter/redis-emitter.d.ts
+++ b/packages/@uppy/companion/lib/server/emitter/redis-emitter.d.ts
@@ -1,10 +1,9 @@
declare function _exports(redisClient: import('ioredis').Redis, redisPubSubScope: string): {
- on: (eventName: string, handler: any) => void | EventEmitter<[never]>;
- off: (eventName: string, handler: any) => Promise<void> | EventEmitter<[never]>;
- once: (eventName: string, handler: any) => void | EventEmitter<[never]>;
- emit: (eventName: string, ...args: any[]) => void;
- removeListener: (eventName: string, handler: any) => Promise<void> | EventEmitter<[never]>;
- removeAllListeners: (eventName: string) => Promise<void> | EventEmitter<[never]>;
+ on: (eventName: string, handler: any) => Promise<void>;
+ off: (eventName: string, handler: any) => Promise<void>;
+ once: (eventName: string, handler: any) => Promise<void>;
+ emit: (eventName: string, ...args: any[]) => Promise<void>;
+ removeListener: (eventName: string, handler: any) => Promise<void>;
+ removeAllListeners: (eventName: string) => Promise<void>;
};
export = _exports;
-import { EventEmitter } from "events";
diff --git a/packages/@uppy/companion/lib/server/emitter/redis-emitter.js b/packages/@uppy/companion/lib/server/emitter/redis-emitter.js
index 62796bc..84b1eb7 100644
--- a/packages/@uppy/companion/lib/server/emitter/redis-emitter.js
+++ b/packages/@uppy/companion/lib/server/emitter/redis-emitter.js
@@ -19,52 +19,61 @@ function replacer(key, value) {
module.exports = (redisClient, redisPubSubScope) => {
const prefix = redisPubSubScope ? `${redisPubSubScope}:` : "";
const getPrefixedEventName = (eventName) => `${prefix}${eventName}`;
- const publisher = redisClient.duplicate({ lazyConnect: true });
- publisher.on("error", err => logger.error("publisher redis error", err.toString()));
- /** @type {import('ioredis').Redis} */
- let subscriber;
- const connectedPromise = publisher.connect().then(() => {
- subscriber = publisher.duplicate();
- subscriber.on("error", err => logger.error("subscriber redis error", err.toString()));
- return subscriber.connect();
- });
- const handlersByEvent = new Map();
const errorEmitter = new EventEmitter();
const handleError = (err) => errorEmitter.emit("error", err);
- connectedPromise.catch((err) => handleError(err));
+ async function makeRedis() {
+ const publisher = redisClient.duplicate({ lazyConnect: true });
+ publisher.on("error", err => logger.error("publisher redis error", err.toString()));
+ const subscriber = publisher.duplicate();
+ subscriber.on("error", err => logger.error("subscriber redis error", err.toString()));
+ await publisher.connect();
+ await subscriber.connect();
+ return { subscriber, publisher };
+ }
+ const redisPromise = makeRedis();
+ redisPromise.catch((err) => handleError(err));
+ /**
+ * @param {(a: Awaited<typeof redisPromise>) => void} fn
+ */
async function runWhenConnected(fn) {
try {
- await connectedPromise;
- await fn();
+ await fn(await redisPromise);
} catch (err) {
handleError(err);
}
}
+ // because each event can have multiple listeners, we need to keep track of them
+ /** @type {Map<string, Map<() => unknown, () => unknown>>} */
+ const handlersByEventName = new Map();
/**
* Remove an event listener
*
* @param {string} eventName name of the event
* @param {any} handler the handler of the event to remove
*/
- function removeListener(eventName, handler) {
+ async function removeListener(eventName, handler) {
if (eventName === "error") {
- return errorEmitter.removeListener("error", handler);
+ errorEmitter.removeListener("error", handler);
+ return;
}
- return runWhenConnected(() => {
- const handlersByThisEventName = handlersByEvent.get(eventName);
- if (handlersByThisEventName == null) {
- return undefined;
- }
- const actualHandler = handlersByThisEventName.get(handler);
- if (actualHandler == null) {
- return undefined;
- }
- handlersByThisEventName.delete(handler);
- if (handlersByThisEventName.size === 0) {
- handlersByEvent.delete(eventName);
- }
+ const actualHandlerByHandler = handlersByEventName.get(eventName);
+ if (actualHandlerByHandler == null) {
+ return;
+ }
+ const actualHandler = actualHandlerByHandler.get(handler);
+ if (actualHandler == null) {
+ return;
+ }
+ actualHandlerByHandler.delete(handler);
+ const didRemoveLastListener = actualHandlerByHandler.size === 0;
+ if (didRemoveLastListener) {
+ handlersByEventName.delete(eventName);
+ }
+ await runWhenConnected(async ({ subscriber }) => {
subscriber.off("pmessage", actualHandler);
- return subscriber.punsubscribe(getPrefixedEventName(eventName));
+ if (didRemoveLastListener) {
+ await subscriber.punsubscribe(getPrefixedEventName(eventName));
+ }
});
}
/**
@@ -72,7 +81,15 @@ module.exports = (redisClient, redisPubSubScope) => {
* @param {*} handler
* @param {*} _once
*/
- function addListener(eventName, handler, _once = false) {
+ async function addListener(eventName, handler, _once = false) {
+ if (eventName === "error") {
+ if (_once) {
+ errorEmitter.once("error", handler);
+ } else {
+ errorEmitter.addListener("error", handler);
+ }
+ return;
+ }
function actualHandler(pattern, channel, message) {
if (pattern !== getPrefixedEventName(eventName)) {
return;
@@ -89,15 +106,15 @@ module.exports = (redisClient, redisPubSubScope) => {
}
handler(...args);
}
- let handlersByThisEventName = handlersByEvent.get(eventName);
- if (handlersByThisEventName == null) {
- handlersByThisEventName = new WeakMap();
- handlersByEvent.set(eventName, handlersByThisEventName);
+ let actualHandlerByHandler = handlersByEventName.get(eventName);
+ if (actualHandlerByHandler == null) {
+ actualHandlerByHandler = new Map();
+ handlersByEventName.set(eventName, actualHandlerByHandler);
}
- handlersByThisEventName.set(handler, actualHandler);
- runWhenConnected(() => {
+ actualHandlerByHandler.set(handler, actualHandler);
+ await runWhenConnected(async ({ subscriber }) => {
subscriber.on("pmessage", actualHandler);
- return subscriber.psubscribe(getPrefixedEventName(eventName));
+ await subscriber.psubscribe(getPrefixedEventName(eventName));
});
}
/**
@@ -106,11 +123,8 @@ module.exports = (redisClient, redisPubSubScope) => {
* @param {string} eventName name of the event
* @param {any} handler the handler of the event
*/
- function on(eventName, handler) {
- if (eventName === "error") {
- return errorEmitter.on("error", handler);
- }
- return addListener(eventName, handler);
+ async function on(eventName, handler) {
+ await addListener(eventName, handler);
}
/**
* Remove an event listener
@@ -118,8 +132,8 @@ module.exports = (redisClient, redisPubSubScope) => {
* @param {string} eventName name of the event
* @param {any} handler the handler of the event
*/
- function off(eventName, handler) {
- return removeListener(eventName, handler);
+ async function off(eventName, handler) {
+ await removeListener(eventName, handler);
}
/**
* Add an event listener (will be triggered at most once)
@@ -127,33 +141,35 @@ module.exports = (redisClient, redisPubSubScope) => {
* @param {string} eventName name of the event
* @param {any} handler the handler of the event
*/
- function once(eventName, handler) {
- if (eventName === "error") {
- return errorEmitter.once("error", handler);
- }
- return addListener(eventName, handler, true);
+ async function once(eventName, handler) {
+ await addListener(eventName, handler, true);
}
/**
* Announce the occurrence of an event
*
* @param {string} eventName name of the event
*/
- function emit(eventName, ...args) {
- runWhenConnected(() => publisher.publish(getPrefixedEventName(eventName), safeStringify(args, replacer)));
+ async function emit(eventName, ...args) {
+ await runWhenConnected(async (
+ { publisher },
+ ) => (publisher.publish(getPrefixedEventName(eventName), safeStringify(args, replacer))));
}
/**
* Remove all listeners of an event
*
* @param {string} eventName name of the event
*/
- function removeAllListeners(eventName) {
+ async function removeAllListeners(eventName) {
if (eventName === "error") {
- return errorEmitter.removeAllListeners(eventName);
+ errorEmitter.removeAllListeners(eventName);
+ return;
+ }
+ const actualHandlerByHandler = handlersByEventName.get(eventName);
+ if (actualHandlerByHandler != null) {
+ for (const handler of actualHandlerByHandler.keys()) {
+ await removeListener(eventName, handler);
+ }
}
- return runWhenConnected(() => {
- handlersByEvent.delete(eventName);
- return subscriber.punsubscribe(getPrefixedEventName(eventName));
- });
}
return {
on, |
return runWhenConnected(() => { | ||
const handlersByThisEventName = handlersByEvent.get(eventName) | ||
if (handlersByThisEventName == null) return undefined | ||
const thisEventNameActualHandlerByHandler = handlersByEvent.get(eventName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actual handler by handler? 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it's a map that maps handler -> actual handler, meaning you can get the actual handler by providing a handler.
or in TS: Map<() => void, () => void>
where the key is the handler and the value is the actualHandler.
This naming was there before and i didn't have any better suggestion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you write some docs above the Map
why it is needed and why it has this structure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thinking about it more, this feels off to me? Why do we need handler -> handler? Is it because you have multiple handlers per event type? Maybe name it subscribers then and array of handlers per event name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because the api allows you to removeListener for a specific handler, then we need to keep a reference to that handler. i believe the redis-emitter's api mimics the EventEmitter API, and that api allows removing a specific handler from an event name. That's why it's implemented like this. We could probably change the API but that's a bigger change and potentially breaking, so maybe that could be done in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a comment and renamed the variables a bit
* main: Fix redis emitter (#5474)
* main: (65 commits) `.handleInputChange()` - use `.currentTarget`; clear the input using `''` (#5381) build(deps): bump @blakeembrey/template from 1.1.0 to 1.2.0 (#5448) Update packages/@uppy/locales/src/fr_FR.ts (#5472) @uppy/svelte: use SvelteKit as the build tool (#5484) @uppy/xhr-upload: add response to upload-error callback (#5486) tus: Avoid duplicate `upload-error` event (#5485) Fix redis emitter (#5474) build(deps): bump docker/build-push-action from 6.8.0 to 6.9.0 (#5483) Release: [email protected] (#5479) @uppy/transloadit: fix multiple upload batches & run again (#5478) build(deps): bump docker/build-push-action from 6.7.0 to 6.8.0 (#5477) build(deps): bump vite from 5.2.11 to 5.4.8 (#5471) build(deps-dev): bump rollup from 4.18.0 to 4.22.4 (#5470) build(deps): bump vite from 5.2.11 to 5.4.6 (#5466) Release: [email protected] (#5467) @uppy/tus: fix retry check for status code 400 (#5461) meta: fix AwsS3 endpoint option in private/dev build(deps): bump body-parser from 1.20.2 to 1.20.3 (#5462) build(deps-dev): bump vite from 5.3.1 to 5.3.6 (#5459) @uppy/tus: set response from tus-js-client (#5456) ...
| Package | Version | Package | Version | | ---------------- | ------- | ---------------- | ------- | | @uppy/companion | 5.1.2 | @uppy/svelte | 4.1.0 | | @uppy/core | 4.2.2 | @uppy/tus | 4.1.2 | | @uppy/dashboard | 4.1.1 | @uppy/utils | 6.0.3 | | @uppy/drag-drop | 4.0.3 | @uppy/xhr-upload | 4.2.1 | | @uppy/file-input | 4.0.2 | uppy | 4.5.0 | | @uppy/locales | 4.2.0 | | | - @uppy/dashboard: Dashboard - convert some files to typescript (Evgenia Karunus / #5367) - @uppy/dashboard,@uppy/drag-drop,@uppy/file-input: `.handleInputChange()` - use `.currentTarget`; clear the input using `''` (Evgenia Karunus / #5381) - meta: build(deps): bump @blakeembrey/template from 1.1.0 to 1.2.0 (dependabot[bot] / #5448) - @uppy/locales: Update packages/@uppy/locales/src/fr_FR.ts (Zéfyx / #5472) - @uppy/svelte: use SvelteKit as the build tool (Merlijn Vos / #5484) - @uppy/xhr-upload: add response to upload-error callback (Caleb Hardin / #5486) - @uppy/tus: tus: Avoid duplicate `upload-error` event (Marius / #5485) - @uppy/companion: Fix redis emitter (Mikael Finstad / #5474) - meta: build(deps): bump docker/build-push-action from 6.8.0 to 6.9.0 (dependabot[bot] / #5483)
it had multiple issues:
I believe the memory leak might have been introduced in #4623
memory leak from prod: