From a0cec5fa78d97e3d9d87a660124ed4169251e7c4 Mon Sep 17 00:00:00 2001 From: ghostiee Date: Sun, 10 Sep 2023 20:39:32 +0800 Subject: [PATCH 1/2] fix memory leak Resume stream after all chunks are published to prevent memory leak --- src/transit.js | 24 ++++++++++++------------ test/unit/transit.spec.js | 28 ++++++++++++++++++++++++---- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/src/transit.js b/src/transit.js index c7dd87a07..a0723db33 100644 --- a/src/transit.js +++ b/src/transit.js @@ -895,7 +895,7 @@ class Transit { } else { chunks.push(chunk); } - for (const ch of chunks) { + const publishMap = chunks.map(ch => { const copy = Object.assign({}, payload); copy.seq = ++payload.seq; copy.stream = true; @@ -905,12 +905,13 @@ class Transit { `=> Send stream chunk ${requestID}to ${nodeName} node. Seq: ${copy.seq}` ); - this.publish(new Packet(P.PACKET_REQUEST, ctx.nodeID, copy)).catch( - publishCatch - ); - } - stream.resume(); - return; + return this.publish(new Packet(P.PACKET_REQUEST, ctx.nodeID, copy)); + }); + + return this.Promise.all(publishMap).then( + () => stream.resume(), + publishCatch + ); }); stream.on("end", () => { @@ -1126,7 +1127,7 @@ class Transit { } else { chunks.push(chunk); } - for (const ch of chunks) { + const publishMap = chunks.map(ch => { const copy = Object.assign({}, payload); copy.seq = ++payload.seq; copy.stream = true; @@ -1134,10 +1135,9 @@ class Transit { this.logger.debug(`=> Send stream chunk to ${nodeID} node. Seq: ${copy.seq}`); - this.publish(new Packet(P.PACKET_RESPONSE, nodeID, copy)).catch(publishCatch); - } - stream.resume(); - return; + return this.publish(new Packet(P.PACKET_RESPONSE, nodeID, copy)); + }); + return this.Promise.all(publishMap).then(() => stream.resume(), publishCatch); }); stream.on("end", () => { diff --git a/test/unit/transit.spec.js b/test/unit/transit.spec.js index d8bfc3d8c..00eadd923 100644 --- a/test/unit/transit.spec.js +++ b/test/unit/transit.spec.js @@ -1931,6 +1931,10 @@ describe("Test Transit._sendRequest", () => { const resolve = jest.fn(); const reject = jest.fn(); + beforeEach(() => { + transit.publish = jest.fn(() => Promise.resolve().delay(40)); + }); + it("should send stream chunks", () => { transit.publish.mockClear(); @@ -2070,8 +2074,11 @@ describe("Test Transit._sendRequest", () => { transit.publish.mockClear(); stream.push(randomData); }) - .delay(100) + .delay(20) + .then(() => expect(stream.isPaused()).toBeTruthy()) + .delay(80) .then(() => { + expect(stream.isPaused()).toBeFalsy(); expect(transit.publish).toHaveBeenCalledTimes( Math.ceil(randomData.length / transit.opts.maxChunkSize) ); @@ -2163,8 +2170,11 @@ describe("Test Transit._sendRequest", () => { }); transit.publish.mockClear(); }) - .delay(100) + .delay(20) + .then(() => expect(stream.isPaused()).toBeTruthy()) + .delay(80) .then(() => { + expect(stream.isPaused()).toBeFalsy(); expect(transit.publish).toHaveBeenCalledTimes( Math.ceil(randomData.length / transit.opts.maxChunkSize) + 1 ); @@ -2715,6 +2725,10 @@ describe("Test Transit.sendResponse", () => { }); describe("with Stream", () => { + beforeEach(() => { + transit.publish = jest.fn(() => Promise.resolve().delay(40)); + }); + it("should send stream chunks", () => { transit.publish.mockClear(); @@ -2819,8 +2833,11 @@ describe("Test Transit.sendResponse", () => { transit.publish.mockClear(); stream.push("first chunk"); }) - .delay(100) + .delay(20) + .then(() => expect(stream.isPaused()).toBeTruthy()) + .delay(80) .then(() => { + expect(stream.isPaused()).toBeFalsy(); expect(transit.publish).toHaveBeenCalledTimes(1); expect(transit.publish).toHaveBeenCalledWith({ payload: { @@ -2889,8 +2906,11 @@ describe("Test Transit.sendResponse", () => { transit.publish.mockClear(); stream.push(randomData); }) - .delay(100) + .delay(20) + .then(() => expect(stream.isPaused()).toBeTruthy()) + .delay(80) .then(() => { + expect(stream.isPaused()).toBeFalsy(); expect(transit.publish).toHaveBeenCalledTimes( Math.ceil(randomData.length / transit.opts.maxChunkSize) ); From cad067d5939a68220bb49218b049a1570f146083 Mon Sep 17 00:00:00 2001 From: Icebob Date: Sun, 1 Oct 2023 11:23:04 +0200 Subject: [PATCH 2/2] change code style --- src/transit.js | 56 ++++++++++++++++++++++++++++---------------------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/src/transit.js b/src/transit.js index a0723db33..2f8e4f593 100644 --- a/src/transit.js +++ b/src/transit.js @@ -895,23 +895,23 @@ class Transit { } else { chunks.push(chunk); } - const publishMap = chunks.map(ch => { - const copy = Object.assign({}, payload); - copy.seq = ++payload.seq; - copy.stream = true; - copy.params = ch; - this.logger.debug( - `=> Send stream chunk ${requestID}to ${nodeName} node. Seq: ${copy.seq}` - ); - - return this.publish(new Packet(P.PACKET_REQUEST, ctx.nodeID, copy)); - }); - - return this.Promise.all(publishMap).then( - () => stream.resume(), - publishCatch - ); + return this.Promise.all( + chunks.map(ch => { + const copy = Object.assign({}, payload); + copy.seq = ++payload.seq; + copy.stream = true; + copy.params = ch; + + this.logger.debug( + `=> Send stream chunk ${requestID}to ${nodeName} node. Seq: ${copy.seq}` + ); + + return this.publish(new Packet(P.PACKET_REQUEST, ctx.nodeID, copy)); + }) + ) + .then(() => stream.resume()) + .catch(publishCatch); }); stream.on("end", () => { @@ -1127,17 +1127,23 @@ class Transit { } else { chunks.push(chunk); } - const publishMap = chunks.map(ch => { - const copy = Object.assign({}, payload); - copy.seq = ++payload.seq; - copy.stream = true; - copy.data = ch; - this.logger.debug(`=> Send stream chunk to ${nodeID} node. Seq: ${copy.seq}`); + return this.Promise.all( + chunks.map(ch => { + const copy = Object.assign({}, payload); + copy.seq = ++payload.seq; + copy.stream = true; + copy.data = ch; - return this.publish(new Packet(P.PACKET_RESPONSE, nodeID, copy)); - }); - return this.Promise.all(publishMap).then(() => stream.resume(), publishCatch); + this.logger.debug( + `=> Send stream chunk to ${nodeID} node. Seq: ${copy.seq}` + ); + + return this.publish(new Packet(P.PACKET_RESPONSE, nodeID, copy)); + }) + ) + .then(() => stream.resume()) + .catch(publishCatch); }); stream.on("end", () => {