Skip to content

Commit

Permalink
fix!: make connection securing abortable (#2662)
Browse files Browse the repository at this point in the history
To allow doing things like having a single `AbortSignal` that can be used as a timeout for incoming connection establishment, allow passing it as an option to the `ConnectionEncrypter` `secureOutbound` and `secureInbound` methods.

Previously we'd wrap the stream to be secured in an `AbortableSource`, however this has some [serious performance implications](ChainSafe/js-libp2p-gossipsub#361) and it's generally better to just use a signal to cancel an ongoing operation instead of racing every chunk that comes out of the source.

BREAKING CHANGE: the final argument to `secureOutbound` and `secureInbound` in the `ConnectionEncrypter` interface is now an options object

---------

Co-authored-by: chad <[email protected]>
  • Loading branch information
achingbrain and maschad authored Aug 30, 2024
1 parent c858160 commit 86a62f5
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 79 deletions.
41 changes: 13 additions & 28 deletions packages/connection-encrypter-plaintext/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import { peerIdFromBytes } from '@libp2p/peer-id'
import { createFromPubKey } from '@libp2p/peer-id-factory'
import { pbStream } from 'it-protobuf-stream'
import { Exchange, KeyType, PublicKey } from './pb/proto.js'
import type { ComponentLogger, Logger, MultiaddrConnection, ConnectionEncrypter, SecuredConnection, PeerId, PublicKey as PubKey } from '@libp2p/interface'
import type { ComponentLogger, Logger, MultiaddrConnection, ConnectionEncrypter, SecuredConnection, PeerId, PublicKey as PubKey, SecureConnectionOptions } from '@libp2p/interface'
import type { Duplex } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

Expand All @@ -37,24 +37,14 @@ export interface PlaintextComponents {
logger: ComponentLogger
}

export interface PlaintextInit {
/**
* The peer id exchange must complete within this many milliseconds
* (default: 1000)
*/
timeout?: number
}

class Plaintext implements ConnectionEncrypter {
public protocol: string = PROTOCOL
private readonly peerId: PeerId
private readonly log: Logger
private readonly timeout: number

constructor (components: PlaintextComponents, init: PlaintextInit = {}) {
constructor (components: PlaintextComponents) {
this.peerId = components.peerId
this.log = components.logger.forComponent('libp2p:plaintext')
this.timeout = init.timeout ?? 1000
}

readonly [Symbol.toStringTag] = '@libp2p/plaintext'
Expand All @@ -63,19 +53,18 @@ class Plaintext implements ConnectionEncrypter {
'@libp2p/connection-encryption'
]

async secureInbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (conn: Stream, remoteId?: PeerId): Promise<SecuredConnection<Stream>> {
return this._encrypt(this.peerId, conn, remoteId)
async secureInbound<Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection>(conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
return this._encrypt(this.peerId, conn, options)
}

async secureOutbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (conn: Stream, remoteId?: PeerId): Promise<SecuredConnection<Stream>> {
return this._encrypt(this.peerId, conn, remoteId)
async secureOutbound<Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection>(conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
return this._encrypt(this.peerId, conn, options)
}

/**
* Encrypt connection
*/
async _encrypt <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localId: PeerId, conn: Stream, remoteId?: PeerId): Promise<SecuredConnection<Stream>> {
const signal = AbortSignal.timeout(this.timeout)
async _encrypt<Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection>(localId: PeerId, conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
const pb = pbStream(conn).pb(Exchange)

let type = KeyType.RSA
Expand All @@ -86,7 +75,7 @@ class Plaintext implements ConnectionEncrypter {
type = KeyType.Secp256k1
}

this.log('write pubkey exchange to peer %p', remoteId)
this.log('write pubkey exchange to peer %p', options?.remotePeer)

const [
, response
Expand All @@ -98,13 +87,9 @@ class Plaintext implements ConnectionEncrypter {
Type: type,
Data: localId.publicKey == null ? new Uint8Array(0) : (PublicKey.decode(localId.publicKey).Data ?? new Uint8Array(0))
}
}, {
signal
}),
}, options),
// Get the Exchange message
pb.read({
signal
})
pb.read(options)
])

let peerId
Expand Down Expand Up @@ -143,7 +128,7 @@ class Plaintext implements ConnectionEncrypter {
throw new InvalidCryptoExchangeError('Remote did not provide its public key')
}

if (remoteId != null && !peerId.equals(remoteId)) {
if (options?.remotePeer != null && !peerId.equals(options?.remotePeer)) {
throw new UnexpectedPeerError()
}

Expand All @@ -156,6 +141,6 @@ class Plaintext implements ConnectionEncrypter {
}
}

export function plaintext (init?: PlaintextInit): (components: PlaintextComponents) => ConnectionEncrypter {
return (components) => new Plaintext(components, init)
export function plaintext (): (components: PlaintextComponents) => ConnectionEncrypter {
return (components) => new Plaintext(components)
}
10 changes: 7 additions & 3 deletions packages/connection-encrypter-plaintext/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ describe('plaintext', () => {
})

await Promise.all([
encrypterRemote.secureInbound(inbound),
encrypter.secureOutbound(outbound, wrongPeer)
encrypter.secureInbound(inbound),
encrypterRemote.secureOutbound(outbound, {
remotePeer: wrongPeer
})
]).then(() => expect.fail('should have failed'), (err) => {
expect(err).to.exist()
expect(err).to.have.property('name', 'UnexpectedPeerError')
Expand All @@ -75,7 +77,9 @@ describe('plaintext', () => {

await expect(Promise.all([
encrypter.secureInbound(inbound),
encrypterRemote.secureOutbound(outbound, localPeer)
encrypterRemote.secureOutbound(outbound, {
remotePeer: localPeer
})
]))
.to.eventually.be.rejected.with.property('name', 'InvalidCryptoExchangeError')
})
Expand Down
12 changes: 2 additions & 10 deletions packages/connection-encrypter-tls/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,6 @@ export interface TLSComponents {
logger: ComponentLogger
}

export interface TLSInit {
/**
* The peer id exchange must complete within this many milliseconds
* (default: 1000)
*/
timeout?: number
}

export function tls (init?: TLSInit): (components: TLSComponents) => ConnectionEncrypter {
return (components) => new TLS(components, init)
export function tls (): (components: TLSComponents) => ConnectionEncrypter {
return (components) => new TLS(components)
}
28 changes: 11 additions & 17 deletions packages/connection-encrypter-tls/src/tls.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,19 @@ import { serviceCapabilities } from '@libp2p/interface'
import { HandshakeTimeoutError } from './errors.js'
import { generateCertificate, verifyPeerCertificate, itToStream, streamToIt } from './utils.js'
import { PROTOCOL } from './index.js'
import type { TLSComponents, TLSInit } from './index.js'
import type { MultiaddrConnection, ConnectionEncrypter, SecuredConnection, PeerId, Logger } from '@libp2p/interface'
import type { TLSComponents } from './index.js'
import type { MultiaddrConnection, ConnectionEncrypter, SecuredConnection, PeerId, Logger, SecureConnectionOptions } from '@libp2p/interface'
import type { Duplex } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

export class TLS implements ConnectionEncrypter {
public protocol: string = PROTOCOL
private readonly log: Logger
private readonly peerId: PeerId
private readonly timeout: number

constructor (components: TLSComponents, init: TLSInit = {}) {
constructor (components: TLSComponents) {
this.log = components.logger.forComponent('libp2p:tls')
this.peerId = components.peerId
this.timeout = init.timeout ?? 1000
}

readonly [Symbol.toStringTag] = '@libp2p/tls'
Expand All @@ -46,18 +44,18 @@ export class TLS implements ConnectionEncrypter {
'@libp2p/connection-encryption'
]

async secureInbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (conn: Stream, remoteId?: PeerId): Promise<SecuredConnection<Stream>> {
return this._encrypt(conn, true, remoteId)
async secureInbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
return this._encrypt(conn, true, options)
}

async secureOutbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (conn: Stream, remoteId?: PeerId): Promise<SecuredConnection<Stream>> {
return this._encrypt(conn, false, remoteId)
async secureOutbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
return this._encrypt(conn, false, options)
}

/**
* Encrypt connection
*/
async _encrypt <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (conn: Stream, isServer: boolean, remoteId?: PeerId): Promise<SecuredConnection<Stream>> {
async _encrypt <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (conn: Stream, isServer: boolean, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
const opts: TLSSocketOptions = {
...await generateCertificate(this.peerId),
isServer,
Expand All @@ -84,14 +82,14 @@ export class TLS implements ConnectionEncrypter {
}

return new Promise((resolve, reject) => {
const abortTimeout = setTimeout(() => {
options?.signal?.addEventListener('abort', () => {
socket.destroy(new HandshakeTimeoutError())
}, this.timeout)
})

const verifyRemote = (): void => {
const remote = socket.getPeerCertificate()

verifyPeerCertificate(remote.raw, remoteId, this.log)
verifyPeerCertificate(remote.raw, options?.remotePeer, this.log)
.then(remotePeer => {
this.log('remote certificate ok, remote peer %p', remotePeer)

Expand All @@ -106,14 +104,10 @@ export class TLS implements ConnectionEncrypter {
.catch((err: Error) => {
reject(err)
})
.finally(() => {
clearTimeout(abortTimeout)
})
}

socket.on('error', (err: Error) => {
reject(err)
clearTimeout(abortTimeout)
})
socket.once('secure', (evt) => {
this.log('verifying remote certificate')
Expand Down
16 changes: 12 additions & 4 deletions packages/connection-encrypter-tls/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ describe('tls', () => {
})

await Promise.all([
encrypter.secureInbound(inbound, remotePeer),
encrypter.secureOutbound(outbound, wrongPeer)
encrypter.secureInbound(inbound, {
remotePeer
}),
encrypter.secureOutbound(outbound, {
remotePeer: wrongPeer
})
]).then(() => expect.fail('should have failed'), (err) => {
expect(err).to.exist()
expect(err).to.have.property('name', 'UnexpectedPeerError')
Expand All @@ -69,8 +73,12 @@ describe('tls', () => {
})

await expect(Promise.all([
encrypter.secureInbound(inbound),
encrypter.secureOutbound(outbound, localPeer)
encrypter.secureInbound(inbound, {
remotePeer
}),
encrypter.secureOutbound(outbound, {
remotePeer: localPeer
})
]))
.to.eventually.be.rejected.with.property('name', 'InvalidParametersError')
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ export default (common: TestSetup<ConnectionEncrypter, ConnectionEncrypterSetupA
outboundResult
] = await Promise.all([
cryptoRemote.secureInbound(localConn),
crypto.secureOutbound(remoteConn, remotePeer)
crypto.secureOutbound(remoteConn, {
remotePeer
})
])

// Echo server
Expand Down Expand Up @@ -84,7 +86,9 @@ export default (common: TestSetup<ConnectionEncrypter, ConnectionEncrypterSetupA
outboundResult
] = await Promise.all([
cryptoRemote.secureInbound(localConn),
crypto.secureOutbound(remoteConn, remotePeer)
crypto.secureOutbound(remoteConn, {
remotePeer
})
])

// Inbound should return the initiator (local) peer
Expand All @@ -97,8 +101,12 @@ export default (common: TestSetup<ConnectionEncrypter, ConnectionEncrypterSetupA
const [localConn, remoteConn] = createMaConnPair()

await Promise.all([
cryptoRemote.secureInbound(localConn, mitmPeer),
crypto.secureOutbound(remoteConn, remotePeer)
cryptoRemote.secureInbound(localConn, {
remotePeer: mitmPeer
}),
crypto.secureOutbound(remoteConn, {
remotePeer
})
]).then(() => expect.fail(), (err) => {
expect(err).to.exist()
expect(err).to.have.property('name', 'UnexpectedPeerError')
Expand Down
14 changes: 12 additions & 2 deletions packages/interface/src/connection-encrypter/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
import type { MultiaddrConnection } from '../connection/index.js'
import type { AbortOptions } from '../index.js'
import type { PeerId } from '../peer-id/index.js'
import type { Duplex } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

/**
* If the remote PeerId is known and passed as an option, the securing operation
* will throw if the remote peer cannot prove it has the private key that
* corresponds to the public key the remote PeerId is derived from.
*/
export interface SecureConnectionOptions extends AbortOptions {
remotePeer?: PeerId
}

/**
* A libp2p connection encrypter module must be compliant to this interface
* to ensure all exchanged data between two peers is encrypted.
Expand All @@ -15,14 +25,14 @@ export interface ConnectionEncrypter<Extension = unknown> {
* pass it for extra verification, otherwise it will be determined during
* the handshake.
*/
secureOutbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (connection: Stream, remotePeer?: PeerId): Promise<SecuredConnection<Stream, Extension>>
secureOutbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (connection: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream, Extension>>

/**
* Decrypt incoming data. If the remote PeerId is known,
* pass it for extra verification, otherwise it will be determined during
* the handshake
*/
secureInbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (connection: Stream, remotePeer?: PeerId): Promise<SecuredConnection<Stream, Extension>>
secureInbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (connection: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream, Extension>>
}

export interface SecuredConnection<Stream = any, Extension = unknown> {
Expand Down
2 changes: 1 addition & 1 deletion packages/interface/src/transport/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export enum FaultTolerance {
NO_FATAL
}

export interface UpgraderOptions<ConnectionUpgradeEvents extends ProgressEvent = ProgressEvent> extends ProgressOptions<ConnectionUpgradeEvents> {
export interface UpgraderOptions<ConnectionUpgradeEvents extends ProgressEvent = ProgressEvent> extends ProgressOptions<ConnectionUpgradeEvents>, AbortOptions {
skipEncryption?: boolean
skipProtection?: boolean
muxerFactory?: StreamMuxerFactory
Expand Down
Loading

0 comments on commit 86a62f5

Please sign in to comment.