Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix MaxListenersExceededWarning - "disconnected listeners" with disconnectEventAudienceMap #78

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
Draft
23 changes: 22 additions & 1 deletion lib/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ export interface ReqResLink {
*/
receiver: Receiver;
/**
* @property {Session} session - The underlying session on whicn the sender and receiver links
* @property {Session} session - The underlying session on which the sender and receiver links
* exist.
*/
session: Session;
Expand All @@ -180,6 +180,13 @@ export declare interface Connection {
on(event: ConnectionEvents, listener: OnAmqpEvent): this;
}

function onDisconnectOccurrence(
context: RheaEventContext,
disconnectEventAudienceMap: Map<string, (context: RheaEventContext) => void>
): void {
disconnectEventAudienceMap.forEach((callback) => callback(context));
}

/**
* Describes the AMQP Connection.
* @class Connection
Expand All @@ -190,6 +197,15 @@ export class Connection extends Entity {
* connection.
*/
options: ConnectionOptions;
/**
* Maintains a map of the audience(sessions/senders/receivers) interested in "disconnected" event.
* This helps us with not needing to create too many listeners on the "disconnected" event,
* which is particularly useful when dealing with 1000s of sessions at the same time.
*/
_disconnectEventAudienceMap: Map<string, (context: RheaEventContext) => void> = new Map<
string,
(context: RheaEventContext) => void
>();
/**
* @property {Container} container The underlying Container instance on which the connection
* exists.
Expand Down Expand Up @@ -232,6 +248,11 @@ export class Connection extends Entity {
this.options = this._connection.options;
this.options.operationTimeoutInSeconds = options.operationTimeoutInSeconds;

// Disconnected event listener for the disconnectEventAudienceMap
this._connection.on(ConnectionEvents.disconnected, (context) => {
onDisconnectOccurrence(context, this._disconnectEventAudienceMap);
});

this._initializeEventListeners();
}

Expand Down
9 changes: 6 additions & 3 deletions lib/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ export class Session extends Entity {
this.actionInitiated--;
this._session.removeListener(SessionEvents.sessionError, onError);
this._session.removeListener(SessionEvents.sessionClose, onClose);
this._session.connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
this._connection._disconnectEventAudienceMap.delete(this.id);
if (abortSignal) { abortSignal.removeEventListener("abort", onAbort); }
};

Expand Down Expand Up @@ -215,7 +215,10 @@ export class Session extends Entity {
// listeners that we add for completing the operation are added directly to rhea's objects.
this._session.once(SessionEvents.sessionClose, onClose);
this._session.once(SessionEvents.sessionError, onError);
this._session.connection.once(ConnectionEvents.disconnected, onDisconnected);
this._connection._disconnectEventAudienceMap.set(
this.id,
onDisconnected
);
log.session("[%s] Calling session.close() for amqp session '%s'.", this.connection.id, this.id);
waitTimer = setTimeout(actionAfterTimeout, this.connection.options!.operationTimeoutInSeconds! * 1000);
this._session.close();
Expand Down Expand Up @@ -404,7 +407,7 @@ export class Session extends Entity {
* `deliveryDispositionMap`.
* - If the user is handling the reconnection of sender link or the underlying connection in it's
* app, then the `onError` and `onSessionError` handlers must be provided by the user and (s)he
* shall be responsible of clearing the `deliveryDispotionMap` of inflight `send()` operation.
* shall be responsible of clearing the `deliveryDispositionMap` of inflight `send()` operation.
*
* @return Promise<AwaitableSender>
* - **Resolves** the promise with the Sender object when rhea emits the "sender_open" event.
Expand Down
91 changes: 82 additions & 9 deletions test/session.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,67 @@ describe("Session", () => {
assert.isFalse(session.isOpen(), "Session should not be open.");
});

it("Single disconnected listener shared among all the sessions when close() is called in parallel", async () => {
const _connection = (connection as any)._connection;
const getDisconnectListenerCount = () => {
return _connection.listenerCount(rhea.ConnectionEvents.disconnected);
};
const sessionCount = 1000;
const sessions: Session[] = [];
const callbackCalledForSessionId: { [key: string]: boolean } = {};
for (let i = 0; i < sessionCount; i++) {
const session = await connection.createSession();
sessions.push(session);
callbackCalledForSessionId[session.id] = false;
}
const disconnectListenerCountBefore = getDisconnectListenerCount();
await Promise.all(
sessions
.map((session) => {
session.close();
})
.concat([
(() => {
assert.equal(
getDisconnectListenerCount(),
disconnectListenerCountBefore,
`Unexpected number of "disconnected" listeners - originated from the close() calls`
);
assert.equal(
connection._disconnectEventAudienceMap.size,
sessionCount,
`Unexpected number of items in _disconnectEventAudienceMap`
);
for (let [
key,
callback,
] of connection._disconnectEventAudienceMap) {
connection._disconnectEventAudienceMap.set(
key,
(context: rhea.EventContext) => {
callbackCalledForSessionId[key] = true;
callback(context);
}
);
}
_connection.emit(rhea.ConnectionEvents.disconnected, {});
})(),
])
);
for (const session of sessions) {
assert.equal(
callbackCalledForSessionId[session.id as string],
true,
`callback not called for ${session.id} - this is unexpected`
);
}
assert.equal(
getDisconnectListenerCount(),
disconnectListenerCountBefore,
`Unexpected number of "disconnected" listeners after sessions were closed`
);
});

it(".remove() removes event listeners", async () => {
const session = new Session(
connection,
Expand Down Expand Up @@ -73,7 +134,6 @@ describe("Session", () => {
assert.strictEqual(session.listenerCount(SessionEvents.sessionOpen), 0);
});


describe("supports events", () => {
it("sessionOpen", (done: Function) => {
const session = new Session(
Expand Down Expand Up @@ -137,7 +197,10 @@ describe("Session", () => {

session.on(SessionEvents.sessionError, async (event) => {
assert.exists(event, "Expected an AMQP event.");
assert.exists(event.session, "Expected session to be defined on AMQP event.");
assert.exists(
event.session,
"Expected session to be defined on AMQP event."
);
if (event.session) {
const error = event.session.error as rhea.ConnectionError;
assert.exists(error, "Expected an AMQP error.");
Expand Down Expand Up @@ -173,7 +236,7 @@ describe("Session", () => {
session.on(SessionEvents.sessionOpen, async () => {
try {
await session.close();
throw new Error("boo")
throw new Error("boo");
} catch (error) {
assert.exists(error, "Expected an AMQP error.");
assert.strictEqual(error.condition, errorCondition);
Expand Down Expand Up @@ -212,9 +275,12 @@ describe("Session", () => {
abortErrorThrown = error.name === abortErrorName;
}

assert.isTrue(abortErrorThrown, "AbortError should have been thrown.")
assert.isTrue(abortErrorThrown, "AbortError should have been thrown.");
assert.isFalse(session.isOpen(), "Session should not be open.");
assert.isTrue(session["_session"].is_remote_open(), "Session remote endpoint should not have gotten a chance to close.");
assert.isTrue(
session["_session"].is_remote_open(),
"Session remote endpoint should not have gotten a chance to close."
);

await connection.close();
});
Expand Down Expand Up @@ -243,9 +309,12 @@ describe("Session", () => {
abortErrorThrown = error.name === abortErrorName;
}

assert.isTrue(abortErrorThrown, "AbortError should have been thrown.")
assert.isTrue(abortErrorThrown, "AbortError should have been thrown.");
assert.isFalse(session.isOpen(), "Session should not be open.");
assert.isTrue(session["_session"].is_remote_open(), "Session remote endpoint should not have gotten a chance to close.");
assert.isTrue(
session["_session"].is_remote_open(),
"Session remote endpoint should not have gotten a chance to close."
);

await connection.close();
});
Expand Down Expand Up @@ -315,7 +384,9 @@ describe("Session", () => {

// Pass an already aborted signal to createAwaitableSender()
abortController.abort();
const createAwaitableSenderPromise = session.createAwaitableSender({ abortSignal });
const createAwaitableSenderPromise = session.createAwaitableSender({
abortSignal,
});

let abortErrorThrown = false;
try {
Expand All @@ -340,7 +411,9 @@ describe("Session", () => {
const abortSignal = abortController.signal;

// Abort the signal after passing it to createAwaitableSender()
const createAwaitableSenderPromise = session.createAwaitableSender({ abortSignal });
const createAwaitableSenderPromise = session.createAwaitableSender({
abortSignal,
});
abortController.abort();

let abortErrorThrown = false;
Expand Down