diff --git a/.github/next-minor.md b/.github/next-minor.md index b57e4763..45ff5938 100644 --- a/.github/next-minor.md +++ b/.github/next-minor.md @@ -37,3 +37,7 @@ https://github.com/radashi-org/radashi/pull/262 #### Add `signal` option to `parallel` https://github.com/radashi-org/radashi/pull/262 + +#### Tolerate out-of-range `parallel` limit + +https://github.com/radashi-org/radashi/pull/238 diff --git a/docs/async/parallel.mdx b/docs/async/parallel.mdx index d2f63bb9..d0fbca04 100644 --- a/docs/async/parallel.mdx +++ b/docs/async/parallel.mdx @@ -20,6 +20,8 @@ const users = await _.parallel(3, userIds, async userId => { }) ``` +Since v12.3.0, if the limit is greater than the array length, it will be clamped to the array length. Similarly, if the limit is less than 1, it will be clamped to 1. + ### Interrupting Since v12.3.0, processing can be manually interrupted. Pass an `AbortController.signal` via the `signal` option. When the signal is aborted, no more calls to your callback will be made. Any in-progress calls will continue to completion, unless you manually connect the signal inside your callback. In other words, `parallel` is only responsible for aborting the array loop, not the async operations themselves. diff --git a/src/async/parallel.ts b/src/async/parallel.ts index 0024ba3c..42b42e7b 100644 --- a/src/async/parallel.ts +++ b/src/async/parallel.ts @@ -1,5 +1,6 @@ import { AggregateError, + clamp, flat, fork, isNumber, @@ -22,12 +23,16 @@ type WorkItemResult = { error: any } -export type ParallelOptions = - | { - limit: number - signal?: AbortSignal - } - | number +export type ParallelOptions = { + /** + * The maximum number of functions to run concurrently. If a + * negative number is passed, only one function will run at a time. + * If a number bigger than the array `length` is passed, the array + * length will be used. + */ + limit: number + signal?: AbortSignal +} /** * Executes many async functions in parallel. Returns the results from @@ -57,7 +62,7 @@ export type ParallelOptions = * @version 12.1.0 */ export async function parallel( - options: ParallelOptions, + options: ParallelOptions | number, array: readonly T[], func: (item: T) => Promise, ): Promise { @@ -95,7 +100,9 @@ export async function parallel( } const queues = Promise.all( - list(1, options.limit).map(() => new Promise(processor)), + list(1, clamp(options.limit, 1, array.length)).map( + () => new Promise(processor), + ), ) let signalPromise: Promise | undefined diff --git a/tests/async/parallel.test.ts b/tests/async/parallel.test.ts index 3cc97feb..a84ad4bb 100644 --- a/tests/async/parallel.test.ts +++ b/tests/async/parallel.test.ts @@ -1,6 +1,29 @@ import * as _ from 'radashi' describe('parallel', () => { + async function testConcurrency( + limit: number, + array: readonly unknown[], + expected: number[], + ) { + vi.useFakeTimers() + + let numInProgress = 0 + const tracking: number[] = [] + + const promise = _.parallel(limit, array, async () => { + numInProgress++ + tracking.push(numInProgress) + await _.sleep(10) + numInProgress-- + }) + + await vi.advanceTimersByTimeAsync(50) + await promise + + expect(tracking).toEqual(expected) + } + test('returns all results from all functions', async () => { const [errors, results] = await _.try(async () => { return _.parallel(1, _.list(1, 3), async num => { @@ -29,15 +52,7 @@ describe('parallel', () => { }) test('does not run more than the limit at once', async () => { - let numInProgress = 0 - const tracking: number[] = [] - await _.parallel(3, _.list(1, 14), async () => { - numInProgress++ - tracking.push(numInProgress) - await _.sleep(0) - numInProgress-- - }) - expect(Math.max(...tracking)).toBe(3) + await testConcurrency(3, _.list(1, 6), [1, 2, 3, 3, 3, 3]) }) test('abort before all iterations are complete', async () => { @@ -88,8 +103,6 @@ describe('parallel', () => { }) test('remove abort listener after completion', async () => { - vi.useFakeTimers() - const ctrl = new AbortController() const removeEventListener = vi.spyOn(ctrl.signal, 'removeEventListener') @@ -109,4 +122,16 @@ describe('parallel', () => { expect.any(Function), ) }) + + test('limit defaults to 1 if negative', async () => { + await testConcurrency(-1, _.list(1, 3), [1, 1, 1]) + }) + + test('limit defaults to 1 if zero is passed', async () => { + await testConcurrency(0, _.list(1, 3), [1, 1, 1]) + }) + + test('limit defaults to array length if Infinity is passed', async () => { + await testConcurrency(Number.POSITIVE_INFINITY, _.list(1, 3), [1, 2, 3]) + }) })