Skip to content

Commit

Permalink
fix: handle more circuit relay refresh failures (#2764)
Browse files Browse the repository at this point in the history
Handles some more instances where we don't remove old reservations
when refreshing them fails.
  • Loading branch information
achingbrain authored Oct 11, 2024
1 parent a2f1748 commit 5d199f9
Show file tree
Hide file tree
Showing 14 changed files with 313 additions and 147 deletions.
34 changes: 1 addition & 33 deletions packages/integration-tests/test/circuit-relay.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -540,41 +540,9 @@ describe('circuit-relay', () => {
const ma = getRelayAddress(relay1).encapsulate(`/p2p-circuit/p2p/${remote.peerId.toString()}`)

await expect(local.dial(ma)).to.eventually.be.rejected
.with.property('name', 'DialError')
.with.property('name', 'NoValidAddressesError')
})
/*
it('should fail to open connection over relayed connection', async () => {
// relay1 dials relay2
await relay1.dial(relay2.getMultiaddrs()[0])
await usingAsRelay(relay1, relay2)
// remote dials relay2
await remote.dial(relay2.getMultiaddrs()[0])
await usingAsRelay(remote, relay2)
// local dials relay1 via relay2
const ma = getRelayAddress(relay1)

// open hop stream and try to connect to remote
const stream = await local.dialProtocol(ma, RELAY_V2_HOP_CODEC, {
runOnLimitedConnection: true
})
const hopStream = pbStream(stream).pb(HopMessage)
await hopStream.write({
type: HopMessage.Type.CONNECT,
peer: {
id: remote.peerId.toMultihash().bytes,
addrs: []
}
})
const response = await hopStream.read()
expect(response).to.have.property('type', HopMessage.Type.STATUS)
expect(response).to.have.property('status', Status.PERMISSION_DENIED)
})
*/
it('should emit connection:close when relay stops', async () => {
// discover relay and make reservation
await remote.dial(relay1.getMultiaddrs()[0])
Expand Down
5 changes: 3 additions & 2 deletions packages/transport-circuit-relay-v2/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,27 @@
"doc-check": "aegir doc-check"
},
"dependencies": {
"@libp2p/crypto": "^5.0.5",
"@libp2p/interface": "^2.1.3",
"@libp2p/interface-internal": "^2.0.8",
"@libp2p/peer-collections": "^6.0.8",
"@libp2p/peer-id": "^5.0.5",
"@libp2p/peer-record": "^8.0.8",
"@libp2p/utils": "^6.1.1",
"@multiformats/mafmt": "^12.1.6",
"@multiformats/multiaddr": "^12.2.3",
"@multiformats/multiaddr-matcher": "^1.3.0",
"any-signal": "^4.1.1",
"it-protobuf-stream": "^1.1.3",
"it-stream-types": "^2.0.1",
"multiformats": "^13.1.0",
"progress-events": "^1.0.0",
"protons-runtime": "^5.4.0",
"race-signal": "^1.0.2",
"retimeable-signal": "^0.0.0",
"uint8arraylist": "^2.4.8",
"uint8arrays": "^5.1.0"
},
"devDependencies": {
"@libp2p/crypto": "^5.0.5",
"@libp2p/interface-compliance-tests": "^6.1.6",
"@libp2p/logger": "^5.1.1",
"aegir": "^44.0.1",
Expand Down
5 changes: 0 additions & 5 deletions packages/transport-circuit-relay-v2/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ export const CIRCUIT_PROTO_CODE = 290
*/
export const DEFAULT_MAX_RESERVATION_STORE_SIZE = 15

/**
* How often to check for reservation expiry
*/
export const DEFAULT_MAX_RESERVATION_CLEAR_INTERVAL = 300 * second

/**
* How often to check for reservation expiry
*/
Expand Down
20 changes: 19 additions & 1 deletion packages/transport-circuit-relay-v2/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,31 @@ import type { Limit } from './pb/index.js'
import type { TypedEventEmitter } from '@libp2p/interface'
import type { PeerMap } from '@libp2p/peer-collections'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { RetimeableAbortSignal } from 'retimeable-signal'

export type { Limit }

export interface RelayReservation {
expire: Date
/**
* When this reservation expires
*/
expiry: Date

/**
* The address of the relay client
*/
addr: Multiaddr

/**
* How much data can be transferred over each relayed connection and for how
* long before the underlying stream is reset
*/
limit?: Limit

/**
* This signal will fire it's "abort" event when the reservation expires
*/
signal: RetimeableAbortSignal
}

export interface CircuitRelayServiceEvents {
Expand Down
21 changes: 20 additions & 1 deletion packages/transport-circuit-relay-v2/src/pb/index.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ message Peer {
message Reservation {
uint64 expire = 1; // Unix expiration time (UTC)
repeated bytes addrs = 2; // relay addrs for reserving peer
optional bytes voucher = 3; // reservation voucher
optional Envelope voucher = 3; // reservation voucher
}

message Limit {
Expand All @@ -65,3 +65,22 @@ message ReservationVoucher {
bytes peer = 2;
uint64 expiration = 3;
}

// https://github.com/libp2p/specs/blob/master/RFC/0002-signed-envelopes.md
message Envelope {
// public_key is the public key of the keypair the enclosed payload was
// signed with.
bytes public_key = 1;

// payload_type encodes the type of payload, so that it can be deserialized
// deterministically.
bytes payload_type = 2;

// payload is the actual payload carried inside this envelope.
ReservationVoucher payload = 3;

// signature is the signature produced by the private key corresponding to
// the enclosed public key, over the payload, prefixing a domain string for
// additional security.
bytes signature = 5;
}
102 changes: 99 additions & 3 deletions packages/transport-circuit-relay-v2/src/pb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ export namespace Peer {
export interface Reservation {
expire: bigint
addrs: Uint8Array[]
voucher?: Uint8Array
voucher?: Envelope
}

export namespace Reservation {
Expand All @@ -345,7 +345,7 @@ export namespace Reservation {

if (obj.voucher != null) {
w.uint32(26)
w.bytes(obj.voucher)
Envelope.codec().encode(obj.voucher, w)
}

if (opts.lengthDelimited !== false) {
Expand Down Expand Up @@ -376,7 +376,9 @@ export namespace Reservation {
break
}
case 3: {
obj.voucher = reader.bytes()
obj.voucher = Envelope.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.voucher
})
break
}
default: {
Expand Down Expand Up @@ -580,3 +582,97 @@ export namespace ReservationVoucher {
return decodeMessage(buf, ReservationVoucher.codec(), opts)
}
}

export interface Envelope {
publicKey: Uint8Array
payloadType: Uint8Array
payload?: ReservationVoucher
signature: Uint8Array
}

export namespace Envelope {
let _codec: Codec<Envelope>

export const codec = (): Codec<Envelope> => {
if (_codec == null) {
_codec = message<Envelope>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}

if ((obj.publicKey != null && obj.publicKey.byteLength > 0)) {
w.uint32(10)
w.bytes(obj.publicKey)
}

if ((obj.payloadType != null && obj.payloadType.byteLength > 0)) {
w.uint32(18)
w.bytes(obj.payloadType)
}

if (obj.payload != null) {
w.uint32(26)
ReservationVoucher.codec().encode(obj.payload, w)
}

if ((obj.signature != null && obj.signature.byteLength > 0)) {
w.uint32(42)
w.bytes(obj.signature)
}

if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {
publicKey: uint8ArrayAlloc(0),
payloadType: uint8ArrayAlloc(0),
signature: uint8ArrayAlloc(0)
}

const end = length == null ? reader.len : reader.pos + length

while (reader.pos < end) {
const tag = reader.uint32()

switch (tag >>> 3) {
case 1: {
obj.publicKey = reader.bytes()
break
}
case 2: {
obj.payloadType = reader.bytes()
break
}
case 3: {
obj.payload = ReservationVoucher.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.payload
})
break
}
case 5: {
obj.signature = reader.bytes()
break
}
default: {
reader.skipType(tag & 7)
break
}
}
}

return obj
})
}

return _codec
}

export const encode = (obj: Partial<Envelope>): Uint8Array => {
return encodeMessage(obj, Envelope.codec())
}

export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<Envelope>): Envelope => {
return decodeMessage(buf, Envelope.codec(), opts)
}
}
32 changes: 21 additions & 11 deletions packages/transport-circuit-relay-v2/src/server/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { publicKeyToProtobuf } from '@libp2p/crypto/keys'
import { TypedEventEmitter, setMaxListeners } from '@libp2p/interface'
import { peerIdFromMultihash } from '@libp2p/peer-id'
import { RecordEnvelope } from '@libp2p/peer-record'
Expand Down Expand Up @@ -156,16 +157,14 @@ class CircuitRelayServer extends TypedEventEmitter<RelayServerEvents> implements
runOnLimitedConnection: true
})

this.reservationStore.start()

this.started = true
}

/**
* Stop Relay service
*/
async stop (): Promise<void> {
this.reservationStore.stop()
this.reservationStore.clear()
this.shutdownController.abort()
await this.registrar.unhandle(RELAY_V2_HOP_CODEC)

Expand Down Expand Up @@ -290,16 +289,25 @@ class CircuitRelayServer extends TypedEventEmitter<RelayServerEvents> implements
addrs.push(relayAddr.bytes)
}

const voucher = await RecordEnvelope.seal(new ReservationVoucherRecord({
const envelope = await RecordEnvelope.seal(new ReservationVoucherRecord({
peer: remotePeer,
relay: this.peerId,
expiration: Number(expire)
expiration: expire
}), this.privateKey)

return {
addrs,
expire,
voucher: voucher.marshal()
voucher: {
publicKey: publicKeyToProtobuf(envelope.publicKey),
payloadType: envelope.payloadType,
payload: {
peer: remotePeer.toMultihash().bytes,
relay: this.peerId.toMultihash().bytes,
expiration: expire
},
signature: envelope.signature
}
}
}

Expand Down Expand Up @@ -330,7 +338,9 @@ class CircuitRelayServer extends TypedEventEmitter<RelayServerEvents> implements
return
}

if (!this.reservationStore.hasReservation(dstPeer)) {
const reservation = this.reservationStore.get(dstPeer)

if (reservation == null) {
this.log.error('hop connect denied for destination peer %p not having a reservation for %p with status %s', dstPeer, connection.remotePeer, Status.NO_RESERVATION)
await hopstr.write({ type: HopMessage.Type.STATUS, status: Status.NO_RESERVATION }, options)
return
Expand All @@ -350,7 +360,6 @@ class CircuitRelayServer extends TypedEventEmitter<RelayServerEvents> implements
return
}

const limit = this.reservationStore.get(dstPeer)?.limit
const destinationConnection = connections[0]

const destinationStream = await this.stopHop({
Expand All @@ -361,7 +370,7 @@ class CircuitRelayServer extends TypedEventEmitter<RelayServerEvents> implements
id: connection.remotePeer.toMultihash().bytes,
addrs: []
},
limit
limit: reservation?.limit
}
}, options)

Expand All @@ -374,13 +383,14 @@ class CircuitRelayServer extends TypedEventEmitter<RelayServerEvents> implements
await hopstr.write({
type: HopMessage.Type.STATUS,
status: Status.OK,
limit
limit: reservation?.limit
}, options)
const sourceStream = stream.unwrap()

this.log('connection from %p to %p established - merging streams', connection.remotePeer, dstPeer)

// Short circuit the two streams to create the relayed connection
createLimitedRelay(sourceStream, destinationStream, this.shutdownController.signal, limit, {
createLimitedRelay(sourceStream, destinationStream, this.shutdownController.signal, reservation, {
log: this.log
})
}
Expand Down
Loading

0 comments on commit 5d199f9

Please sign in to comment.