diff --git a/packages/libp2p/src/connection-manager/dial-queue.ts b/packages/libp2p/src/connection-manager/dial-queue.ts index 2c3c8b5a34..f789bd4ee4 100644 --- a/packages/libp2p/src/connection-manager/dial-queue.ts +++ b/packages/libp2p/src/connection-manager/dial-queue.ts @@ -135,11 +135,12 @@ export class DialQueue { */ async dial (peerIdOrMultiaddr: PeerId | Multiaddr | Multiaddr[], options: OpenConnectionOptions = {}): Promise { const { peerId, multiaddrs } = getPeerAddress(peerIdOrMultiaddr) + const { force } = options // make sure we don't have an existing connection to any of the addresses we // are about to dial const existingConnection = Array.from(this.connections.values()).flat().find(conn => { - if (options.force === true) { + if (force === true) { return false } @@ -260,6 +261,30 @@ export class DialQueue { this.log.error('could not update last dial failure key for %p', peerId, err) } + const { remotePeer } = conn + + // make sure we don't have an existing connection to the address we dialed + const existingConnection = Array.from(this.connections.values()).flat().find(_conn => { + if (force === true) { + return false + } + + if (_conn.remotePeer.equals(remotePeer) && _conn !== conn) { + return true + } + + return false + }) + + // return existing, open connection to peer if equal or better limits + if (existingConnection?.status === 'open' && (existingConnection?.limits == null || conn?.limits != null)) { + this.log('already connected to %a', existingConnection.remoteAddr) + options?.onProgress?.(new CustomProgressEvent('dial-queue:already-connected')) + this.log('closing duplicate connection to %p', remotePeer) + await conn.close() + return existingConnection + } + return conn } catch (err: any) { this.log.error('dial failed to %a', address.multiaddr, err) diff --git a/packages/libp2p/test/connection-manager/dial-queue.spec.ts b/packages/libp2p/test/connection-manager/dial-queue.spec.ts index dc1d07df62..cddf7e4cb8 100644 --- a/packages/libp2p/test/connection-manager/dial-queue.spec.ts +++ b/packages/libp2p/test/connection-manager/dial-queue.spec.ts @@ -5,6 +5,7 @@ import { NotFoundError } from '@libp2p/interface' import { matchMultiaddr } from '@libp2p/interface-compliance-tests/matchers' import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/interface-compliance-tests/mocks' import { peerLogger } from '@libp2p/logger' +import { PeerMap } from '@libp2p/peer-collections' import { peerIdFromPrivateKey } from '@libp2p/peer-id' import { multiaddr, resolvers } from '@multiformats/multiaddr' import { WebRTC } from '@multiformats/multiaddr-matcher' @@ -325,4 +326,74 @@ describe('dial queue', () => { dialer = new DialQueue(components) await expect(dialer.dial(remotePeer)).to.eventually.equal(connection) }) + + it('should return existing connection when dialing a multiaddr without a peer id', async () => { + const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + const ip = multiaddr('/ip4/123.123.123.123') + const addr1 = ip.encapsulate('/tcp/123') + const addr2 = ip.encapsulate('/tcp/321') + + const existingConnection = stubInterface({ + limits: { + bytes: 100n + }, + remotePeer, + remoteAddr: addr1.encapsulate(`/p2p/${remotePeer}`), + status: 'open' + }) + + const newConnection = stubInterface({ + limits: { + bytes: 100n + }, + remotePeer, + remoteAddr: addr2.encapsulate(`/p2p/${remotePeer}`), + status: 'open' + }) + + const connections = new PeerMap() + connections.set(remotePeer, [existingConnection]) + + components.transportManager.dialTransportForMultiaddr.callsFake(ma => { + return stubInterface() + }) + components.transportManager.dial.callsFake(async (ma, opts = {}) => newConnection) + dialer = new DialQueue(components, { connections }) + + await expect(dialer.dial(addr2)).to.eventually.equal(existingConnection) + }) + + it('should return new connection when existing connection to same peer is worse', async () => { + const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + const ip = multiaddr('/ip4/123.123.123.123') + const addr1 = ip.encapsulate('/tcp/123') + const addr2 = ip.encapsulate('/tcp/321') + + const existingConnection = stubInterface({ + limits: { + bytes: 100n + }, + remotePeer, + remoteAddr: addr1.encapsulate(`/p2p/${remotePeer}`), + status: 'open' + }) + + const newConnection = stubInterface({ + limits: undefined, + remotePeer, + remoteAddr: addr2.encapsulate(`/p2p/${remotePeer}`), + status: 'open' + }) + + const connections = new PeerMap() + connections.set(remotePeer, [existingConnection]) + + components.transportManager.dialTransportForMultiaddr.callsFake(ma => { + return stubInterface() + }) + components.transportManager.dial.callsFake(async (ma, opts = {}) => newConnection) + dialer = new DialQueue(components, { connections }) + + await expect(dialer.dial(addr2)).to.eventually.equal(newConnection) + }) })