From 7fa9fe894a0d59bf24105d7c1a01ad7ac8a9158e Mon Sep 17 00:00:00 2001 From: hugofqt <12221094+hugo082@users.noreply.github.com> Date: Thu, 26 Dec 2024 11:09:05 +0100 Subject: [PATCH 1/9] chore: init with-semaphore --- docs/async/withSemaphore.mdx | 14 ++++++++++++++ src/async/withSemaphore.ts | 10 ++++++++++ src/mod.ts | 1 + tests/async/withSemaphore.test.ts | 7 +++++++ 4 files changed, 32 insertions(+) create mode 100644 docs/async/withSemaphore.mdx create mode 100644 src/async/withSemaphore.ts create mode 100644 tests/async/withSemaphore.test.ts diff --git a/docs/async/withSemaphore.mdx b/docs/async/withSemaphore.mdx new file mode 100644 index 00000000..481bfba4 --- /dev/null +++ b/docs/async/withSemaphore.mdx @@ -0,0 +1,14 @@ +--- +title: withSemaphore +description: desc +--- + +### Usage + +Does a thing. Returns a value. + +```ts +import * as _ from 'radashi' + +_.withSemaphore() +``` diff --git a/src/async/withSemaphore.ts b/src/async/withSemaphore.ts new file mode 100644 index 00000000..7270ffdb --- /dev/null +++ b/src/async/withSemaphore.ts @@ -0,0 +1,10 @@ +/** + * Does a thing. + * + * @see https://radashi.js.org/reference/async/withSemaphore + * @example + * ```ts + * withSemaphore() + * ``` + */ +export function withSemaphore(): void { } diff --git a/src/mod.ts b/src/mod.ts index 0a91c38d..07fa1d27 100644 --- a/src/mod.ts +++ b/src/mod.ts @@ -43,6 +43,7 @@ export * from './async/timeout.ts' export * from './async/TimeoutError.ts' export * from './async/tryit.ts' export * from './async/withResolvers.ts' +export * from './async/withSemaphore.ts' export * from './curry/callable.ts' export * from './curry/chain.ts' diff --git a/tests/async/withSemaphore.test.ts b/tests/async/withSemaphore.test.ts new file mode 100644 index 00000000..4893e518 --- /dev/null +++ b/tests/async/withSemaphore.test.ts @@ -0,0 +1,7 @@ +import * as _ from 'radashi' + +describe('withSemaphore', () => { + test('does a thing', () => { + expect(_.withSemaphore()).toBe(undefined) + }) +}) From 1cf733502bf0f8122d7d3e26a9e7051a42892074 Mon Sep 17 00:00:00 2001 From: hugofqt <12221094+hugo082@users.noreply.github.com> Date: Thu, 26 Dec 2024 13:10:11 +0100 Subject: [PATCH 2/9] feat: withSemaphore implementation --- src/async/withSemaphore.ts | 169 +++++++++++++++++++++++++++++- tests/async/withSemaphore.test.ts | 124 +++++++++++++++++++++- 2 files changed, 289 insertions(+), 4 deletions(-) diff --git a/src/async/withSemaphore.ts b/src/async/withSemaphore.ts index 7270ffdb..33f54d0e 100644 --- a/src/async/withSemaphore.ts +++ b/src/async/withSemaphore.ts @@ -1,10 +1,175 @@ +import { once } from "radashi"; + +type AnyFn = (permit: Permit) => Promise + +export interface Permit { + weight: number; + /** Currently locked weight, including this permit */ + running: number; + isAcquired: boolean; + release: () => void; +} + +export interface WithSemaphoreOptions { + capacity: number; +} + +export interface Semaphore { + getRunning(): number; + acquire(weight?: number): Promise; + release(weight?: number): void; +} + /** * Does a thing. * * @see https://radashi.js.org/reference/async/withSemaphore * @example * ```ts - * withSemaphore() + * const limitedFn = withSemaphore(1) + * limitedFn(() => ...) * ``` */ -export function withSemaphore(): void { } +export function withSemaphore(capacity: number): ExclusiveFn; + +/** + * Does a thing. + * + * @see https://radashi.js.org/reference/async/withSemaphore + * @example + * ```ts + * const limitedFn = withSemaphore({ capacity: 1 }) + * limitedFn(() => ...) + * ``` + */ +export function withSemaphore(options: WithSemaphoreOptions): ExclusiveFn; + +/** + * Does a thing. + * + * @see https://radashi.js.org/reference/async/withSemaphore + * @example + * ```ts + * const limitedFn = withSemaphore(1, () => ...) + * limitedFn() + * limitedFn(() => ...) + * ``` + */ +export function withSemaphore(capacity: number, fn: AnyFn): PrebuiltExclusiveFn; + +/** + * Does a thing. + * + * @see https://radashi.js.org/reference/async/withSemaphore + * @example + * ```ts + * const limitedFn = withSemaphore({ capacity: 1 }, () => ...) + * limitedFn() + * limitedFn(() => ...) + * ``` + */ +export function withSemaphore(options: WithSemaphoreOptions, fn: AnyFn): PrebuiltExclusiveFn; +export function withSemaphore(optionsOrCapacity: number | WithSemaphoreOptions, baseFn?: AnyFn): PrebuiltExclusiveFn { + const options = useOptions(optionsOrCapacity); + if (options.capacity < 1) throw new Error(`invalid capacity ${options.capacity}: must be positive`); + + const queue: QueuedTask[] = []; + let running = 0; + + function _dispatch() { + while (queue.length > 0 && running < options.capacity) { + const task = queue.shift()!; + running += task.weight; + task.resolve(_createPermit(task.weight)); + } + } + + function _createPermit(weight: number): Permit { + let isAcquired = true; + + return { + get weight() { return weight }, + get isAcquired() { return isAcquired }, + get running() { return running }, + release: once(() => { + release(weight) + isAcquired = false; + }) + } + } + + function release(weight = 1) { + if (weight < 1) throw new Error(`invalid weight ${weight}: must be positive`); + + running -= weight; + _dispatch(); + } + + function acquire(weight = 1) { + if (weight < 1) throw new Error(`invalid weight ${weight}: must be positive`); + if (weight > options.capacity) throw new Error(`invalid weight ${weight}: must be lower than or equal capacity ${options.capacity}`); + + return new Promise((resolve, reject) => { + queue.push({ resolve, weight }); + _dispatch(); + }); + } + + async function runExclusive(optionsOrWeightOrFn?: number | ExclusiveOptions | AnyFn, innerFn?: AnyFn): Promise { + const options = typeof optionsOrWeightOrFn === "function" ? {} : useExclusiveFnOptions(optionsOrWeightOrFn); + const fn = (typeof optionsOrWeightOrFn === "function" ? optionsOrWeightOrFn : innerFn) ?? baseFn; + if (!fn) throw new Error(`invalid execution: function is required`); + + const permit = await acquire(options.weight) + + try { + return await fn(permit); + } finally { + permit.release(); + } + } + + runExclusive.acquire = acquire; + runExclusive.release = release; + runExclusive.getRunning = () => running; + + return runExclusive; +} + +function useOptions(optionsOrCapacity: number | WithSemaphoreOptions): WithSemaphoreOptions { + if (typeof optionsOrCapacity === "number") { + return { capacity: optionsOrCapacity }; + } + + return optionsOrCapacity; +} + +function useExclusiveFnOptions(optionsOrWeight?: number | ExclusiveOptions): ExclusiveOptions { + if (typeof optionsOrWeight === "number") { + return { weight: optionsOrWeight }; + } + + return optionsOrWeight ?? {}; +} + + +type QueuedTask = { + weight: number; + resolve: (permit: Permit) => void; +}; + + +interface ExclusiveOptions { + weight?: number; +} + +interface ExclusiveFn extends Semaphore { + (fn: AnyFn): Promise; + (options: ExclusiveOptions, fn: AnyFn): Promise; + (weight: number, fn: AnyFn): Promise; +} + +interface PrebuiltExclusiveFn extends ExclusiveFn { + (options: ExclusiveOptions): Promise; + (weight?: number): Promise; +} diff --git a/tests/async/withSemaphore.test.ts b/tests/async/withSemaphore.test.ts index 4893e518..dd1ad124 100644 --- a/tests/async/withSemaphore.test.ts +++ b/tests/async/withSemaphore.test.ts @@ -1,7 +1,127 @@ import * as _ from 'radashi' describe('withSemaphore', () => { - test('does a thing', () => { - expect(_.withSemaphore()).toBe(undefined) + beforeEach(() => { + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + test('does not blocked while the semaphore has not reached zero', async () => { + const values: number[] = []; + const exclusive = _.withSemaphore(2) + + exclusive(async () => { values.push(1) }) + exclusive(async () => { values.push(2) }) + + await vi.advanceTimersByTimeAsync(0) + + expect(values).toEqual([1, 2]) + }) + + test('does blocked while the semaphore has reached zero', async () => { + const values: number[][] = []; + const exclusive = _.withSemaphore(2) + exclusive(async (permit) => { + await _.sleep(50) + values.push([1, permit.running]) + }) + exclusive(async (permit) => { + await _.sleep(100) + values.push([2, permit.running]) + }) + exclusive(async (permit) => { + await _.sleep(100) + values.push([3, permit.running]) + }) + + await vi.advanceTimersByTimeAsync(51) + expect(values).toEqual([[1, 2]]) + + await vi.advanceTimersByTimeAsync(51) + expect(values).toEqual([[1, 2], [2, 2]]) + + await vi.advanceTimersByTimeAsync(51) + expect(values).toEqual([[1, 2], [2, 2], [3, 1]]) + }) + + test('does weight the rexecution', async () => { + const values: number[][] = []; + const exclusive = _.withSemaphore(2) + exclusive(2, async (permit) => { + values.push([1, permit.running]) + }) + exclusive(2, async (permit) => { + values.push([2, permit.running]) + }) + exclusive(2, async (permit) => { + values.push([3, permit.running]) + }) + + await vi.advanceTimersByTimeAsync(0) + expect(values).toEqual([[1, 2], [2, 2], [3, 2]]) + }) + + test('handler failures does not affect the semaphore release', async () => { + const exclusive = _.withSemaphore(2, async () => { + throw new Error('boom') + }) + + await expect(exclusive()).rejects.toThrow('boom') + + expect(exclusive.getRunning()).toEqual(0) + }) + + test('does expose manual lock management', async () => { + const values: number[][] = []; + const semaphore = _.withSemaphore(2) + const permit = await semaphore.acquire(2) + expect(permit.weight).toBe(2) + + semaphore(async (permit) => { + values.push([1, permit.running]) + }) + + expect(values).toEqual([]) + + permit.release() + + await vi.advanceTimersByTimeAsync(0) + expect(values).toEqual([[1, 1]]) + }) + + test('permit is released only once', async () => { + const semaphore = _.withSemaphore(2) + const permit = await semaphore.acquire(2) + + expect(permit.isAcquired).toBe(true) + expect(semaphore.getRunning()).toBe(2) + + permit.release() + expect(permit.isAcquired).toBe(false) + expect(semaphore.getRunning()).toBe(0) + + permit.release() + expect(permit.isAcquired).toBe(false) + expect(semaphore.getRunning()).toBe(0) + }) + + test('signatures', () => { + expect(_.withSemaphore(1)).toBeDefined() + expect(_.withSemaphore(1, async () => { })).toBeDefined() + expect(_.withSemaphore({ capacity: 1 })).toBeDefined() + expect(_.withSemaphore({ capacity: 1 }, async () => { })).toBeDefined() + }) + + test('invalid options', async () => { + const semaphore = _.withSemaphore(2) + expect(() => _.withSemaphore(0)).toThrow(/invalid capacity 0: must be positive/) + expect(() => semaphore.acquire(0)).toThrow(/invalid weight 0: must be positive/) + expect(() => semaphore.acquire(5)).toThrow(/invalid weight 5: must be lower than or equal capacity 2/) + expect(() => semaphore.release(0)).toThrow(/invalid weight 0: must be positive/) + // @ts-expect-error should pass function + await expect(semaphore(1)).rejects.toThrow(/invalid execution: function is required/) }) }) From d781c8c1284c85f0e76b6d00616451703d313ace Mon Sep 17 00:00:00 2001 From: hugofqt <12221094+hugo082@users.noreply.github.com> Date: Thu, 26 Dec 2024 13:27:40 +0100 Subject: [PATCH 3/9] doc: withSemaphore usage --- docs/async/withSemaphore.mdx | 57 ++++++++++++++++++++++++++++++++++-- src/async/withSemaphore.ts | 16 +++++----- 2 files changed, 62 insertions(+), 11 deletions(-) diff --git a/docs/async/withSemaphore.mdx b/docs/async/withSemaphore.mdx index 481bfba4..8b19e7f7 100644 --- a/docs/async/withSemaphore.mdx +++ b/docs/async/withSemaphore.mdx @@ -1,14 +1,65 @@ --- title: withSemaphore -description: desc +description: A synchronization primitive for limiting concurrent usage --- ### Usage -Does a thing. Returns a value. +Creates a [semaphore-protected](https://en.wikipedia.org/wiki/Semaphore_(programming)) async function that limits concurrent execution to a specified number of active uses. +Additional calls will wait for previous executions to complete. ```ts import * as _ from 'radashi' -_.withSemaphore() +const exclusiveFn =_.withSemaphore(2, async () => { + // Do stuff +}) + +exclusiveFn() // run immediatly +exclusiveFn() // run immediatly +exclusiveFn() // wait until semaphore is released +``` + +#### Using with task based functions + +Execution function can be passed as a task parameter and ignored at the semaphore creation. + +```ts +import * as _ from 'radashi' + +const exclusiveFn =_.withSemaphore({ capacity: 2 }) + +exclusiveFn(async (permit) => { + // Do stuff +}) +``` + +#### Weighted tasks + +Each task can require a specific weight from a semaphore. In this example two tasks each weighted with 2 from +a semaphore with a capacity of 2. As a result they are mutually exclusive. + +```ts +import * as _ from 'radashi' + +const exclusiveFn =_.withSemaphore({ capacity: 2 }) + +exclusiveFn({ weight: 2 }, async (permit) => { }) // run immediatly +exclusiveFn({ weight: 2 }, async (permit) => { }) // wait until semaphore is released +``` + +### Manual lock management + +The semaphore can be manually acquired and released for fine-grained control over locking behavior. + +```ts +import * as _ from 'radashi' + +const semaphore =_.withSemaphore({ capacity: 2 }) + +const permit = await semaphore.acquire() + +// Do stuff + +permit.release() ``` diff --git a/src/async/withSemaphore.ts b/src/async/withSemaphore.ts index 33f54d0e..978bfdde 100644 --- a/src/async/withSemaphore.ts +++ b/src/async/withSemaphore.ts @@ -21,36 +21,36 @@ export interface Semaphore { } /** - * Does a thing. + * Creates a semaphore-protected async function that limits concurrent execution to a specified number of active uses. * * @see https://radashi.js.org/reference/async/withSemaphore * @example * ```ts - * const limitedFn = withSemaphore(1) + * const limitedFn = withSemaphore(2) * limitedFn(() => ...) * ``` */ export function withSemaphore(capacity: number): ExclusiveFn; /** - * Does a thing. + * Creates a semaphore-protected async function that limits concurrent execution to a specified number of active uses. * * @see https://radashi.js.org/reference/async/withSemaphore * @example * ```ts - * const limitedFn = withSemaphore({ capacity: 1 }) + * const limitedFn = withSemaphore({ capacity: 2 }) * limitedFn(() => ...) * ``` */ export function withSemaphore(options: WithSemaphoreOptions): ExclusiveFn; /** - * Does a thing. + * Creates a semaphore-protected async function that limits concurrent execution to a specified number of active uses. * * @see https://radashi.js.org/reference/async/withSemaphore * @example * ```ts - * const limitedFn = withSemaphore(1, () => ...) + * const limitedFn = withSemaphore(2, () => ...) * limitedFn() * limitedFn(() => ...) * ``` @@ -58,12 +58,12 @@ export function withSemaphore(options: WithSemaphoreOptions): ExclusiveFn; export function withSemaphore(capacity: number, fn: AnyFn): PrebuiltExclusiveFn; /** - * Does a thing. + * Creates a semaphore-protected async function that limits concurrent execution to a specified number of active uses. * * @see https://radashi.js.org/reference/async/withSemaphore * @example * ```ts - * const limitedFn = withSemaphore({ capacity: 1 }, () => ...) + * const limitedFn = withSemaphore({ capacity: 2 }, () => ...) * limitedFn() * limitedFn(() => ...) * ``` From c09a32007486f9643f1caa6a362da401e04728e7 Mon Sep 17 00:00:00 2001 From: hugofqt <12221094+hugo082@users.noreply.github.com> Date: Thu, 26 Dec 2024 13:51:15 +0100 Subject: [PATCH 4/9] chore: prefix Permit type to avoid conflicts with other functions --- src/async/withSemaphore.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/async/withSemaphore.ts b/src/async/withSemaphore.ts index 978bfdde..bd2e24c2 100644 --- a/src/async/withSemaphore.ts +++ b/src/async/withSemaphore.ts @@ -1,8 +1,8 @@ import { once } from "radashi"; -type AnyFn = (permit: Permit) => Promise +type AnyFn = (permit: SemaphorePermit) => Promise -export interface Permit { +export interface SemaphorePermit { weight: number; /** Currently locked weight, including this permit */ running: number; @@ -16,7 +16,7 @@ export interface WithSemaphoreOptions { export interface Semaphore { getRunning(): number; - acquire(weight?: number): Promise; + acquire(weight?: number): Promise; release(weight?: number): void; } @@ -84,7 +84,7 @@ export function withSemaphore(optionsOrCapacity: number | WithSemaphoreOptions, } } - function _createPermit(weight: number): Permit { + function _createPermit(weight: number): SemaphorePermit { let isAcquired = true; return { @@ -109,7 +109,7 @@ export function withSemaphore(optionsOrCapacity: number | WithSemaphoreOptions, if (weight < 1) throw new Error(`invalid weight ${weight}: must be positive`); if (weight > options.capacity) throw new Error(`invalid weight ${weight}: must be lower than or equal capacity ${options.capacity}`); - return new Promise((resolve, reject) => { + return new Promise((resolve) => { queue.push({ resolve, weight }); _dispatch(); }); @@ -155,7 +155,7 @@ function useExclusiveFnOptions(optionsOrWeight?: number | ExclusiveOptions): Exc type QueuedTask = { weight: number; - resolve: (permit: Permit) => void; + resolve: (permit: SemaphorePermit) => void; }; From f47b346ace7b65fcf9746810c29f06c128e0f036 Mon Sep 17 00:00:00 2001 From: hugofqt <12221094+hugo082@users.noreply.github.com> Date: Thu, 26 Dec 2024 14:02:16 +0100 Subject: [PATCH 5/9] feat: withMutex implementation --- src/async/withMutex.ts | 58 ++++++++++++++++++++++ src/mod.ts | 1 + tests/async/withMutex.test.ts | 91 +++++++++++++++++++++++++++++++++++ 3 files changed, 150 insertions(+) create mode 100644 src/async/withMutex.ts create mode 100644 tests/async/withMutex.test.ts diff --git a/src/async/withMutex.ts b/src/async/withMutex.ts new file mode 100644 index 00000000..2dc62ecb --- /dev/null +++ b/src/async/withMutex.ts @@ -0,0 +1,58 @@ +import { withSemaphore, type SemaphorePermit } from "radashi"; + +type AnyFn = (permit: SemaphorePermit) => Promise + +export interface Mutex { + isLocked(): boolean; + acquire(): Promise; + release(): void; +} + +/** + * Creates a mutex-protected instance with supplied function that limits concurrent execution to a single active use. + * + * @see https://radashi.js.org/reference/async/withSemaphore + * @example + * ```ts + * const limitedFn = withMutex() + * limitedFn(() => ...) + * ``` + */ +export function withMutex(): ExclusiveFn; + +/** + * Creates a mutex-protected instance with supplied function that limits concurrent execution to a single active use. + * Supports direct invocation and dynamic function passing. + * + * @see https://radashi.js.org/reference/async/withMutex + * @example + * ```ts + * const limitedFn = withMutex(() => ...) + * limitedFn() + * limitedFn(() => ...) + * ``` + */ +export function withMutex(fn: AnyFn): PrebuiltExclusiveFn; +export function withMutex(baseFn?: AnyFn): PrebuiltExclusiveFn { + // @ts-expect-error because baseFn is not optional + const semaphore = withSemaphore({ capacity: 1 }, baseFn); + + async function runExclusive(innerFn?: AnyFn): Promise { + // @ts-expect-error because innerFn is not optional + return semaphore({ weight: 1 }, innerFn); + } + + runExclusive.isLocked = () => semaphore.getRunning() > 0; + runExclusive.acquire = () => semaphore.acquire(1); + runExclusive.release = () => semaphore.release(1); + + return runExclusive +} + +interface ExclusiveFn extends Mutex { + (fn: AnyFn): Promise; +} + +interface PrebuiltExclusiveFn extends ExclusiveFn { + (): Promise; +} diff --git a/src/mod.ts b/src/mod.ts index 07fa1d27..6545bc8b 100644 --- a/src/mod.ts +++ b/src/mod.ts @@ -44,6 +44,7 @@ export * from './async/TimeoutError.ts' export * from './async/tryit.ts' export * from './async/withResolvers.ts' export * from './async/withSemaphore.ts' +export * from './async/withMutex.ts' export * from './curry/callable.ts' export * from './curry/chain.ts' diff --git a/tests/async/withMutex.test.ts b/tests/async/withMutex.test.ts new file mode 100644 index 00000000..ac2cf6ff --- /dev/null +++ b/tests/async/withMutex.test.ts @@ -0,0 +1,91 @@ +import * as _ from 'radashi' + +describe('withMutex', () => { + beforeEach(() => { + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + test('does blocked while the mutex is locked', async () => { + const values: number[][] = []; + const exclusive = _.withMutex() + exclusive(async (permit) => { + await _.sleep(50) + values.push([1, permit.running]) + }) + exclusive(async (permit) => { + await _.sleep(100) + values.push([2, permit.running]) + }) + exclusive(async (permit) => { + await _.sleep(100) + values.push([3, permit.running]) + }) + + await vi.advanceTimersByTimeAsync(51) + expect(values).toEqual([[1, 1]]) + + await vi.advanceTimersByTimeAsync(101) + expect(values).toEqual([[1, 1], [2, 1]]) + + await vi.advanceTimersByTimeAsync(101) + expect(values).toEqual([[1, 1], [2, 1], [3, 1]]) + }) + + test('handler failures does not affect the mutex release', async () => { + const exclusive = _.withMutex(async () => { + throw new Error('boom') + }) + + await expect(exclusive()).rejects.toThrow('boom') + + expect(exclusive.isLocked()).toBe(false) + }) + + test('does expose manual lock management', async () => { + const values: number[][] = []; + const mutex = _.withMutex() + const permit = await mutex.acquire() + expect(permit.isAcquired).toBe(true) + + mutex(async (permit) => { + values.push([1, permit.running]) + }) + + expect(values).toEqual([]) + + permit.release() + + await vi.advanceTimersByTimeAsync(0) + expect(values).toEqual([[1, 1]]) + }) + + test('permit is released only once', async () => { + const mutex = _.withMutex() + const permit_a = await mutex.acquire() + + expect(permit_a.isAcquired).toBe(true) + expect(mutex.isLocked()).toBe(true) + + permit_a.release() + expect(permit_a.isAcquired).toBe(false) + expect(mutex.isLocked()).toBe(false) + + const permit_b = await mutex.acquire() + expect(permit_b.isAcquired).toBe(true) + expect(mutex.isLocked()).toBe(true) + + permit_a.release() // permit_a release has no impact on permit_b + expect(permit_a.isAcquired).toBe(false) + expect(permit_b.isAcquired).toBe(true) + expect(mutex.isLocked()).toBe(true) + }) + + test('signatures', () => { + expect(_.withMutex()).toBeDefined() + expect(_.withMutex(async () => { })).toBeDefined() + }) +}) From acfa4c939c2f6d402f1a7758a9c1e583d390f6df Mon Sep 17 00:00:00 2001 From: hugofqt <12221094+hugo082@users.noreply.github.com> Date: Thu, 26 Dec 2024 14:02:30 +0100 Subject: [PATCH 6/9] doc: withMutex usage --- docs/async/withMutex.mdx | 51 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 docs/async/withMutex.mdx diff --git a/docs/async/withMutex.mdx b/docs/async/withMutex.mdx new file mode 100644 index 00000000..9eff1e87 --- /dev/null +++ b/docs/async/withMutex.mdx @@ -0,0 +1,51 @@ +--- +title: withSemaphore +description: A synchronization primitive for limiting concurrent usage to one +--- + +### Usage + +Creates a mutex-protected async function that limits concurrent execution to a single use at a time. Additional calls will wait for the mutex to be released before executing. + +```ts +import * as _ from 'radashi' + +const exclusiveFn =_.withMutex(async () => { + // Do stuff +}) + +exclusiveFn() // run immediatly +exclusiveFn() // wait until mutex is released +``` + +#### Using with task based functions + +Execution function can be passed as a task parameter and ignored at the mutex creation. + +```ts +import * as _ from 'radashi' + +const exclusiveFn =_.withMutex() + +exclusiveFn(async (permit) => { + // Do stuff +}) +``` + +### Manual lock management + +The mutex can be manually acquired and released for fine-grained control over locking behavior. + +```ts +import * as _ from 'radashi' + +const mutex =_.withMutex() + +const permit = await mutex.acquire() +mutex.isLocked() // true + +// Do stuff + +permit.release() +mutex.isLocked() // false +``` From f662c284ca24526b10ddbb9a9b7c090d7fe41f92 Mon Sep 17 00:00:00 2001 From: hugofqt <12221094+hugo082@users.noreply.github.com> Date: Thu, 26 Dec 2024 14:09:35 +0100 Subject: [PATCH 7/9] chore: fix coverage --- tests/async/withMutex.test.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/async/withMutex.test.ts b/tests/async/withMutex.test.ts index ac2cf6ff..47f865ab 100644 --- a/tests/async/withMutex.test.ts +++ b/tests/async/withMutex.test.ts @@ -82,6 +82,9 @@ describe('withMutex', () => { expect(permit_a.isAcquired).toBe(false) expect(permit_b.isAcquired).toBe(true) expect(mutex.isLocked()).toBe(true) + + mutex.release() + expect(mutex.isLocked()).toBe(false) }) test('signatures', () => { From 66bd2e2304d46ae902f546cfb50f055b2015f0fe Mon Sep 17 00:00:00 2001 From: hugofqt <12221094+hugo082@users.noreply.github.com> Date: Fri, 27 Dec 2024 11:05:50 +0100 Subject: [PATCH 8/9] chore: format --- docs/async/withMutex.mdx | 8 +- docs/async/withSemaphore.mdx | 16 +-- src/async/withMutex.ts | 26 ++--- src/async/withSemaphore.ts | 168 ++++++++++++++++++------------ tests/async/withMutex.test.ts | 25 +++-- tests/async/withSemaphore.test.ts | 71 +++++++++---- 6 files changed, 192 insertions(+), 122 deletions(-) diff --git a/docs/async/withMutex.mdx b/docs/async/withMutex.mdx index 9eff1e87..c92beb3a 100644 --- a/docs/async/withMutex.mdx +++ b/docs/async/withMutex.mdx @@ -10,7 +10,7 @@ Creates a mutex-protected async function that limits concurrent execution to a s ```ts import * as _ from 'radashi' -const exclusiveFn =_.withMutex(async () => { +const exclusiveFn = _.withMutex(async () => { // Do stuff }) @@ -25,9 +25,9 @@ Execution function can be passed as a task parameter and ignored at the mutex cr ```ts import * as _ from 'radashi' -const exclusiveFn =_.withMutex() +const exclusiveFn = _.withMutex() -exclusiveFn(async (permit) => { +exclusiveFn(async permit => { // Do stuff }) ``` @@ -39,7 +39,7 @@ The mutex can be manually acquired and released for fine-grained control over lo ```ts import * as _ from 'radashi' -const mutex =_.withMutex() +const mutex = _.withMutex() const permit = await mutex.acquire() mutex.isLocked() // true diff --git a/docs/async/withSemaphore.mdx b/docs/async/withSemaphore.mdx index 8b19e7f7..ce17a4ca 100644 --- a/docs/async/withSemaphore.mdx +++ b/docs/async/withSemaphore.mdx @@ -5,13 +5,13 @@ description: A synchronization primitive for limiting concurrent usage ### Usage -Creates a [semaphore-protected](https://en.wikipedia.org/wiki/Semaphore_(programming)) async function that limits concurrent execution to a specified number of active uses. +Creates a [semaphore-protected]() async function that limits concurrent execution to a specified number of active uses. Additional calls will wait for previous executions to complete. ```ts import * as _ from 'radashi' -const exclusiveFn =_.withSemaphore(2, async () => { +const exclusiveFn = _.withSemaphore(2, async () => { // Do stuff }) @@ -27,9 +27,9 @@ Execution function can be passed as a task parameter and ignored at the semaphor ```ts import * as _ from 'radashi' -const exclusiveFn =_.withSemaphore({ capacity: 2 }) +const exclusiveFn = _.withSemaphore({ capacity: 2 }) -exclusiveFn(async (permit) => { +exclusiveFn(async permit => { // Do stuff }) ``` @@ -42,10 +42,10 @@ a semaphore with a capacity of 2. As a result they are mutually exclusive. ```ts import * as _ from 'radashi' -const exclusiveFn =_.withSemaphore({ capacity: 2 }) +const exclusiveFn = _.withSemaphore({ capacity: 2 }) -exclusiveFn({ weight: 2 }, async (permit) => { }) // run immediatly -exclusiveFn({ weight: 2 }, async (permit) => { }) // wait until semaphore is released +exclusiveFn({ weight: 2 }, async permit => {}) // run immediatly +exclusiveFn({ weight: 2 }, async permit => {}) // wait until semaphore is released ``` ### Manual lock management @@ -55,7 +55,7 @@ The semaphore can be manually acquired and released for fine-grained control ove ```ts import * as _ from 'radashi' -const semaphore =_.withSemaphore({ capacity: 2 }) +const semaphore = _.withSemaphore({ capacity: 2 }) const permit = await semaphore.acquire() diff --git a/src/async/withMutex.ts b/src/async/withMutex.ts index 2dc62ecb..653d2090 100644 --- a/src/async/withMutex.ts +++ b/src/async/withMutex.ts @@ -1,11 +1,11 @@ -import { withSemaphore, type SemaphorePermit } from "radashi"; +import { withSemaphore, type SemaphorePermit } from 'radashi' type AnyFn = (permit: SemaphorePermit) => Promise export interface Mutex { - isLocked(): boolean; - acquire(): Promise; - release(): void; + isLocked(): boolean + acquire(): Promise + release(): void } /** @@ -18,7 +18,7 @@ export interface Mutex { * limitedFn(() => ...) * ``` */ -export function withMutex(): ExclusiveFn; +export function withMutex(): ExclusiveFn /** * Creates a mutex-protected instance with supplied function that limits concurrent execution to a single active use. @@ -32,27 +32,27 @@ export function withMutex(): ExclusiveFn; * limitedFn(() => ...) * ``` */ -export function withMutex(fn: AnyFn): PrebuiltExclusiveFn; +export function withMutex(fn: AnyFn): PrebuiltExclusiveFn export function withMutex(baseFn?: AnyFn): PrebuiltExclusiveFn { // @ts-expect-error because baseFn is not optional - const semaphore = withSemaphore({ capacity: 1 }, baseFn); + const semaphore = withSemaphore({ capacity: 1 }, baseFn) async function runExclusive(innerFn?: AnyFn): Promise { // @ts-expect-error because innerFn is not optional - return semaphore({ weight: 1 }, innerFn); + return semaphore({ weight: 1 }, innerFn) } - runExclusive.isLocked = () => semaphore.getRunning() > 0; - runExclusive.acquire = () => semaphore.acquire(1); - runExclusive.release = () => semaphore.release(1); + runExclusive.isLocked = () => semaphore.getRunning() > 0 + runExclusive.acquire = () => semaphore.acquire(1) + runExclusive.release = () => semaphore.release(1) return runExclusive } interface ExclusiveFn extends Mutex { - (fn: AnyFn): Promise; + (fn: AnyFn): Promise } interface PrebuiltExclusiveFn extends ExclusiveFn { - (): Promise; + (): Promise } diff --git a/src/async/withSemaphore.ts b/src/async/withSemaphore.ts index bd2e24c2..e24a1f82 100644 --- a/src/async/withSemaphore.ts +++ b/src/async/withSemaphore.ts @@ -1,23 +1,23 @@ -import { once } from "radashi"; +import { once } from 'radashi' type AnyFn = (permit: SemaphorePermit) => Promise export interface SemaphorePermit { - weight: number; + weight: number /** Currently locked weight, including this permit */ - running: number; - isAcquired: boolean; - release: () => void; + running: number + isAcquired: boolean + release: () => void } export interface WithSemaphoreOptions { - capacity: number; + capacity: number } export interface Semaphore { - getRunning(): number; - acquire(weight?: number): Promise; - release(weight?: number): void; + getRunning(): number + acquire(weight?: number): Promise + release(weight?: number): void } /** @@ -30,7 +30,7 @@ export interface Semaphore { * limitedFn(() => ...) * ``` */ -export function withSemaphore(capacity: number): ExclusiveFn; +export function withSemaphore(capacity: number): ExclusiveFn /** * Creates a semaphore-protected async function that limits concurrent execution to a specified number of active uses. @@ -42,7 +42,7 @@ export function withSemaphore(capacity: number): ExclusiveFn; * limitedFn(() => ...) * ``` */ -export function withSemaphore(options: WithSemaphoreOptions): ExclusiveFn; +export function withSemaphore(options: WithSemaphoreOptions): ExclusiveFn /** * Creates a semaphore-protected async function that limits concurrent execution to a specified number of active uses. @@ -55,7 +55,10 @@ export function withSemaphore(options: WithSemaphoreOptions): ExclusiveFn; * limitedFn(() => ...) * ``` */ -export function withSemaphore(capacity: number, fn: AnyFn): PrebuiltExclusiveFn; +export function withSemaphore( + capacity: number, + fn: AnyFn, +): PrebuiltExclusiveFn /** * Creates a semaphore-protected async function that limits concurrent execution to a specified number of active uses. @@ -68,108 +71,143 @@ export function withSemaphore(capacity: number, fn: AnyFn): PrebuiltExclus * limitedFn(() => ...) * ``` */ -export function withSemaphore(options: WithSemaphoreOptions, fn: AnyFn): PrebuiltExclusiveFn; -export function withSemaphore(optionsOrCapacity: number | WithSemaphoreOptions, baseFn?: AnyFn): PrebuiltExclusiveFn { - const options = useOptions(optionsOrCapacity); - if (options.capacity < 1) throw new Error(`invalid capacity ${options.capacity}: must be positive`); +export function withSemaphore( + options: WithSemaphoreOptions, + fn: AnyFn, +): PrebuiltExclusiveFn +export function withSemaphore( + optionsOrCapacity: number | WithSemaphoreOptions, + baseFn?: AnyFn, +): PrebuiltExclusiveFn { + const options = useOptions(optionsOrCapacity) + if (options.capacity < 1) { + throw new Error(`invalid capacity ${options.capacity}: must be positive`) + } - const queue: QueuedTask[] = []; - let running = 0; + const queue: QueuedTask[] = [] + let running = 0 function _dispatch() { while (queue.length > 0 && running < options.capacity) { - const task = queue.shift()!; - running += task.weight; - task.resolve(_createPermit(task.weight)); + const task = queue.shift()! + running += task.weight + task.resolve(_createPermit(task.weight)) } } function _createPermit(weight: number): SemaphorePermit { - let isAcquired = true; + let isAcquired = true return { - get weight() { return weight }, - get isAcquired() { return isAcquired }, - get running() { return running }, + get weight() { + return weight + }, + get isAcquired() { + return isAcquired + }, + get running() { + return running + }, release: once(() => { release(weight) - isAcquired = false; - }) + isAcquired = false + }), } } function release(weight = 1) { - if (weight < 1) throw new Error(`invalid weight ${weight}: must be positive`); + if (weight < 1) { + throw new Error(`invalid weight ${weight}: must be positive`) + } - running -= weight; - _dispatch(); + running -= weight + _dispatch() } function acquire(weight = 1) { - if (weight < 1) throw new Error(`invalid weight ${weight}: must be positive`); - if (weight > options.capacity) throw new Error(`invalid weight ${weight}: must be lower than or equal capacity ${options.capacity}`); + if (weight < 1) { + throw new Error(`invalid weight ${weight}: must be positive`) + } + if (weight > options.capacity) { + throw new Error( + `invalid weight ${weight}: must be lower than or equal capacity ${options.capacity}`, + ) + } - return new Promise((resolve) => { - queue.push({ resolve, weight }); - _dispatch(); - }); + return new Promise(resolve => { + queue.push({ resolve, weight }) + _dispatch() + }) } - async function runExclusive(optionsOrWeightOrFn?: number | ExclusiveOptions | AnyFn, innerFn?: AnyFn): Promise { - const options = typeof optionsOrWeightOrFn === "function" ? {} : useExclusiveFnOptions(optionsOrWeightOrFn); - const fn = (typeof optionsOrWeightOrFn === "function" ? optionsOrWeightOrFn : innerFn) ?? baseFn; - if (!fn) throw new Error(`invalid execution: function is required`); + async function runExclusive( + optionsOrWeightOrFn?: number | ExclusiveOptions | AnyFn, + innerFn?: AnyFn, + ): Promise { + const options = + typeof optionsOrWeightOrFn === 'function' + ? {} + : useExclusiveFnOptions(optionsOrWeightOrFn) + const fn = + (typeof optionsOrWeightOrFn === 'function' + ? optionsOrWeightOrFn + : innerFn) ?? baseFn + if (!fn) { + throw new Error('invalid execution: function is required') + } const permit = await acquire(options.weight) try { - return await fn(permit); + return await fn(permit) } finally { - permit.release(); + permit.release() } } - runExclusive.acquire = acquire; - runExclusive.release = release; - runExclusive.getRunning = () => running; + runExclusive.acquire = acquire + runExclusive.release = release + runExclusive.getRunning = () => running - return runExclusive; + return runExclusive } -function useOptions(optionsOrCapacity: number | WithSemaphoreOptions): WithSemaphoreOptions { - if (typeof optionsOrCapacity === "number") { - return { capacity: optionsOrCapacity }; +function useOptions( + optionsOrCapacity: number | WithSemaphoreOptions, +): WithSemaphoreOptions { + if (typeof optionsOrCapacity === 'number') { + return { capacity: optionsOrCapacity } } - return optionsOrCapacity; + return optionsOrCapacity } -function useExclusiveFnOptions(optionsOrWeight?: number | ExclusiveOptions): ExclusiveOptions { - if (typeof optionsOrWeight === "number") { - return { weight: optionsOrWeight }; +function useExclusiveFnOptions( + optionsOrWeight?: number | ExclusiveOptions, +): ExclusiveOptions { + if (typeof optionsOrWeight === 'number') { + return { weight: optionsOrWeight } } - return optionsOrWeight ?? {}; + return optionsOrWeight ?? {} } - type QueuedTask = { - weight: number; - resolve: (permit: SemaphorePermit) => void; -}; - + weight: number + resolve: (permit: SemaphorePermit) => void +} interface ExclusiveOptions { - weight?: number; + weight?: number } interface ExclusiveFn extends Semaphore { - (fn: AnyFn): Promise; - (options: ExclusiveOptions, fn: AnyFn): Promise; - (weight: number, fn: AnyFn): Promise; + (fn: AnyFn): Promise + (options: ExclusiveOptions, fn: AnyFn): Promise + (weight: number, fn: AnyFn): Promise } interface PrebuiltExclusiveFn extends ExclusiveFn { - (options: ExclusiveOptions): Promise; - (weight?: number): Promise; + (options: ExclusiveOptions): Promise + (weight?: number): Promise } diff --git a/tests/async/withMutex.test.ts b/tests/async/withMutex.test.ts index 47f865ab..582082ba 100644 --- a/tests/async/withMutex.test.ts +++ b/tests/async/withMutex.test.ts @@ -10,17 +10,17 @@ describe('withMutex', () => { }) test('does blocked while the mutex is locked', async () => { - const values: number[][] = []; + const values: number[][] = [] const exclusive = _.withMutex() - exclusive(async (permit) => { + exclusive(async permit => { await _.sleep(50) values.push([1, permit.running]) }) - exclusive(async (permit) => { + exclusive(async permit => { await _.sleep(100) values.push([2, permit.running]) }) - exclusive(async (permit) => { + exclusive(async permit => { await _.sleep(100) values.push([3, permit.running]) }) @@ -29,10 +29,17 @@ describe('withMutex', () => { expect(values).toEqual([[1, 1]]) await vi.advanceTimersByTimeAsync(101) - expect(values).toEqual([[1, 1], [2, 1]]) + expect(values).toEqual([ + [1, 1], + [2, 1], + ]) await vi.advanceTimersByTimeAsync(101) - expect(values).toEqual([[1, 1], [2, 1], [3, 1]]) + expect(values).toEqual([ + [1, 1], + [2, 1], + [3, 1], + ]) }) test('handler failures does not affect the mutex release', async () => { @@ -46,12 +53,12 @@ describe('withMutex', () => { }) test('does expose manual lock management', async () => { - const values: number[][] = []; + const values: number[][] = [] const mutex = _.withMutex() const permit = await mutex.acquire() expect(permit.isAcquired).toBe(true) - mutex(async (permit) => { + mutex(async permit => { values.push([1, permit.running]) }) @@ -89,6 +96,6 @@ describe('withMutex', () => { test('signatures', () => { expect(_.withMutex()).toBeDefined() - expect(_.withMutex(async () => { })).toBeDefined() + expect(_.withMutex(async () => {})).toBeDefined() }) }) diff --git a/tests/async/withSemaphore.test.ts b/tests/async/withSemaphore.test.ts index dd1ad124..82957074 100644 --- a/tests/async/withSemaphore.test.ts +++ b/tests/async/withSemaphore.test.ts @@ -10,11 +10,15 @@ describe('withSemaphore', () => { }) test('does not blocked while the semaphore has not reached zero', async () => { - const values: number[] = []; + const values: number[] = [] const exclusive = _.withSemaphore(2) - exclusive(async () => { values.push(1) }) - exclusive(async () => { values.push(2) }) + exclusive(async () => { + values.push(1) + }) + exclusive(async () => { + values.push(2) + }) await vi.advanceTimersByTimeAsync(0) @@ -22,17 +26,17 @@ describe('withSemaphore', () => { }) test('does blocked while the semaphore has reached zero', async () => { - const values: number[][] = []; + const values: number[][] = [] const exclusive = _.withSemaphore(2) - exclusive(async (permit) => { + exclusive(async permit => { await _.sleep(50) values.push([1, permit.running]) }) - exclusive(async (permit) => { + exclusive(async permit => { await _.sleep(100) values.push([2, permit.running]) }) - exclusive(async (permit) => { + exclusive(async permit => { await _.sleep(100) values.push([3, permit.running]) }) @@ -41,27 +45,38 @@ describe('withSemaphore', () => { expect(values).toEqual([[1, 2]]) await vi.advanceTimersByTimeAsync(51) - expect(values).toEqual([[1, 2], [2, 2]]) + expect(values).toEqual([ + [1, 2], + [2, 2], + ]) await vi.advanceTimersByTimeAsync(51) - expect(values).toEqual([[1, 2], [2, 2], [3, 1]]) + expect(values).toEqual([ + [1, 2], + [2, 2], + [3, 1], + ]) }) test('does weight the rexecution', async () => { - const values: number[][] = []; + const values: number[][] = [] const exclusive = _.withSemaphore(2) - exclusive(2, async (permit) => { + exclusive(2, async permit => { values.push([1, permit.running]) }) - exclusive(2, async (permit) => { + exclusive(2, async permit => { values.push([2, permit.running]) }) - exclusive(2, async (permit) => { + exclusive(2, async permit => { values.push([3, permit.running]) }) await vi.advanceTimersByTimeAsync(0) - expect(values).toEqual([[1, 2], [2, 2], [3, 2]]) + expect(values).toEqual([ + [1, 2], + [2, 2], + [3, 2], + ]) }) test('handler failures does not affect the semaphore release', async () => { @@ -75,12 +90,12 @@ describe('withSemaphore', () => { }) test('does expose manual lock management', async () => { - const values: number[][] = []; + const values: number[][] = [] const semaphore = _.withSemaphore(2) const permit = await semaphore.acquire(2) expect(permit.weight).toBe(2) - semaphore(async (permit) => { + semaphore(async permit => { values.push([1, permit.running]) }) @@ -110,18 +125,28 @@ describe('withSemaphore', () => { test('signatures', () => { expect(_.withSemaphore(1)).toBeDefined() - expect(_.withSemaphore(1, async () => { })).toBeDefined() + expect(_.withSemaphore(1, async () => {})).toBeDefined() expect(_.withSemaphore({ capacity: 1 })).toBeDefined() - expect(_.withSemaphore({ capacity: 1 }, async () => { })).toBeDefined() + expect(_.withSemaphore({ capacity: 1 }, async () => {})).toBeDefined() }) test('invalid options', async () => { const semaphore = _.withSemaphore(2) - expect(() => _.withSemaphore(0)).toThrow(/invalid capacity 0: must be positive/) - expect(() => semaphore.acquire(0)).toThrow(/invalid weight 0: must be positive/) - expect(() => semaphore.acquire(5)).toThrow(/invalid weight 5: must be lower than or equal capacity 2/) - expect(() => semaphore.release(0)).toThrow(/invalid weight 0: must be positive/) + expect(() => _.withSemaphore(0)).toThrow( + /invalid capacity 0: must be positive/, + ) + expect(() => semaphore.acquire(0)).toThrow( + /invalid weight 0: must be positive/, + ) + expect(() => semaphore.acquire(5)).toThrow( + /invalid weight 5: must be lower than or equal capacity 2/, + ) + expect(() => semaphore.release(0)).toThrow( + /invalid weight 0: must be positive/, + ) // @ts-expect-error should pass function - await expect(semaphore(1)).rejects.toThrow(/invalid execution: function is required/) + await expect(semaphore(1)).rejects.toThrow( + /invalid execution: function is required/, + ) }) }) From 157e5e7aaaadb2341e71c8dd049a537c742e104b Mon Sep 17 00:00:00 2001 From: hugofqt <12221094+hugo082@users.noreply.github.com> Date: Fri, 27 Dec 2024 11:07:18 +0100 Subject: [PATCH 9/9] chore: doc --- .github/next-minor.md | 8 +++++--- docs/async/withMutex.mdx | 1 + docs/async/withSemaphore.mdx | 1 + 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/.github/next-minor.md b/.github/next-minor.md index 2b835552..e49d2340 100644 --- a/.github/next-minor.md +++ b/.github/next-minor.md @@ -4,8 +4,10 @@ The `####` headline should be short and descriptive of the new functionality. In ## New Functions -#### +#### Add `withMutex` -## New Features +https://github.com/radashi-org/radashi/pull/331 -#### +#### Add `withSemaphore` + +https://github.com/radashi-org/radashi/pull/331 diff --git a/docs/async/withMutex.mdx b/docs/async/withMutex.mdx index c92beb3a..6eba802a 100644 --- a/docs/async/withMutex.mdx +++ b/docs/async/withMutex.mdx @@ -1,6 +1,7 @@ --- title: withSemaphore description: A synchronization primitive for limiting concurrent usage to one +since: 12.3.0 --- ### Usage diff --git a/docs/async/withSemaphore.mdx b/docs/async/withSemaphore.mdx index ce17a4ca..69d06c9a 100644 --- a/docs/async/withSemaphore.mdx +++ b/docs/async/withSemaphore.mdx @@ -1,6 +1,7 @@ --- title: withSemaphore description: A synchronization primitive for limiting concurrent usage +since: 12.3.0 --- ### Usage