diff --git a/.github/next-minor.md b/.github/next-minor.md index 2b835552..542c8bcd 100644 --- a/.github/next-minor.md +++ b/.github/next-minor.md @@ -8,4 +8,10 @@ The `####` headline should be short and descriptive of the new functionality. In ## New Features -#### +#### Add `signal` option to `retry` + +https://github.com/radashi-org/radashi/pull/262 + +#### Add `signal` option to `parallel` + +https://github.com/radashi-org/radashi/pull/262 diff --git a/docs/async/parallel.mdx b/docs/async/parallel.mdx index e710ff62..9899ca18 100644 --- a/docs/async/parallel.mdx +++ b/docs/async/parallel.mdx @@ -1,14 +1,12 @@ --- title: parallel -description: Run many async function in parallel +description: Parallelize async operations while managing load since: 12.1.0 --- ### Usage -Like `_.map` but built specifically to run the async callback functions -in parallel. The first argument is a limit of how many functions should -be allowed to run at once. Returns an array of results. +The `parallel` function processes an array with an async callback. The first argument controls how many array items are processed at one time. Similar to `Promise.all`, an ordered array of results is returned. ```ts import * as _ from 'radashi' @@ -22,11 +20,34 @@ const users = await _.parallel(3, userIds, async userId => { }) ``` -### Errors +### Interrupting -When all work is complete parallel will check for errors. If any -occurred they will all be thrown in a single `AggregateError` that -has an `errors` property that is all the errors that were thrown. +Processing can be manually interrupted. Pass an `AbortController.signal` via the `signal` option. When the signal is aborted, no more calls to your callback will be made. Any in-progress calls will continue to completion, unless you manually connect the signal inside your callback. In other words, `parallel` is only responsible for aborting the array loop, not the async operations themselves. + +When `parallel` is interrupted by the signal, it throws a `DOMException` (even in Node.js) with the message `This operation was aborted` and name `AbortError`. + +```ts +import * as _ from 'radashi' + +const abortController = new AbortController() +const signal = abortController.signal + +// Pass in the signal: +const pizzas = await _.parallel( + { limit: 2, signal }, + ['pepperoni', 'cheese', 'mushroom'], + async topping => { + return await bakePizzaInWoodFiredOven(topping) // each pizza takes 10 minutes! + }, +) + +// Later on, if you need to abort: +abortController.abort() +``` + +### Aggregate Errors + +Once the whole array has been processed, `parallel` will check for errors. If any errors occurred during processing, they are combined into a single `AggregateError`. The `AggregateError` has an `errors` array property which contains all the individual errors that were thrown. ```ts import * as _ from 'radashi' @@ -39,5 +60,5 @@ const [err, users] = await _.tryit(_.parallel)(3, userIds, async userId => { console.log(err) // => AggregateError console.log(err.errors) // => [Error, Error, Error] -console.log(err.errors[1].message) // => No, I don't want to find user 2 +console.log(err.errors[1].message) // => "No, I don't want to find user 2" ``` diff --git a/docs/async/retry.mdx b/docs/async/retry.mdx index 0659e4bf..d115fc49 100644 --- a/docs/async/retry.mdx +++ b/docs/async/retry.mdx @@ -1,16 +1,20 @@ --- title: retry -description: Run an async function retrying if it fails +description: Retry an async function when it fails since: 12.1.0 --- ### Usage -The `_.retry` function allows you to run an async function and automagically retry it if it fails. Given the async func to run, an optional max number of retries (`r`), and an optional milliseconds to delay between retries (`d`), the given async function will be called, retrying `r` many times, and waiting `d` milliseconds between retries. +The `retry` function runs an async function and retries it if it fails. You can specify how many times to retry, how long to wait between retries, and whether to use exponential backoff. -The `times` option defaults to `3`. The `delay` option (defaults to null) can specify milliseconds to sleep between attempts. +**Options** -The `backoff` option is like delay but uses a function to sleep -- makes for easy exponential backoff. +- `times` is the maximum number of times to retry (default: `3`) +- `delay` is milliseconds to sleep between retries +- `backoff` is a function called to calculate the delay between retries + - It receives the attempt number (starting with `1`) and returns the delay in milliseconds. +- `signal` allows you to pass an `AbortController.signal` to interrupt the retry operation ```ts import * as _ from 'radashi' @@ -33,3 +37,27 @@ await _.retry({ times: 2, delay: 1000 }, api.users.list) // try 2 times with 1 s // exponential backoff await _.retry({ backoff: i => 10 ** i }, api.users.list) // try 3 times with 10, 100, 1000 ms delay ``` + +### Interrupting + +If a `signal` is passed, the retry operation can be interrupted. When the signal is aborted, `retry`'s promise will reject with a `DOMException` (even in Node.js) with the message `This operation was aborted` and name `AbortError`. + +```ts +import * as _ from 'radashi' + +const abortController = new AbortController() +const signal = abortController.signal + +const promise = _.retry({ times: 3, delay: 1000, signal }, api.users.list) + +// To stop retrying immediately: +abortController.abort() + +try { + await promise +} catch (err) { + if (err.message === 'This operation was aborted') { + console.log('Retry operation was aborted') + } +} +``` diff --git a/scripts/bench-file/src/main.ts b/scripts/bench-file/src/main.ts index 10a8cb0b..19cdcea1 100644 --- a/scripts/bench-file/src/main.ts +++ b/scripts/bench-file/src/main.ts @@ -13,7 +13,7 @@ async function main() { watch: false, pool: 'vmThreads', includeTaskLocation: true, - config: new URL('../vitest.config.ts', import.meta.url).pathname, + config: new URL('../../../vitest.config.ts', import.meta.url).pathname, benchmark: { reporters: [ reportToBenchmarkHandler(report => { diff --git a/scripts/bench-file/vitest.config.ts b/scripts/bench-file/vitest.config.ts deleted file mode 100644 index ec3da757..00000000 --- a/scripts/bench-file/vitest.config.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { defineConfig } from 'vitest/config' - -const resolve = (specifier: string) => - new URL(import.meta.resolve(specifier)).pathname - -export default defineConfig({ - resolve: { - alias: { - 'radashi/typed/isArray.ts': resolve('../../src/typed/isArray.ts'), - }, - }, -}) diff --git a/scripts/benchmarks/src/runner.ts b/scripts/benchmarks/src/runner.ts index 80ce26ef..084f4ef8 100644 --- a/scripts/benchmarks/src/runner.ts +++ b/scripts/benchmarks/src/runner.ts @@ -1,17 +1,14 @@ import { execa } from 'execa' import type { BenchmarkReport } from './reporter.ts' -const tsx = './scripts/benchmarks/node_modules/.bin/tsx' -const runner = './scripts/benchmarks/vitest-bench.ts' - -export function runVitest(file: string) { +export async function runVitest(file: string) { console.log(`Running benchmarks in ./${file}`) - - return execa(tsx, [runner, file], { reject: false }).then(result => { - if (result.exitCode !== 0) { - console.error(result.stderr) - throw new Error('Benchmark failed. See above for details.') - } - return JSON.parse(result.stdout) as BenchmarkReport[] + const result = await execa('node', ['scripts/run', 'bench-file', file], { + reject: false, }) + if (result.exitCode !== 0) { + console.error(result.stderr) + throw new Error('Benchmark failed. See above for details.') + } + return JSON.parse(result.stdout) as BenchmarkReport[] } diff --git a/scripts/publish-version/src/main.ts b/scripts/publish-version/src/main.ts index 48580521..82c294c1 100644 --- a/scripts/publish-version/src/main.ts +++ b/scripts/publish-version/src/main.ts @@ -33,8 +33,8 @@ async function parseArgs() { const { default: mri } = await import('mri') const argv = mri(process.argv.slice(2), { - boolean: ['no-push'], - string: ['tag', 'latest'], + boolean: ['no-push', 'patch', 'latest'], + string: ['tag'], }) if (argv.latest && argv.tag) { @@ -47,6 +47,7 @@ async function parseArgs() { return { push: !argv['no-push'], tag: argv.tag as ValidTag, + patch: argv.patch, gitCliffToken, npmToken, radashiBotToken, diff --git a/scripts/radashi-db/gen-types.ts b/scripts/radashi-db/gen-types.ts index f845ad73..55240df8 100644 --- a/scripts/radashi-db/gen-types.ts +++ b/scripts/radashi-db/gen-types.ts @@ -27,5 +27,5 @@ await execa( }, ).then(async result => { // Write output to file - await writeFile(`${scriptDir}/src/supabase.types.ts`, result.stdout) + await writeFile(`${scriptDir}/src/supabase-types.ts`, result.stdout) }) diff --git a/scripts/radashi-db/src/supabase.types.ts b/scripts/radashi-db/src/supabase-types.ts similarity index 100% rename from scripts/radashi-db/src/supabase.types.ts rename to scripts/radashi-db/src/supabase-types.ts diff --git a/scripts/radashi-db/src/supabase.ts b/scripts/radashi-db/src/supabase.ts index 842fe6c8..5a5ce145 100644 --- a/scripts/radashi-db/src/supabase.ts +++ b/scripts/radashi-db/src/supabase.ts @@ -1,5 +1,5 @@ import { createClient } from '@supabase/supabase-js' -import type { Database } from './supabase.types.ts' +import type { Database } from './supabase-types.ts' if (!process.env.SUPABASE_KEY) { throw new Error('SUPABASE_KEY is not set') @@ -12,4 +12,4 @@ export const supabase = createClient( process.env.SUPABASE_KEY = '' -export * from './supabase.types' +export * from './supabase-types.ts' diff --git a/src/async/parallel.ts b/src/async/parallel.ts index 81e4eef0..931ba86a 100644 --- a/src/async/parallel.ts +++ b/src/async/parallel.ts @@ -1,33 +1,64 @@ -import { AggregateError, clamp, flat, fork, list, sort, tryit } from 'radashi' +import { + AggregateError, + clamp, + flat, + fork, + isNumber, + list, + sort, + tryit, +} from 'radashi' + +type AbortSignal = { + readonly aborted: boolean + readonly reason: any + addEventListener(type: 'abort', listener: () => void): void + removeEventListener(type: 'abort', listener: () => void): void + throwIfAborted(): void +} type WorkItemResult = { index: number result: K error: any } + +export type ParallelOptions = + | { + limit: number + signal?: AbortSignal + } + | number + /** * Executes many async functions in parallel. Returns the results from * all functions as an array. After all functions have resolved, if * any errors were thrown, they are rethrown in an instance of - * AggregateError. + * AggregateError. The operation can be aborted by passing optional AbortSignal, + * which will throw an Error if aborted. * * @see https://radashi.js.org/reference/async/parallel * @example * ```ts * // Process images concurrently, resizing each image to a standard size. - * const images = await parallel(2, imageFiles, async (file) => { + * const abortController = new AbortController(); + * const images = await parallel( + * { + * limit: 2, + * signal: abortController.signal, + * }, + * imageFiles, + * async file => { * return await resizeImage(file) * }) + * + * // To abort the operation: + * // abortController.abort() * ``` * @version 12.1.0 */ export async function parallel( - /** - * The maximum number of functions to run concurrently.If a negative - * number is passed, only one function will run at a time. if a number - * bigger than the array size is passed, the array size will be used. - */ - limit: number, + options: ParallelOptions, array: readonly T[], func: (item: T) => Promise, ): Promise { @@ -35,13 +66,24 @@ export async function parallel( index, item, })) + + let signal: AbortSignal | undefined + if (isNumber(options)) { + options = { + limit: options, + } + } else { + signal = options.signal + signal?.throwIfAborted() + } + // Process array items - const processor = async (res: (value: WorkItemResult[]) => void) => { + const processor = async (resolve: (value: WorkItemResult[]) => void) => { const results: WorkItemResult[] = [] - while (true) { + while (!signal?.aborted) { const next = work.pop() if (!next) { - return res(results) + break } const [error, result] = await tryit(func)(next.item) results.push({ @@ -50,13 +92,27 @@ export async function parallel( index: next.index, }) } + return resolve(results) } - // Create queues - const queues = list(1, clamp(limit, 1, array.length)).map( - () => new Promise(processor), + + const queues = Promise.all( + list(1, clamp(options.limit, 1, array.length)).map(() => new Promise(processor)), ) + + let signalPromise: Promise | undefined + if (signal) { + signalPromise = new Promise((_, reject) => { + const onAbort = () => reject(signal.reason) + signal.addEventListener('abort', onAbort) + queues.then(() => signal.removeEventListener('abort', onAbort)) + }) + } + // Wait for all queues to complete - const itemResults = (await Promise.all(queues)) as WorkItemResult[][] + const itemResults = await (signalPromise + ? Promise.race([queues, signalPromise]) + : queues) + const [errors, results] = fork( sort(flat(itemResults), r => r.index), x => !!x.error, diff --git a/src/async/retry.ts b/src/async/retry.ts index 53132671..a04f66f5 100644 --- a/src/async/retry.ts +++ b/src/async/retry.ts @@ -1,9 +1,14 @@ import { sleep, tryit } from 'radashi' +type AbortSignal = { + throwIfAborted(): void +} + export type RetryOptions = { times?: number delay?: number | null backoff?: (count: number) => number + signal?: AbortSignal } /** @@ -12,9 +17,12 @@ export type RetryOptions = { * @see https://radashi.js.org/reference/async/retry * @example * ```ts - * const result = await retry({ times: 3, delay: 1000 }, async () => { + * const abortController = new AbortController(); + * const result = await retry({ times: 3, delay: 1000, signal: abortController.signal }, async () => { * return await fetch('https://example.com') * }) + * // To abort the operation: + * // abortController.abort() * ``` * @version 12.1.0 */ @@ -25,11 +33,14 @@ export async function retry( const times = options?.times ?? 3 const delay = options?.delay const backoff = options?.backoff ?? null + const signal = options?.signal + let i = 0 while (true) { const [err, result] = (await tryit(func)((err: any) => { throw { _exited: err } })) as [any, TResponse] + signal?.throwIfAborted() if (!err) { return result } diff --git a/tests/async/parallel.test.ts b/tests/async/parallel.test.ts index a0c52dc2..7ff1627e 100644 --- a/tests/async/parallel.test.ts +++ b/tests/async/parallel.test.ts @@ -24,6 +24,7 @@ describe('parallel', () => { expect(errors).toBeUndefined() expect(results).toEqual(['hi_1', 'hi_2', 'hi_3']) }) + test('throws errors as array of all errors', async () => { const [error, results] = await _.try(async () => { return _.parallel(1, _.list(1, 3), async num => { @@ -39,27 +40,107 @@ describe('parallel', () => { expect(err.errors.length).toBe(1) expect(err.errors[0].message).toBe('number is 2') }) + test('does not run more than the limit at once', async () => { const { tracking, func } = makeProgressTracker() await _.parallel(3, _.list(1, 14), func) expect(Math.max(...tracking)).toBe(3) }) + test('should run only one parallel function when a negative number is passed', async () => { const { tracking, func } = makeProgressTracker() - await _.parallel(-1, _.list(1, 10), func) + await _.parallel({ + limit: -1, + }, _.list(1, 10), func) expect(Math.max(...tracking)).toBe(1) expect(tracking).toEqual([1, 1, 1, 1, 1, 1, 1, 1, 1, 1]) }) + test('should run only one parallel function when 0 is passed', async () => { const { tracking, func } = makeProgressTracker() - await _.parallel(0, _.list(1, 10), func) + await _.parallel({ + limit: 0, + }, _.list(1, 10), func) expect(Math.max(...tracking)).toBe(1) expect(tracking).toEqual([1, 1, 1, 1, 1, 1, 1, 1, 1, 1]) }) + test('should run the same number of parallel functions as the array size when Infinity is passed', async () => { const { tracking, func } = makeProgressTracker() - await _.parallel(Number.POSITIVE_INFINITY, _.list(1, 10), func) + await _.parallel({ + limit: Number.POSITIVE_INFINITY, + }, _.list(1, 10), func) expect(Math.max(...tracking)).toBe(10) expect(tracking).toEqual([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) }) + + test('abort before all iterations are complete', async () => { + vi.useFakeTimers() + + const ctrl = new AbortController() + + // Abort in the middle of the 3rd iteration. + setTimeout(() => ctrl.abort(), 25) + + const callback = vi.fn(() => _.sleep(10)) + const promise = _.parallel( + { + limit: 1, + signal: ctrl.signal, + }, + _.list(1, 10), + callback, + ) + promise.catch(_.noop) + + await vi.advanceTimersByTimeAsync(25) + + await expect(promise).rejects.toThrowError( + new DOMException('This operation was aborted', 'AbortError'), + ) + expect(callback).toHaveBeenCalledTimes(3) + }) + + test('abort before first iteration begins', async () => { + const ctrl = new AbortController() + ctrl.abort() + + const callback = vi.fn() + await expect( + _.parallel( + { + limit: 1, + signal: ctrl.signal, + }, + _.list(1, 5), + callback, + ), + ).rejects.toThrowError( + new DOMException('This operation was aborted', 'AbortError'), + ) + expect(callback).not.toHaveBeenCalled() + }) + + test('remove abort listener after completion', async () => { + vi.useFakeTimers() + + const ctrl = new AbortController() + const removeEventListener = vi.spyOn(ctrl.signal, 'removeEventListener') + + const callback = vi.fn() + await _.parallel( + { + limit: 2, + signal: ctrl.signal, + }, + _.list(1, 5), + callback, + ) + + expect(callback).toHaveBeenCalledTimes(5) + expect(removeEventListener).toHaveBeenCalledWith( + 'abort', + expect.any(Function), + ) + }) }) diff --git a/tests/async/retry.test.ts b/tests/async/retry.test.ts index 553023d1..ddc2c271 100644 --- a/tests/async/retry.test.ts +++ b/tests/async/retry.test.ts @@ -1,7 +1,7 @@ // cSpell:ignore backoffs -import * as _ from 'radashi' import type { RetryOptions } from 'radashi' +import * as _ from 'radashi' const cast = (value: any): T => value @@ -116,4 +116,23 @@ describe('retry', () => { // or 2 milliseconds after. expect(diff).toBeGreaterThanOrEqual(backoffs) }) + test('aborts the retry operation when signal is aborted', async () => { + try { + const abortController = new AbortController() + let attempt = 0 + await _.retry({ signal: abortController.signal }, async () => { + attempt++ + if (attempt === 2) { + abortController.abort() + } + throw 'quit again' + }) + } catch (err) { + expect(err).toBeInstanceOf(Error) + expect((err as Error).message).toBe('This operation was aborted') + return + } + + expect.fail('error should have been thrown') + }) })