Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix redis emitter #5474

Merged
merged 2 commits into from
Oct 7, 2024
Merged

Fix redis emitter #5474

merged 2 commits into from
Oct 7, 2024

Conversation

mifi
Copy link
Contributor

@mifi mifi commented Sep 28, 2024

it had multiple issues:

  • memory leak in removeAllListeners
  • eventName === 'error' wasn't consistently handled

I believe the memory leak might have been introduced in #4623

memory leak from prod:
Screenshot 2024-09-28 at 10 56 26

it had multiple issues:
- memory leak in removeAllListeners
- eventName === 'error' wasn't consistently handled
Copy link
Contributor

github-actions bot commented Sep 28, 2024

Diff output files
diff --git a/packages/@uppy/companion/lib/server/emitter/redis-emitter.d.ts b/packages/@uppy/companion/lib/server/emitter/redis-emitter.d.ts
index 0d502d9..e64f5f0 100644
--- a/packages/@uppy/companion/lib/server/emitter/redis-emitter.d.ts
+++ b/packages/@uppy/companion/lib/server/emitter/redis-emitter.d.ts
@@ -1,10 +1,9 @@
 declare function _exports(redisClient: import('ioredis').Redis, redisPubSubScope: string): {
-    on: (eventName: string, handler: any) => void | EventEmitter<[never]>;
-    off: (eventName: string, handler: any) => Promise<void> | EventEmitter<[never]>;
-    once: (eventName: string, handler: any) => void | EventEmitter<[never]>;
-    emit: (eventName: string, ...args: any[]) => void;
-    removeListener: (eventName: string, handler: any) => Promise<void> | EventEmitter<[never]>;
-    removeAllListeners: (eventName: string) => Promise<void> | EventEmitter<[never]>;
+    on: (eventName: string, handler: any) => Promise<void>;
+    off: (eventName: string, handler: any) => Promise<void>;
+    once: (eventName: string, handler: any) => Promise<void>;
+    emit: (eventName: string, ...args: any[]) => Promise<void>;
+    removeListener: (eventName: string, handler: any) => Promise<void>;
+    removeAllListeners: (eventName: string) => Promise<void>;
 };
 export = _exports;
-import { EventEmitter } from "events";
diff --git a/packages/@uppy/companion/lib/server/emitter/redis-emitter.js b/packages/@uppy/companion/lib/server/emitter/redis-emitter.js
index 62796bc..84b1eb7 100644
--- a/packages/@uppy/companion/lib/server/emitter/redis-emitter.js
+++ b/packages/@uppy/companion/lib/server/emitter/redis-emitter.js
@@ -19,52 +19,61 @@ function replacer(key, value) {
 module.exports = (redisClient, redisPubSubScope) => {
   const prefix = redisPubSubScope ? `${redisPubSubScope}:` : "";
   const getPrefixedEventName = (eventName) => `${prefix}${eventName}`;
-  const publisher = redisClient.duplicate({ lazyConnect: true });
-  publisher.on("error", err => logger.error("publisher redis error", err.toString()));
-  /** @type {import('ioredis').Redis} */
-  let subscriber;
-  const connectedPromise = publisher.connect().then(() => {
-    subscriber = publisher.duplicate();
-    subscriber.on("error", err => logger.error("subscriber redis error", err.toString()));
-    return subscriber.connect();
-  });
-  const handlersByEvent = new Map();
   const errorEmitter = new EventEmitter();
   const handleError = (err) => errorEmitter.emit("error", err);
-  connectedPromise.catch((err) => handleError(err));
+  async function makeRedis() {
+    const publisher = redisClient.duplicate({ lazyConnect: true });
+    publisher.on("error", err => logger.error("publisher redis error", err.toString()));
+    const subscriber = publisher.duplicate();
+    subscriber.on("error", err => logger.error("subscriber redis error", err.toString()));
+    await publisher.connect();
+    await subscriber.connect();
+    return { subscriber, publisher };
+  }
+  const redisPromise = makeRedis();
+  redisPromise.catch((err) => handleError(err));
+  /**
+   * @param {(a: Awaited<typeof redisPromise>) => void} fn
+   */
   async function runWhenConnected(fn) {
     try {
-      await connectedPromise;
-      await fn();
+      await fn(await redisPromise);
     } catch (err) {
       handleError(err);
     }
   }
+  // because each event can have multiple listeners, we need to keep track of them
+  /** @type {Map<string, Map<() => unknown, () => unknown>>} */
+  const handlersByEventName = new Map();
   /**
    * Remove an event listener
    *
    * @param {string} eventName name of the event
    * @param {any} handler the handler of the event to remove
    */
-  function removeListener(eventName, handler) {
+  async function removeListener(eventName, handler) {
     if (eventName === "error") {
-      return errorEmitter.removeListener("error", handler);
+      errorEmitter.removeListener("error", handler);
+      return;
     }
-    return runWhenConnected(() => {
-      const handlersByThisEventName = handlersByEvent.get(eventName);
-      if (handlersByThisEventName == null) {
-        return undefined;
-      }
-      const actualHandler = handlersByThisEventName.get(handler);
-      if (actualHandler == null) {
-        return undefined;
-      }
-      handlersByThisEventName.delete(handler);
-      if (handlersByThisEventName.size === 0) {
-        handlersByEvent.delete(eventName);
-      }
+    const actualHandlerByHandler = handlersByEventName.get(eventName);
+    if (actualHandlerByHandler == null) {
+      return;
+    }
+    const actualHandler = actualHandlerByHandler.get(handler);
+    if (actualHandler == null) {
+      return;
+    }
+    actualHandlerByHandler.delete(handler);
+    const didRemoveLastListener = actualHandlerByHandler.size === 0;
+    if (didRemoveLastListener) {
+      handlersByEventName.delete(eventName);
+    }
+    await runWhenConnected(async ({ subscriber }) => {
       subscriber.off("pmessage", actualHandler);
-      return subscriber.punsubscribe(getPrefixedEventName(eventName));
+      if (didRemoveLastListener) {
+        await subscriber.punsubscribe(getPrefixedEventName(eventName));
+      }
     });
   }
   /**
@@ -72,7 +81,15 @@ module.exports = (redisClient, redisPubSubScope) => {
    * @param {*} handler
    * @param {*} _once
    */
-  function addListener(eventName, handler, _once = false) {
+  async function addListener(eventName, handler, _once = false) {
+    if (eventName === "error") {
+      if (_once) {
+        errorEmitter.once("error", handler);
+      } else {
+        errorEmitter.addListener("error", handler);
+      }
+      return;
+    }
     function actualHandler(pattern, channel, message) {
       if (pattern !== getPrefixedEventName(eventName)) {
         return;
@@ -89,15 +106,15 @@ module.exports = (redisClient, redisPubSubScope) => {
       }
       handler(...args);
     }
-    let handlersByThisEventName = handlersByEvent.get(eventName);
-    if (handlersByThisEventName == null) {
-      handlersByThisEventName = new WeakMap();
-      handlersByEvent.set(eventName, handlersByThisEventName);
+    let actualHandlerByHandler = handlersByEventName.get(eventName);
+    if (actualHandlerByHandler == null) {
+      actualHandlerByHandler = new Map();
+      handlersByEventName.set(eventName, actualHandlerByHandler);
     }
-    handlersByThisEventName.set(handler, actualHandler);
-    runWhenConnected(() => {
+    actualHandlerByHandler.set(handler, actualHandler);
+    await runWhenConnected(async ({ subscriber }) => {
       subscriber.on("pmessage", actualHandler);
-      return subscriber.psubscribe(getPrefixedEventName(eventName));
+      await subscriber.psubscribe(getPrefixedEventName(eventName));
     });
   }
   /**
@@ -106,11 +123,8 @@ module.exports = (redisClient, redisPubSubScope) => {
    * @param {string} eventName name of the event
    * @param {any} handler the handler of the event
    */
-  function on(eventName, handler) {
-    if (eventName === "error") {
-      return errorEmitter.on("error", handler);
-    }
-    return addListener(eventName, handler);
+  async function on(eventName, handler) {
+    await addListener(eventName, handler);
   }
   /**
    * Remove an event listener
@@ -118,8 +132,8 @@ module.exports = (redisClient, redisPubSubScope) => {
    * @param {string} eventName name of the event
    * @param {any} handler the handler of the event
    */
-  function off(eventName, handler) {
-    return removeListener(eventName, handler);
+  async function off(eventName, handler) {
+    await removeListener(eventName, handler);
   }
   /**
    * Add an event listener (will be triggered at most once)
@@ -127,33 +141,35 @@ module.exports = (redisClient, redisPubSubScope) => {
    * @param {string} eventName name of the event
    * @param {any} handler the handler of the event
    */
-  function once(eventName, handler) {
-    if (eventName === "error") {
-      return errorEmitter.once("error", handler);
-    }
-    return addListener(eventName, handler, true);
+  async function once(eventName, handler) {
+    await addListener(eventName, handler, true);
   }
   /**
    * Announce the occurrence of an event
    *
    * @param {string} eventName name of the event
    */
-  function emit(eventName, ...args) {
-    runWhenConnected(() => publisher.publish(getPrefixedEventName(eventName), safeStringify(args, replacer)));
+  async function emit(eventName, ...args) {
+    await runWhenConnected(async (
+      { publisher },
+    ) => (publisher.publish(getPrefixedEventName(eventName), safeStringify(args, replacer))));
   }
   /**
    * Remove all listeners of an event
    *
    * @param {string} eventName name of the event
    */
-  function removeAllListeners(eventName) {
+  async function removeAllListeners(eventName) {
     if (eventName === "error") {
-      return errorEmitter.removeAllListeners(eventName);
+      errorEmitter.removeAllListeners(eventName);
+      return;
+    }
+    const actualHandlerByHandler = handlersByEventName.get(eventName);
+    if (actualHandlerByHandler != null) {
+      for (const handler of actualHandlerByHandler.keys()) {
+        await removeListener(eventName, handler);
+      }
     }
-    return runWhenConnected(() => {
-      handlersByEvent.delete(eventName);
-      return subscriber.punsubscribe(getPrefixedEventName(eventName));
-    });
   }
   return {
     on,

return runWhenConnected(() => {
const handlersByThisEventName = handlersByEvent.get(eventName)
if (handlersByThisEventName == null) return undefined
const thisEventNameActualHandlerByHandler = handlersByEvent.get(eventName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actual handler by handler? 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's a map that maps handler -> actual handler, meaning you can get the actual handler by providing a handler.

or in TS: Map<() => void, () => void>

where the key is the handler and the value is the actualHandler.

This naming was there before and i didn't have any better suggestion

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write some docs above the Map why it is needed and why it has this structure?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking about it more, this feels off to me? Why do we need handler -> handler? Is it because you have multiple handlers per event type? Maybe name it subscribers then and array of handlers per event name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the api allows you to removeListener for a specific handler, then we need to keep a reference to that handler. i believe the redis-emitter's api mimics the EventEmitter API, and that api allows removing a specific handler from an event name. That's why it's implemented like this. We could probably change the API but that's a bigger change and potentially breaking, so maybe that could be done in a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a comment and renamed the variables a bit

@mifi mifi merged commit 7322413 into main Oct 7, 2024
20 checks passed
@mifi mifi deleted the redis-emitter-memory-leak-fix branch October 7, 2024 11:35
Murderlon added a commit that referenced this pull request Oct 7, 2024
Murderlon added a commit that referenced this pull request Oct 15, 2024
* main: (65 commits)
  `.handleInputChange()` - use `.currentTarget`; clear the input using `''` (#5381)
  build(deps): bump @blakeembrey/template from 1.1.0 to 1.2.0 (#5448)
  Update packages/@uppy/locales/src/fr_FR.ts (#5472)
  @uppy/svelte: use SvelteKit as the build tool (#5484)
  @uppy/xhr-upload: add response to upload-error callback (#5486)
  tus: Avoid duplicate `upload-error` event (#5485)
  Fix redis emitter (#5474)
  build(deps): bump docker/build-push-action from 6.8.0 to 6.9.0 (#5483)
  Release: [email protected] (#5479)
  @uppy/transloadit: fix multiple upload batches & run again (#5478)
  build(deps): bump docker/build-push-action from 6.7.0 to 6.8.0 (#5477)
  build(deps): bump vite from 5.2.11 to 5.4.8 (#5471)
  build(deps-dev): bump rollup from 4.18.0 to 4.22.4 (#5470)
  build(deps): bump vite from 5.2.11 to 5.4.6 (#5466)
  Release: [email protected] (#5467)
  @uppy/tus: fix retry check for status code 400 (#5461)
  meta: fix AwsS3 endpoint option in private/dev
  build(deps): bump body-parser from 1.20.2 to 1.20.3 (#5462)
  build(deps-dev): bump vite from 5.3.1 to 5.3.6 (#5459)
  @uppy/tus: set response from tus-js-client (#5456)
  ...
github-actions bot added a commit that referenced this pull request Oct 15, 2024
| Package          | Version | Package          | Version |
| ---------------- | ------- | ---------------- | ------- |
| @uppy/companion  |   5.1.2 | @uppy/svelte     |   4.1.0 |
| @uppy/core       |   4.2.2 | @uppy/tus        |   4.1.2 |
| @uppy/dashboard  |   4.1.1 | @uppy/utils      |   6.0.3 |
| @uppy/drag-drop  |   4.0.3 | @uppy/xhr-upload |   4.2.1 |
| @uppy/file-input |   4.0.2 | uppy             |   4.5.0 |
| @uppy/locales    |   4.2.0 |                  |         |

- @uppy/dashboard: Dashboard - convert some files to typescript  (Evgenia Karunus / #5367)
- @uppy/dashboard,@uppy/drag-drop,@uppy/file-input:  `.handleInputChange()` - use `.currentTarget`; clear the input using `''` (Evgenia Karunus / #5381)
- meta: build(deps): bump @blakeembrey/template from 1.1.0 to 1.2.0 (dependabot[bot] / #5448)
- @uppy/locales: Update packages/@uppy/locales/src/fr_FR.ts (Zéfyx / #5472)
- @uppy/svelte: use SvelteKit as the build tool (Merlijn Vos / #5484)
- @uppy/xhr-upload: add response to upload-error callback (Caleb Hardin / #5486)
- @uppy/tus: tus: Avoid duplicate `upload-error` event (Marius / #5485)
- @uppy/companion: Fix redis emitter (Mikael Finstad / #5474)
- meta: build(deps): bump docker/build-push-action from 6.8.0 to 6.9.0 (dependabot[bot] / #5483)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants