Skip to content

Commit

Permalink
[core-amqp] set the max listener limit to 1000 for sender and receiver (
Browse files Browse the repository at this point in the history
#20088)

NodeJS would issue warning if the number of disconnected listeners on an event
emitter exceeds 10. When many sessions on a connection are closed but the
removal of the listener hasn't caught up, we will see this warning because the
default limit of 10 in NodeJS is too low. The disconnected listeners DO get
removed eventually in our code.

We already do this for senders created in Service Bus. This PR increase the
limit to 1000 for senders and receivers created off
`ConnectionContextBase.connection` so that the limites apply to both Service Bus
and Event Hubs.

* EH and SB no longer need to set max listener limit.

For Issue #12161
  • Loading branch information
jeremymeng authored Feb 2, 2022
1 parent 1dbd41d commit 35757be
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 9 deletions.
63 changes: 60 additions & 3 deletions sdk/core/core-amqp/src/ConnectionContextBase.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { Connection, ConnectionOptions, generate_uuid } from "rhea-promise";
import {
AwaitableSender,
Connection,
ConnectionOptions,
CreateAwaitableSenderOptions,
CreateReceiverOptions,
CreateSenderOptions,
Receiver,
Sender,
generate_uuid,
} from "rhea-promise";
import { getFrameworkInfo, getPlatformInfo } from "./util/runtimeInfo";
import { CbsClient } from "./cbs";
import { ConnectionConfig } from "./connectionConfig/connectionConfig";
Expand Down Expand Up @@ -100,6 +110,53 @@ export interface CreateConnectionContextBaseParameters {
operationTimeoutInMs?: number;
}

const maxListenerLimit = 1000;

class CoreAmqpConnection extends Connection {
/**
* Creates an amqp sender link. Max listener limit on the sender is set to 1000 because the
* default value of 10 in NodeJS is too low.
* @param options - Optional parameters to create a sender link.
* @returns Promise<Sender>.
*/
async createSender(options?: CreateSenderOptions): Promise<Sender> {
const sender = await super.createSender(options);
sender.setMaxListeners(maxListenerLimit);
return sender;
}

/**
* Creates an awaitable amqp sender. Max listener limit on the sender is set to 1000 because the
* default value of 10 in NodeJS is too low.
* @param options - Optional parameters to create an awaitable sender link.
* - If `onError` and `onSessionError` handlers are not provided then the `AwaitableSender` will
* clear the timer and reject the Promise for all the entries of inflight send operation in its
* `deliveryDispositionMap`.
* - If the user is handling the reconnection of sender link or the underlying connection in it's
* app, then the `onError` and `onSessionError` handlers must be provided by the user and (s)he
* shall be responsible of clearing the `deliveryDispositionMap` of inflight `send()` operation.
*
* @returns Promise<AwaitableSender>.
*/
async createAwaitableSender(options?: CreateAwaitableSenderOptions): Promise<AwaitableSender> {
const sender = await super.createAwaitableSender(options);
sender.setMaxListeners(maxListenerLimit);
return sender;
}

/**
* Creates an amqp receiver link. Max listener limit on the sender is set to 1000 because the
* default value of 10 in NodeJS is too low.
* @param options - Optional parameters to create a receiver link.
* @returns Promise<Receiver>.
*/
async createReceiver(options?: CreateReceiverOptions): Promise<Receiver> {
const receiver = await super.createReceiver(options);
receiver.setMaxListeners(maxListenerLimit);
return receiver;
}
}

// eslint-disable-next-line @typescript-eslint/no-redeclare -- renaming constant would be a breaking change.
export const ConnectionContextBase = {
/**
Expand Down Expand Up @@ -157,7 +214,7 @@ export const ConnectionContextBase = {
};
}

const connection = new Connection(connectionOptions);
const connection = new CoreAmqpConnection(connectionOptions);
const connectionLock = `${Constants.establishConnection}-${generate_uuid()}`;
const connectionContextBase: ConnectionContextBase = {
wasConnectionCloseCalled: false,
Expand All @@ -168,7 +225,7 @@ export const ConnectionContextBase = {
cbsSession: new CbsClient(connection, connectionLock),
config: parameters.config,
refreshConnection() {
const newConnection = new Connection(connectionOptions);
const newConnection = new CoreAmqpConnection(connectionOptions);
const newConnectionLock = `${Constants.establishConnection}-${generate_uuid()}`;
this.wasConnectionCloseCalled = false;
this.connectionLock = newConnectionLock;
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
},
"dependencies": {
"@azure/abort-controller": "^1.0.0",
"@azure/core-amqp": "^3.0.0",
"@azure/core-amqp": "^3.1.0",
"@azure/core-asynciterator-polyfill": "^1.0.0",
"@azure/core-auth": "^1.3.0",
"@azure/core-tracing": "1.0.0-preview.13",
Expand Down
1 change: 0 additions & 1 deletion sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,6 @@ export class EventHubSender extends LinkEntity {
this.name,
options
);
sender.setMaxListeners(1000);

// It is possible for someone to close the sender and then start it again.
// Thus make sure that the sender is present in the client cache.
Expand Down
6 changes: 2 additions & 4 deletions sdk/servicebus/service-bus/src/core/messageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,8 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
return retry<void>(config);
}

protected async createRheaLink(options: AwaitableSenderOptions): Promise<AwaitableSender> {
const sender = await this._context.connection.createAwaitableSender(options);
sender.setMaxListeners(1000);
return sender;
protected createRheaLink(options: AwaitableSenderOptions): Promise<AwaitableSender> {
return this._context.connection.createAwaitableSender(options);
}

/**
Expand Down

0 comments on commit 35757be

Please sign in to comment.