Skip to content

Commit

Permalink
Prototype the blocking queue implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
rzeigler committed Apr 15, 2019
1 parent 68169b2 commit c0e77ae
Showing 1 changed file with 99 additions and 6 deletions.
105 changes: 99 additions & 6 deletions src/queue/blocking.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,112 @@ export class AsyncQueueImpl<A> implements AsyncQueue<A> {

export class CloseableAsyncQueueImpl<A> {
public readonly count: IO<never, number>;
public readonly take: IO<never, A>;
public readonly take: IO<never, Option<A>>;
public readonly close: IO<never, void>;
public readonly isClosed: IO<never, boolean>;

constructor(private readonly state: Ref<CloseableQueueState<A>>,
private readonly closed: Deferred<Option<A>>,
private readonly semaphore: Semaphore) {
this.count = IO.aborted(new Abort("boom!"));
this.take = IO.aborted(new Abort("boom!"));
this.close = IO.aborted(new Abort("boom"));
// Everytime we queue up a read, that means we are freeing up max capacity, ergo, semaphore release
// On ticket cleanup (for interruption) we perform the opposite action of acquiring to ensure we
// don't accidentally increase max capacity
// If our read completes early because of close, its safe to just ignore the release because leaking max
// capacity in a closed state doesn't matter
const makeTicket = Deferred.alloc<Option<A>>()
.chain((deferred) =>
this.semaphore.release.applySecond(
this.state.modify((current) => {
// The queue is closed, so we need to ensure we are draining
if (current.closed) {
return current.queue.fold<[Ticket<Option<A>>, CloseableQueueState<A>]>(
(waiting) => [new Ticket(IO.of(none), IO.void()), {...current, queue: left(waiting)}],
(available) => {
const [next, queue] = available.dequeue();
if (next) {
return [
new Ticket(IO.of(next), IO.void()),
{...current, queue: right(queue)}
];
}
return [
new Ticket(IO.of(none), IO.void()),
{...current, queue: left(Dequeue.empty())}
];
}
);
} else {
const cleanup = this.unregister(deferred);
return current.queue.fold<[Ticket<Option<A>>, CloseableQueueState<A>]>(
(waiting) => [
new Ticket(
deferred.wait.race(this.closed.wait)
.ensuring(cleanup),
cleanup
),
{...current, queue: left(waiting.enqueue(deferred))}
],
(available) => {
const [next, queue] = available.dequeue();
if (next) {
return [
new Ticket(IO.of(next), IO.void()),
{...current, queue: right(queue)}
];
}
return [
new Ticket(
deferred.wait.race(this.closed.wait)
.ensuring(cleanup),
cleanup
),
{...current, queue: left(Dequeue.of(deferred))}
];
}
);
}
})
)
);

this.count = this.state.get.map((s) => queueCount(s.queue));
this.take = makeTicket.bracketExit(Ticket.cleanup, Ticket.wait);
this.close = state.modify((current) =>
current.closed ? [IO.void(), current] : [closed.fill(none), {...current, closed: true}]
).flatten().critical();
this.isClosed = state.get.map((c) => c.closed);
}

public offer(a: A): IO<never, void> {
return IO.aborted(new Abort("boom!"));
public offer(a: A): IO<never, boolean> {
const offerIO = this.state.modify((current) => {
// Don't worry about closed check due to acquisition logic
return current.queue.fold<[IO<never, void>, CloseableQueueState<A>]>(
(waiting) => {
const [next, queue] = waiting.dequeue();
if (next) {
return [next.fill(some(a)), {...current, queue: left(queue)}];
}
return [IO.void(), {...current, queue: right(Dequeue.of(some(a)))}];
},
(available) => [IO.void(), {...current, queue: right(available.enqueue(some(a)))}]
);
});

return this.semaphore.acquire.as(true)
.race(this.closed.wait.as(false))
.branch(offerIO.as(true), IO.of(false));
}

private unregister(deferred: Deferred<Option<A>>): IO<never, void> {
return this.state.update((current) => {
return {
...current,
queue: current.queue.fold(
(waiting) => left(Dequeue.ofAll(waiting.array.filter((d) => d !== deferred))),
(available) => right(available)
)
};
}).void();
}
}

Expand Down

0 comments on commit c0e77ae

Please sign in to comment.