Skip to content

Commit

Permalink
chore: add priority queue back
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed May 13, 2024
1 parent c85b962 commit 8f50de1
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 8 deletions.
11 changes: 6 additions & 5 deletions packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import { CodeError, AggregateCodeError, ERR_TIMEOUT, setMaxListeners } from '@libp2p/interface'
import { PeerMap } from '@libp2p/peer-collections'
import { defaultAddressSort } from '@libp2p/utils/address-sort'
import { Queue, type QueueAddOptions } from '@libp2p/utils/queue'
import { PriorityQueue, type PriorityQueueJobOptions } from '@libp2p/utils/priority-queue'
import { type Multiaddr, type Resolver, resolvers, multiaddr } from '@multiformats/multiaddr'
import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers'
import { Circuit } from '@multiformats/multiaddr-matcher'
Expand All @@ -18,6 +18,7 @@ import {
MAX_DIAL_QUEUE_LENGTH
} from './constants.js'
import { resolveMultiaddrs } from './utils.js'
import { DEFAULT_DIAL_PRIORITY } from './index.js'
import type { AddressSorter, AbortOptions, ComponentLogger, Logger, Connection, ConnectionGater, Metrics, PeerId, Address, PeerStore, PeerRouting, IsDialableOptions } from '@libp2p/interface'
import type { TransportManager } from '@libp2p/interface-internal'
import type { DNS } from '@multiformats/dns'
Expand All @@ -32,7 +33,7 @@ export interface DialOptions extends AbortOptions {
force?: boolean
}

interface DialQueueJobOptions extends QueueAddOptions {
interface DialQueueJobOptions extends PriorityQueueJobOptions {
peerId?: PeerId
multiaddrs: Set<string>
}
Expand Down Expand Up @@ -70,7 +71,7 @@ interface DialQueueComponents {
}

export class DialQueue {
public queue: Queue<Connection, DialQueueJobOptions>
public queue: PriorityQueue<Connection, DialQueueJobOptions>
private readonly components: DialQueueComponents
private readonly addressSorter: AddressSorter
private readonly maxPeerAddrsToDial: number
Expand All @@ -97,7 +98,7 @@ export class DialQueue {
}

// controls dial concurrency
this.queue = new Queue({
this.queue = new PriorityQueue({
concurrency: init.maxParallelDials ?? defaultOptions.maxParallelDials,
metricName: 'libp2p_dial_queue',
metrics: components.metrics
Expand Down Expand Up @@ -277,7 +278,7 @@ export class DialQueue {
}
}, {
peerId,
priority: options.priority,
priority: options.priority ?? DEFAULT_DIAL_PRIORITY,
multiaddrs: new Set(multiaddrs.map(ma => ma.toString())),
signal: options.signal
})
Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions, ComponentL
import type { ConnectionManager, OpenConnectionOptions, TransportManager } from '@libp2p/interface-internal'
import type { JobStatus } from '@libp2p/utils/queue'

const DEFAULT_DIAL_PRIORITY = 50
export const DEFAULT_DIAL_PRIORITY = 50

export interface ConnectionManagerInit {
/**
Expand Down
4 changes: 4 additions & 0 deletions packages/utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@
"types": "./dist/src/peer-queue.d.ts",
"import": "./dist/src/peer-queue.js"
},
"./priority-queue": {
"types": "./dist/src/priority-queue.d.ts",
"import": "./dist/src/priority-queue.js"
},
"./private-ip": {
"types": "./dist/src/private-ip.d.ts",
"import": "./dist/src/private-ip.js"
Expand Down
2 changes: 0 additions & 2 deletions packages/utils/src/peer-queue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */

import { Queue } from './queue/index.js'
import type { Job } from './queue/job.js'
import type { AbortOptions, PeerId } from '@libp2p/interface'
Expand Down
26 changes: 26 additions & 0 deletions packages/utils/src/priority-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Queue } from './queue/index.js'
import type { QueueInit } from './queue/index.js'
import type { AbortOptions } from '@libp2p/interface'

export interface PriorityQueueJobOptions extends AbortOptions {
priority: number
}

export class PriorityQueue <JobReturnType = void, JobOptions extends PriorityQueueJobOptions = PriorityQueueJobOptions> extends Queue<JobReturnType, JobOptions> {
constructor (init: QueueInit<JobReturnType, JobOptions> = {}) {
super({
...init,
sort: (a, b) => {
if (a.options.priority > b.options.priority) {
return -1
}

if (a.options.priority < b.options.priority) {
return 1
}

return 0
}
})
}
}
19 changes: 19 additions & 0 deletions packages/utils/test/priority-queue.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { expect } from 'aegir/chai'
import { PriorityQueue } from '../src/priority-queue.js'

describe('priority-queue', () => {
it('adds with priority', async () => {
const result: number[] = []
const queue = new PriorityQueue<number>({ concurrency: 1 })
void queue.add(async () => result.push(1), { priority: 1 })
void queue.add(async () => result.push(0), { priority: 0 })
void queue.add(async () => result.push(1), { priority: 1 })
void queue.add(async () => result.push(2), { priority: 1 })
void queue.add(async () => result.push(3), { priority: 2 })
void queue.add(async () => result.push(0), { priority: -1 })

await queue.onEmpty()

expect(result).to.deep.equal([1, 3, 1, 2, 0, 0])
})
})

0 comments on commit 8f50de1

Please sign in to comment.