Skip to content

Commit

Permalink
Reproduce 06 example
Browse files Browse the repository at this point in the history
  • Loading branch information
rzeigler committed Aug 25, 2019
1 parent d1da78c commit 3fa71da
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 1 deletion.
7 changes: 7 additions & 0 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
"problemMatcher": [
"$eslint-stylish"
]
},
{
"type": "npm",
"script": "test-build",
"problemMatcher": [
"$tsc"
]
}
]
}
92 changes: 92 additions & 0 deletions examples/06-concurrent-structures.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2019 Ryan Zeigler
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* waveguide also exposes several concurrency structures, one of the most useful of which is semaphore.
* Lets suppose you want to make at most 3 requests to a remote server at a time.
* Doing all in parallel is fairly easy, doing 1 at a time is also fairly easy, but this is an interesting case where
* 'semantic blocking' can be used to come into play
*/
import { array } from "fp-ts/lib/Array";
import { pipe } from "fp-ts/lib/pipeable";
import * as wave from "../src/wave";
import { Wave } from "../src/wave";
import * as waver from "../src/waver";
import { WaveR } from "../src/waver";
import * as S from "../src/semaphore";
import { fetch, agent, time, main } from "./common";
import * as https from "https";
import * as consoleIO from "../src/console";
import * as managed from "../src/managed";

function query(q: string): WaveR<https.Agent, Error, readonly [string, bigint]> {
const get = pipe(
fetch(`https://www.google.com/search?q=${q}`),
time,
waver.mapWith(x => [q, x[1]] as const)
)
return waver.applySecond(consoleIO.logR(`starting ${q}`), waver.applyFirst(get, consoleIO.logR(`finishing ${q}`)));
}

/**
* Here we create an action that will create a semaphore.
* See http://systemfw.org/scala-italy-2018/#/4/3 for an excellent description (in Scala) about why this should be an IO
*/
const sem: WaveR<{}, never, S.SemaphoreR> = S.makeSemaphoreR(3);

const queries = ["c", "cplusplus", "java", "scala", "haskell", "purescript", "rust", "idris", "ada", "coq", "smalltalk", "swift", "kotlin"]

const queryTimes: WaveR<https.Agent, never, void> =

pipe(
waver.chain(sem as WaveR<https.Agent, Error, S.SemaphoreR>,
(sem) =>
waver.chain(array.traverse(waver.parInstances)(queries, q => sem.withPermit(query(q))),
(results) => consoleIO.logR(results.toString()))),
waver.chainErrorWith((e) => consoleIO.errorR(`eeik! ${e}`))
)

/**
* This results in the following example output in the console
*
*
starting kotlin
starting swift
starting smalltalk
finishing smalltalk
starting coq
finishing kotlin
starting ada
finishing swift
starting idris
finishing coq
starting rust
finishing idris
starting purescript
finishing ada
starting haskell
finishing rust
starting scala
finishing purescript
starting java
finishing haskell
starting cplusplus
finishing scala
starting c
finishing java
finishing cplusplus
finishing c
c,913086810,cplusplus,596634401,java,831743612,scala,793182444,haskell,768789686,purescript,421581001,rust,832051966,idris,792947817,ada,1271403411,coq,602136105,smalltalk,590635041,swift,1087156638,kotlin,739227974
*/
main(managed.provideTo(agent, queryTimes));
1 change: 1 addition & 0 deletions examples/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export function main(io: Wave<never, void>): void {
driver.onExit((e) => {
// We don't worry about the raise case because the type of main says you must have handled your errors
if (e._tag === ExitTag.Abort) {
console.error(e);
process.exit(1);
} else {
process.exit(0);
Expand Down
28 changes: 28 additions & 0 deletions src/console.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
/* eslint @typescript-eslint/no-explicit-any:off */

import { Wave, sync } from "./wave";
import { WaveR } from "./waver";
import * as waver from "./waver";



Expand Down Expand Up @@ -50,3 +52,29 @@ export function error(msg?: any, ...more: any[]): Wave<never, void> {
console.error(msg, ...more);
});
}



/**
* Suspend console.log in an IO
* @param msg
*/
export function logR<R = {}>(msg?: any, ...more: any[]): WaveR<R, never, void> {
return waver.encaseWaveR(log(msg, ...more));
}

/**
* Suspend console.warn in an IO
* @param msg
*/
export function warnR<R = {}>(msg?: any, ...more: any[]): WaveR<R, never, void> {
return waver.encaseWaveR(warn(msg, ...more));
}

/**
* Suspend console.error in an IO
* @param msg
*/
export function errorR<R = {}>(msg?: any, ...more: any[]): WaveR<R, never, void> {
return waver.encaseWaveR(error(msg, ...more));
}
41 changes: 40 additions & 1 deletion src/semaphore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import * as e from "fp-ts/lib/Either";
import { Either, left, right } from "fp-ts/lib/Either";
import { constant, identity, not } from "fp-ts/lib/function";
import { constant, identity, not, flow } from "fp-ts/lib/function";
import * as o from "fp-ts/lib/Option";
import { pipe } from "fp-ts/lib/pipeable";
import { Deferred, makeDeferred } from "./deferred";
Expand All @@ -23,6 +23,8 @@ import { Wave } from "./wave";
import { makeRef, Ref } from "./ref";
import { Dequeue, empty } from "./support/dequeue";
import { makeTicket, Ticket, ticketExit, ticketUse } from "./ticket";
import { WaveR } from "./waver";
import * as waver from "./waver";

export interface Semaphore {
readonly acquire: Wave<never, void>;
Expand Down Expand Up @@ -174,3 +176,40 @@ export function makeSemaphore(n: number): Wave<never, Semaphore> {
io.map(makeRef(right(n) as State), makeSemaphoreImpl)
);
}

export interface SemaphoreR {
acquireN(n: number): WaveR<{}, never, void>;
readonly acquire: WaveR<{}, never, void>
releaseN(n: number): WaveR<{}, never, void>;
readonly release: WaveR<{}, never, void>;
withPermitsN<R, E, A>(n: number, wave: WaveR<R, E, A>): WaveR<R, E, A>;
withPermit<R, E, A>(wave: WaveR<R, E, A>): WaveR<R, E, A>;
readonly available: WaveR<{}, never, number>
}

export function liftSemaphore(sem: Semaphore): SemaphoreR {
const acquireN = flow(sem.acquireN, waver.encaseWave);
const acquire = waver.encaseWave(sem.acquire);
const releaseN = flow(sem.releaseN, waver.encaseWave);
const release = waver.encaseWave(sem.release);
function withPermitsN<R, E, A>(n: number, wave: WaveR<R, E, A>): WaveR<R, E, A> {
return (r) => sem.withPermitsN(n, wave(r));
}
function withPermit<R, E, A>(wave: WaveR<R, E, A>): WaveR<R, E, A> {
return (r) => sem.withPermit(wave(r));
}
const available = waver.encaseWave(sem.available);
return {
acquireN,
acquire,
releaseN,
release,
withPermitsN,
withPermit,
available
}
}

export function makeSemaphoreR(n: number): WaveR<{}, never, SemaphoreR> {
return waver.encaseWave(io.map(makeSemaphore(n), liftSemaphore));
}

0 comments on commit 3fa71da

Please sign in to comment.