Skip to content

Commit

Permalink
Produce exports for the closeable blocking queue
Browse files Browse the repository at this point in the history
  • Loading branch information
rzeigler committed Apr 21, 2019
1 parent ed245da commit b46a7c1
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 57 deletions.
21 changes: 14 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"chai-as-promised": "^7.1.1",
"esm": "^3.2.5",
"fast-check": "^1.10.1",
"fp-ts": "^1.14.3",
"fp-ts": "^1.17.0",
"husky": "^1.3.1",
"mocha": "^5.2.0",
"nodemon": "^1.18.10",
Expand All @@ -47,12 +47,13 @@
"tslint": "^5.12.1",
"typedoc": "^0.14.2",
"typedoc-plugin-markdown": "^1.1.27",
"typescript": "^3.3.3"
"typescript": "^3.4.4"
},
"dependencies": {
"autobind-decorator": "^2.4.0"
"autobind-decorator": "^2.4.0",
"fp-ts-contrib": "0.0.3"
},
"peerDependencies": {
"fp-ts": "^1.14.3"
"fp-ts": "^1.17.0"
}
}
18 changes: 17 additions & 1 deletion src/__test__/queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import { array } from "fp-ts/lib/Array";
import { none, Option, some } from "fp-ts/lib/Option";
import { log } from "../console";
import { blockingQueue, boundedQueue, unboundedCloseableQueue, unboundedQueue } from "../queue";
import { blockingQueue, blockingCloseableQueue, boundedQueue, unboundedCloseableQueue, unboundedQueue } from "../queue";
import { Ref } from "../ref";
import { Value } from "../result";
import { equiv } from "./lib.spec";
Expand Down Expand Up @@ -155,3 +155,19 @@ describe("Blocking Queue", () => {
return equiv(io, new Value([[1], [1, 2, 3]]));
});
});

// describe("CloseableBlockingQueue", () => {
// it("should block while waiting for values", () => {
// const io = blockingCloseableQueue<number>(1).product(Ref.alloc<Array<Option<number>>>([]))
// .chain(([queue, wrote]) => {
// const write = (n: number) => queue.offer(n).applySecond(wrote.update(append(n)));
// const bulkWrite = array.traverse(monad)([1, 2, 3], write).void();
// const bulkRead = queue.take.forever();
// return bulkWrite.fork()
// .chain((fiber) =>
// wrote.get.delay(50).applyFirst(bulkRead.fork()).product(fiber.wait.applySecond(wrote.get))
// );
// });
// return equiv(io, new Value([[some(1)], [some(1), some(2), some(3)]]));
// })
// })
13 changes: 0 additions & 13 deletions src/io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -581,19 +581,6 @@ export class IO<E, A> {
return test.branch<void>(this.void(), IO.void());
}

/**
* Abort the current fiber if this produces true using the provided abort
* @param this
* @param abort
*/
public abort(this: IO<E, boolean>, abort: Abort): IO<E, void> {
return this.chain((yes) => yes ? IO.aborted(abort).widenError<E>() : IO.void());
}

public fail(this: IO<E, boolean>, e: E): IO<E, void> {
return this.chain((yes) => yes ? IO.failed(e) : IO.void().widenError<E>());
}

/**
* Produce an IO that when run will spawn this as a fiber.
*/
Expand Down
62 changes: 31 additions & 31 deletions src/queue/blocking.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,58 +102,58 @@ export class CloseableAsyncQueueImpl<A> {
this.state.modify((current) => {
// The queue is closed, so we need to ensure we are draining
if (current.closed) {
const cleanup = this.semaphore.acquire;
return current.queue.fold<[Ticket<Option<A>>, CloseableQueueState<A>]>(
// we are queued up on waits, so we can do nothing
(waiting) => [new Ticket(IO.of(none), IO.void()), {...current, queue: left(waiting)}],
(waiting) => [new Ticket(IO.of(none), cleanup), {...current, queue: left(waiting)}],
(available) => {
const [next, queue] = available.dequeue();
// there is an available element so we should drain it
if (next) {
return [
new Ticket(IO.of(next), IO.void()),
new Ticket(IO.of(next), cleanup),
{...current, queue: right(queue)}
];
}
// otherwise there is nothing we can do
return [
new Ticket(IO.of(none), IO.void()),
new Ticket(IO.of(none), cleanup),
{...current, queue: left(Dequeue.empty())}
];
}
);
} else {
// The queue is not closed
// We want to construct tickets that wait on the constructed deferred and the closed implementation
// The tickets should also always remove the deferred from the queue
const cleanup = this.unregister(deferred);
return current.queue.fold<[Ticket<Option<A>>, CloseableQueueState<A>]>(
(waiting) => [
}
// The queue is not closed
// We want to construct tickets that wait on the constructed deferred and the closed implementation
// The tickets should also always remove the deferred from the queue
const cleanup = this.unregister(deferred).applySecond(this.semaphore.acquire);
return current.queue.fold<[Ticket<Option<A>>, CloseableQueueState<A>]>(
(waiting) => [
new Ticket(
deferred.wait.race(this.closed.wait)
.ensuring(cleanup),
cleanup
),
{...current, queue: left(waiting.enqueue(deferred))}
],
(available) => {
const [next, queue] = available.dequeue();
if (next) {
return [
new Ticket(IO.of(next), IO.void()),
{...current, queue: right(queue)}
];
}
return [
new Ticket(
deferred.wait.race(this.closed.wait)
.ensuring(cleanup),
cleanup
),
{...current, queue: left(waiting.enqueue(deferred))}
],
(available) => {
const [next, queue] = available.dequeue();
if (next) {
return [
new Ticket(IO.of(next), IO.void()),
{...current, queue: right(queue)}
];
}
return [
new Ticket(
deferred.wait.race(this.closed.wait)
.ensuring(cleanup),
cleanup
),
{...current, queue: left(Dequeue.of(deferred))}
];
}
);
}
{...current, queue: left(Dequeue.of(deferred))}
];
}
);
})
)
);
Expand Down
13 changes: 12 additions & 1 deletion src/queue/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
export * from "./iface";
import { right } from "fp-ts/lib/Either";
import { Option } from "fp-ts/lib/Option";
import { Do } from "fp-ts-contrib/lib/Do";
import { Deferred } from "../deferred";
import { Dequeue } from "../internal/dequeue";
import { IO } from "../io";
import { monad } from "../instances";
import { Ref } from "../ref";
import { Semaphore } from "../semaphore";
import { AsyncQueueImpl as BQImpl } from "./blocking";
import { AsyncQueueImpl as BQImpl, CloseableAsyncQueueImpl as CBQImpl } from "./blocking";
import { CloseableQueueState, droppingStrategy, QueueState, slidingStrategy, unboundedStrategy } from "./common";
import { AsyncQueue, CloseableAsyncQueue } from "./iface";
import {
Expand Down Expand Up @@ -41,3 +43,12 @@ export function blockingQueue<A>(max: number): IO<never, AsyncQueue<A>> {
return Ref.alloc<QueueState<A>>(right(Dequeue.empty())).product(Semaphore.alloc(max))
.map(([ref, sem]) => new BQImpl(ref, sem));
}


export function blockingCloseableQueue<A>(max: number): IO<never, CloseableAsyncQueue<A>> {
return Do(monad)
.bind("ref", Ref.alloc<CloseableQueueState<A>>({closed: false, queue: right(Dequeue.empty())}))
.bind("close", Deferred.alloc<Option<A>>())
.bind("sem", Semaphore.alloc(max))
.return(({ref, close, sem}) => new CBQImpl(ref, close, sem));
}

0 comments on commit b46a7c1

Please sign in to comment.