From b46a7c1dc5d27e99e833e01684930c6224a452a8 Mon Sep 17 00:00:00 2001 From: Ryan Zeigler Date: Sat, 20 Apr 2019 21:30:39 -0500 Subject: [PATCH] Produce exports for the closeable blocking queue --- package-lock.json | 21 ++++++++----- package.json | 9 +++--- src/__test__/queue.spec.ts | 18 ++++++++++- src/io.ts | 13 -------- src/queue/blocking.ts | 62 +++++++++++++++++++------------------- src/queue/index.ts | 13 +++++++- 6 files changed, 79 insertions(+), 57 deletions(-) diff --git a/package-lock.json b/package-lock.json index f280183..96177a2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1529,10 +1529,17 @@ } }, "fp-ts": { - "version": "1.14.3", - "resolved": "https://s5stratos.jfrog.io/s5stratos/api/npm/npm/fp-ts/-/fp-ts-1.14.3.tgz", - "integrity": "sha1-SLOu7mvdo7JKpE+QCtA5gT0sPwE=", - "dev": true + "version": "1.17.0", + "resolved": "https://registry.npmjs.org/fp-ts/-/fp-ts-1.17.0.tgz", + "integrity": "sha512-nBq25aCAMbCwVLobUUuM/MZihPKyjn0bCVBf6xMAGriHlf8W8Ze9UhyfLnbmfp0ekFTxMuTfLXrCzpJ34px7PQ==" + }, + "fp-ts-contrib": { + "version": "0.0.3", + "resolved": "https://registry.npmjs.org/fp-ts-contrib/-/fp-ts-contrib-0.0.3.tgz", + "integrity": "sha512-qfFKSnbmPtq2eHOzx03teUqMd/ezE8siNvZKApKK8/XR7jDJ891xXURMWHgE9VkjK8Wqc5WpHeHA1LrcUd9VaQ==", + "requires": { + "fp-ts": "^1.14.3" + } }, "fragment-cache": { "version": "0.2.1", @@ -5386,9 +5393,9 @@ } }, "typescript": { - "version": "3.3.3", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-3.3.3.tgz", - "integrity": "sha512-Y21Xqe54TBVp+VDSNbuDYdGw0BpoR/Q6wo/+35M8PAU0vipahnyduJWirxxdxjsAkS7hue53x2zp8gz7F05u0A==", + "version": "3.4.4", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-3.4.4.tgz", + "integrity": "sha512-xt5RsIRCEaf6+j9AyOBgvVuAec0i92rgCaS3S+UVf5Z/vF2Hvtsw08wtUTJqp4djwznoAgjSxeCcU4r+CcDBJA==", "dev": true }, "uglify-js": { diff --git a/package.json b/package.json index e111497..35a687f 100644 --- a/package.json +++ b/package.json @@ -37,7 +37,7 @@ "chai-as-promised": "^7.1.1", "esm": "^3.2.5", "fast-check": "^1.10.1", - "fp-ts": "^1.14.3", + "fp-ts": "^1.17.0", "husky": "^1.3.1", "mocha": "^5.2.0", "nodemon": "^1.18.10", @@ -47,12 +47,13 @@ "tslint": "^5.12.1", "typedoc": "^0.14.2", "typedoc-plugin-markdown": "^1.1.27", - "typescript": "^3.3.3" + "typescript": "^3.4.4" }, "dependencies": { - "autobind-decorator": "^2.4.0" + "autobind-decorator": "^2.4.0", + "fp-ts-contrib": "0.0.3" }, "peerDependencies": { - "fp-ts": "^1.14.3" + "fp-ts": "^1.17.0" } } diff --git a/src/__test__/queue.spec.ts b/src/__test__/queue.spec.ts index 7dab176..7150f01 100644 --- a/src/__test__/queue.spec.ts +++ b/src/__test__/queue.spec.ts @@ -6,7 +6,7 @@ import { array } from "fp-ts/lib/Array"; import { none, Option, some } from "fp-ts/lib/Option"; import { log } from "../console"; -import { blockingQueue, boundedQueue, unboundedCloseableQueue, unboundedQueue } from "../queue"; +import { blockingQueue, blockingCloseableQueue, boundedQueue, unboundedCloseableQueue, unboundedQueue } from "../queue"; import { Ref } from "../ref"; import { Value } from "../result"; import { equiv } from "./lib.spec"; @@ -155,3 +155,19 @@ describe("Blocking Queue", () => { return equiv(io, new Value([[1], [1, 2, 3]])); }); }); + +// describe("CloseableBlockingQueue", () => { +// it("should block while waiting for values", () => { +// const io = blockingCloseableQueue(1).product(Ref.alloc>>([])) +// .chain(([queue, wrote]) => { +// const write = (n: number) => queue.offer(n).applySecond(wrote.update(append(n))); +// const bulkWrite = array.traverse(monad)([1, 2, 3], write).void(); +// const bulkRead = queue.take.forever(); +// return bulkWrite.fork() +// .chain((fiber) => +// wrote.get.delay(50).applyFirst(bulkRead.fork()).product(fiber.wait.applySecond(wrote.get)) +// ); +// }); +// return equiv(io, new Value([[some(1)], [some(1), some(2), some(3)]])); +// }) +// }) diff --git a/src/io.ts b/src/io.ts index 4432ba5..104adba 100644 --- a/src/io.ts +++ b/src/io.ts @@ -581,19 +581,6 @@ export class IO { return test.branch(this.void(), IO.void()); } - /** - * Abort the current fiber if this produces true using the provided abort - * @param this - * @param abort - */ - public abort(this: IO, abort: Abort): IO { - return this.chain((yes) => yes ? IO.aborted(abort).widenError() : IO.void()); - } - - public fail(this: IO, e: E): IO { - return this.chain((yes) => yes ? IO.failed(e) : IO.void().widenError()); - } - /** * Produce an IO that when run will spawn this as a fiber. */ diff --git a/src/queue/blocking.ts b/src/queue/blocking.ts index 008da2b..1f80525 100644 --- a/src/queue/blocking.ts +++ b/src/queue/blocking.ts @@ -102,58 +102,58 @@ export class CloseableAsyncQueueImpl { this.state.modify((current) => { // The queue is closed, so we need to ensure we are draining if (current.closed) { + const cleanup = this.semaphore.acquire; return current.queue.fold<[Ticket>, CloseableQueueState]>( // we are queued up on waits, so we can do nothing - (waiting) => [new Ticket(IO.of(none), IO.void()), {...current, queue: left(waiting)}], + (waiting) => [new Ticket(IO.of(none), cleanup), {...current, queue: left(waiting)}], (available) => { const [next, queue] = available.dequeue(); // there is an available element so we should drain it if (next) { return [ - new Ticket(IO.of(next), IO.void()), + new Ticket(IO.of(next), cleanup), {...current, queue: right(queue)} ]; } // otherwise there is nothing we can do return [ - new Ticket(IO.of(none), IO.void()), + new Ticket(IO.of(none), cleanup), {...current, queue: left(Dequeue.empty())} ]; } ); - } else { - // The queue is not closed - // We want to construct tickets that wait on the constructed deferred and the closed implementation - // The tickets should also always remove the deferred from the queue - const cleanup = this.unregister(deferred); - return current.queue.fold<[Ticket>, CloseableQueueState]>( - (waiting) => [ + } + // The queue is not closed + // We want to construct tickets that wait on the constructed deferred and the closed implementation + // The tickets should also always remove the deferred from the queue + const cleanup = this.unregister(deferred).applySecond(this.semaphore.acquire); + return current.queue.fold<[Ticket>, CloseableQueueState]>( + (waiting) => [ + new Ticket( + deferred.wait.race(this.closed.wait) + .ensuring(cleanup), + cleanup + ), + {...current, queue: left(waiting.enqueue(deferred))} + ], + (available) => { + const [next, queue] = available.dequeue(); + if (next) { + return [ + new Ticket(IO.of(next), IO.void()), + {...current, queue: right(queue)} + ]; + } + return [ new Ticket( deferred.wait.race(this.closed.wait) .ensuring(cleanup), cleanup ), - {...current, queue: left(waiting.enqueue(deferred))} - ], - (available) => { - const [next, queue] = available.dequeue(); - if (next) { - return [ - new Ticket(IO.of(next), IO.void()), - {...current, queue: right(queue)} - ]; - } - return [ - new Ticket( - deferred.wait.race(this.closed.wait) - .ensuring(cleanup), - cleanup - ), - {...current, queue: left(Dequeue.of(deferred))} - ]; - } - ); - } + {...current, queue: left(Dequeue.of(deferred))} + ]; + } + ); }) ) ); diff --git a/src/queue/index.ts b/src/queue/index.ts index 01c7475..0bdbd44 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -1,12 +1,14 @@ export * from "./iface"; import { right } from "fp-ts/lib/Either"; import { Option } from "fp-ts/lib/Option"; +import { Do } from "fp-ts-contrib/lib/Do"; import { Deferred } from "../deferred"; import { Dequeue } from "../internal/dequeue"; import { IO } from "../io"; +import { monad } from "../instances"; import { Ref } from "../ref"; import { Semaphore } from "../semaphore"; -import { AsyncQueueImpl as BQImpl } from "./blocking"; +import { AsyncQueueImpl as BQImpl, CloseableAsyncQueueImpl as CBQImpl } from "./blocking"; import { CloseableQueueState, droppingStrategy, QueueState, slidingStrategy, unboundedStrategy } from "./common"; import { AsyncQueue, CloseableAsyncQueue } from "./iface"; import { @@ -41,3 +43,12 @@ export function blockingQueue(max: number): IO> { return Ref.alloc>(right(Dequeue.empty())).product(Semaphore.alloc(max)) .map(([ref, sem]) => new BQImpl(ref, sem)); } + + +export function blockingCloseableQueue(max: number): IO> { + return Do(monad) + .bind("ref", Ref.alloc>({closed: false, queue: right(Dequeue.empty())})) + .bind("close", Deferred.alloc>()) + .bind("sem", Semaphore.alloc(max)) + .return(({ref, close, sem}) => new CBQImpl(ref, close, sem)); +} \ No newline at end of file