Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: refactor connection opening and closing #2735

Merged
merged 3 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/integration-tests/test/circuit-relay.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -602,8 +602,8 @@ describe('circuit-relay', () => {
await deferred.promise

// should have closed connections to remote and to relay
expect(events[0].detail.remotePeer.toString()).to.equal(remote.peerId.toString())
expect(events[1].detail.remotePeer.toString()).to.equal(relay1.peerId.toString())
expect(events[0].detail.remotePeer.toString()).to.equal(relay1.peerId.toString())
expect(events[1].detail.remotePeer.toString()).to.equal(remote.peerId.toString())
})

it('should remove the relay event listener when the relay stops', async () => {
Expand Down
3 changes: 2 additions & 1 deletion packages/transport-tcp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@
"@multiformats/mafmt": "^12.1.6",
"@multiformats/multiaddr": "^12.2.3",
"@types/sinon": "^17.0.3",
"p-defer": "^4.0.1",
"progress-events": "^1.0.0",
"race-event": "^1.3.0",
"stream-to-it": "^1.0.1"
},
"devDependencies": {
Expand All @@ -74,7 +76,6 @@
"aegir": "^44.0.1",
"it-all": "^3.0.6",
"it-pipe": "^3.0.1",
"p-defer": "^4.0.1",
"sinon": "^18.0.0",
"uint8arrays": "^5.1.0",
"wherearewe": "^2.0.1"
Expand Down
2 changes: 1 addition & 1 deletion packages/transport-tcp/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ export const CODE_UNIX = 400
export const CLOSE_TIMEOUT = 500

// Close the socket if there is no activity after this long in ms
export const SOCKET_TIMEOUT = 5 * 60000 // 5 mins
export const SOCKET_TIMEOUT = 2 * 60000 // 2 mins
3 changes: 2 additions & 1 deletion packages/transport-tcp/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ export interface TCPComponents {
}

export interface TCPMetrics {
dialerEvents: CounterGroup<'error' | 'timeout' | 'connect' | 'abort'>
events: CounterGroup<'error' | 'timeout' | 'connect' | 'abort'>
errors: CounterGroup<'outbound_to_connection' | 'outbound_upgrade'>
}

export function tcp (init: TCPOptions = {}): (components: TCPComponents) => Transport {
Expand Down
17 changes: 9 additions & 8 deletions packages/transport-tcp/src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,20 +163,19 @@
this.safeDispatchEvent('close')
}
})
.on('drop', () => {
this.metrics?.events.increment({ [`${this.addr} drop`]: true })
})
}

private onSocket (socket: net.Socket): void {
this.metrics?.events.increment({ [`${this.addr} connection`]: true })

if (this.status.code !== TCPListenerStatusCode.ACTIVE) {
socket.destroy()
throw new NotStartedError('Server is not listening yet')
}

// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
this.log('socket error', err)
this.metrics?.events.increment({ [`${this.addr} error`]: true })
})

let maConn: MultiaddrConnection
try {
maConn = toMultiaddrConnection(socket, {
Expand All @@ -185,11 +184,13 @@
socketCloseTimeout: this.context.socketCloseTimeout,
metrics: this.metrics?.events,
metricPrefix: `${this.addr} `,
logger: this.context.logger
logger: this.context.logger,
direction: 'inbound'
})
} catch (err) {
} catch (err: any) {
this.log.error('inbound connection failed', err)
this.metrics?.errors.increment({ [`${this.addr} inbound_to_connection`]: true })
socket.destroy()

Check warning on line 193 in packages/transport-tcp/src/listener.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-tcp/src/listener.ts#L193

Added line #L193 was not covered by tests
return
}

Expand Down
158 changes: 81 additions & 77 deletions packages/transport-tcp/src/socket-to-conn.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { AbortError, InvalidParametersError, TimeoutError } from '@libp2p/interface'
import { InvalidParametersError, TimeoutError } from '@libp2p/interface'
import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr'
import pDefer from 'p-defer'
import { raceEvent } from 'race-event'
import { duplex } from 'stream-to-it'
import { CLOSE_TIMEOUT, SOCKET_TIMEOUT } from './constants.js'
import { multiaddrToNetConfig } from './utils.js'
import type { ComponentLogger, MultiaddrConnection, CounterGroup } from '@libp2p/interface'
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr'
import type { Socket } from 'net'
import type { DeferredPromise } from 'p-defer'

interface ToConnectionOptions {
listeningAddr?: Multiaddr
Expand All @@ -16,19 +19,23 @@
metrics?: CounterGroup
metricPrefix?: string
logger: ComponentLogger
direction: 'inbound' | 'outbound'
}

/**
* Convert a socket into a MultiaddrConnection
* https://github.com/libp2p/interface-transport#multiaddrconnection
*/
export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptions): MultiaddrConnection => {
let closePromise: Promise<void> | null = null
let closePromise: DeferredPromise<void>
const log = options.logger.forComponent('libp2p:tcp:socket')
const direction = options.direction
const metrics = options.metrics
const metricPrefix = options.metricPrefix ?? ''
const inactivityTimeout = options.socketInactivityTimeout ?? SOCKET_TIMEOUT
const closeTimeout = options.socketCloseTimeout ?? CLOSE_TIMEOUT
let timedout = false
let errored = false

// Check if we are connected on a unix path
if (options.listeningAddr?.getPath() != null) {
Expand All @@ -39,6 +46,19 @@
options.localAddr = options.remoteAddr
}

// handle socket errors
socket.on('error', err => {
errored = true

if (!timedout) {
log.error('%s socket error - %e', direction, err)
metrics?.increment({ [`${metricPrefix}error`]: true })
}

socket.destroy()
maConn.timeline.close = Date.now()
})

let remoteAddr: Multiaddr

if (options.remoteAddr != null) {
Expand All @@ -59,37 +79,37 @@

// by default there is no timeout
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback
socket.setTimeout(inactivityTimeout, () => {
log('%s socket read timeout', lOptsStr)
metrics?.increment({ [`${metricPrefix}timeout`]: true })
socket.setTimeout(inactivityTimeout)

// only destroy with an error if the remote has not sent the FIN message
let err: Error | undefined
if (socket.readable) {
err = new TimeoutError('Socket read timeout')
}
socket.once('timeout', () => {
timedout = true
log('%s %s socket read timeout', direction, lOptsStr)
metrics?.increment({ [`${metricPrefix}timeout`]: true })

// if the socket times out due to inactivity we must manually close the connection
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-timeout
socket.destroy(err)
socket.destroy(new TimeoutError())
maConn.timeline.close = Date.now()
})

socket.once('close', () => {
log('%s socket close', lOptsStr)
metrics?.increment({ [`${metricPrefix}close`]: true })
// record metric for clean exit
if (!timedout && !errored) {
log('%s %s socket close', direction, lOptsStr)
metrics?.increment({ [`${metricPrefix}close`]: true })
}

// In instances where `close` was not explicitly called,
// such as an iterable stream ending, ensure we have set the close
// timeline
if (maConn.timeline.close == null) {
maConn.timeline.close = Date.now()
}
socket.destroy()
maConn.timeline.close = Date.now()
})

socket.once('end', () => {
// the remote sent a FIN packet which means no more data will be sent
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-end
log('%s socket end', lOptsStr)
log('%s %s socket end', direction, lOptsStr)
metrics?.increment({ [`${metricPrefix}end`]: true })
})

Expand All @@ -111,7 +131,7 @@
// If the source errored the socket will already have been destroyed by
// duplex(). If the socket errored it will already be
// destroyed. There's nothing to do here except log the error & return.
log.error('%s error in sink', lOptsStr, err)
log.error('%s %s error in sink - %e', direction, lOptsStr, err)
}
}

Expand All @@ -128,100 +148,84 @@

async close (options: AbortOptions = {}) {
if (socket.closed) {
log('The %s socket is already closed', lOptsStr)
log('the %s %s socket is already closed', direction, lOptsStr)
return
}

if (socket.destroyed) {
log('The %s socket is already destroyed', lOptsStr)
log('the %s %s socket is already destroyed', direction, lOptsStr)

Check warning on line 156 in packages/transport-tcp/src/socket-to-conn.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-tcp/src/socket-to-conn.ts#L156

Added line #L156 was not covered by tests
return
}

const abortSignalListener = (): void => {
socket.destroy(new AbortError('Destroying socket after timeout'))
if (closePromise != null) {
return closePromise.promise
}

try {
if (closePromise != null) {
log('The %s socket is already closing', lOptsStr)
await closePromise
return
}
closePromise = pDefer()

if (options.signal == null) {
const signal = AbortSignal.timeout(closeTimeout)
// close writable end of socket
socket.end()

options = {
...options,
signal
}
}
// convert EventEmitter to EventTarget
const eventTarget = socketToEventTarget(socket)

options.signal?.addEventListener('abort', abortSignalListener)
// don't wait forever to close
const signal = options.signal ?? AbortSignal.timeout(closeTimeout)

log('%s closing socket', lOptsStr)
closePromise = new Promise<void>((resolve, reject) => {
socket.once('close', () => {
// socket completely closed
log('%s socket closed', lOptsStr)
resolve()
// wait for any unsent data to be sent
if (socket.writableLength > 0) {
log('%s %s draining socket', direction, lOptsStr)
await raceEvent(eventTarget, 'drain', signal, {
errorEvent: 'error'
})
socket.once('error', (err: Error) => {
log('%s socket error', lOptsStr, err)

if (!socket.destroyed) {
reject(err)
}
// if socket is destroyed, 'closed' event will be emitted later to resolve the promise
})

// shorten inactivity timeout
socket.setTimeout(closeTimeout)

// close writable end of the socket
socket.end()

if (socket.writableLength > 0) {
// there are outgoing bytes waiting to be sent
socket.once('drain', () => {
log('%s socket drained', lOptsStr)
log('%s %s socket drained', direction, lOptsStr)
}

Check warning on line 183 in packages/transport-tcp/src/socket-to-conn.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-tcp/src/socket-to-conn.ts#L182-L183

Added lines #L182 - L183 were not covered by tests

// all bytes have been sent we can destroy the socket (maybe) before the timeout
socket.destroy()
})
} else {
// nothing to send, destroy immediately, no need for the timeout
socket.destroy()
}
})
await Promise.all([
raceEvent(eventTarget, 'close', signal, {
errorEvent: 'error'
}),

await closePromise
// all bytes have been sent we can destroy the socket
socket.destroy()
])
} catch (err: any) {
this.abort(err)
} finally {
options.signal?.removeEventListener('abort', abortSignalListener)
closePromise.resolve()
}
},

abort: (err: Error) => {
log('%s socket abort due to error', lOptsStr, err)
log('%s %s socket abort due to error - %e', direction, lOptsStr, err)

// the abortSignalListener may already destroyed the socket with an error
if (!socket.destroyed) {
socket.destroy(err)
}
socket.destroy()

// closing a socket is always asynchronous (must wait for "close" event)
// but the tests expect this to be a synchronous operation so we have to
// set the close time here. the tests should be refactored to reflect
// reality.
if (maConn.timeline.close == null) {
maConn.timeline.close = Date.now()
}
maConn.timeline.close = Date.now()
},

log
}

return maConn
}

function socketToEventTarget (obj?: any): EventTarget {
const eventTarget = {
addEventListener: (type: any, cb: any) => {
obj.addListener(type, cb)
},
removeEventListener: (type: any, cb: any) => {
obj.removeListener(type, cb)
}
}

// @ts-expect-error partial implementation
return eventTarget
}
Loading
Loading