Skip to content

Commit

Permalink
Going to re-work the queue implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
rzeigler committed Apr 23, 2019
1 parent b46a7c1 commit 961c596
Show file tree
Hide file tree
Showing 12 changed files with 36 additions and 689 deletions.
12 changes: 6 additions & 6 deletions src/__test__/deferred.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ describe("Deferred", () => {
it("should block until fulfilled", () => {
const io = Deferred.alloc<number>()
.chain((defer) =>
defer.fill(100).delay(100).fork()
.applySecond(defer.wait)
defer.complete(100).delay(100).fork()
.applySecond(defer.get)
);
equiv(io, new Value(100));
});

it("should allow cross fiber messaging", () => {
const deferred = Deferred.unsafeAlloc<void>();
const fibIO1 = deferred.fill(undefined).delay(50);
const fibIO2 = deferred.wait.yield_().as(42);
const fibIO1 = deferred.complete(undefined).delay(50);
const fibIO2 = deferred.get.yield_().as(42);
const io = fibIO1.fork()
.applySecond(fibIO2.fork().chain((fib2) => fib2.wait));
return equiv(io, new Value(new Value(42)));
Expand All @@ -29,8 +29,8 @@ describe("Deferred", () => {
it("experimenting with multiple deferreds", () => {
const d1 = Deferred.unsafeAlloc<void>();
const d2 = Deferred.unsafeAlloc<void>();
const fibIO1 = d1.wait.interrupted(d2.fill(undefined));
const fibIO2 = d2.wait.as(42);
const fibIO1 = d1.get.onInterrupt(d2.complete(undefined));
const fibIO2 = d2.get.as(42);
const io =
fibIO1.fork().chain((fib1) =>
fibIO2.fork().chain((fib2) => fib1.interruptAndWait.delay(10).applySecond(fib2.join)));
Expand Down
10 changes: 5 additions & 5 deletions src/__test__/io.resource.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ describe("IO", () => {
describe("resource management", () => {
it("should eval ensuring", () => {
const ref = Ref.unsafeAlloc(42);
const io = ref.update((n) => n + 1).ensuring(ref.update((n) => n - 1).yield_()).applySecond(ref.get);
const io = ref.update((n) => n + 1).onComplete(ref.update((n) => n - 1).yield_()).applySecond(ref.get);
return equiv(io, new Value(42));
});
it("should eval ensuring in the face of interrupts", () => {
const ref = Ref.unsafeAlloc(41);
const fiberIO = IO.never_().ensuring(ref.update((n) => n + 1));
const fiberIO = IO.never_().onComplete(ref.update((n) => n + 1));
const io = fiberIO.fork().chain((fiber) => fiber.interrupt.yield_().applySecond(ref.get));
return equiv(io, new Value(42));
});
Expand Down Expand Up @@ -98,23 +98,23 @@ describe("IO", () => {
it("should not invoke interrupted during normal execution", () => {
const ref = Ref.unsafeAlloc(41);
const add1 = ref.update(((n) => n + 1));
const fiberIO = add1.interrupted(add1);
const fiberIO = add1.onInterrupt(add1);
const io = fiberIO.fork()
.chain((fiber) => fiber.join.attempt().applySecond(ref.get));
return equiv(io, new Value(42));
});
it("should not invoke interrupted during failure execution", () => {
const ref = Ref.unsafeAlloc(42);
const add1 = ref.update(((n) => n + 1));
const fiberIO = IO.failed("boom").interrupted(add1);
const fiberIO = IO.failed("boom").onInterrupt(add1);
const io = fiberIO.fork()
.chain((fiber) => fiber.join.attempt().applySecond(ref.get));
return equiv(io, new Value(42));
});
it("should invoke interrupted during interrupts", () => {
const ref = Ref.unsafeAlloc(41);
const add1 = ref.update((n) => n + 1);
const fiberIO = IO.never_().interrupted(add1);
const fiberIO = IO.never_().onInterrupt(add1);
const io = fiberIO.fork()
.chain((fiber) => fiber.interruptAndWait.delay(10).applySecond(ref.get));
return equiv(io, new Value(42));
Expand Down
173 changes: 0 additions & 173 deletions src/__test__/queue.spec.ts

This file was deleted.

16 changes: 8 additions & 8 deletions src/deferred.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ export class Deferred<A> {
return new Deferred();
}

public wait: IO<never, A>;
public get: IO<never, Option<A>>;
public isFull: IO<never, boolean>;
public get: IO<never, A>;
public tryGet: IO<never, Option<A>>;
public isComplete: IO<never, boolean>;
public isEmpty: IO<never, boolean>;
private oneshot: OneShot<A>;

private constructor() {
this.oneshot = new OneShot<A>();
this.isFull = IO.eval(() => this.oneshot.isSet());
this.isComplete = IO.eval(() => this.oneshot.isSet());
this.isEmpty = IO.eval(() => !this.oneshot.isSet());
this.wait = IO.async<never, A>((contextSwitch) => {
this.get = IO.async<never, A>((contextSwitch) => {
// types are weird between browser and node but we are only using it as an opaque handle
const listener = (a: A) => {
// Don't deliver the notification until the next tick.
Expand All @@ -47,11 +47,11 @@ export class Deferred<A> {
this.oneshot.unlisten(listener);
});
});
this.get = IO.eval(() => this.oneshot.get());
this.tryGet = IO.eval(() => this.oneshot.get());
}

@boundMethod
public fill(a: A): IO<never, void> {
public complete(a: A): IO<never, void> {
return IO.eval(() => {
if (this.oneshot.isSet()) {
throw new Error("Bug: Deferred has already been filled");
Expand All @@ -61,7 +61,7 @@ export class Deferred<A> {
}

@boundMethod
public tryFill(a: A): IO<never, boolean> {
public tryComplete(a: A): IO<never, boolean> {
return IO.eval(() => {
if (this.oneshot.isSet()) {
return false;
Expand Down
2 changes: 1 addition & 1 deletion src/internal/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ export class Runtime<E, A> {
} else if (current.step._tag === "critical") {
// Once enter critical completes we are guaranteed leave critical
return (this.enterCritical as unknown as IO<unknown, unknown>)
.applySecond(current.step.io.ensuring(this.leaveCritical as unknown as IO<never, unknown>));
.applySecond(current.step.io.onComplete(this.leaveCritical as unknown as IO<never, unknown>));
} else if (current.step._tag === "chain") {
this.callFrames.push(new ChainFrame(current.step.chain));
return current.step.left;
Expand Down
24 changes: 12 additions & 12 deletions src/io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,11 @@ export class IO<E, A> {
Deferred.alloc<B>().widenError<E>().chain((rightInto) =>
this.fork().widenError<E>().bracket((fiba) => fiba.interrupt, (fiba) =>
fb.fork().widenError<E>().bracket((fibb) => fibb.interrupt, (fibb) =>
fiba.join.tap((v) => leftInto.fill(v).widenError<E>())
.applySecond(fibb.join.tap((v) => rightInto.fill(v).widenError<E>()))
fiba.join.tap((v) => leftInto.complete(v).widenError<E>())
.applySecond(fibb.join.tap((v) => rightInto.complete(v).widenError<E>()))
)
)
.applySecond(leftInto.wait.map2(rightInto.wait, f).widenError<E>())
.applySecond(leftInto.get.map2(rightInto.get, f).widenError<E>())
)
);
}
Expand Down Expand Up @@ -340,9 +340,9 @@ export class IO<E, A> {
/**
* Run this and produce either a Value<A> or a Raise<E> depending on the result
*/
public attempt(): IO<never, Attempt<E, A>> {
return this.map<Attempt<E, A>>((v) => new Value(v))
.chainCause((cause) => cause._tag === "raise" ? IO.of(cause) : IO.caused(cause));
public attempt(): IO<never, Either<E, A>> {
return this.map<Either<E, A>>(right)
.chainError((e) => IO.of(left(e)));
}

/**
Expand Down Expand Up @@ -389,7 +389,7 @@ export class IO<E, A> {
* This is an interaction between asynchronous boundaries and critical sections, but you must account for it.
* @param always
*/
public ensuring<B>(always: IO<never, B>): IO<E, A> {
public onComplete<B>(always: IO<never, B>): IO<E, A> {
return new IO(new OnDone(this, always));
}

Expand All @@ -401,7 +401,7 @@ export class IO<E, A> {
* (and thus always be registered as a cleanup action) without 'this' IO every actually starting.
* This is an interaction between asynchronous boundaries and critical sections, but you must account for it.
*/
public interrupted<B>(interrupt: IO<never, B>): IO<E, A> {
public onInterrupt<B>(interrupt: IO<never, B>): IO<E, A> {
return new IO(new OnInterrupted(this, interrupt));
}

Expand All @@ -421,7 +421,7 @@ export class IO<E, A> {
// Resource acquisition and setting of the ref is critical
this.chain((r) => ref.set(release(r)).as(r).widenError<E>()).critical()
.chain(consume)
.ensuring(ref.get.flatten())
.onComplete(ref.get.flatten())
);
}

Expand All @@ -447,7 +447,7 @@ export class IO<E, A> {
cleanup.set(fib.interruptAndWait
.chain((result) => release(resource, result))).widenError<E>())).critical()
.chain((fib) => fib.join)
.ensuring(cleanup.get.flatten())
.onComplete(cleanup.get.flatten())
);
}

Expand Down Expand Up @@ -508,7 +508,7 @@ export class IO<E, A> {
.chain((deferred) =>
raceInto(deferred, this)
.within((fiba) => fiba.interrupt, raceInto(deferred, other)
.within((fibb) => fibb.interrupt, deferred.wait)))
.within((fibb) => fibb.interrupt, deferred.get)))
.slay();
}

Expand Down Expand Up @@ -653,7 +653,7 @@ export class IO<E, A> {

function raceInto<E, A>(defer: Deferred<Result<E, A>>, io: IO<E, A>): IO<never, Fiber<never, void>> {
return io.resurrect()
.chain((result) => defer.tryFill(result).void())
.chain((result) => defer.tryComplete(result).void())
.fork();
}

Expand Down
Loading

0 comments on commit 961c596

Please sign in to comment.