Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lightpush): introduce ReliabilityMonitor and allow send retries #2133

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ export {
defaultLibp2p,
createLibp2pAndUpdateOptions
} from "./create/index.js";
export { wakuLightPush } from "./protocols/light_push.js";
export { wakuLightPush } from "./protocols/lightpush/index.js";
export { wakuFilter } from "./protocols/filter/index.js";
export { wakuStore } from "./protocols/store.js";
export { wakuStore } from "./protocols/store/index.js";

export * as waku from "@waku/core";
export * as utils from "@waku/utils";
Expand Down
7 changes: 3 additions & 4 deletions packages/sdk/src/protocols/filter/subscription_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ import {
import { WakuMessage } from "@waku/proto";
import { groupByContentTopic, Logger } from "@waku/utils";

import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js";
import { ReceiverReliabilityMonitor } from "../../reliability_monitor/receiver.js";

import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js";
import {
ReceiverReliabilityMonitor,
ReliabilityMonitorManager
} from "./reliability_monitor.js";

const log = new Logger("sdk:filter:subscription_manager");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ import {
} from "@waku/interfaces";
import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils";

import { BaseProtocolSDK } from "./base_protocol.js";
import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js";
import { SenderReliabilityMonitor } from "../../reliability_monitor/sender.js";
import { BaseProtocolSDK } from "../base_protocol.js";

const log = new Logger("sdk:light-push");

class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
public readonly protocol: LightPushCore;

private readonly reliabilityMonitor: SenderReliabilityMonitor;

public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
Expand All @@ -33,6 +37,10 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
}
);

this.reliabilityMonitor = ReliabilityMonitorManager.createSenderMonitor(
this.renewPeer.bind(this)
);

this.protocol = this.core as LightPushCore;
}

Expand Down Expand Up @@ -89,16 +97,23 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
successes.push(success);
}
if (failure) {
failures.push(failure);
if (failure.peerId) {
try {
await this.renewPeer(failure.peerId);
log.info("Renewed peer", failure.peerId.toString());
} catch (error) {
log.error("Failed to renew peer", error);
const peer = this.connectedPeers.find((connectedPeer) =>
connectedPeer.id.equals(failure.peerId)
);
if (peer) {
log.info(`
Failed to send message to peer ${failure.peerId}.
Retrying the message with the same peer in the background.
If this fails, the peer will be renewed.
`);
void this.reliabilityMonitor.attemptRetriesOrRenew(
failure.peerId,
() => this.protocol.send(encoder, message, peer)
);
}
}

failures.push(failure);
}
} else {
log.error("Failed unexpectedly while sending:", result.reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
import { messageHash } from "@waku/message-hash";
import { ensurePubsubTopicIsConfigured, isDefined, Logger } from "@waku/utils";

import { BaseProtocolSDK } from "./base_protocol.js";
import { BaseProtocolSDK } from "../base_protocol.js";

const DEFAULT_NUM_PEERS = 1;

Expand Down
70 changes: 70 additions & 0 deletions packages/sdk/src/reliability_monitor/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import type { Peer, PeerId } from "@libp2p/interface";
import {
ContentTopic,
CoreProtocolResult,
PubsubTopic
} from "@waku/interfaces";

import { ReceiverReliabilityMonitor } from "./receiver.js";
import { SenderReliabilityMonitor } from "./sender.js";

export class ReliabilityMonitorManager {
private static receiverMonitors: Map<
PubsubTopic,
ReceiverReliabilityMonitor
> = new Map();
private static senderMonitor: SenderReliabilityMonitor | undefined;

public static createReceiverMonitor(
pubsubTopic: PubsubTopic,
getPeers: () => Peer[],
renewPeer: (peerId: PeerId) => Promise<Peer>,
getContentTopics: () => ContentTopic[],
protocolSubscribe: (
pubsubTopic: PubsubTopic,
peer: Peer,
contentTopics: ContentTopic[]
) => Promise<CoreProtocolResult>
): ReceiverReliabilityMonitor {
if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) {
return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!;
}

const monitor = new ReceiverReliabilityMonitor(
pubsubTopic,
getPeers,
renewPeer,
getContentTopics,
protocolSubscribe
);
ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor);
return monitor;
}

public static createSenderMonitor(
renewPeer: (peerId: PeerId) => Promise<Peer>
): SenderReliabilityMonitor {
if (!ReliabilityMonitorManager.senderMonitor) {
ReliabilityMonitorManager.senderMonitor = new SenderReliabilityMonitor(
renewPeer
);
}
return ReliabilityMonitorManager.senderMonitor;
}

private constructor() {}

public static stop(pubsubTopic: PubsubTopic): void {
this.receiverMonitors.delete(pubsubTopic);
this.senderMonitor = undefined;
}

public static stopAll(): void {
for (const [pubsubTopic, monitor] of this.receiverMonitors) {
monitor.setMaxMissedMessagesThreshold(undefined);
monitor.setMaxPingFailures(undefined);
this.receiverMonitors.delete(pubsubTopic);
this.senderMonitor = undefined;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,53 +21,6 @@ const log = new Logger("sdk:receiver:reliability_monitor");

const DEFAULT_MAX_PINGS = 3;

export class ReliabilityMonitorManager {
private static receiverMonitors: Map<
PubsubTopic,
ReceiverReliabilityMonitor
> = new Map();

public static createReceiverMonitor(
pubsubTopic: PubsubTopic,
getPeers: () => Peer[],
renewPeer: (peerId: PeerId) => Promise<Peer>,
getContentTopics: () => ContentTopic[],
protocolSubscribe: (
pubsubTopic: PubsubTopic,
peer: Peer,
contentTopics: ContentTopic[]
) => Promise<CoreProtocolResult>
): ReceiverReliabilityMonitor {
if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) {
return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!;
}

const monitor = new ReceiverReliabilityMonitor(
pubsubTopic,
getPeers,
renewPeer,
getContentTopics,
protocolSubscribe
);
ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor);
return monitor;
}

private constructor() {}

public static destroy(pubsubTopic: PubsubTopic): void {
this.receiverMonitors.delete(pubsubTopic);
}

public static destroyAll(): void {
for (const [pubsubTopic, monitor] of this.receiverMonitors) {
monitor.setMaxMissedMessagesThreshold(undefined);
monitor.setMaxPingFailures(undefined);
this.receiverMonitors.delete(pubsubTopic);
}
}
}

export class ReceiverReliabilityMonitor {
private receivedMessagesHashes: ReceivedMessageHashes;
private missedMessagesByPeer: Map<string, number> = new Map();
Expand Down
57 changes: 57 additions & 0 deletions packages/sdk/src/reliability_monitor/sender.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import type { Peer, PeerId } from "@libp2p/interface";
import { CoreProtocolResult, PeerIdStr } from "@waku/interfaces";
import { Logger } from "@waku/utils";

const log = new Logger("sdk:sender:reliability_monitor");

const DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL = 3;

export class SenderReliabilityMonitor {
private attempts: Map<PeerIdStr, number> = new Map();
private readonly maxAttemptsBeforeRenewal =
DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL;

public constructor(private renewPeer: (peerId: PeerId) => Promise<Peer>) {}

public async attemptRetriesOrRenew(
peerId: PeerId,
protocolSend: () => Promise<CoreProtocolResult>
): Promise<void> {
const peerIdStr = peerId.toString();
const currentAttempts = this.attempts.get(peerIdStr) || 0;
this.attempts.set(peerIdStr, currentAttempts + 1);

if (currentAttempts + 1 < this.maxAttemptsBeforeRenewal) {
try {
const result = await protocolSend();
if (result.success) {
log.info(`Successfully sent message after retry to ${peerIdStr}`);
this.attempts.delete(peerIdStr);
} else {
log.error(
`Failed to send message after retry to ${peerIdStr}: ${result.failure}`
);
await this.attemptRetriesOrRenew(peerId, protocolSend);
}
} catch (error) {
log.error(
`Failed to send message after retry to ${peerIdStr}: ${error}`
);
await this.attemptRetriesOrRenew(peerId, protocolSend);
}
} else {
try {
const newPeer = await this.renewPeer(peerId);
log.info(
`Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}`
);

this.attempts.delete(peerIdStr);
this.attempts.set(newPeer.id.toString(), 0);
await protocolSend();
} catch (error) {
log.error(`Failed to renew peer ${peerId.toString()}: ${error}`);
}
}
}
}
8 changes: 4 additions & 4 deletions packages/sdk/src/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import { Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils";

import { wakuFilter } from "./protocols/filter/index.js";
import { ReliabilityMonitorManager } from "./protocols/filter/reliability_monitor.js";
import { wakuLightPush } from "./protocols/light_push.js";
import { wakuStore } from "./protocols/store.js";
import { wakuLightPush } from "./protocols/lightpush/index.js";
import { wakuStore } from "./protocols/store/index.js";
import { ReliabilityMonitorManager } from "./reliability_monitor/index.js";

export const DefaultPingKeepAliveValueSecs = 5 * 60;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
Expand Down Expand Up @@ -196,7 +196,7 @@ export class WakuNode implements Waku {
}

public async stop(): Promise<void> {
ReliabilityMonitorManager.destroyAll();
ReliabilityMonitorManager.stopAll();
this.connectionManager.stop();
await this.libp2p.stop();
}
Expand Down
14 changes: 9 additions & 5 deletions packages/tests/tests/light-push/peer_management.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { LightNode } from "@waku/interfaces";
import { createEncoder, utf8ToBytes } from "@waku/sdk";
import { delay } from "@waku/utils";
import { expect } from "chai";
import { describe } from "mocha";

Expand Down Expand Up @@ -78,18 +79,21 @@ describe("Waku Light Push: Peer Management: E2E", function () {
expect(response2.failures).to.have.length(1);
expect(response2.failures?.[0].peerId).to.equal(peerToDisconnect);

// send another lightpush request -- renewal should have triggerred and new peer should be used instead of the disconnected one
// send another lightpush request
// reattempts to send should be triggerred
// then renewal should happen
// so one failure should exist
const response3 = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello_World")
});

// wait for reattempts to finish as they are async and not awaited
await delay(500);

expect(response3.successes.length).to.be.equal(
waku.lightPush.numPeersToUse
waku.lightPush.numPeersToUse - 1
);

expect(response3.successes).to.not.include(peerToDisconnect);
if (response3.failures) {
expect(response3.failures.length).to.equal(0);
}
});
});
Loading