Skip to content

Commit

Permalink
feat(interval): option to unref sleep interval
Browse files Browse the repository at this point in the history
  • Loading branch information
jeengbe committed Jan 20, 2025
1 parent 7221be2 commit d900406
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 8 deletions.
8 changes: 6 additions & 2 deletions src/asynciterable/_sleep.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AbortError } from '../aborterror.js';

export function sleep(dueTime: number, signal?: AbortSignal) {
return new Promise<void>((resolve, reject) => {
export function sleep(dueTime: number, signal?: AbortSignal, unref = false): Promise<void> {
return new Promise((resolve, reject) => {
if (signal && signal.aborted) {
reject(new AbortError());
}
Expand All @@ -18,6 +18,10 @@ export function sleep(dueTime: number, signal?: AbortSignal) {
resolve();
}, dueTime);

if (unref && typeof id['unref'] === 'function') {
id['unref']();
}

if (signal) {
signal.addEventListener('abort', onAbort, { once: true });
}
Expand Down
11 changes: 7 additions & 4 deletions src/asynciterable/interval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ import { throwIfAborted } from '../aborterror.js';

class IntervalAsyncIterable extends AsyncIterableX<number> {
private _dueTime: number;
private _unref: boolean;

constructor(dueTime: number) {
constructor(dueTime: number, unref: boolean) {
super();
this._dueTime = dueTime;
this._unref = unref;
}

async *[Symbol.asyncIterator](signal?: AbortSignal) {
throwIfAborted(signal);
let i = 0;
while (1) {
await sleep(this._dueTime, signal);
await sleep(this._dueTime, signal, this._unref);
yield i++;
}
}
Expand All @@ -24,8 +26,9 @@ class IntervalAsyncIterable extends AsyncIterableX<number> {
* Produces a new item in an async-iterable at the given interval cycle time.
*
* @param {number} dueTime The due time in milliseconds to spawn a new item.
* @param {boolean} [unref=false] Whether to unref the interval timer.
* @returns {AsyncIterableX<number>} An async-iterable producing values at the specified interval.
*/
export function interval(dueTime: number): AsyncIterableX<number> {
return new IntervalAsyncIterable(dueTime);
export function interval(dueTime: number, unref = false): AsyncIterableX<number> {
return new IntervalAsyncIterable(dueTime, unref);
}
2 changes: 1 addition & 1 deletion src/asynciterable/operators/buffercountortime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class BufferCountOrTime<TSource> extends AsyncIterableX<TSource[]> {

async *[Symbol.asyncIterator](signal?: AbortSignal) {
const buffer: TSource[] = [];
const timer = interval(this.maxWaitTime).pipe(map(() => timerEvent));
const timer = interval(this.maxWaitTime, true).pipe(map(() => timerEvent));
const source = concat(this.source, of(ended));
const merged = merge(source, timer);

Expand Down
2 changes: 1 addition & 1 deletion src/asynciterable/operators/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class TimeoutAsyncIterable<TSource> extends AsyncIterableX<TSource> {
it.next().then((val) => {
return { type: VALUE_TYPE, value: val };
}),
sleep(this._dueTime, signal).then(() => {
sleep(this._dueTime, signal, true).then(() => {
return { type: ERROR_TYPE };
}),
]);
Expand Down

0 comments on commit d900406

Please sign in to comment.