Skip to content

Commit

Permalink
fix: filter missing messages (#2119)
Browse files Browse the repository at this point in the history
* bug: fix filter missing messages

* fix keep alive

* fix const
  • Loading branch information
weboko committed Sep 2, 2024
1 parent 5cfe932 commit 5d3cc5f
Showing 1 changed file with 27 additions and 28 deletions.
55 changes: 27 additions & 28 deletions packages/sdk/src/protocols/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,25 @@ type ReceivedMessageHashes = {

const log = new Logger("sdk:filter");

const DEFAULT_MAX_PINGS = 3;
const DEFAULT_MAX_PINGS = 2;
const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3;
const DEFAULT_KEEP_ALIVE = 30 * 1000;
const DEFAULT_KEEP_ALIVE = 60 * 1000;

const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: DEFAULT_KEEP_ALIVE
};
export class SubscriptionManager implements ISubscriptionSDK {
private subscriptionCallbacks: Map<
ContentTopic,
SubscriptionCallback<IDecodedMessage>
> = new Map();
private readonly receivedMessagesHashStr: string[] = [];
private keepAliveTimer: number | null = null;
private readonly receivedMessagesHashes: ReceivedMessageHashes;
private peerFailures: Map<string, number> = new Map();
private readonly receivedMessagesHashes: ReceivedMessageHashes;
private missedMessagesByPeer: Map<string, number> = new Map();

private keepAliveInterval: number = DEFAULT_KEEP_ALIVE;
private maxPingFailures: number = DEFAULT_MAX_PINGS;
private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
private subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS;

private subscriptionCallbacks: Map<
ContentTopic,
SubscriptionCallback<IDecodedMessage>
> = new Map();
private keepAliveTimer: number | null = null;

public constructor(
private readonly pubsubTopic: PubsubTopic,
Expand All @@ -95,17 +93,23 @@ export class SubscriptionManager implements ISubscriptionSDK {
private addHash(hash: string, peerIdStr?: string): void {
this.receivedMessagesHashes.all.add(hash);

if (peerIdStr) {
this.receivedMessagesHashes.nodes[peerIdStr].add(hash);
if (!peerIdStr) {
return;
}

if (!this.receivedMessagesHashes.nodes[peerIdStr]) {
this.receivedMessagesHashes.nodes[peerIdStr] = new Set();
}

this.receivedMessagesHashes.nodes[peerIdStr].add(hash);
}

public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
options: SubscribeOptions = {}
): Promise<SDKProtocolResult> {
this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE;
this.keepAliveInterval = options.keepAlive || DEFAULT_KEEP_ALIVE;
this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS;
this.maxMissedMessagesThreshold =
options.maxMissedMessagesThreshold ||
Expand Down Expand Up @@ -154,8 +158,7 @@ export class SubscriptionManager implements ISubscriptionSDK {
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
});

this.subscribeOptions = options;
this.startSubscriptionsMaintenance(options);
this.startSubscriptionsMaintenance(this.keepAliveInterval);

return finalResult;
}
Expand Down Expand Up @@ -372,10 +375,8 @@ export class SubscriptionManager implements ISubscriptionSDK {
}
}

private startSubscriptionsMaintenance(options: SubscribeOptions): void {
if (options?.keepAlive) {
this.startKeepAlivePings(options.keepAlive);
}
private startSubscriptionsMaintenance(interval: number): void {
this.startKeepAlivePings(interval);
this.startConnectionListener();
}

Expand Down Expand Up @@ -445,9 +446,7 @@ export class SubscriptionManager implements ISubscriptionSDK {
log.error(`networkStateListener failed to recover: ${err}`);
}

this.startKeepAlivePings(
this.subscribeOptions?.keepAlive || DEFAULT_SUBSCRIBE_OPTIONS.keepAlive
);
this.startKeepAlivePings(this.keepAliveInterval);
}

private incrementMissedMessageCount(peerIdStr: string): void {
Expand Down Expand Up @@ -503,7 +502,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
* @param {IDecoder<T> | IDecoder<T>[]} decoders - A single decoder or an array of decoders to use for decoding messages.
* @param {Callback<T>} callback - The callback function to be invoked with decoded messages.
* @param {ProtocolUseOptions} [protocolUseOptions] - Optional settings for using the protocol.
* @param {SubscribeOptions} [subscribeOptions=DEFAULT_SUBSCRIBE_OPTIONS] - Options for the subscription.
* @param {SubscribeOptions} [subscribeOptions] - Options for the subscription.
*
* @returns {Promise<SubscribeResult>} A promise that resolves to an object containing:
* - subscription: The created subscription object if successful, or null if failed.
Expand Down Expand Up @@ -539,7 +538,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
protocolUseOptions?: ProtocolUseOptions,
subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
subscribeOptions?: SubscribeOptions
): Promise<SubscribeResult> {
const uniquePubsubTopics = this.getUniquePubsubTopics(decoders);

Expand Down Expand Up @@ -653,7 +652,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
public async subscribeWithUnsubscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
options?: SubscribeOptions
): Promise<Unsubscribe> {
const uniquePubsubTopics = this.getUniquePubsubTopics<T>(decoders);

Expand Down

0 comments on commit 5d3cc5f

Please sign in to comment.