diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 233918a..7b53f50 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -26,6 +26,13 @@ "problemMatcher": [ "$eslint-stylish" ] + }, + { + "type": "npm", + "script": "test-build", + "problemMatcher": [ + "$tsc" + ] } ] } diff --git a/examples/06-concurrent-structures.ts b/examples/06-concurrent-structures.ts new file mode 100644 index 0000000..55a0b00 --- /dev/null +++ b/examples/06-concurrent-structures.ts @@ -0,0 +1,92 @@ +// Copyright 2019 Ryan Zeigler +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * waveguide also exposes several concurrency structures, one of the most useful of which is semaphore. + * Lets suppose you want to make at most 3 requests to a remote server at a time. + * Doing all in parallel is fairly easy, doing 1 at a time is also fairly easy, but this is an interesting case where + * 'semantic blocking' can be used to come into play + */ +import { array } from "fp-ts/lib/Array"; +import { pipe } from "fp-ts/lib/pipeable"; +import * as wave from "../src/wave"; +import { Wave } from "../src/wave"; +import * as waver from "../src/waver"; +import { WaveR } from "../src/waver"; +import * as S from "../src/semaphore"; +import { fetch, agent, time, main } from "./common"; +import * as https from "https"; +import * as consoleIO from "../src/console"; +import * as managed from "../src/managed"; + +function query(q: string): WaveR { + const get = pipe( + fetch(`https://www.google.com/search?q=${q}`), + time, + waver.mapWith(x => [q, x[1]] as const) + ) + return waver.applySecond(consoleIO.logR(`starting ${q}`), waver.applyFirst(get, consoleIO.logR(`finishing ${q}`))); +} + +/** + * Here we create an action that will create a semaphore. + * See http://systemfw.org/scala-italy-2018/#/4/3 for an excellent description (in Scala) about why this should be an IO + */ +const sem: WaveR<{}, never, S.SemaphoreR> = S.makeSemaphoreR(3); + +const queries = ["c", "cplusplus", "java", "scala", "haskell", "purescript", "rust", "idris", "ada", "coq", "smalltalk", "swift", "kotlin"] + +const queryTimes: WaveR = + + pipe( + waver.chain(sem as WaveR, + (sem) => + waver.chain(array.traverse(waver.parInstances)(queries, q => sem.withPermit(query(q))), + (results) => consoleIO.logR(results.toString()))), + waver.chainErrorWith((e) => consoleIO.errorR(`eeik! ${e}`)) + ) + +/** + * This results in the following example output in the console + * + * +starting kotlin +starting swift +starting smalltalk +finishing smalltalk +starting coq +finishing kotlin +starting ada +finishing swift +starting idris +finishing coq +starting rust +finishing idris +starting purescript +finishing ada +starting haskell +finishing rust +starting scala +finishing purescript +starting java +finishing haskell +starting cplusplus +finishing scala +starting c +finishing java +finishing cplusplus +finishing c +c,913086810,cplusplus,596634401,java,831743612,scala,793182444,haskell,768789686,purescript,421581001,rust,832051966,idris,792947817,ada,1271403411,coq,602136105,smalltalk,590635041,swift,1087156638,kotlin,739227974 + */ +main(managed.provideTo(agent, queryTimes)); diff --git a/examples/common.ts b/examples/common.ts index 36b9789..297410c 100644 --- a/examples/common.ts +++ b/examples/common.ts @@ -34,6 +34,7 @@ export function main(io: Wave): void { driver.onExit((e) => { // We don't worry about the raise case because the type of main says you must have handled your errors if (e._tag === ExitTag.Abort) { + console.error(e); process.exit(1); } else { process.exit(0); diff --git a/src/console.ts b/src/console.ts index 734ed48..7a9e072 100644 --- a/src/console.ts +++ b/src/console.ts @@ -15,6 +15,8 @@ /* eslint @typescript-eslint/no-explicit-any:off */ import { Wave, sync } from "./wave"; +import { WaveR } from "./waver"; +import * as waver from "./waver"; @@ -50,3 +52,29 @@ export function error(msg?: any, ...more: any[]): Wave { console.error(msg, ...more); }); } + + + +/** + * Suspend console.log in an IO + * @param msg + */ +export function logR(msg?: any, ...more: any[]): WaveR { + return waver.encaseWaveR(log(msg, ...more)); +} + +/** + * Suspend console.warn in an IO + * @param msg + */ +export function warnR(msg?: any, ...more: any[]): WaveR { + return waver.encaseWaveR(warn(msg, ...more)); +} + +/** + * Suspend console.error in an IO + * @param msg + */ +export function errorR(msg?: any, ...more: any[]): WaveR { + return waver.encaseWaveR(error(msg, ...more)); +} diff --git a/src/semaphore.ts b/src/semaphore.ts index c51a180..5efa168 100644 --- a/src/semaphore.ts +++ b/src/semaphore.ts @@ -14,7 +14,7 @@ import * as e from "fp-ts/lib/Either"; import { Either, left, right } from "fp-ts/lib/Either"; -import { constant, identity, not } from "fp-ts/lib/function"; +import { constant, identity, not, flow } from "fp-ts/lib/function"; import * as o from "fp-ts/lib/Option"; import { pipe } from "fp-ts/lib/pipeable"; import { Deferred, makeDeferred } from "./deferred"; @@ -23,6 +23,8 @@ import { Wave } from "./wave"; import { makeRef, Ref } from "./ref"; import { Dequeue, empty } from "./support/dequeue"; import { makeTicket, Ticket, ticketExit, ticketUse } from "./ticket"; +import { WaveR } from "./waver"; +import * as waver from "./waver"; export interface Semaphore { readonly acquire: Wave; @@ -174,3 +176,40 @@ export function makeSemaphore(n: number): Wave { io.map(makeRef(right(n) as State), makeSemaphoreImpl) ); } + +export interface SemaphoreR { + acquireN(n: number): WaveR<{}, never, void>; + readonly acquire: WaveR<{}, never, void> + releaseN(n: number): WaveR<{}, never, void>; + readonly release: WaveR<{}, never, void>; + withPermitsN(n: number, wave: WaveR): WaveR; + withPermit(wave: WaveR): WaveR; + readonly available: WaveR<{}, never, number> +} + +export function liftSemaphore(sem: Semaphore): SemaphoreR { + const acquireN = flow(sem.acquireN, waver.encaseWave); + const acquire = waver.encaseWave(sem.acquire); + const releaseN = flow(sem.releaseN, waver.encaseWave); + const release = waver.encaseWave(sem.release); + function withPermitsN(n: number, wave: WaveR): WaveR { + return (r) => sem.withPermitsN(n, wave(r)); + } + function withPermit(wave: WaveR): WaveR { + return (r) => sem.withPermit(wave(r)); + } + const available = waver.encaseWave(sem.available); + return { + acquireN, + acquire, + releaseN, + release, + withPermitsN, + withPermit, + available + } +} + +export function makeSemaphoreR(n: number): WaveR<{}, never, SemaphoreR> { + return waver.encaseWave(io.map(makeSemaphore(n), liftSemaphore)); +}