Skip to content

Commit

Permalink
fix: split error/operation metrics (#2728)
Browse files Browse the repository at this point in the history
Splitting metrics is considered best practice where there are a
small number of outcomes.

Refs: https://promlabs.com/blog/2023/09/19/errors-successes-totals-which-metrics-should-i-expose-to-prometheus/
  • Loading branch information
achingbrain authored Sep 25, 2024
1 parent 442a835 commit 0c59578
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 39 deletions.
6 changes: 4 additions & 2 deletions packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint-disable max-depth */
import { TimeoutError, DialError, setMaxListeners } from '@libp2p/interface'
import { TimeoutError, DialError, setMaxListeners, AbortError } from '@libp2p/interface'
import { PeerMap } from '@libp2p/peer-collections'
import { defaultAddressSort } from '@libp2p/utils/address-sort'
import { PriorityQueue, type PriorityQueueJobOptions } from '@libp2p/utils/priority-queue'
Expand Down Expand Up @@ -103,7 +103,9 @@ export class DialQueue {
})
// a started job errored
this.queue.addEventListener('error', (event) => {
this.log.error('error in dial queue', event.detail)
if (event.detail.name !== AbortError.name) {
this.log.error('error in dial queue - %e', event.detail)
}
})
}

Expand Down
23 changes: 22 additions & 1 deletion packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { InvalidParametersError, NotStartedError, start, stop } from '@libp2p/interface'
import { InvalidMultiaddrError, InvalidParametersError, InvalidPeerIdError, NotStartedError, start, stop } from '@libp2p/interface'
import { PeerMap } from '@libp2p/peer-collections'
import { defaultAddressSort } from '@libp2p/utils/address-sort'
import { RateLimiter } from '@libp2p/utils/rate-limiter'
Expand Down Expand Up @@ -191,6 +191,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
private readonly metrics?: Metrics
private readonly events: TypedEventTarget<Libp2pEvents>
private readonly log: Logger
private readonly peerId: PeerId

constructor (components: DefaultConnectionManagerComponents, init: ConnectionManagerInit = {}) {
this.maxConnections = init.maxConnections ?? defaultOptions.maxConnections
Expand All @@ -205,6 +206,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
this.connections = new PeerMap()

this.started = false
this.peerId = components.peerId
this.peerStore = components.peerStore
this.metrics = components.metrics
this.events = components.events
Expand Down Expand Up @@ -484,6 +486,10 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {

const { peerId } = getPeerAddress(peerIdOrMultiaddr)

if (this.peerId.equals(peerId)) {
throw new InvalidPeerIdError('Can not dial self')
}

if (peerId != null && options.force !== true) {
this.log('dial %p', peerId)
const existingConnection = this.getConnections(peerId)
Expand All @@ -501,6 +507,13 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
...options,
priority: options.priority ?? DEFAULT_DIAL_PRIORITY
})

if (connection.remotePeer.equals(this.peerId)) {
const err = new InvalidPeerIdError('Can not dial self')
connection.abort(err)
throw err
}

let peerConnections = this.connections.get(connection.remotePeer)

if (peerConnections == null) {
Expand All @@ -517,6 +530,14 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
if (conn.id === connection.id) {
trackedConnection = true
}

// make sure we don't already have a connection to this multiaddr
if (options.force !== true && conn.id !== connection.id && conn.remoteAddr.equals(connection.remoteAddr)) {
connection.abort(new InvalidMultiaddrError('Duplicate multiaddr connection'))

// return the existing connection
return conn
}
}

if (!trackedConnection) {
Expand Down
86 changes: 56 additions & 30 deletions packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { createConnection } from './connection/index.js'
import { PROTOCOL_NEGOTIATION_TIMEOUT, UPGRADE_TIMEOUT } from './connection-manager/constants.js'
import { ConnectionDeniedError, ConnectionInterceptedError, EncryptionFailedError, MuxerUnavailableError } from './errors.js'
import { DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS } from './registrar.js'
import type { Libp2pEvents, AbortOptions, ComponentLogger, MultiaddrConnection, Connection, Stream, ConnectionProtector, NewStreamOptions, ConnectionEncrypter, SecuredConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, PeerStore, StreamMuxer, StreamMuxerFactory, Upgrader, UpgraderOptions, ConnectionLimits, SecureConnectionOptions } from '@libp2p/interface'
import type { Libp2pEvents, AbortOptions, ComponentLogger, MultiaddrConnection, Connection, Stream, ConnectionProtector, NewStreamOptions, ConnectionEncrypter, SecuredConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, PeerStore, StreamMuxer, StreamMuxerFactory, Upgrader, UpgraderOptions, ConnectionLimits, SecureConnectionOptions, CounterGroup } from '@libp2p/interface'
import type { ConnectionManager, Registrar } from '@libp2p/interface-internal'

interface CreateConnectionOptions {
Expand Down Expand Up @@ -130,6 +130,10 @@ export class DefaultUpgrader implements Upgrader {
private readonly inboundStreamProtocolNegotiationTimeout: number
private readonly outboundStreamProtocolNegotiationTimeout: number
private readonly events: TypedEventTarget<Libp2pEvents>
private readonly metrics: {
dials?: CounterGroup<'inbound' | 'outbound'>
errors?: CounterGroup<'inbound' | 'outbound'>
}

constructor (components: DefaultUpgraderComponents, init: UpgraderInit) {
this.components = components
Expand All @@ -150,6 +154,10 @@ export class DefaultUpgrader implements Upgrader {
this.inboundStreamProtocolNegotiationTimeout = init.inboundStreamProtocolNegotiationTimeout ?? PROTOCOL_NEGOTIATION_TIMEOUT
this.outboundStreamProtocolNegotiationTimeout = init.outboundStreamProtocolNegotiationTimeout ?? PROTOCOL_NEGOTIATION_TIMEOUT
this.events = components.events
this.metrics = {
dials: components.metrics?.registerCounterGroup('libp2p_connection_manager_dials_total'),
errors: components.metrics?.registerCounterGroup('libp2p_connection_manager_dial_errors_total')
}
}

readonly [Symbol.toStringTag] = '@libp2p/upgrader'
Expand All @@ -175,6 +183,10 @@ export class DefaultUpgrader implements Upgrader {
*/
async upgradeInbound (maConn: MultiaddrConnection, opts: UpgraderOptions = {}): Promise<Connection> {
try {
this.metrics.dials?.increment({
inbound: true
})

const accept = await this.components.connectionManager.acceptIncomingConnection(maConn)

if (!accept) {
Expand All @@ -183,7 +195,15 @@ export class DefaultUpgrader implements Upgrader {

await this.shouldBlockConnection('denyInboundConnection', maConn)

return await this._performUpgrade(maConn, 'inbound', opts)
const conn = await this._performUpgrade(maConn, 'inbound', opts)

return conn
} catch (err) {
this.metrics.errors?.increment({
inbound: true
})

throw err
} finally {
this.components.connectionManager.afterUpgradeInbound()
}
Expand All @@ -193,15 +213,27 @@ export class DefaultUpgrader implements Upgrader {
* Upgrades an outbound connection
*/
async upgradeOutbound (maConn: MultiaddrConnection, opts: UpgraderOptions = {}): Promise<Connection> {
const idStr = maConn.remoteAddr.getPeerId()
let remotePeerId: PeerId | undefined
try {
this.metrics.dials?.increment({
outbound: true
})

if (idStr != null) {
remotePeerId = peerIdFromString(idStr)
await this.shouldBlockConnection('denyOutboundConnection', remotePeerId, maConn)
}
const idStr = maConn.remoteAddr.getPeerId()
let remotePeerId: PeerId | undefined

return this._performUpgrade(maConn, 'outbound', opts)
if (idStr != null) {
remotePeerId = peerIdFromString(idStr)
await this.shouldBlockConnection('denyOutboundConnection', remotePeerId, maConn)
}

return await this._performUpgrade(maConn, 'outbound', opts)
} catch (err) {
this.metrics.errors?.increment({
outbound: true
})

throw err
}
}

private async _performUpgrade (maConn: MultiaddrConnection, direction: 'inbound' | 'outbound', opts: UpgraderOptions): Promise<Connection> {
Expand All @@ -218,7 +250,7 @@ export class DefaultUpgrader implements Upgrader {

this.components.metrics?.trackMultiaddrConnection(maConn)

maConn.log('starting the %s connection upgrade', direction)
maConn.log.trace('starting the %s connection upgrade', direction)

// Protect
let protectedConn = maConn
Expand Down Expand Up @@ -292,13 +324,13 @@ export class DefaultUpgrader implements Upgrader {
upgradedConn = multiplexed.stream
}
} catch (err: any) {
maConn.log.error('failed to upgrade inbound connection', err)
maConn.log.error('failed to upgrade inbound connection %s %a - %e', direction === 'inbound' ? 'from' : 'to', maConn.remoteAddr, err)
throw err
}

await this.shouldBlockConnection(direction === 'inbound' ? 'denyInboundUpgradedConnection' : 'denyOutboundUpgradedConnection', remotePeer, maConn)

maConn.log('successfully %s inbound connection', direction)
maConn.log('successfully upgraded %s connection', direction)

return this._createConnection({
cryptoProtocol,
Expand Down Expand Up @@ -399,7 +431,7 @@ export class DefaultUpgrader implements Upgrader {
this._onStream({ connection, stream: muxedStream, protocol })
})
.catch(async err => {
connection.log.error('error handling incoming stream id %s', muxedStream.id, err.message, err.code, err.stack)
connection.log.error('error handling incoming stream id %s - %e', muxedStream.id, err)

if (muxedStream.timeline.close == null) {
await muxedStream.close()
Expand All @@ -413,7 +445,7 @@ export class DefaultUpgrader implements Upgrader {
throw new MuxerUnavailableError('Connection is not multiplexed')
}

connection.log('starting new stream for protocols %s', protocols)
connection.log.trace('starting new stream for protocols %s', protocols)
const muxedStream = await muxer.newStream()
connection.log.trace('started new stream %s for protocols %s', muxedStream.id, protocols)

Expand Down Expand Up @@ -441,7 +473,7 @@ export class DefaultUpgrader implements Upgrader {
yieldBytes: true
})

muxedStream.log('selected protocol %s', protocol)
muxedStream.log.trace('selected protocol %s', protocol)

const outgoingLimit = findOutgoingStreamLimit(protocol, this.components.registrar, options)
const streamCount = countStreams(protocol, 'outbound', connection)
Expand Down Expand Up @@ -484,7 +516,7 @@ export class DefaultUpgrader implements Upgrader {

return muxedStream
} catch (err: any) {
connection.log.error('could not create new stream for protocols %s', protocols, err)
connection.log.error('could not create new outbound stream on connection %s %a for protocols %s - %e', direction === 'inbound' ? 'from' : 'to', opts.maConn.remoteAddr, protocols, err)

if (muxedStream.timeline.close == null) {
muxedStream.abort(err)
Expand All @@ -499,7 +531,7 @@ export class DefaultUpgrader implements Upgrader {
muxer.sink(upgradedConn.source),
upgradedConn.sink(muxer.source)
]).catch(err => {
connection.log.error('error piping data through muxer', err)
connection.log.error('error piping data through muxer - %e', err)
})
}

Expand Down Expand Up @@ -594,7 +626,6 @@ export class DefaultUpgrader implements Upgrader {
*/
async _encryptInbound (connection: MultiaddrConnection, options?: AbortOptions): Promise<CryptoResult> {
const protocols = Array.from(this.connectionEncrypters.keys())
connection.log('handling inbound crypto protocol selection', protocols)

try {
const { stream, protocol } = await mss.handle(connection, protocols, {
Expand All @@ -604,17 +635,17 @@ export class DefaultUpgrader implements Upgrader {
const encrypter = this.connectionEncrypters.get(protocol)

if (encrypter == null) {
throw new Error(`no crypto module found for ${protocol}`)
throw new EncryptionFailedError(`no crypto module found for ${protocol}`)
}

connection.log('encrypting inbound connection using', protocol)
connection.log('encrypting inbound connection to %a using %s', connection.remoteAddr, protocol)

return {
...await encrypter.secureInbound(stream, options),
protocol
}
} catch (err: any) {
connection.log.error('encrypting inbound connection failed', err)
connection.log.error('encrypting inbound connection from %a failed', connection.remoteAddr, err)
throw new EncryptionFailedError(err.message)
}
}
Expand All @@ -625,34 +656,29 @@ export class DefaultUpgrader implements Upgrader {
*/
async _encryptOutbound (connection: MultiaddrConnection, options: SecureConnectionOptions): Promise<CryptoResult> {
const protocols = Array.from(this.connectionEncrypters.keys())
connection.log('selecting outbound crypto protocol', protocols)

try {
connection.log.trace('selecting encrypter from %s', protocols)

const {
stream,
protocol
} = await mss.select(connection, protocols, {
const { stream, protocol } = await mss.select(connection, protocols, {
...options,
log: connection.log,
yieldBytes: true
})

const encrypter = this.connectionEncrypters.get(protocol)

if (encrypter == null) {
throw new Error(`no crypto module found for ${protocol}`)
throw new EncryptionFailedError(`no crypto module found for ${protocol}`)
}

connection.log('encrypting outbound connection to %p using %s', options?.remotePeer, encrypter)
connection.log('encrypting outbound connection to %a using %s', connection.remoteAddr, protocol)

return {
...await encrypter.secureOutbound(stream, options),
protocol
}
} catch (err: any) {
connection.log.error('encrypting outbound connection to %p failed', options?.remotePeer, err)
connection.log.error('encrypting outbound connection to %a failed', connection.remoteAddr, err)
throw new EncryptionFailedError(err.message)
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p/test/connection-manager/direct.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => {

await expect(libp2p.dial(multiaddr(`/ip4/127.0.0.1/tcp/1234/ws/p2p/${libp2p.peerId.toString()}`)))
.to.eventually.be.rejected()
.and.to.have.property('name', 'DialError')
.and.to.have.property('name', 'InvalidPeerIdError')
})

it('should limit the maximum dial queue size', async () => {
Expand Down
21 changes: 16 additions & 5 deletions packages/libp2p/test/connection-manager/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,9 @@ describe('Connection Manager', () => {
})
await connectionManager.start()

sinon.stub(connectionManager.dialQueue, 'dial').resolves(stubInterface<Connection>())
sinon.stub(connectionManager.dialQueue, 'dial').resolves(stubInterface<Connection>({
remotePeer: peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
}))

// max out the connection limit
await connectionManager.openConnection(peerIdFromPrivateKey(await generateKeyPair('Ed25519')))
Expand Down Expand Up @@ -450,7 +452,9 @@ describe('Connection Manager', () => {
})
await connectionManager.start()

sinon.stub(connectionManager.dialQueue, 'dial').resolves(stubInterface<Connection>())
sinon.stub(connectionManager.dialQueue, 'dial').resolves(stubInterface<Connection>({
remotePeer: peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
}))

// max out the connection limit
await connectionManager.openConnection(peerIdFromPrivateKey(await generateKeyPair('Ed25519')))
Expand All @@ -477,7 +481,9 @@ describe('Connection Manager', () => {
})
await connectionManager.start()

sinon.stub(connectionManager.dialQueue, 'dial').resolves(stubInterface<Connection>())
sinon.stub(connectionManager.dialQueue, 'dial').resolves(stubInterface<Connection>({
remotePeer: peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
}))

// start the upgrade
const maConn1 = mockMultiaddrConnection({
Expand Down Expand Up @@ -523,9 +529,14 @@ describe('Connection Manager', () => {
const existingConnection = stubInterface<Connection>({
limits: {
bytes: 100n
}
},
remotePeer: targetPeer,
remoteAddr: multiaddr(`/ip4/123.123.123.123/tcp/123/p2p-circuit/p2p/${targetPeer}`)
})
const newConnection = stubInterface<Connection>({
remotePeer: targetPeer,
remoteAddr: addr
})
const newConnection = stubInterface<Connection>()

sinon.stub(connectionManager.dialQueue, 'dial')
.withArgs(addr)
Expand Down

0 comments on commit 0c59578

Please sign in to comment.