Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix memory leak #1243

Merged
merged 2 commits into from
Oct 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 32 additions & 26 deletions src/transit.js
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@
}

if (!pass) {
isNew = true;

Check warning on line 533 in src/transit.js

View workflow job for this annotation

GitHub Actions / common

'isNew' is assigned a value but never used
this.logger.debug(
`<= New stream is received from '${payload.sender}'. Seq: ${payload.seq}`
);
Expand Down Expand Up @@ -895,22 +895,23 @@
} else {
chunks.push(chunk);
}
for (const ch of chunks) {
const copy = Object.assign({}, payload);
copy.seq = ++payload.seq;
copy.stream = true;
copy.params = ch;

this.logger.debug(
`=> Send stream chunk ${requestID}to ${nodeName} node. Seq: ${copy.seq}`
);

this.publish(new Packet(P.PACKET_REQUEST, ctx.nodeID, copy)).catch(
icebob marked this conversation as resolved.
Show resolved Hide resolved
publishCatch
);
}
stream.resume();
return;

return this.Promise.all(
chunks.map(ch => {
const copy = Object.assign({}, payload);
copy.seq = ++payload.seq;
copy.stream = true;
copy.params = ch;

this.logger.debug(
`=> Send stream chunk ${requestID}to ${nodeName} node. Seq: ${copy.seq}`
);

return this.publish(new Packet(P.PACKET_REQUEST, ctx.nodeID, copy));
})
)
.then(() => stream.resume())
.catch(publishCatch);
});

stream.on("end", () => {
Expand Down Expand Up @@ -1126,18 +1127,23 @@
} else {
chunks.push(chunk);
}
for (const ch of chunks) {
const copy = Object.assign({}, payload);
copy.seq = ++payload.seq;
copy.stream = true;
copy.data = ch;

this.logger.debug(`=> Send stream chunk to ${nodeID} node. Seq: ${copy.seq}`);
return this.Promise.all(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@icebob It's not a good solution to not control the size of chunks, as this can lead to excessive CPU usage in the Promise.all call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the size which can cause problem?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my analysis, it seems most reasonable to derive the meaning from os.availableParallelism().

We are facing two issues:

  1. an unknown logic in transitPublish may cause synchronous blocking of operations in Promise.all.
  2. We are unaware of the number of available system resources during the planning of Promise.all operations. We cannot control the peak load even if we had this information.

A small size of chunks for available resources is a bad solution, but for busy resources, it is a good solution.

chunks.map(ch => {
const copy = Object.assign({}, payload);
copy.seq = ++payload.seq;
copy.stream = true;
copy.data = ch;

this.logger.debug(
`=> Send stream chunk to ${nodeID} node. Seq: ${copy.seq}`
);

this.publish(new Packet(P.PACKET_RESPONSE, nodeID, copy)).catch(publishCatch);
}
stream.resume();
return;
return this.publish(new Packet(P.PACKET_RESPONSE, nodeID, copy));
})
)
.then(() => stream.resume())
.catch(publishCatch);
});

stream.on("end", () => {
Expand Down
28 changes: 24 additions & 4 deletions test/unit/transit.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -1931,6 +1931,10 @@ describe("Test Transit._sendRequest", () => {
const resolve = jest.fn();
const reject = jest.fn();

beforeEach(() => {
transit.publish = jest.fn(() => Promise.resolve().delay(40));
});

it("should send stream chunks", () => {
transit.publish.mockClear();

Expand Down Expand Up @@ -2070,8 +2074,11 @@ describe("Test Transit._sendRequest", () => {
transit.publish.mockClear();
stream.push(randomData);
})
.delay(100)
.delay(20)
.then(() => expect(stream.isPaused()).toBeTruthy())
.delay(80)
.then(() => {
expect(stream.isPaused()).toBeFalsy();
expect(transit.publish).toHaveBeenCalledTimes(
Math.ceil(randomData.length / transit.opts.maxChunkSize)
);
Expand Down Expand Up @@ -2163,8 +2170,11 @@ describe("Test Transit._sendRequest", () => {
});
transit.publish.mockClear();
})
.delay(100)
.delay(20)
.then(() => expect(stream.isPaused()).toBeTruthy())
.delay(80)
.then(() => {
expect(stream.isPaused()).toBeFalsy();
expect(transit.publish).toHaveBeenCalledTimes(
Math.ceil(randomData.length / transit.opts.maxChunkSize) + 1
);
Expand Down Expand Up @@ -2715,6 +2725,10 @@ describe("Test Transit.sendResponse", () => {
});

describe("with Stream", () => {
beforeEach(() => {
transit.publish = jest.fn(() => Promise.resolve().delay(40));
});

it("should send stream chunks", () => {
transit.publish.mockClear();

Expand Down Expand Up @@ -2819,8 +2833,11 @@ describe("Test Transit.sendResponse", () => {
transit.publish.mockClear();
stream.push("first chunk");
})
.delay(100)
.delay(20)
.then(() => expect(stream.isPaused()).toBeTruthy())
.delay(80)
.then(() => {
expect(stream.isPaused()).toBeFalsy();
expect(transit.publish).toHaveBeenCalledTimes(1);
expect(transit.publish).toHaveBeenCalledWith({
payload: {
Expand Down Expand Up @@ -2889,8 +2906,11 @@ describe("Test Transit.sendResponse", () => {
transit.publish.mockClear();
stream.push(randomData);
})
.delay(100)
.delay(20)
.then(() => expect(stream.isPaused()).toBeTruthy())
.delay(80)
.then(() => {
expect(stream.isPaused()).toBeFalsy();
expect(transit.publish).toHaveBeenCalledTimes(
Math.ceil(randomData.length / transit.opts.maxChunkSize)
);
Expand Down