Skip to content

Commit

Permalink
Add a circle ci build.
Browse files Browse the repository at this point in the history
  • Loading branch information
rzeigler committed Mar 9, 2019
1 parent addfadc commit f99a092
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 49 deletions.
33 changes: 33 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
version: 2
jobs:
build:
docker:
# specify the version you desire here
- image: circleci/node:11.10

# Specify service dependencies here if necessary
# CircleCI maintains a library of pre-built images
# documented at https://circleci.com/docs/2.0/circleci-images/
# - image: circleci/mongo:3.4.4

working_directory: ~/repo

steps:
- checkout

# Download and cache dependencies
- restore_cache:
keys:
- v1-dependencies-{{ checksum "package.json" }}
# fallback to using the latest cache if no exact match is found
- v1-dependencies-

- run: npm ci

- save_cache:
paths:
- node_modules
key: v1-dependencies-{{ checksum "package.json" }}

# run tests!
- run: npm run prepublishOnly
30 changes: 15 additions & 15 deletions src/__test__/queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { right } from "fp-ts/lib/Either";
import { none, some } from "fp-ts/lib/Option";
import { Dequeue } from "../internal/dequeue";
import { IO } from "../io";
import { NBS, NonBlockingQueue, slidingStrategy, unboundedStrategy } from "../queue";
import { Queue, QueueState, slidingStrategy, unboundedStrategy } from "../queue";
import { Ref } from "../ref";
import { Value } from "../result";
import { equiv } from "./lib.spec";
Expand All @@ -17,8 +17,8 @@ const append = <A>(a: A) => (as: A[]) => [...as, a];
describe("Unbounded Async Queue", () => {
it("should allow consuming elements in the order they were added", () => {
const ref = Ref.unsafeAlloc<number[]>([]);
const state = Ref.unsafeAlloc<NBS<number>>(right(Dequeue.empty()));
const queue = new NonBlockingQueue(state, unboundedStrategy);
const state = Ref.unsafeAlloc<QueueState<number>>(right(Dequeue.empty()));
const queue = new Queue(state, unboundedStrategy);
const read = queue.take.chain((n) => ref.update(append(n)));
const io = queue.offer(1)
.applySecond(queue.offer(2))
Expand All @@ -31,8 +31,8 @@ describe("Unbounded Async Queue", () => {
});
it("should block consumers until there is a value ready", () => {
const ref = Ref.unsafeAlloc<number[]>([]);
const state = Ref.unsafeAlloc<NBS<number>>(right(Dequeue.empty()));
const queue = new NonBlockingQueue(state, unboundedStrategy);
const state = Ref.unsafeAlloc<QueueState<number>>(right(Dequeue.empty()));
const queue = new Queue(state, unboundedStrategy);
const read = queue.take.chain((n) => ref.update(append(n)));
const io =
read.fork().chain((fib1) =>
Expand All @@ -46,8 +46,8 @@ describe("Unbounded Async Queue", () => {
});
it("should allow consumers to stack up", () => {
const ref = Ref.unsafeAlloc<number[]>([]);
const state = Ref.unsafeAlloc<NBS<number>>(right(Dequeue.empty()));
const queue = new NonBlockingQueue(state, unboundedStrategy);
const state = Ref.unsafeAlloc<QueueState<number>>(right(Dequeue.empty()));
const queue = new Queue(state, unboundedStrategy);
const read = queue.take.chain((n) => ref.update(append(n)));
const io =
queue.offer(1)
Expand All @@ -62,8 +62,8 @@ describe("Unbounded Async Queue", () => {
return equiv(io, new Value([[1], [1, 2]]));
});
it("should ensure available items are removed before subsequent reads", () => {
const state = Ref.unsafeAlloc<NBS<number>>(right(Dequeue.empty()));
const queue = new NonBlockingQueue(state, unboundedStrategy);
const state = Ref.unsafeAlloc<QueueState<number>>(right(Dequeue.empty()));
const queue = new Queue(state, unboundedStrategy);
const read = queue.take;
const io =
queue.offer(1)
Expand All @@ -72,8 +72,8 @@ describe("Unbounded Async Queue", () => {
});
it("should allow reads to be cancelled", () => {
const ref = Ref.unsafeAlloc<Array<[string, number]>>([]);
const state = Ref.unsafeAlloc<NBS<number>>(right(Dequeue.empty()));
const queue = new NonBlockingQueue(state, unboundedStrategy);
const state = Ref.unsafeAlloc<QueueState<number>>(right(Dequeue.empty()));
const queue = new Queue(state, unboundedStrategy);
const read = (name: string) => queue.take.chain((n) => ref.update(append([name, n] as [string, number])));
const io =
read("a").fork().product(read("b").fork())
Expand All @@ -83,8 +83,8 @@ describe("Unbounded Async Queue", () => {
return equiv(io, new Value([["b", 1]]));
});
it("should be unbounded", () => {
const state = Ref.unsafeAlloc<NBS<number>>(right(Dequeue.empty()));
const queue = new NonBlockingQueue(state, unboundedStrategy);
const state = Ref.unsafeAlloc<QueueState<number>>(right(Dequeue.empty()));
const queue = new Queue(state, unboundedStrategy);
const inserts: Array<IO<never, void>> = [];
for (let i = 0; i < 10000; i++) {
inserts.push(queue.offer(i));
Expand All @@ -98,8 +98,8 @@ describe("Unbounded Async Queue", () => {
describe("Bounded Non Blocking Queue", () => {
it("should be bounded", () => {
const ref = Ref.unsafeAlloc<number[]>([]);
const state = Ref.unsafeAlloc<NBS<number>>(right(Dequeue.empty()));
const queue = new NonBlockingQueue(state, slidingStrategy(1));
const state = Ref.unsafeAlloc<QueueState<number>>(right(Dequeue.empty()));
const queue = new Queue(state, slidingStrategy(1));
const read = queue.take.chain((n) => ref.update(append(n)));
const io = queue.offer(1)
.applySecond(queue.offer(2))
Expand Down
10 changes: 5 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
// This software is released under the MIT License.
// https://opensource.org/licenses/MIT

export * from "./io";
export { IO } from "./io";
export { Fiber } from "./fiber";
export { Ref } from "./ref";
export { Deferred } from "./deferred";
export { AsyncQueue, unboundedQueue, boundedQueue, OverflowStrategy } from "./queue";
export * from "./result";
export * from "./fiber";
export * from "./ref";
export * from "./deferred";
export * from "./terminal";
export * from "./instances";
export * from "./queue";
13 changes: 4 additions & 9 deletions src/internal/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,10 @@ export class Runtime<E, A> {
if (!this.result.isSet() && !this.interrupted) {
this.interrupted = true;
if (this.criticalSections === 0) {
if (this.suspended) {
// Maybe we haven't actually started yet so we are suspended but have no async frame
// In that case, just let the runloop start pick it up
if (this.asyncFrame) {
this.asyncFrame.interrupt();
this.interruptFinalize();
}
} else {
throw new Error("Bug: Interrupted a running fiber. Run loops should not be able to stack this way.");
// It is possible we were interrupted before the runloop started
if (this.suspended && this.asyncFrame) {
this.asyncFrame.interrupt();
this.interruptFinalize();
}
}
}
Expand Down
35 changes: 15 additions & 20 deletions src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,36 @@ import { Dequeue } from "./internal/dequeue";
import { Ticket } from "./internal/ticket";
import { IO } from "./io";
import { Ref } from "./ref";
import { Abort } from "./result";

export interface AsyncQueue<A> {
readonly count: IO<never, number>;
readonly take: IO<never, A>;
offer(a: A): IO<never, void>;
}

export interface FiniteAsyncQueue<A> extends AsyncQueue<Option<A>> {
export interface CloseableAsyncQueue<A> extends AsyncQueue<Option<A>> {
close: IO<never, void>;
}

export type OverflowStrategy = "slide" | "drop";

export function unboundedNonBlockingQueue<A>(): IO<never, AsyncQueue<A>> {
return Ref.alloc<NBS<A>>(right(Dequeue.empty()))
.map((state) => new NonBlockingQueue(state, unboundedStrategy));
export function unboundedQueue<A>(): IO<never, AsyncQueue<A>> {
return Ref.alloc<QueueState<A>>(right(Dequeue.empty()))
.map((state) => new Queue(state, unboundedStrategy));
}

export function boundedNonBlockingQueue<A>(max: number, strategy: OverflowStrategy): IO<never, AsyncQueue<A>> {
export function boundedQueue<A>(max: number, strategy: OverflowStrategy): IO<never, AsyncQueue<A>> {
return assert(max, isGt(0), "Bug: Max queue size must be > 0")
.applySecond(Ref.alloc<NBS<A>>(right(Dequeue.empty())))
.map((state) => new NonBlockingQueue(state, strategy === "slide" ? slidingStrategy(max) : droppingStrategy(max)));
.applySecond(Ref.alloc<QueueState<A>>(right(Dequeue.empty())))
.map((state) => new Queue(state, strategy === "slide" ? slidingStrategy(max) : droppingStrategy(max)));
}

export type EnqueueStrategy<A> = (a: A, current: Dequeue<A>) => Dequeue<A>;

export type Available<A> = Dequeue<A>;
export type Waiting<A> = Dequeue<Deferred<A>>;
export type NBS<A> = Either<Waiting<A>, Available<A>>;

type FNBQ<A> = Either<Waiting<Option<A>>, Available<A>>;
interface FiniteNonBlockingState<A> {
closed: boolean;
state: FNBQ<A>;
}

export type EnqueueStrategy<A> = (a: A, current: Dequeue<A>) => Dequeue<A>;
export type QueueState<A> = Either<Waiting<A>, Available<A>>;

export const unboundedStrategy =
<A>(a: A, current: Dequeue<A>): Dequeue<A> => current.enqueue(a);
Expand All @@ -61,11 +56,11 @@ export const slidingStrategy = (max: number) => <A>(a: A, current: Dequeue<A>):
return current.enqueue(a);
};

export class NonBlockingQueue<A> implements AsyncQueue<A> {
export class Queue<A> implements AsyncQueue<A> {
public readonly count: IO<never, number>;
public readonly take: IO<never, A>;

constructor(private readonly state: Ref<NBS<A>>, private readonly enqueue: EnqueueStrategy<A>) {
constructor(private readonly state: Ref<QueueState<A>>, private readonly enqueue: EnqueueStrategy<A>) {
function unregister(deferred: Deferred<A>): IO<never, void> {
return state.update((current) =>
current.fold(
Expand All @@ -77,7 +72,7 @@ export class NonBlockingQueue<A> implements AsyncQueue<A> {
const makeTicket = Deferred.alloc<A>()
.chain((deferred) =>
state.modify((current) =>
current.fold<[Ticket<A>, NBS<A>]>(
current.fold<[Ticket<A>, QueueState<A>]>(
(waiting) => [new Ticket(deferred.wait, unregister(deferred)), left(waiting.enqueue(deferred))],
(available) => {
const [next, queue] = available.dequeue();
Expand All @@ -97,7 +92,7 @@ export class NonBlockingQueue<A> implements AsyncQueue<A> {
public offer(a: A): IO<never, void> {
return this.state
.modify((current) =>
current.fold<[IO<never, void>, NBS<A>]>(
current.fold<[IO<never, void>, QueueState<A>]>(
(waiting) => {
const [next, queue] = waiting.dequeue();
if (next) {
Expand Down

0 comments on commit f99a092

Please sign in to comment.