diff --git a/packages/libp2p/src/connection-manager/dial-queue.ts b/packages/libp2p/src/connection-manager/dial-queue.ts index d88b04d254..34de960e0c 100644 --- a/packages/libp2p/src/connection-manager/dial-queue.ts +++ b/packages/libp2p/src/connection-manager/dial-queue.ts @@ -1,5 +1,5 @@ /* eslint-disable max-depth */ -import { TimeoutError, DialError, setMaxListeners } from '@libp2p/interface' +import { TimeoutError, DialError, setMaxListeners, AbortError } from '@libp2p/interface' import { PeerMap } from '@libp2p/peer-collections' import { defaultAddressSort } from '@libp2p/utils/address-sort' import { PriorityQueue, type PriorityQueueJobOptions } from '@libp2p/utils/priority-queue' @@ -103,7 +103,9 @@ export class DialQueue { }) // a started job errored this.queue.addEventListener('error', (event) => { - this.log.error('error in dial queue', event.detail) + if (event.detail.name !== AbortError.name) { + this.log.error('error in dial queue - %e', event.detail) + } }) } diff --git a/packages/libp2p/src/connection-manager/index.ts b/packages/libp2p/src/connection-manager/index.ts index 8889113d01..607f0c23f5 100644 --- a/packages/libp2p/src/connection-manager/index.ts +++ b/packages/libp2p/src/connection-manager/index.ts @@ -1,4 +1,4 @@ -import { InvalidParametersError, NotStartedError, start, stop } from '@libp2p/interface' +import { InvalidMultiaddrError, InvalidParametersError, InvalidPeerIdError, NotStartedError, start, stop } from '@libp2p/interface' import { PeerMap } from '@libp2p/peer-collections' import { defaultAddressSort } from '@libp2p/utils/address-sort' import { RateLimiter } from '@libp2p/utils/rate-limiter' @@ -191,6 +191,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { private readonly metrics?: Metrics private readonly events: TypedEventTarget private readonly log: Logger + private readonly peerId: PeerId constructor (components: DefaultConnectionManagerComponents, init: ConnectionManagerInit = {}) { this.maxConnections = init.maxConnections ?? defaultOptions.maxConnections @@ -205,6 +206,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { this.connections = new PeerMap() this.started = false + this.peerId = components.peerId this.peerStore = components.peerStore this.metrics = components.metrics this.events = components.events @@ -484,6 +486,10 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { const { peerId } = getPeerAddress(peerIdOrMultiaddr) + if (this.peerId.equals(peerId)) { + throw new InvalidPeerIdError('Can not dial self') + } + if (peerId != null && options.force !== true) { this.log('dial %p', peerId) const existingConnection = this.getConnections(peerId) @@ -501,6 +507,13 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { ...options, priority: options.priority ?? DEFAULT_DIAL_PRIORITY }) + + if (connection.remotePeer.equals(this.peerId)) { + const err = new InvalidPeerIdError('Can not dial self') + connection.abort(err) + throw err + } + let peerConnections = this.connections.get(connection.remotePeer) if (peerConnections == null) { @@ -517,6 +530,14 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { if (conn.id === connection.id) { trackedConnection = true } + + // make sure we don't already have a connection to this multiaddr + if (options.force !== true && conn.id !== connection.id && conn.remoteAddr.equals(connection.remoteAddr)) { + connection.abort(new InvalidMultiaddrError('Duplicate multiaddr connection')) + + // return the existing connection + return conn + } } if (!trackedConnection) { diff --git a/packages/libp2p/src/upgrader.ts b/packages/libp2p/src/upgrader.ts index 92baa1e773..349140b5ea 100644 --- a/packages/libp2p/src/upgrader.ts +++ b/packages/libp2p/src/upgrader.ts @@ -7,7 +7,7 @@ import { createConnection } from './connection/index.js' import { PROTOCOL_NEGOTIATION_TIMEOUT, UPGRADE_TIMEOUT } from './connection-manager/constants.js' import { ConnectionDeniedError, ConnectionInterceptedError, EncryptionFailedError, MuxerUnavailableError } from './errors.js' import { DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS } from './registrar.js' -import type { Libp2pEvents, AbortOptions, ComponentLogger, MultiaddrConnection, Connection, Stream, ConnectionProtector, NewStreamOptions, ConnectionEncrypter, SecuredConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, PeerStore, StreamMuxer, StreamMuxerFactory, Upgrader, UpgraderOptions, ConnectionLimits, SecureConnectionOptions } from '@libp2p/interface' +import type { Libp2pEvents, AbortOptions, ComponentLogger, MultiaddrConnection, Connection, Stream, ConnectionProtector, NewStreamOptions, ConnectionEncrypter, SecuredConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, PeerStore, StreamMuxer, StreamMuxerFactory, Upgrader, UpgraderOptions, ConnectionLimits, SecureConnectionOptions, CounterGroup } from '@libp2p/interface' import type { ConnectionManager, Registrar } from '@libp2p/interface-internal' interface CreateConnectionOptions { @@ -130,6 +130,10 @@ export class DefaultUpgrader implements Upgrader { private readonly inboundStreamProtocolNegotiationTimeout: number private readonly outboundStreamProtocolNegotiationTimeout: number private readonly events: TypedEventTarget + private readonly metrics: { + dials?: CounterGroup<'inbound' | 'outbound'> + errors?: CounterGroup<'inbound' | 'outbound'> + } constructor (components: DefaultUpgraderComponents, init: UpgraderInit) { this.components = components @@ -150,6 +154,10 @@ export class DefaultUpgrader implements Upgrader { this.inboundStreamProtocolNegotiationTimeout = init.inboundStreamProtocolNegotiationTimeout ?? PROTOCOL_NEGOTIATION_TIMEOUT this.outboundStreamProtocolNegotiationTimeout = init.outboundStreamProtocolNegotiationTimeout ?? PROTOCOL_NEGOTIATION_TIMEOUT this.events = components.events + this.metrics = { + dials: components.metrics?.registerCounterGroup('libp2p_connection_manager_dials_total'), + errors: components.metrics?.registerCounterGroup('libp2p_connection_manager_dial_errors_total') + } } readonly [Symbol.toStringTag] = '@libp2p/upgrader' @@ -175,6 +183,10 @@ export class DefaultUpgrader implements Upgrader { */ async upgradeInbound (maConn: MultiaddrConnection, opts: UpgraderOptions = {}): Promise { try { + this.metrics.dials?.increment({ + inbound: true + }) + const accept = await this.components.connectionManager.acceptIncomingConnection(maConn) if (!accept) { @@ -183,7 +195,15 @@ export class DefaultUpgrader implements Upgrader { await this.shouldBlockConnection('denyInboundConnection', maConn) - return await this._performUpgrade(maConn, 'inbound', opts) + const conn = await this._performUpgrade(maConn, 'inbound', opts) + + return conn + } catch (err) { + this.metrics.errors?.increment({ + inbound: true + }) + + throw err } finally { this.components.connectionManager.afterUpgradeInbound() } @@ -193,15 +213,27 @@ export class DefaultUpgrader implements Upgrader { * Upgrades an outbound connection */ async upgradeOutbound (maConn: MultiaddrConnection, opts: UpgraderOptions = {}): Promise { - const idStr = maConn.remoteAddr.getPeerId() - let remotePeerId: PeerId | undefined + try { + this.metrics.dials?.increment({ + outbound: true + }) - if (idStr != null) { - remotePeerId = peerIdFromString(idStr) - await this.shouldBlockConnection('denyOutboundConnection', remotePeerId, maConn) - } + const idStr = maConn.remoteAddr.getPeerId() + let remotePeerId: PeerId | undefined - return this._performUpgrade(maConn, 'outbound', opts) + if (idStr != null) { + remotePeerId = peerIdFromString(idStr) + await this.shouldBlockConnection('denyOutboundConnection', remotePeerId, maConn) + } + + return await this._performUpgrade(maConn, 'outbound', opts) + } catch (err) { + this.metrics.errors?.increment({ + outbound: true + }) + + throw err + } } private async _performUpgrade (maConn: MultiaddrConnection, direction: 'inbound' | 'outbound', opts: UpgraderOptions): Promise { @@ -218,7 +250,7 @@ export class DefaultUpgrader implements Upgrader { this.components.metrics?.trackMultiaddrConnection(maConn) - maConn.log('starting the %s connection upgrade', direction) + maConn.log.trace('starting the %s connection upgrade', direction) // Protect let protectedConn = maConn @@ -292,13 +324,13 @@ export class DefaultUpgrader implements Upgrader { upgradedConn = multiplexed.stream } } catch (err: any) { - maConn.log.error('failed to upgrade inbound connection', err) + maConn.log.error('failed to upgrade inbound connection %s %a - %e', direction === 'inbound' ? 'from' : 'to', maConn.remoteAddr, err) throw err } await this.shouldBlockConnection(direction === 'inbound' ? 'denyInboundUpgradedConnection' : 'denyOutboundUpgradedConnection', remotePeer, maConn) - maConn.log('successfully %s inbound connection', direction) + maConn.log('successfully upgraded %s connection', direction) return this._createConnection({ cryptoProtocol, @@ -399,7 +431,7 @@ export class DefaultUpgrader implements Upgrader { this._onStream({ connection, stream: muxedStream, protocol }) }) .catch(async err => { - connection.log.error('error handling incoming stream id %s', muxedStream.id, err.message, err.code, err.stack) + connection.log.error('error handling incoming stream id %s - %e', muxedStream.id, err) if (muxedStream.timeline.close == null) { await muxedStream.close() @@ -413,7 +445,7 @@ export class DefaultUpgrader implements Upgrader { throw new MuxerUnavailableError('Connection is not multiplexed') } - connection.log('starting new stream for protocols %s', protocols) + connection.log.trace('starting new stream for protocols %s', protocols) const muxedStream = await muxer.newStream() connection.log.trace('started new stream %s for protocols %s', muxedStream.id, protocols) @@ -441,7 +473,7 @@ export class DefaultUpgrader implements Upgrader { yieldBytes: true }) - muxedStream.log('selected protocol %s', protocol) + muxedStream.log.trace('selected protocol %s', protocol) const outgoingLimit = findOutgoingStreamLimit(protocol, this.components.registrar, options) const streamCount = countStreams(protocol, 'outbound', connection) @@ -484,7 +516,7 @@ export class DefaultUpgrader implements Upgrader { return muxedStream } catch (err: any) { - connection.log.error('could not create new stream for protocols %s', protocols, err) + connection.log.error('could not create new outbound stream on connection %s %a for protocols %s - %e', direction === 'inbound' ? 'from' : 'to', opts.maConn.remoteAddr, protocols, err) if (muxedStream.timeline.close == null) { muxedStream.abort(err) @@ -499,7 +531,7 @@ export class DefaultUpgrader implements Upgrader { muxer.sink(upgradedConn.source), upgradedConn.sink(muxer.source) ]).catch(err => { - connection.log.error('error piping data through muxer', err) + connection.log.error('error piping data through muxer - %e', err) }) } @@ -594,7 +626,6 @@ export class DefaultUpgrader implements Upgrader { */ async _encryptInbound (connection: MultiaddrConnection, options?: AbortOptions): Promise { const protocols = Array.from(this.connectionEncrypters.keys()) - connection.log('handling inbound crypto protocol selection', protocols) try { const { stream, protocol } = await mss.handle(connection, protocols, { @@ -604,17 +635,17 @@ export class DefaultUpgrader implements Upgrader { const encrypter = this.connectionEncrypters.get(protocol) if (encrypter == null) { - throw new Error(`no crypto module found for ${protocol}`) + throw new EncryptionFailedError(`no crypto module found for ${protocol}`) } - connection.log('encrypting inbound connection using', protocol) + connection.log('encrypting inbound connection to %a using %s', connection.remoteAddr, protocol) return { ...await encrypter.secureInbound(stream, options), protocol } } catch (err: any) { - connection.log.error('encrypting inbound connection failed', err) + connection.log.error('encrypting inbound connection from %a failed', connection.remoteAddr, err) throw new EncryptionFailedError(err.message) } } @@ -625,34 +656,29 @@ export class DefaultUpgrader implements Upgrader { */ async _encryptOutbound (connection: MultiaddrConnection, options: SecureConnectionOptions): Promise { const protocols = Array.from(this.connectionEncrypters.keys()) - connection.log('selecting outbound crypto protocol', protocols) try { connection.log.trace('selecting encrypter from %s', protocols) - const { - stream, - protocol - } = await mss.select(connection, protocols, { + const { stream, protocol } = await mss.select(connection, protocols, { ...options, log: connection.log, yieldBytes: true }) - const encrypter = this.connectionEncrypters.get(protocol) if (encrypter == null) { - throw new Error(`no crypto module found for ${protocol}`) + throw new EncryptionFailedError(`no crypto module found for ${protocol}`) } - connection.log('encrypting outbound connection to %p using %s', options?.remotePeer, encrypter) + connection.log('encrypting outbound connection to %a using %s', connection.remoteAddr, protocol) return { ...await encrypter.secureOutbound(stream, options), protocol } } catch (err: any) { - connection.log.error('encrypting outbound connection to %p failed', options?.remotePeer, err) + connection.log.error('encrypting outbound connection to %a failed', connection.remoteAddr, err) throw new EncryptionFailedError(err.message) } } diff --git a/packages/libp2p/test/connection-manager/direct.spec.ts b/packages/libp2p/test/connection-manager/direct.spec.ts index 7bd3958bd1..508489efb2 100644 --- a/packages/libp2p/test/connection-manager/direct.spec.ts +++ b/packages/libp2p/test/connection-manager/direct.spec.ts @@ -490,7 +490,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { await expect(libp2p.dial(multiaddr(`/ip4/127.0.0.1/tcp/1234/ws/p2p/${libp2p.peerId.toString()}`))) .to.eventually.be.rejected() - .and.to.have.property('name', 'DialError') + .and.to.have.property('name', 'InvalidPeerIdError') }) it('should limit the maximum dial queue size', async () => { diff --git a/packages/libp2p/test/connection-manager/index.spec.ts b/packages/libp2p/test/connection-manager/index.spec.ts index cac55058e0..6a500e264e 100644 --- a/packages/libp2p/test/connection-manager/index.spec.ts +++ b/packages/libp2p/test/connection-manager/index.spec.ts @@ -391,7 +391,9 @@ describe('Connection Manager', () => { }) await connectionManager.start() - sinon.stub(connectionManager.dialQueue, 'dial').resolves(stubInterface()) + sinon.stub(connectionManager.dialQueue, 'dial').resolves(stubInterface({ + remotePeer: peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + })) // max out the connection limit await connectionManager.openConnection(peerIdFromPrivateKey(await generateKeyPair('Ed25519'))) @@ -450,7 +452,9 @@ describe('Connection Manager', () => { }) await connectionManager.start() - sinon.stub(connectionManager.dialQueue, 'dial').resolves(stubInterface()) + sinon.stub(connectionManager.dialQueue, 'dial').resolves(stubInterface({ + remotePeer: peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + })) // max out the connection limit await connectionManager.openConnection(peerIdFromPrivateKey(await generateKeyPair('Ed25519'))) @@ -477,7 +481,9 @@ describe('Connection Manager', () => { }) await connectionManager.start() - sinon.stub(connectionManager.dialQueue, 'dial').resolves(stubInterface()) + sinon.stub(connectionManager.dialQueue, 'dial').resolves(stubInterface({ + remotePeer: peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + })) // start the upgrade const maConn1 = mockMultiaddrConnection({ @@ -523,9 +529,14 @@ describe('Connection Manager', () => { const existingConnection = stubInterface({ limits: { bytes: 100n - } + }, + remotePeer: targetPeer, + remoteAddr: multiaddr(`/ip4/123.123.123.123/tcp/123/p2p-circuit/p2p/${targetPeer}`) + }) + const newConnection = stubInterface({ + remotePeer: targetPeer, + remoteAddr: addr }) - const newConnection = stubInterface() sinon.stub(connectionManager.dialQueue, 'dial') .withArgs(addr)