Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filter): enhancing protocol peer management with mutex locks #2137

Merged
merged 35 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
8caa220
chore: improvements
danisharora099 Sep 18, 2024
4554eea
chore: add logs for subscription maintenance
danisharora099 Sep 18, 2024
692081e
chore: update logging
danisharora099 Sep 19, 2024
6117593
chore: trimming down BaseProtocolCore
danisharora099 Sep 23, 2024
128041b
chore: track peers in a hashmap instead of array
danisharora099 Sep 23, 2024
ac1a030
chore: peer mgmt responds to conenction/disconnection and improve log…
danisharora099 Sep 24, 2024
3384b87
feat: add mutex locks to tackle race conditions over shared state
danisharora099 Sep 24, 2024
557ee96
fix: build
danisharora099 Sep 24, 2024
558fb6a
chore: some mutex lock-release improvements
danisharora099 Sep 24, 2024
82baa26
feat: peer manager
danisharora099 Sep 24, 2024
8cb3c35
chore: rm tests for remove internal util
danisharora099 Sep 24, 2024
1df0556
chore: update HealthManager updates
danisharora099 Sep 24, 2024
d1d9198
chore: update tests
danisharora099 Sep 24, 2024
2e79540
rm: only
danisharora099 Sep 24, 2024
be7a0b0
fix: hasPeers management
danisharora099 Sep 25, 2024
aec9451
chore: add modularity to getting connected peers
danisharora099 Sep 25, 2024
3647ec9
chore: improve logs & add debug
danisharora099 Sep 25, 2024
4d7b997
chore: renewal doesnt disconnect, only removes
danisharora099 Sep 26, 2024
3eef8be
chore: await for sequential operations
danisharora099 Sep 27, 2024
7b7d691
chore: add TODO
danisharora099 Sep 27, 2024
8e88fad
chore: minor improvements
danisharora099 Oct 3, 2024
b0d53bc
chore: fix rebase
danisharora099 Oct 4, 2024
92b506d
chore: update playright
danisharora099 Oct 4, 2024
017e901
chore: remove additional arg
danisharora099 Oct 8, 2024
d0714d9
chore: update interafce
danisharora099 Oct 8, 2024
874d835
feat(peer-manager): unit tests
danisharora099 Oct 8, 2024
b9bfaa2
chore: improve hasPeers()
danisharora099 Oct 8, 2024
ed138cc
chore: update lockfile
danisharora099 Oct 8, 2024
43fa32b
feat: Filter reacts to peer:disconnect event, add tests
danisharora099 Oct 9, 2024
535a20f
chore: fix lock
danisharora099 Oct 9, 2024
829d183
chore: update playright
danisharora099 Oct 9, 2024
e84eb62
chore: update protocol health for lightpush
danisharora099 Oct 9, 2024
b7bdb60
chore: remove .only
danisharora099 Oct 9, 2024
7e2c1cb
chore: address comments and improvements
danisharora099 Oct 10, 2024
de10ff4
fix: tsconfig
danisharora099 Oct 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/playwright.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
timeout-minutes: 60
runs-on: ubuntu-latest
container:
image: mcr.microsoft.com/playwright:v1.46.0-jammy
image: mcr.microsoft.com/playwright:v1.48.0-jammy
weboko marked this conversation as resolved.
Show resolved Hide resolved
steps:
- uses: actions/checkout@v3
- uses: actions/setup-node@v3
Expand Down
7,556 changes: 4,405 additions & 3,151 deletions package-lock.json

Large diffs are not rendered by default.

42 changes: 14 additions & 28 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import type { Libp2p } from "@libp2p/interface";
import type { Peer, PeerStore, Stream } from "@libp2p/interface";
import type { Peer, Stream } from "@libp2p/interface";
import type {
IBaseProtocolCore,
Libp2pComponents,
PubsubTopic
} from "@waku/interfaces";
import { Logger, pubsubTopicsToShardInfo } from "@waku/utils";
import {
getConnectedPeersForProtocolAndShard,
getPeersForProtocol,
sortPeersByLatency
} from "@waku/utils/libp2p";
import { Logger } from "@waku/utils";
import { getPeersForProtocol, sortPeersByLatency } from "@waku/utils/libp2p";
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved

import { filterPeersByDiscovery } from "./filterPeers.js";
import { StreamManager } from "./stream_manager/index.js";
Expand All @@ -26,7 +22,7 @@ export class BaseProtocol implements IBaseProtocolCore {

protected constructor(
public multicodec: string,
private components: Libp2pComponents,
protected components: Libp2pComponents,
private log: Logger,
public readonly pubsubTopics: PubsubTopic[]
) {
Expand All @@ -50,25 +46,22 @@ export class BaseProtocol implements IBaseProtocolCore {
return this.streamManager.getStream(peer);
}

public get peerStore(): PeerStore {
return this.components.peerStore;
}

/**
* Returns known peers from the address book (`libp2p.peerStore`) that support
* the class protocol. Waku may or may not be currently connected to these
* peers.
*/
public async allPeers(): Promise<Peer[]> {
return getPeersForProtocol(this.peerStore, [this.multicodec]);
return getPeersForProtocol(this.components.peerStore, [this.multicodec]);
}

public async connectedPeers(): Promise<Peer[]> {
const peers = await this.allPeers();
return peers.filter((peer) => {
return (
this.components.connectionManager.getConnections(peer.id).length > 0
const connections = this.components.connectionManager.getConnections(
peer.id
);
return connections.length > 0;
});
}

Expand All @@ -77,9 +70,8 @@ export class BaseProtocol implements IBaseProtocolCore {
*
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.

* @returns A list of peers that support the protocol sorted by latency.
*/
* @returns A list of peers that support the protocol sorted by latency. By default, returns all peers available, including bootstrap.
*/
public async getPeers(
{
numPeers,
Expand All @@ -88,29 +80,23 @@ export class BaseProtocol implements IBaseProtocolCore {
numPeers: number;
maxBootstrapPeers: number;
} = {
maxBootstrapPeers: 1,
maxBootstrapPeers: 0,
numPeers: 0
}
): Promise<Peer[]> {
// Retrieve all connected peers that support the protocol & shard (if configured)
const connectedPeersForProtocolAndShard =
await getConnectedPeersForProtocolAndShard(
this.components.connectionManager.getConnections(),
this.peerStore,
[this.multicodec],
pubsubTopicsToShardInfo(this.pubsubTopics)
);
const allAvailableConnectedPeers = await this.connectedPeers();

// Filter the peers based on discovery & number of peers requested
const filteredPeers = filterPeersByDiscovery(
connectedPeersForProtocolAndShard,
allAvailableConnectedPeers,
numPeers,
maxBootstrapPeers
);

// Sort the peers by latency
const sortedFilteredPeers = await sortPeersByLatency(
this.peerStore,
this.components.peerStore,
filteredPeers
);

Expand Down
7 changes: 5 additions & 2 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,11 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
() => {
log.info("Receiving pipe closed.");
},
(e) => {
log.error("Error with receiving pipe", e);
async (e) => {
log.error(
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
`Error with receiving pipe on peer:${connection.remotePeer.toString()} -- stream:${stream.id} -- protocol:${stream.protocol}: `,
e
);
}
);
} catch (e) {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class Metadata extends BaseProtocol implements IMetadata {
pubsubTopicsToShardInfo(this.pubsubTopics)
);

const peer = await this.peerStore.get(peerId);
const peer = await this.libp2pComponents.peerStore.get(peerId);
weboko marked this conversation as resolved.
Show resolved Hide resolved
if (!peer) {
return {
shardInfo: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
numPeers: BigInt(numPeers)
});

const peer = await this.peerStore.get(peerId);
const peer = await this.components.peerStore.get(peerId);
if (!peer) {
return {
peerInfos: null,
Expand Down
17 changes: 2 additions & 15 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Libp2p } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import type { Peer, PeerStore } from "@libp2p/interface";
import type { Peer } from "@libp2p/interface";

import type { CreateLibp2pOptions } from "./libp2p.js";
import type { IDecodedMessage } from "./message.js";
Expand All @@ -16,7 +16,6 @@ export enum Protocols {

export type IBaseProtocolCore = {
multicodec: string;
peerStore: PeerStore;
allPeers: () => Promise<Peer[]>;
connectedPeers: () => Promise<Peer[]>;
addLibp2pEventListener: Libp2p["addEventListener"];
Expand All @@ -25,7 +24,7 @@ export type IBaseProtocolCore = {

export type IBaseProtocolSDK = {
readonly connectedPeers: Peer[];
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>;
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer | undefined>;
readonly numPeersToUse: number;
};

Expand All @@ -36,10 +35,6 @@ export type NetworkConfig = StaticSharding | AutoSharding;
* Options for using LightPush and Filter
*/
export type ProtocolUseOptions = {
/**
weboko marked this conversation as resolved.
Show resolved Hide resolved
* Optional flag to enable auto-retry with exponential backoff
*/
autoRetry?: boolean;
/**
* Optional flag to force using all available peers
*/
Expand All @@ -48,14 +43,6 @@ export type ProtocolUseOptions = {
* Optional maximum number of attempts for exponential backoff
*/
maxAttempts?: number;
/**
* Optional initial delay in milliseconds for exponential backoff
*/
initialDelay?: number;
/**
* Optional maximum delay in milliseconds for exponential backoff
*/
maxDelay?: number;
};

export type ProtocolCreateOptions = {
Expand Down
15 changes: 8 additions & 7 deletions packages/sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
},
"dependencies": {
"@chainsafe/libp2p-noise": "^15.1.0",
"@libp2p/bootstrap": "^10.1.2",
"@libp2p/bootstrap": "^10",
"@libp2p/identify": "^2.1.2",
"@libp2p/mplex": "^10.1.2",
"@libp2p/ping": "^1.1.2",
Expand All @@ -73,23 +73,24 @@
"@waku/proto": "^0.0.8",
"@waku/utils": "0.0.20",
"@waku/message-hash": "0.1.16",
"async-mutex": "^0.5.0",
"libp2p": "^1.8.1"
},
"devDependencies": {
"@types/mocha": "^10.0.6",
"@types/chai": "^4.3.11",
"@rollup/plugin-commonjs": "^25.0.7",
"@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.2.3",
"@rollup/plugin-replace": "^5.0.5",
"@types/mocha": "^10.0.9",
"@waku/build-utils": "*",
"mocha": "^10.3.0",
"sinon": "^18.0.0",
"chai": "^4.3.10",
"chai": "^5.1.1",
"cspell": "^8.6.1",
"interface-datastore": "^8.2.10",
"mocha": "^10.7.3",
"npm-run-all": "^4.1.5",
"rollup": "^4.12.0"
"rollup": "^4.12.0",
"sinon": "^19.0.2"
},
"peerDependencies": {
"@libp2p/bootstrap": "^10"
Expand All @@ -109,4 +110,4 @@
"LICENSE",
"README.md"
]
}
}
Loading
Loading