diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index 0ef7756bbd..b377f4413f 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -51,27 +51,25 @@ type ReceivedMessageHashes = { const log = new Logger("sdk:filter"); -const DEFAULT_MAX_PINGS = 3; +const DEFAULT_MAX_PINGS = 2; const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; -const DEFAULT_KEEP_ALIVE = 30 * 1000; +const DEFAULT_KEEP_ALIVE = 60 * 1000; -const DEFAULT_SUBSCRIBE_OPTIONS = { - keepAlive: DEFAULT_KEEP_ALIVE -}; export class SubscriptionManager implements ISubscriptionSDK { + private subscriptionCallbacks: Map< + ContentTopic, + SubscriptionCallback + > = new Map(); private readonly receivedMessagesHashStr: string[] = []; - private keepAliveTimer: number | null = null; - private readonly receivedMessagesHashes: ReceivedMessageHashes; private peerFailures: Map = new Map(); + private readonly receivedMessagesHashes: ReceivedMessageHashes; private missedMessagesByPeer: Map = new Map(); + + private keepAliveInterval: number = DEFAULT_KEEP_ALIVE; private maxPingFailures: number = DEFAULT_MAX_PINGS; private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; - private subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS; - private subscriptionCallbacks: Map< - ContentTopic, - SubscriptionCallback - > = new Map(); + private keepAliveTimer: number | null = null; public constructor( private readonly pubsubTopic: PubsubTopic, @@ -95,17 +93,23 @@ export class SubscriptionManager implements ISubscriptionSDK { private addHash(hash: string, peerIdStr?: string): void { this.receivedMessagesHashes.all.add(hash); - if (peerIdStr) { - this.receivedMessagesHashes.nodes[peerIdStr].add(hash); + if (!peerIdStr) { + return; + } + + if (!this.receivedMessagesHashes.nodes[peerIdStr]) { + this.receivedMessagesHashes.nodes[peerIdStr] = new Set(); } + + this.receivedMessagesHashes.nodes[peerIdStr].add(hash); } public async subscribe( decoders: IDecoder | IDecoder[], callback: Callback, - options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS + options: SubscribeOptions = {} ): Promise { - this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE; + this.keepAliveInterval = options.keepAlive || DEFAULT_KEEP_ALIVE; this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS; this.maxMissedMessagesThreshold = options.maxMissedMessagesThreshold || @@ -154,8 +158,7 @@ export class SubscriptionManager implements ISubscriptionSDK { this.subscriptionCallbacks.set(contentTopic, subscriptionCallback); }); - this.subscribeOptions = options; - this.startSubscriptionsMaintenance(options); + this.startSubscriptionsMaintenance(this.keepAliveInterval); return finalResult; } @@ -372,10 +375,8 @@ export class SubscriptionManager implements ISubscriptionSDK { } } - private startSubscriptionsMaintenance(options: SubscribeOptions): void { - if (options?.keepAlive) { - this.startKeepAlivePings(options.keepAlive); - } + private startSubscriptionsMaintenance(interval: number): void { + this.startKeepAlivePings(interval); this.startConnectionListener(); } @@ -445,9 +446,7 @@ export class SubscriptionManager implements ISubscriptionSDK { log.error(`networkStateListener failed to recover: ${err}`); } - this.startKeepAlivePings( - this.subscribeOptions?.keepAlive || DEFAULT_SUBSCRIBE_OPTIONS.keepAlive - ); + this.startKeepAlivePings(this.keepAliveInterval); } private incrementMissedMessageCount(peerIdStr: string): void { @@ -503,7 +502,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { * @param {IDecoder | IDecoder[]} decoders - A single decoder or an array of decoders to use for decoding messages. * @param {Callback} callback - The callback function to be invoked with decoded messages. * @param {ProtocolUseOptions} [protocolUseOptions] - Optional settings for using the protocol. - * @param {SubscribeOptions} [subscribeOptions=DEFAULT_SUBSCRIBE_OPTIONS] - Options for the subscription. + * @param {SubscribeOptions} [subscribeOptions] - Options for the subscription. * * @returns {Promise} A promise that resolves to an object containing: * - subscription: The created subscription object if successful, or null if failed. @@ -539,7 +538,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { decoders: IDecoder | IDecoder[], callback: Callback, protocolUseOptions?: ProtocolUseOptions, - subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS + subscribeOptions?: SubscribeOptions ): Promise { const uniquePubsubTopics = this.getUniquePubsubTopics(decoders); @@ -653,7 +652,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { public async subscribeWithUnsubscribe( decoders: IDecoder | IDecoder[], callback: Callback, - options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS + options?: SubscribeOptions ): Promise { const uniquePubsubTopics = this.getUniquePubsubTopics(decoders);