diff --git a/src/queue/blocking.ts b/src/queue/blocking.ts index 1ddd1df..2bd6aea 100644 --- a/src/queue/blocking.ts +++ b/src/queue/blocking.ts @@ -84,19 +84,112 @@ export class AsyncQueueImpl implements AsyncQueue { export class CloseableAsyncQueueImpl { public readonly count: IO; - public readonly take: IO; + public readonly take: IO>; public readonly close: IO; + public readonly isClosed: IO; constructor(private readonly state: Ref>, private readonly closed: Deferred>, private readonly semaphore: Semaphore) { - this.count = IO.aborted(new Abort("boom!")); - this.take = IO.aborted(new Abort("boom!")); - this.close = IO.aborted(new Abort("boom")); + // Everytime we queue up a read, that means we are freeing up max capacity, ergo, semaphore release + // On ticket cleanup (for interruption) we perform the opposite action of acquiring to ensure we + // don't accidentally increase max capacity + // If our read completes early because of close, its safe to just ignore the release because leaking max + // capacity in a closed state doesn't matter + const makeTicket = Deferred.alloc>() + .chain((deferred) => + this.semaphore.release.applySecond( + this.state.modify((current) => { + // The queue is closed, so we need to ensure we are draining + if (current.closed) { + return current.queue.fold<[Ticket>, CloseableQueueState]>( + (waiting) => [new Ticket(IO.of(none), IO.void()), {...current, queue: left(waiting)}], + (available) => { + const [next, queue] = available.dequeue(); + if (next) { + return [ + new Ticket(IO.of(next), IO.void()), + {...current, queue: right(queue)} + ]; + } + return [ + new Ticket(IO.of(none), IO.void()), + {...current, queue: left(Dequeue.empty())} + ]; + } + ); + } else { + const cleanup = this.unregister(deferred); + return current.queue.fold<[Ticket>, CloseableQueueState]>( + (waiting) => [ + new Ticket( + deferred.wait.race(this.closed.wait) + .ensuring(cleanup), + cleanup + ), + {...current, queue: left(waiting.enqueue(deferred))} + ], + (available) => { + const [next, queue] = available.dequeue(); + if (next) { + return [ + new Ticket(IO.of(next), IO.void()), + {...current, queue: right(queue)} + ]; + } + return [ + new Ticket( + deferred.wait.race(this.closed.wait) + .ensuring(cleanup), + cleanup + ), + {...current, queue: left(Dequeue.of(deferred))} + ]; + } + ); + } + }) + ) + ); + + this.count = this.state.get.map((s) => queueCount(s.queue)); + this.take = makeTicket.bracketExit(Ticket.cleanup, Ticket.wait); + this.close = state.modify((current) => + current.closed ? [IO.void(), current] : [closed.fill(none), {...current, closed: true}] + ).flatten().critical(); + this.isClosed = state.get.map((c) => c.closed); } - public offer(a: A): IO { - return IO.aborted(new Abort("boom!")); + public offer(a: A): IO { + const offerIO = this.state.modify((current) => { + // Don't worry about closed check due to acquisition logic + return current.queue.fold<[IO, CloseableQueueState]>( + (waiting) => { + const [next, queue] = waiting.dequeue(); + if (next) { + return [next.fill(some(a)), {...current, queue: left(queue)}]; + } + return [IO.void(), {...current, queue: right(Dequeue.of(some(a)))}]; + }, + (available) => [IO.void(), {...current, queue: right(available.enqueue(some(a)))}] + ); + }); + + return this.semaphore.acquire.as(true) + .race(this.closed.wait.as(false)) + .branch(offerIO.as(true), IO.of(false)); + } + + private unregister(deferred: Deferred>): IO { + return this.state.update((current) => { + return { + ...current, + queue: current.queue.fold( + (waiting) => left(Dequeue.ofAll(waiting.array.filter((d) => d !== deferred))), + (available) => right(available) + ) + }; + }).void(); } }