Skip to content

Commit

Permalink
Merge branch 'main' into feature/parallel-handle-with-infinity-and-ne…
Browse files Browse the repository at this point in the history
…gative-numbers
  • Loading branch information
MarlonPassos-git authored Nov 23, 2024
2 parents ea7ca47 + 843867a commit 73bae96
Show file tree
Hide file tree
Showing 14 changed files with 272 additions and 64 deletions.
8 changes: 7 additions & 1 deletion .github/next-minor.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,10 @@ The `####` headline should be short and descriptive of the new functionality. In

## New Features

####
#### Add `signal` option to `retry`

https://github.com/radashi-org/radashi/pull/262

#### Add `signal` option to `parallel`

https://github.com/radashi-org/radashi/pull/262
39 changes: 30 additions & 9 deletions docs/async/parallel.mdx
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
---
title: parallel
description: Run many async function in parallel
description: Parallelize async operations while managing load
since: 12.1.0
---

### Usage

Like `_.map` but built specifically to run the async callback functions
in parallel. The first argument is a limit of how many functions should
be allowed to run at once. Returns an array of results.
The `parallel` function processes an array with an async callback. The first argument controls how many array items are processed at one time. Similar to `Promise.all`, an ordered array of results is returned.

```ts
import * as _ from 'radashi'
Expand All @@ -22,11 +20,34 @@ const users = await _.parallel(3, userIds, async userId => {
})
```

### Errors
### Interrupting

When all work is complete parallel will check for errors. If any
occurred they will all be thrown in a single `AggregateError` that
has an `errors` property that is all the errors that were thrown.
Processing can be manually interrupted. Pass an `AbortController.signal` via the `signal` option. When the signal is aborted, no more calls to your callback will be made. Any in-progress calls will continue to completion, unless you manually connect the signal inside your callback. In other words, `parallel` is only responsible for aborting the array loop, not the async operations themselves.

When `parallel` is interrupted by the signal, it throws a `DOMException` (even in Node.js) with the message `This operation was aborted` and name `AbortError`.

```ts
import * as _ from 'radashi'

const abortController = new AbortController()
const signal = abortController.signal

// Pass in the signal:
const pizzas = await _.parallel(
{ limit: 2, signal },
['pepperoni', 'cheese', 'mushroom'],
async topping => {
return await bakePizzaInWoodFiredOven(topping) // each pizza takes 10 minutes!
},
)

// Later on, if you need to abort:
abortController.abort()
```

### Aggregate Errors

Once the whole array has been processed, `parallel` will check for errors. If any errors occurred during processing, they are combined into a single `AggregateError`. The `AggregateError` has an `errors` array property which contains all the individual errors that were thrown.

```ts
import * as _ from 'radashi'
Expand All @@ -39,5 +60,5 @@ const [err, users] = await _.tryit(_.parallel)(3, userIds, async userId => {

console.log(err) // => AggregateError
console.log(err.errors) // => [Error, Error, Error]
console.log(err.errors[1].message) // => No, I don't want to find user 2
console.log(err.errors[1].message) // => "No, I don't want to find user 2"
```
36 changes: 32 additions & 4 deletions docs/async/retry.mdx
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
---
title: retry
description: Run an async function retrying if it fails
description: Retry an async function when it fails
since: 12.1.0
---

### Usage

The `_.retry` function allows you to run an async function and automagically retry it if it fails. Given the async func to run, an optional max number of retries (`r`), and an optional milliseconds to delay between retries (`d`), the given async function will be called, retrying `r` many times, and waiting `d` milliseconds between retries.
The `retry` function runs an async function and retries it if it fails. You can specify how many times to retry, how long to wait between retries, and whether to use exponential backoff.

The `times` option defaults to `3`. The `delay` option (defaults to null) can specify milliseconds to sleep between attempts.
**Options**

The `backoff` option is like delay but uses a function to sleep -- makes for easy exponential backoff.
- `times` is the maximum number of times to retry (default: `3`)
- `delay` is milliseconds to sleep between retries
- `backoff` is a function called to calculate the delay between retries
- It receives the attempt number (starting with `1`) and returns the delay in milliseconds.
- `signal` allows you to pass an `AbortController.signal` to interrupt the retry operation

```ts
import * as _ from 'radashi'
Expand All @@ -33,3 +37,27 @@ await _.retry({ times: 2, delay: 1000 }, api.users.list) // try 2 times with 1 s
// exponential backoff
await _.retry({ backoff: i => 10 ** i }, api.users.list) // try 3 times with 10, 100, 1000 ms delay
```

### Interrupting

If a `signal` is passed, the retry operation can be interrupted. When the signal is aborted, `retry`'s promise will reject with a `DOMException` (even in Node.js) with the message `This operation was aborted` and name `AbortError`.

```ts
import * as _ from 'radashi'

const abortController = new AbortController()
const signal = abortController.signal

const promise = _.retry({ times: 3, delay: 1000, signal }, api.users.list)

// To stop retrying immediately:
abortController.abort()

try {
await promise
} catch (err) {
if (err.message === 'This operation was aborted') {
console.log('Retry operation was aborted')
}
}
```
2 changes: 1 addition & 1 deletion scripts/bench-file/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async function main() {
watch: false,
pool: 'vmThreads',
includeTaskLocation: true,
config: new URL('../vitest.config.ts', import.meta.url).pathname,
config: new URL('../../../vitest.config.ts', import.meta.url).pathname,
benchmark: {
reporters: [
reportToBenchmarkHandler(report => {
Expand Down
12 changes: 0 additions & 12 deletions scripts/bench-file/vitest.config.ts

This file was deleted.

19 changes: 8 additions & 11 deletions scripts/benchmarks/src/runner.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
import { execa } from 'execa'
import type { BenchmarkReport } from './reporter.ts'

const tsx = './scripts/benchmarks/node_modules/.bin/tsx'
const runner = './scripts/benchmarks/vitest-bench.ts'

export function runVitest(file: string) {
export async function runVitest(file: string) {
console.log(`Running benchmarks in ./${file}`)

return execa(tsx, [runner, file], { reject: false }).then(result => {
if (result.exitCode !== 0) {
console.error(result.stderr)
throw new Error('Benchmark failed. See above for details.')
}
return JSON.parse(result.stdout) as BenchmarkReport[]
const result = await execa('node', ['scripts/run', 'bench-file', file], {
reject: false,
})
if (result.exitCode !== 0) {
console.error(result.stderr)
throw new Error('Benchmark failed. See above for details.')
}
return JSON.parse(result.stdout) as BenchmarkReport[]
}
5 changes: 3 additions & 2 deletions scripts/publish-version/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ async function parseArgs() {
const { default: mri } = await import('mri')

const argv = mri(process.argv.slice(2), {
boolean: ['no-push'],
string: ['tag', 'latest'],
boolean: ['no-push', 'patch', 'latest'],
string: ['tag'],
})

if (argv.latest && argv.tag) {
Expand All @@ -47,6 +47,7 @@ async function parseArgs() {
return {
push: !argv['no-push'],
tag: argv.tag as ValidTag,
patch: argv.patch,
gitCliffToken,
npmToken,
radashiBotToken,
Expand Down
2 changes: 1 addition & 1 deletion scripts/radashi-db/gen-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ await execa(
},
).then(async result => {
// Write output to file
await writeFile(`${scriptDir}/src/supabase.types.ts`, result.stdout)
await writeFile(`${scriptDir}/src/supabase-types.ts`, result.stdout)
})
File renamed without changes.
4 changes: 2 additions & 2 deletions scripts/radashi-db/src/supabase.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createClient } from '@supabase/supabase-js'
import type { Database } from './supabase.types.ts'
import type { Database } from './supabase-types.ts'

if (!process.env.SUPABASE_KEY) {
throw new Error('SUPABASE_KEY is not set')
Expand All @@ -12,4 +12,4 @@ export const supabase = createClient<Database>(

process.env.SUPABASE_KEY = ''

export * from './supabase.types'
export * from './supabase-types.ts'
88 changes: 72 additions & 16 deletions src/async/parallel.ts
Original file line number Diff line number Diff line change
@@ -1,47 +1,89 @@
import { AggregateError, clamp, flat, fork, list, sort, tryit } from 'radashi'
import {
AggregateError,
clamp,
flat,
fork,
isNumber,
list,
sort,
tryit,
} from 'radashi'

type AbortSignal = {
readonly aborted: boolean
readonly reason: any
addEventListener(type: 'abort', listener: () => void): void
removeEventListener(type: 'abort', listener: () => void): void
throwIfAborted(): void
}

type WorkItemResult<K> = {
index: number
result: K
error: any
}

export type ParallelOptions =
| {
limit: number
signal?: AbortSignal
}
| number

/**
* Executes many async functions in parallel. Returns the results from
* all functions as an array. After all functions have resolved, if
* any errors were thrown, they are rethrown in an instance of
* AggregateError.
* AggregateError. The operation can be aborted by passing optional AbortSignal,
* which will throw an Error if aborted.
*
* @see https://radashi.js.org/reference/async/parallel
* @example
* ```ts
* // Process images concurrently, resizing each image to a standard size.
* const images = await parallel(2, imageFiles, async (file) => {
* const abortController = new AbortController();
* const images = await parallel(
* {
* limit: 2,
* signal: abortController.signal,
* },
* imageFiles,
* async file => {
* return await resizeImage(file)
* })
*
* // To abort the operation:
* // abortController.abort()
* ```
* @version 12.1.0
*/
export async function parallel<T, K>(
/**
* The maximum number of functions to run concurrently.If a negative
* number is passed, only one function will run at a time. if a number
* bigger than the array size is passed, the array size will be used.
*/
limit: number,
options: ParallelOptions,
array: readonly T[],
func: (item: T) => Promise<K>,
): Promise<K[]> {
const work = array.map((item, index) => ({
index,
item,
}))

let signal: AbortSignal | undefined
if (isNumber(options)) {
options = {
limit: options,
}
} else {
signal = options.signal
signal?.throwIfAborted()
}

// Process array items
const processor = async (res: (value: WorkItemResult<K>[]) => void) => {
const processor = async (resolve: (value: WorkItemResult<K>[]) => void) => {
const results: WorkItemResult<K>[] = []
while (true) {
while (!signal?.aborted) {
const next = work.pop()
if (!next) {
return res(results)
break
}
const [error, result] = await tryit(func)(next.item)
results.push({
Expand All @@ -50,13 +92,27 @@ export async function parallel<T, K>(
index: next.index,
})
}
return resolve(results)
}
// Create queues
const queues = list(1, clamp(limit, 1, array.length)).map(
() => new Promise(processor),

const queues = Promise.all(
list(1, clamp(options.limit, 1, array.length)).map(() => new Promise(processor)),
)

let signalPromise: Promise<never> | undefined
if (signal) {
signalPromise = new Promise((_, reject) => {
const onAbort = () => reject(signal.reason)
signal.addEventListener('abort', onAbort)
queues.then(() => signal.removeEventListener('abort', onAbort))
})
}

// Wait for all queues to complete
const itemResults = (await Promise.all(queues)) as WorkItemResult<K>[][]
const itemResults = await (signalPromise
? Promise.race([queues, signalPromise])
: queues)

const [errors, results] = fork(
sort(flat(itemResults), r => r.index),
x => !!x.error,
Expand Down
13 changes: 12 additions & 1 deletion src/async/retry.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import { sleep, tryit } from 'radashi'

type AbortSignal = {
throwIfAborted(): void
}

export type RetryOptions = {
times?: number
delay?: number | null
backoff?: (count: number) => number
signal?: AbortSignal
}

/**
Expand All @@ -12,9 +17,12 @@ export type RetryOptions = {
* @see https://radashi.js.org/reference/async/retry
* @example
* ```ts
* const result = await retry({ times: 3, delay: 1000 }, async () => {
* const abortController = new AbortController();
* const result = await retry({ times: 3, delay: 1000, signal: abortController.signal }, async () => {
* return await fetch('https://example.com')
* })
* // To abort the operation:
* // abortController.abort()
* ```
* @version 12.1.0
*/
Expand All @@ -25,11 +33,14 @@ export async function retry<TResponse>(
const times = options?.times ?? 3
const delay = options?.delay
const backoff = options?.backoff ?? null
const signal = options?.signal

let i = 0
while (true) {
const [err, result] = (await tryit(func)((err: any) => {
throw { _exited: err }
})) as [any, TResponse]
signal?.throwIfAborted()
if (!err) {
return result
}
Expand Down
Loading

0 comments on commit 73bae96

Please sign in to comment.