From 49b06e93abaf9c4e50ff6d0a157a906b44434ca4 Mon Sep 17 00:00:00 2001 From: Alec Larson <1925840+aleclarson@users.noreply.github.com> Date: Sun, 5 Jan 2025 02:53:51 -0500 Subject: [PATCH] feat: add `withCapacity` function --- src/async/withCapacity.ts | 84 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 src/async/withCapacity.ts diff --git a/src/async/withCapacity.ts b/src/async/withCapacity.ts new file mode 100644 index 00000000..f3466be6 --- /dev/null +++ b/src/async/withCapacity.ts @@ -0,0 +1,84 @@ +import { isFunction } from 'radashi' + +export function withCapacity< + T extends (...args: any[]) => Promise = never, +>(capacity: number, job?: T): WithCapacity { + const pendingJobs: (() => void)[] = [] + + let capacityTaken = 0 + + const enqueue = async Promise>( + job: T, + weight: number, + args?: Parameters, + ) => { + if (capacityTaken + weight > capacity) { + if (weight > capacity) { + throw new Error('Weight is greater than capacity') + } + return new Promise(resolve => { + pendingJobs.push(function run() { + if (capacityTaken + weight > capacity) { + pendingJobs.unshift(run) + } else { + resolve(enqueue(job, weight, args)) + } + }) + }) + } + capacityTaken += weight + try { + return await (args ? job(...args) : job()) + } finally { + capacityTaken -= weight + pendingJobs.shift()?.() + } + } + + const apply: Function = job + ? (...args: Parameters) => enqueue(job, 1, args) + : ( + arg: JobOptions | (() => Promise), + job?: () => Promise, + ): Promise => + isFunction(arg) ? enqueue(arg, 1) : enqueue(job!, arg.weight) + + const queue = apply as WithCapacity + queue.hasCapacity = (weight = 1) => capacityTaken + weight <= capacity + queue.clear = () => { + pendingJobs.length = 0 + } + return queue +} + +export type JobOptions = { + /** + * Jobs have a weight. This is the amount of capacity they consume. + */ + weight: number +} + +interface JobRunner { + (options: JobOptions, job: () => Promise): Promise + (job: () => Promise): Promise +} + +export type WithCapacity Promise> = ([ + T, +] extends [never] + ? JobRunner + : T) & { + /** + * Check if the semaphore has capacity for a given weight. + * + * @param weight - The weight to check for. If not provided, the weight will + * be 1. + */ + hasCapacity: (weight?: number) => boolean + /** + * Clear the queue. Note that this does not cancel any jobs that are already + * running. If you need to cancel running jobs, you should do so manually + * (e.g. by passing an `AbortSignal` to each job). + */ + clear: () => void +}