Skip to content

Commit

Permalink
fix: check for connection status before storing
Browse files Browse the repository at this point in the history
It's possible that the remote can close the connection very shortly
after it is opened, so check the connection status before adding it
to the list of connections.

Fixes a memory leak whereby the connection is already closed and
then is never removed from the connections list.
  • Loading branch information
achingbrain committed Sep 27, 2024
1 parent 82bd42b commit 7946719
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 84 deletions.
24 changes: 16 additions & 8 deletions packages/libp2p/src/connection-manager/connection-pruner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,29 @@ export class ConnectionPruner {
this.peerStore = components.peerStore
this.events = components.events
this.log = components.logger.forComponent('libp2p:connection-manager:connection-pruner')
this.maybePruneConnections = this.maybePruneConnections.bind(this)
}

// check the max connection limit whenever a peer connects
components.events.addEventListener('connection:open', () => {
this.maybePruneConnections()
.catch(err => {
this.log.error(err)
})
})
start (): void {
this.events.addEventListener('connection:open', this.maybePruneConnections)
}

stop (): void {
this.events.removeEventListener('connection:open', this.maybePruneConnections)
}

maybePruneConnections (): void {
this._maybePruneConnections()
.catch(err => {
this.log.error('error while pruning connections %e', err)

Check warning on line 57 in packages/libp2p/src/connection-manager/connection-pruner.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/connection-manager/connection-pruner.ts#L57

Added line #L57 was not covered by tests
})
}

/**
* If we have more connections than our maximum, select some excess connections
* to prune based on peer value
*/
async maybePruneConnections (): Promise<void> {
private async _maybePruneConnections (): Promise<void> {
const connections = this.connectionManager.getConnections()
const numConnections = connections.length

Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ export class DialQueue {
})
})

if (existingConnection != null) {
if (existingConnection?.status === 'open') {
this.log('already connected to %a', existingConnection.remoteAddr)
options.onProgress?.(new CustomProgressEvent('dial-queue:already-connected'))
return existingConnection
Expand Down
75 changes: 37 additions & 38 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { InvalidMultiaddrError, InvalidParametersError, InvalidPeerIdError, NotStartedError, start, stop } from '@libp2p/interface'
import { ConnectionClosedError, InvalidMultiaddrError, InvalidParametersError, InvalidPeerIdError, NotStartedError, start, stop } from '@libp2p/interface'
import { PeerMap } from '@libp2p/peer-collections'
import { defaultAddressSort } from '@libp2p/utils/address-sort'
import { RateLimiter } from '@libp2p/utils/rate-limiter'
Expand Down Expand Up @@ -214,8 +214,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {

this.onConnect = this.onConnect.bind(this)
this.onDisconnect = this.onDisconnect.bind(this)
this.events.addEventListener('connection:open', this.onConnect)
this.events.addEventListener('connection:close', this.onDisconnect)

// allow/deny lists
this.allow = (init.allow ?? []).map(ma => multiaddr(ma))
Expand Down Expand Up @@ -268,10 +266,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {

readonly [Symbol.toStringTag] = '@libp2p/connection-manager'

isStarted (): boolean {
return this.started
}

/**
* Starts the Connection Manager. If Metrics are not enabled on libp2p
* only event loop and connection limits will be monitored.
Expand All @@ -288,11 +282,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {

for (const conns of this.connections.values()) {
for (const conn of conns) {
if (conn.direction === 'inbound') {
metric.inbound++
} else {
metric.outbound++
}
metric[conn.direction]++

Check warning on line 285 in packages/libp2p/src/connection-manager/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/connection-manager/index.ts#L285

Added line #L285 was not covered by tests
}
}

Expand Down Expand Up @@ -356,9 +346,13 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
}
})

this.events.addEventListener('connection:open', this.onConnect)
this.events.addEventListener('connection:close', this.onDisconnect)

await start(
this.dialQueue,
this.reconnectQueue
this.reconnectQueue,
this.connectionPruner
)

this.started = true
Expand All @@ -369,9 +363,13 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
* Stops the Connection Manager
*/
async stop (): Promise<void> {
this.events.removeEventListener('connection:open', this.onConnect)
this.events.removeEventListener('connection:close', this.onDisconnect)

await stop(
this.reconnectQueue,
this.dialQueue
this.dialQueue,
this.connectionPruner
)

// Close all connections we're tracking
Expand Down Expand Up @@ -413,17 +411,19 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
return
}

const peerId = connection.remotePeer
const storedConns = this.connections.get(peerId)
let isNewPeer = false

if (storedConns != null) {
storedConns.push(connection)
} else {
isNewPeer = true
this.connections.set(peerId, [connection])
if (connection.status !== 'open') {
// this can happen when the remote closes the connection immediately after
// opening
return

Check warning on line 417 in packages/libp2p/src/connection-manager/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/connection-manager/index.ts#L415-L417

Added lines #L415 - L417 were not covered by tests
}

const peerId = connection.remotePeer
const isNewPeer = !this.connections.has(peerId)
const storedConns = this.connections.get(peerId) ?? []
storedConns.push(connection)

this.connections.set(peerId, storedConns)

// only need to store RSA public keys, all other types are embedded in the peer id
if (peerId.publicKey != null && peerId.type === 'RSA') {
await this.peerStore.patch(peerId, {
Expand All @@ -441,20 +441,21 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
*/
onDisconnect (evt: CustomEvent<Connection>): void {
const { detail: connection } = evt
const peerId = connection.remotePeer
const peerConns = this.connections.get(peerId) ?? []

if (!this.started) {
// This can happen when we are in the process of shutting down the node
return
}
// remove closed connection
const filteredPeerConns = peerConns.filter(conn => conn.id !== connection.id)

const peerId = connection.remotePeer
let storedConn = this.connections.get(peerId)
// update peer connections
this.connections.set(peerId, filteredPeerConns)

if (storedConn != null && storedConn.length > 1) {
storedConn = storedConn.filter((conn) => conn.id !== connection.id)
this.connections.set(peerId, storedConn)
} else if (storedConn != null) {
if (filteredPeerConns.length === 0) {
// trigger disconnect event if no connections remain
this.log('onDisconnect remove all connections for peer %p', peerId)
this.connections.delete(peerId)

// broadcast disconnect event
this.events.safeDispatchEvent('peer:disconnect', { detail: connection.remotePeer })
}
}
Expand All @@ -478,7 +479,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
}

async openConnection (peerIdOrMultiaddr: PeerId | Multiaddr | Multiaddr[], options: OpenConnectionOptions = {}): Promise<Connection> {
if (!this.isStarted()) {
if (!this.started) {
throw new NotStartedError('Not started')
}

Expand Down Expand Up @@ -508,10 +509,8 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
priority: options.priority ?? DEFAULT_DIAL_PRIORITY
})

if (connection.remotePeer.equals(this.peerId)) {
const err = new InvalidPeerIdError('Can not dial self')
connection.abort(err)
throw err
if (connection.status !== 'open') {
throw new ConnectionClosedError('Remote closed connection during opening')

Check warning on line 513 in packages/libp2p/src/connection-manager/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/connection-manager/index.ts#L513

Added line #L513 was not covered by tests
}

let peerConnections = this.connections.get(connection.remotePeer)
Expand Down
16 changes: 5 additions & 11 deletions packages/libp2p/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,6 @@ export class ConnectionImpl implements Connection {
}

try {
this.log.trace('closing all streams')

// close all streams gracefully - this can throw if we're not multiplexed
await Promise.all(
this.streams.map(async s => s.close(options))
)

this.log.trace('closing underlying transport')

// close raw connection
Expand All @@ -184,18 +177,19 @@ export class ConnectionImpl implements Connection {
}

abort (err: Error): void {
if (this.status === 'closed') {
return
}

Check warning on line 183 in packages/libp2p/src/connection/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/connection/index.ts#L180-L183

Added lines #L180 - L183 were not covered by tests
this.log.error('aborting connection to %a due to error', this.remoteAddr, err)

this.status = 'closing'
this.streams.forEach(s => { s.abort(err) })

this.log.error('all streams aborted', this.streams.length)

// Abort raw connection
this._abort(err)

this.timeline.close = Date.now()
this.status = 'closed'
this.timeline.close = Date.now()

Check warning on line 192 in packages/libp2p/src/connection/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/connection/index.ts#L192

Added line #L192 was not covered by tests
}
}

Expand Down
41 changes: 25 additions & 16 deletions packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { InvalidMultiaddrError, TooManyInboundProtocolStreamsError, TooManyOutboundProtocolStreamsError, LimitedConnectionError, setMaxListeners } from '@libp2p/interface'
import { InvalidMultiaddrError, TooManyInboundProtocolStreamsError, TooManyOutboundProtocolStreamsError, LimitedConnectionError, setMaxListeners, InvalidPeerIdError } from '@libp2p/interface'
import * as mss from '@libp2p/multistream-select'
import { peerIdFromString } from '@libp2p/peer-id'
import { anySignal } from 'any-signal'
Expand Down Expand Up @@ -304,6 +304,14 @@ export class DefaultUpgrader implements Upgrader {
remotePeer = remotePeerId
}

// this can happen if we dial a multiaddr without a peer id, we only find
// out the identity of the remote after the connection is encrypted
if (remotePeer.equals(this.components.peerId)) {
const err = new InvalidPeerIdError('Can not dial self')
maConn.abort(err)
throw err

Check warning on line 312 in packages/libp2p/src/upgrader.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/upgrader.ts#L310-L312

Added lines #L310 - L312 were not covered by tests
}

upgradedConn = encryptedConn
if (opts?.muxerFactory != null) {
muxerFactory = opts.muxerFactory
Expand All @@ -326,6 +334,8 @@ export class DefaultUpgrader implements Upgrader {
} catch (err: any) {
maConn.log.error('failed to upgrade inbound connection %s %a - %e', direction === 'inbound' ? 'from' : 'to', maConn.remoteAddr, err)
throw err
} finally {
signal.clear()
}

await this.shouldBlockConnection(direction === 'inbound' ? 'denyInboundUpgradedConnection' : 'denyOutboundUpgradedConnection', remotePeer, maConn)
Expand Down Expand Up @@ -538,22 +548,22 @@ export class DefaultUpgrader implements Upgrader {
const _timeline = maConn.timeline
maConn.timeline = new Proxy(_timeline, {
set: (...args) => {
if (connection != null && args[1] === 'close' && args[2] != null && _timeline.close == null) {
if (args[1] === 'close' && args[2] != null && _timeline.close == null) {
// Wait for close to finish before notifying of the closure
(async () => {
try {
if (connection.status === 'open') {
await connection.close()
}
} catch (err: any) {
connection.log.error('error closing connection after timeline close', err)
connection.log.error('error closing connection after timeline close %e', err)

Check warning on line 559 in packages/libp2p/src/upgrader.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/upgrader.ts#L559

Added line #L559 was not covered by tests
} finally {
this.events.safeDispatchEvent('connection:close', {
detail: connection
})
}
})().catch(err => {
connection.log.error('error thrown while dispatching connection:close event', err)
connection.log.error('error thrown while dispatching connection:close event %e', err)

Check warning on line 566 in packages/libp2p/src/upgrader.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/upgrader.ts#L566

Added line #L566 was not covered by tests
})
}

Expand All @@ -578,32 +588,31 @@ export class DefaultUpgrader implements Upgrader {
limits,
logger: this.components.logger,
newStream: newStream ?? errConnectionNotMultiplexed,
getStreams: () => { if (muxer != null) { return muxer.streams } else { return [] } },
getStreams: () => {
return muxer?.streams ?? []
},
close: async (options?: AbortOptions) => {
// Ensure remaining streams are closed gracefully
if (muxer != null) {
connection.log.trace('close muxer')
await muxer.close(options)
}
// ensure remaining streams are closed gracefully
await muxer?.close(options)

connection.log.trace('close maconn')
// close the underlying transport
await maConn.close(options)
connection.log.trace('closed maconn')
},
abort: (err) => {
maConn.abort(err)
// Ensure remaining streams are aborted
if (muxer != null) {
muxer.abort(err)
}

// ensure remaining streams are aborted
muxer?.abort(err)

Check warning on line 605 in packages/libp2p/src/upgrader.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/upgrader.ts#L603-L605

Added lines #L603 - L605 were not covered by tests
}
})

this.events.safeDispatchEvent('connection:open', {
detail: connection
})

// @ts-expect-error nah
connection.__maConnTimeline = _timeline

return connection
}

Expand Down
Loading

0 comments on commit 7946719

Please sign in to comment.