Skip to content

Commit

Permalink
Merge pull request #9 from well-known-components/feat/components
Browse files Browse the repository at this point in the history
feat: expose linkedList and add AsyncQueue
  • Loading branch information
Hugo Arregui authored Sep 21, 2022
2 parents bb0e590 + 73d73a0 commit 20168bc
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 8 deletions.
8 changes: 8 additions & 0 deletions etc/pushable-channel.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
```ts

// @public
export function linkedList<T>(): {
enqueue: (value: T) => void;
dequeue: () => T | undefined;
isEmpty: () => boolean;
size: () => number;
};

// @public
export function pushableChannel<T>(onIteratorClose: () => void): {
iterable: AsyncGenerator<T, any, unknown>;
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@

export { pushableChannel } from './push-channel'
export { linkedList, pushableChannel } from "./push-channel"
101 changes: 97 additions & 4 deletions src/push-channel.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
type Node<T> = { value: T; prev?: Node<T>; next?: Node<T> }
type LastResolver = (err?: any) => void

/**
* Creates a linkedList of <T> elements
* @public
*/
export function linkedList<T>() {
let head: Node<T> | undefined = undefined
let tail: Node<T> | undefined = undefined
Expand Down Expand Up @@ -65,7 +69,7 @@ export function linkedList<T>() {
*/
export function pushableChannel<T>(onIteratorClose: () => void) {
let returnLock: (() => void) | null = null
const queue = linkedList<{ value: T, resolve: LastResolver }>()
const queue = linkedList<{ value: T; resolve: LastResolver }>()
let closed = false
let error: Error | null = null

Expand All @@ -82,7 +86,7 @@ export function pushableChannel<T>(onIteratorClose: () => void) {
function bufferSize() {
return queue.size()
}

function releaseLockIfNeeded() {
// signal that we have a value
if (returnLock) {
Expand Down Expand Up @@ -121,8 +125,7 @@ export function pushableChannel<T>(onIteratorClose: () => void) {
}
if (!queue.isEmpty()) {
const node = queue.dequeue()!
if (node.resolve)
node.resolve(error || undefined)
if (node.resolve) node.resolve(error || undefined)
return {
done: false,
value: node.value,
Expand Down Expand Up @@ -177,3 +180,93 @@ export function pushableChannel<T>(onIteratorClose: () => void) {

return { iterable, bufferSize, push, close, failAndClose, isClosed, [Symbol.asyncIterator]: () => iterable }
}

/**
* Creates a queue of <T> elements
* @public
*/
export class AsyncQueue<T> implements AsyncGenerator<T> {
// enqueues > dequeues
values = linkedList<IteratorResult<T>>()
// dequeues > enqueues
settlers = linkedList<{
resolve(x: IteratorResult<T>): void
reject(error: Error): void
}>()
closed = false
error: Error | undefined = undefined

constructor(private requestingNext: (queue: AsyncQueue<T>, action: "next" | "close") => void) {}

[Symbol.asyncIterator](): AsyncGenerator<T> {
return this
}

enqueue(value: T) {
if (this.closed) {
throw new Error("Channel is closed")
}
if (!this.settlers.isEmpty()) {
if (!this.values.isEmpty()) {
throw new Error("Illegal internal state")
}
const settler = this.settlers.dequeue()!
if (value instanceof Error) {
settler.reject(value)
} else {
settler.resolve({ value })
}
} else {
this.values.enqueue({ value })
}
}
/**
* @returns a Promise for an IteratorResult
*/
async next(): Promise<IteratorResult<T>> {
if (!this.values.isEmpty()) {
const value = this.values.dequeue()!
return value
}
if (this.error) {
throw this.error
}
if (this.closed) {
if (!this.settlers.isEmpty()) {
throw new Error("Illegal internal state")
}
return { done: true, value: undefined }
}
// Wait for new values to be enqueued
return new Promise<IteratorResult<T>>((resolve, reject) => {
this.requestingNext(this, "next")
this.settlers.enqueue({ resolve, reject })
})
}

async return(value: any): Promise<IteratorResult<T>> {
this.close(value)
return { done: true, value }
}

async throw(error: Error): Promise<IteratorResult<T>> {
this.close(error)
return { done: true, value: undefined }
}

close(error?: Error) {
if (error)
while (!this.settlers.isEmpty()) {
this.settlers.dequeue()!.reject(error)
}
else
while (!this.settlers.isEmpty()) {
this.settlers.dequeue()!.resolve({ done: true, value: undefined })
}
if (error) this.error = error
if (!this.closed) {
this.closed = true
this.requestingNext(this, "close")
}
}
}

0 comments on commit 20168bc

Please sign in to comment.