diff --git a/README.md b/README.md index be99b8c..871c2f8 100644 --- a/README.md +++ b/README.md @@ -46,9 +46,6 @@ __Other features__ * Supports the transducer protocol. You can for instance transduce streams with [Ramda](http://ramdajs.com/) and [transducers.js](https://github.com/jlongster/transducers.js). -* Complies to the [fantasy land](https://github.com/fantasyland/fantasy-land) - applicative specification. -* [Elegant support for promises](#using-promises-for-asynchronous-operations). * [Atomic updates](#atomic-updates). ## Examples @@ -209,19 +206,31 @@ after an actual response has been received (otherwise `responses()` would return streams has received a value (this behaviour can be circumvented with [flyd.immediate](#flydimmediatestream)). -### Using promises for asynchronous operations +### Promises +Flyd has two helpers for dealing with promises: `flyd.fromPromise` and `flyd.flattenPromise`. -Flyd has inbuilt support for promises. Similarly to how a promise can never be -resolved with a promise, a promise can never flow down a stream. Instead the -fulfilled value of the promise will be sent down the stream. +Let's say you're building a filtered list. It is important to you that the latest filter always corresponds +to the latest promise and its resolution. using `flyd.fromPromise` guarantees the ordering, and can skip intermediate results. -```javascript -var urls = flyd.stream('/something.json'); -var responses = flyd.stream(requestPromise(urls())); -flyd.on(function(responses) { - console.log('Received response!'); - console.log(responses()); -}, responses); +```js +const filter = flyd.stream(''); +const results = filter + .pipe(flyd.chain( + filter => flyd.fromPromise(requestPromise(`https://example.com?q=${filter}`)) + )); +``` + +On the other hand let's say you want to sum some numbers from a service you've written. +Every time someone clicks on your site you want to send a request and get back a random number to be tallied. + +`flyd.flattenPromise` gives you the guarantee that every promise resolution will be handled, regardless of order. + +```js +const clicks = flyd.stream(); +const total = clicks + .map(getNumberAsync) + .pipe(flyd.flattenPromise) + .pipe(flyd.scan((acc, v)=> acc + v, 0)); ``` ### Mapping over a stream @@ -426,6 +435,60 @@ __Example__ var numbers = flyd.stream(0); var squaredNumbers = flyd.map(function(n) { return n*n; }, numbers); ``` +### flyd.chain(fn, s) +`fn` must return a stream. + +`fn` is run every time a value is pushed into `s`. +Returns a single stream of merged values from the created streams. + +Ends when every created stream and the main stream ends + +__Signature__ + +`(a -> Stream b) -> Stream a -> Stream b` + +__Example__ +```javascript +var filter = flyd.stream('filter'); +var search_results = flyd.chain(function(filter){ + return flyd.stream(getResults(filter)); +}, filter); +``` + +### flyd.ap(valueStream, functionStream) + +Applies the value in `valueStream` to the function in `functionStream` + +__Signature__ +`Stream a -> Stream (a -> b) -> Stream b` + +__Example__ +```javascript +function add3(x) { return x + 3; } +flyd.ap(flyd.stream(5), flyd.stream(add3)) // stream(8); +``` + +while it can not seem useful immediately consider this example + +```javascript +var get_results = function (filter, sortProperty, sortDirection) { + return flyd.stream(fetch(`${base_url}/search?q=${filter}&sort=${sortProperty} ${sortDirection}`)) +}; + +// this would eventually be linked to an input field +var filter = flyd.stream(''); +var sortProperty = flyd.stream('name'); +var sortDirection = flyd.stream('descending'); + +var results = flyd.stream(flyd.curryN(3, get_results)) + .pipe(flyd.ap(filter)) + .pipe(flyd.ap(sortProperty)) + .pipe(flyd.ap(sortDirection)) + .pipe(flyd.map(function(d){ return d; })); +``` + +In the above example you have a stream of results that triggers a call for get_results +every time `filter`, `sortProperty`, or `sortDirection` is changed. ### flyd.on(fn, s) @@ -555,7 +618,36 @@ names(); // 'Bohr' A stream that emits `true` when the stream ends. If `true` is pushed down the stream the parent stream ends. -### stream.map(f) +### stream.pipe(fn) + +Returns the result of applying function `fn` to the stream. + +__Signature__ +Called bound to `Stream a`: `(Stream a -> Stream b) -> Stream b` + +__Example__ +```javascript +// map a stream +var numbers = flyd.stream(0); +var squaredNumbers = numbers + .pipe(flyd.map(function(n) { return n*n; })); + +// Chain a stream +var filter = flyd.stream('filter'); +var search_results = filter + .pipe(flyd.chain(function(filter){ + return flyd.stream(getResults(filter)); + })); + +// use with a flyd module +var filter = require('flyd/module/filter'); +var numbers = flyd.stream(0); +var isEven = function(x){ return x % 2 === 0; }; +var evenNumbers = numbers + .pipe(filter(isEven)); +``` + +### stream.map(f) __Deprecated__ Returns a new stream identical to the original except every value will be passed through `f`. @@ -574,7 +666,7 @@ var numbers = flyd.stream(0); var squaredNumbers = numbers.map(function(n) { return n*n; }); ``` -### stream1.ap(stream2) +### stream1.ap(stream2) __Deprecated__ `stream1` must be a stream of functions. @@ -624,7 +716,6 @@ Modules listed with names in the format `flyd/module/filter` are builtin to the | --- | --- | | [flyd/module/**filter**](module/filter) | Filter values from stream based on predicate. | | [flyd/module/**lift**](module/lift) | Maps a function taking _n_ parameters over _n_ streams. | -| [flyd/module/**flatmap**](module/flatmap) | Maps a function over a stream of streams and flattens the result to a single stream. | | [flyd/module/**switchlatest**](module/switchlatest) | Flattens a stream of streams. The result stream reflects changes from the last stream only. | | [flyd/module/**keepwhen**](module/keepwhen) | Keep values from one stream only when another stream is true. | | [flyd/module/**obj**](module/obj) | Functions for working with stream in objects. | diff --git a/index.d.ts b/index.d.ts new file mode 100644 index 0000000..e959adf --- /dev/null +++ b/index.d.ts @@ -0,0 +1,241 @@ +type CurriedFunction2 = ((t1: T1, t2: T2) => R) & ((t1: T1, ...rest: Array) => (t2: T2) => R); + +declare namespace flyd { + interface Stream { + (): T; + (value: T): Stream; + (value: Promise | PromiseLike): Stream; + + + map(project: (value: T) => V): Stream; + ap(this: Stream<(value: A) => B>, stream: Stream): Stream; + chain(project: (value: T) => Stream): Stream; + of(...values: V[]): Stream; + + pipe(operator: (input: Stream) => Stream): Stream; + + ['fantasy-land/map'](project: (value: T) => V): Stream; + ['fantasy-land/ap'](fn: (value: Stream) => V): Stream; + ['fantasy-land/chain'](project: (value: T) => Stream): Stream; + ['fantasy-land/of'](...values: V[]): Stream; + + end: Stream; + val: T; + hasVal: boolean; + } + + interface Combine { + (fn: (value: Stream, self: Stream) => R | void, streams: [Stream]): Stream; + ( + fn: (value: Stream, t1: Stream, self: Stream) => R | void, + streams: [Stream, Stream] + ): Stream; + ( + fn: (value: Stream, t1: Stream, t2: Stream, self: Stream) => R | void, + streams: [Stream, Stream, Stream] + ): Stream; + + ( + fn: (value: Stream, t1: Stream, t2: Stream, t3: Stream, self: Stream) => R | void, + streams: [Stream, Stream, Stream, Stream] + ): Stream; + } + + interface CreateStream { + (): Stream; + (value: T): Stream; + (value: Promise | PromiseLike): Stream; + (): Stream; + } + + interface Static { + stream: CreateStream; + + immediate(stream: Stream): Stream; + isStream(stream: any): boolean; + + combine: Combine; + endsOn(end$: Stream, stream: Stream): Stream; + + map(accessor: (value: T) => V): (stream: Stream) => Stream; + map(accessor: (value: T) => V, stream: Stream): Stream; + + ap(value$: Stream, transform$: Stream<(value: A) => B>): Stream; + ap(value$: Stream): (transform$: Stream<(value: A) => B>) => Stream + + chain(accessor: (value: T) => Stream): (stream: Stream) => Stream; + chain(accessor: (value: T) => Stream, stream: Stream): Stream; + + on(onfn: (value: T) => void): (stream: Stream) => Stream; + on(onfn: (value: T) => void, stream: Stream): Stream; + + scan(reducer: (acc: T, value: V) => T, initial: T, stream: Stream): Stream; + scan(reducer: (acc: T, value: V) => T, initial: T): (stream: Stream) => Stream; + scan(reducer: (acc: T, value: V) => T): (initial: T) => (stream: Stream) => Stream; + + merge(stream1: Stream, stream2: Stream): Stream; + merge(stream1: Stream): (stream2: Stream) => Stream; + + transduce(mapfn: Function, stream: Stream): Stream; + transduce(mapfn: Function): (stream: Stream) => Stream; + + fromPromise(promise: PromiseLike): Stream; + flattenPromise(promise$: Stream>): Stream; + + curryN(length: number, fn: (...args: Array) => void): Function; + } +} + +declare module 'flyd' { + const f: flyd.Static; + export = f; +} + +declare module 'flyd/module/aftersilence' { + interface aftersilence { + (delay: number, stream: flyd.Stream): flyd.Stream; + (delay: number): (stream: flyd.Stream) => flyd.Stream; + } + export = aftersilence; +} +declare module 'flyd/module/droprepeats' { + interface dropRepeats { + (s: flyd.Stream): flyd.Stream; + } + interface dropRepeatsWith { + (isEqual: (value: T) => boolean, stream: flyd.Stream): flyd.Stream; + (isEqual: (value: T) => boolean): (stream: flyd.Stream) => flyd.Stream; + } + + export const dropRepeats: dropRepeats; + export const dropRepeatsWith: dropRepeatsWith; +} + +declare module 'flyd/module/every' { + interface every { + (ms: number): flyd.Stream; + } + const _every: every; + export = _every; +} + +declare module 'flyd/module/filter' { + interface Filter { + (project: (val: T) => val is V, stream: flyd.Stream): flyd.Stream; + (project: (val: T) => val is V): (stream: flyd.Stream) => flyd.Stream; + } + const _Filter: Filter; + export = _Filter; +} + +declare module 'flyd/module/flatmap' { + type projection = (val: T) => flyd.Stream; + interface flatMap { + (project: projection, source: flyd.Stream): flyd.Stream; + (project: projection): (source: flyd.Stream) => flyd.Stream; + } + const _flatMap: flatMap; + export = _flatMap; +} + +declare module 'flyd/module/forwardto' { + interface ForwardTo { + (stream: flyd.Stream, project: (value: V) => T): flyd.Stream; + (stream: flyd.Stream): (project: (value: V) => T) => flyd.Stream; + } + const _ForwardTo: ForwardTo; + export = _ForwardTo; +} + +declare module 'flyd/module/inlast' { + interface InLast { + (ms: number, stream: flyd.Stream): flyd.Stream; + (ms: number): (stream: flyd.Stream) => flyd.Stream; + } + + const _InLast: InLast; + export = _InLast; +} + +declare module 'flyd/module/keepwhen' { + interface KeepWhen { + (when: flyd.Stream, stream: flyd.Stream): flyd.Stream; + (when: flyd.Stream): (stream: flyd.Stream) => flyd.Stream; + } + const _KeepWhen: KeepWhen; + export = _KeepWhen; +} + +declare module 'flyd/module/lift' { + interface Lift { + (liftFn: (t1: T1, t2: T2) => R, s1: flyd.Stream, s2: flyd.Stream): flyd.Stream; + (liftFn: (t1: T1, t2: T2, t3: T3) => R, s1: flyd.Stream, s2: flyd.Stream, s3: flyd.Stream): flyd.Stream; + (liftFn: (t1: T1, t2: T2, t3: T3, t4: T4) => R, s1: flyd.Stream, s2: flyd.Stream, s3: flyd.Stream, s4: flyd.Stream): flyd.Stream; + (liftFn: (...rest: any[]) => T, ...streams: flyd.Stream[]): flyd.Stream; + } + + const _Lift: Lift; + export = _Lift; +} + +declare module 'flyd/module/mergeall' { + interface MergeAll { + (streams: [flyd.Stream, flyd.Stream]): flyd.Stream; + (streams: [flyd.Stream, flyd.Stream, flyd.Stream]): flyd.Stream; + (streams: flyd.Stream[]): flyd.Stream; + } + const _MergeAll: MergeAll; + export = _MergeAll; +} + +declare module 'flyd/module/obj' { + interface ObjModule { + streamProps(obj: T): {[P in keyof T]: flyd.Stream }; + extractProps(obj: any): any; + } + + const _ObjModule: ObjModule; + export = _ObjModule; +} + +declare module 'flyd/module/previous' { + type previous = (stream: flyd.Stream) => flyd.Stream; + const _previous: previous;; + export = _previous; +} + +declare module 'flyd/module/sampleon' { + interface SampleOn { + (samplewhen: flyd.Stream, stream: flyd.Stream): flyd.Stream; + (samplewhen: flyd.Stream): (stream: flyd.Stream) => flyd.Stream; + } + const _SampleOn: SampleOn; + export = _SampleOn; +} + +declare module 'flyd/module/scanmerge' { + type ScanFn = (acc: T, value: V) => T; + interface ScanMerge { + (pairs: Array<[flyd.Stream, ScanFn]>, initial: T): flyd.Stream; + (pairs: Array<[flyd.Stream, ScanFn]>): (initial: T) => flyd.Stream; + } + const _ScanMerge: ScanMerge; + export = _ScanMerge; +} + +declare module 'flyd/module/switchlatest' { + interface SwitchLatest { + (stream: flyd.Stream>): flyd.Stream; + } + const _SwitchLatest: SwitchLatest; + export = _SwitchLatest; +} + +declare module 'flyd/module/takeuntil' { + interface takeuntil { + (source: flyd.Stream, end: flyd.Stream): flyd.Stream; + (source: flyd.Stream): (end: flyd.Stream) => flyd.Stream; + } + const _takeuntil: takeuntil; + export = _takeuntil; +} diff --git a/lib/index.js b/lib/index.js index c5fe26d..7a500f8 100644 --- a/lib/index.js +++ b/lib/index.js @@ -39,12 +39,12 @@ flyd.stream = function(initialValue) { s.end = endStream; s.fnArgs = []; endStream.listeners.push(s); - s.toJSON = function() { - return s(); - }; if (arguments.length > 0) s(initialValue); return s; } +// fantasy-land Applicative +flyd.stream['fantasy-land/of'] = flyd.stream.of = flyd.stream; + /** * Create a new dependent stream @@ -184,6 +184,55 @@ flyd.map = curryN(2, function(f, s) { return combine(function(s, self) { self(f(s.val)); }, [s]); }) +/** + * Chain a stream + * + * also known as flatMap + * + * Where `fn` returns a stream this function will flatten the resulting streams. + * Every time `fn` is called the context of the returned stream will "switch" to that stream. + * + * __Signature__: `(a -> Stream b) -> Stream a -> Stream b` + * + * @name flyd.chain + * @param {Function} fn - the function that produces the streams to be flattened + * @param {stream} stream - the stream to map + * @return {stream} a new stream with the mapped values + * + * @example + * var filter = flyd.stream('who'); + * var items = flyd.chain(function(filter){ + * return flyd.stream(findUsers(filter)); + * }, filter); + */ +flyd.chain = curryN(2, chain); + +/** + * Apply a stream + * + * Applies the value in `s2` to the function in `s1`. + * + * __Signature__: `Stream (a -> b) -> Stream a -> Stream b` + * + * @name flyd.ap + * @param {stream} s1 - The value to be applied + * @param {stream} s2 - The function expecting the value + * @return {stream} a new stream with the mapped values + * + * @example + * var add = stream(a => b => a + b) + * var n1 = stream(1) + * var n2 = stream(2) + * + * var added = flyd.ap(n2, flyd.ap(n1, add)) // stream(3) + * // can also be written using pipe + * var added_pipe = add + * .pipe(ap(n1)) + * .pipe(ap(n2)); + * added_pipe() // 3 + */ +flyd.ap = curryN(2, ap); + /** * Listen to stream events * @@ -332,6 +381,71 @@ flyd.curryN = curryN */ function boundMap(f) { return flyd.map(f, this); } +/** + * Returns the result of applying function `fn` to this stream + * + * __Signature__: Called bound to `Stream a`: `(a -> Stream b) -> Stream b` + * + * @name stream.pipe + * @param {Function} fn - the function to apply + * @return {stream} A new stream + * + * @example + * var numbers = flyd.stream(0); + * var squaredNumbers = numbers.pipe(flyd.map(function(n){ return n*n; })); + */ +function operator_pipe(f) { return f(this) } + +function boundChain(f) { + return chain(f, this); +} + +function chain(f, s) { + // Internal state to end flat map stream + var flatEnd = flyd.stream(1); + var internalEnded = flyd.on(function() { + var alive = flatEnd() - 1; + flatEnd(alive); + if (alive <= 0) { + flatEnd.end(true); + } + }); + + internalEnded(s.end); + var last = flyd.stream(); + var flatStream = flyd.combine(function(s, own) { + last.end(true) + // Our fn stream makes streams + var newS = f(s()); + flatEnd(flatEnd() + 1); + internalEnded(newS.end); + + // Update self on call -- newS is never handed out so deps don't matter + last = flyd.map(own, newS); + }, [s]); + + flyd.endsOn(flatEnd.end, flatStream); + + return flatStream; +} + +flyd.fromPromise = function fromPromise(p) { + var s = flyd.stream(); + p.then(function(val) { + s(val); + s.end(true); + }); + return s; +} + +/* istanbul ignore next */ +flyd.flattenPromise = function flattenPromise(s) { + return combine(function(s, self) { + s().then(self); + }, [s]) +} + + /** * Returns a new stream which is the result of applying the * functions from `this` stream to the values in `stream` parameter. @@ -354,11 +468,21 @@ function boundMap(f) { return flyd.map(f, this); } * var addToNumbers1 = flyd.map(add, numbers1); * var added = addToNumbers1.ap(numbers2); */ -function ap(s2) { - var s1 = this; +function ap(s2, s1) { return combine(function(s1, s2, self) { self(s1.val(s2.val)); }, [s1, s2]); } +function boundAp(s2) { + return ap(s2, this); +} + +/** + * @private + */ +function fantasy_land_ap(s1) { + return ap(this, s1); +} + /** * Get a human readable view of a stream * @name stream.toString @@ -410,9 +534,24 @@ function createStream() { s.listeners = []; s.queued = false; s.end = undefined; - s.map = boundMap; - s.ap = ap; - s.of = flyd.stream; + + // fantasy-land compatibility + s.ap = boundAp; + s['fantasy-land/map'] = s.map = boundMap; + s['fantasy-land/ap'] = fantasy_land_ap; + s['fantasy-land/of'] = s.of = flyd.stream; + s['fantasy-land/chain'] = s.chain = boundChain; + + s.pipe = operator_pipe; + + // According to the fantasy-land Applicative specification + // Given a value f, one can access its type representative via the constructor property: + // `f.constructor.of` + s.constructor = flyd.stream; + + s.toJSON = function() { + return s.val; + } s.toString = streamToString; return s; } @@ -537,7 +676,9 @@ function flushUpdate() { * @param {*} value */ function updateStreamValue(s, n) { + /* istanbul ignore if */ if (n !== undefined && n !== null && isFunction(n.then)) { + console.warn('flyd: Promise swallowing has been deprecated, please see https://github.com/paldepind/flyd#promises for more info'); n.then(s); return; } @@ -619,6 +760,9 @@ function endStream(s) { if (s.end !== undefined) detachDeps(s.end); } +/** + * @private + */ /** * @private * transducer stream transformer diff --git a/module/flatmap/index.js b/module/flatmap/index.js index 911cdce..d704749 100644 --- a/module/flatmap/index.js +++ b/module/flatmap/index.js @@ -1,5 +1,7 @@ var flyd = require('../../lib'); +console.warn('flyd/module/flatmap has been deprecated in favour of flyd.chain'); + /** * Given a stream of streams, returns a single stream of merged values * from the created streams. @@ -18,30 +20,4 @@ var flyd = require('../../lib'); * s(0)(1)(2); * // flat = 0, 1, 2 */ -module.exports = flyd.curryN(2, function(f, s) { - // Internal state to end flat map stream - var flatEnd = flyd.stream(1); - var internalEnded = flyd.on(function() { - var alive = flatEnd() - 1; - flatEnd(alive); - if (alive <= 0) { - flatEnd.end(true); - } - }); - - internalEnded(s.end); - - var flatStream = flyd.combine(function(s, own) { - // Our fn stream makes streams - var newS = f(s()); - flatEnd(flatEnd() + 1); - internalEnded(newS.end); - - // Update self on call -- newS is never handed out so deps don't matter - flyd.on(own, newS); - }, [s]); - - flyd.endsOn(flatEnd.end, flatStream); - - return flatStream; -}); +module.exports = flyd.chain; diff --git a/module/flatmap/test/index.js b/module/flatmap/test/index.js deleted file mode 100644 index 03f2a92..0000000 --- a/module/flatmap/test/index.js +++ /dev/null @@ -1,61 +0,0 @@ -var assert = require('assert'); -var flyd = require('../../../flyd'); -var stream = flyd.stream; -var flatMap = require('../index.js'); - -describe('flatMap', function() { - it('applies function to values in stream', function() { - var result = []; - function f(v) { - result.push(v); - return stream(); - } - var s = stream(); - flatMap(f, s); - s(1)(2)(3)(4)(5); - assert.deepEqual(result, [1, 2, 3, 4, 5]); - }); - it('returns stream with result from all streams created by function', function() { - var result = []; - function f(v) { - var s = stream(); - setImmediate(function() { - s(v + 1)(v + 2)(v + 3); - }); - return s; - } - var s = stream(); - flyd.map(function(v) { - result.push(v); - }, flatMap(f, s)); - s(1)(3)(5); - setImmediate(function() { - assert.deepEqual(result, [2, 3, 4, - 4, 5, 6, - 6, 7, 8]); - }); - }); - it('passed bug outlined in https://github.com/paldepind/flyd/issues/31', function(done) { - function delay(val, ms) { - var outStream = flyd.stream(); - - setTimeout(function() { - outStream(val); - outStream.end(true); - }, ms); - - return outStream; - } - - var main = delay(1, 500); - var merged = flatMap(function(v) { - return delay(v, 1000) - }, main); - - flyd.on(function() { - assert(main() === 1); - assert(merged() === 1); - done(); - }, merged.end); - }); -}); diff --git a/package.json b/package.json index 0177168..5595cbe 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ "test": "test" }, "dependencies": { - "ramda": "^0.19.1" + "ramda": "^0.25.0" }, "devDependencies": { "benchmark": "^1.0.0", @@ -24,9 +24,10 @@ }, "scripts": { "test-lib": "mocha", - "test": "eslint lib/ test/ module/ && mocha -R dot test/*.js module/**/test/*.js", + "test": "eslint --fix lib/ test/ module/ && mocha -R dot test/*.js module/**/test/*.js", "docs": "documentation -f md lib/index.js > API.md", "perf": "./perf/run-benchmarks", + "coverage": "istanbul cover _mocha -- -R spec", "post-to-coveralls-io": "istanbul cover _mocha --report lcovonly -- -R spec && cat ./coverage/lcov.info | coveralls && rm -rf ./coverage", "build": "browserify -s flyd lib/index.js > flyd.js && uglifyjs flyd.js -o flyd.min.js" }, @@ -63,4 +64,4 @@ "android-browser/4.2..latest" ] } -} +} \ No newline at end of file diff --git a/test/index.js b/test/index.js index 93c6e55..8e79ad5 100644 --- a/test/index.js +++ b/test/index.js @@ -6,6 +6,9 @@ var t = require('transducers.js'); var flyd = require('../lib'); var stream = flyd.stream; var combine = flyd.combine; +var map = flyd.map; +var ap = flyd.ap; +var chain = flyd.chain; // Some combinators function doubleFn(x) { return x() * 2; } @@ -32,7 +35,7 @@ describe('stream', function() { var obj = { num: stream(23), str: stream('string'), - obj: stream({is_object: true}) + obj: stream({ is_object: true }) }; var expected_outcome = { num: 23, @@ -247,30 +250,30 @@ describe('stream', function() { describe('streams created within dependent stream bodies', function() { it('if dependencies are met it is updated eventually', function() { var result; - stream(1).map(function() { + stream(1).pipe(map(function() { var n = flyd.stream(1); - n.map(function(v) { result = v + 100; }); - }); + n.pipe(map(function(v) { result = v + 100; })); + })); assert.equal(result, 101); }); it('if dependencies are not met at creation it is updated after their dependencies are met', function() { var result; - stream(1).map(function() { + stream(1).pipe(map(function() { var n = stream(); - n.map(function(v) { result = v + 100; }); + n.pipe(map(function(v) { result = v + 100; })); n(1); - }); + })); assert.equal(result, 101); }); it('if a streams end stream is called it takes effect immediately', function() { var result = undefined; - stream(1).map(function() { + stream(1).pipe(map(function() { var n = stream(); - n.map(function(v) { result = v + 100; }); + n.pipe(map(function(v) { result = v + 100; })); n.end(true); n(1); n(2); - }); + })); assert.equal(result, undefined); }); }) @@ -366,28 +369,26 @@ describe('stream', function() { }); }); - describe('promise integration', function() { + describe('promise swallowing', function() { it('pushes result of promise down the stream', function(done) { - var s = stream(); + var s = flyd.fromPromise(Promise.resolve(12)); combine(function(s) { assert.equal(s(), 12); done(); }, [s]); - s(Promise.resolve(12)); }); it('recursively unpacks promise', function(done) { - var s = stream(); - combine(function(s) { - assert.equal(s(), 12); - done(); - }, [s]); - s(new Promise(function(res) { + var s = flyd.fromPromise(new Promise(function(res) { setTimeout(function() { res(new Promise(function(res) { setTimeout(res.bind(null, 12)); })); }, 20); })); + combine(function(s) { + assert.equal(s(), 12); + done(); + }, [s]); }); }); @@ -405,7 +406,7 @@ describe('stream', function() { describe('map', function() { it('maps a function', function() { var x = stream(3); - var doubleX = x.map(function(x) { return 2 * x; }); + var doubleX = x.pipe(map(function(x) { return 2 * x; })); assert.equal(doubleX(), 6); x(1); assert.equal(doubleX(), 2); @@ -437,7 +438,7 @@ describe('stream', function() { }); it('returns equivalent stream when mapping identity', function() { var x = stream(3); - var x2 = x.map(function(a) { return a; }); + var x2 = x.pipe(map(function(a) { return a; })); assert.equal(x2(), x()); x('foo'); assert.equal(x2(), x()); @@ -446,14 +447,96 @@ describe('stream', function() { function f(x) { return x * 2; } function g(x) { return x + 4; } var x = stream(3); - var s1 = x.map(g).map(f); - var s2 = x.map(function(x) { return f(g(x)); }); + var s1 = x.pipe(map(g)).pipe(map(f)); + var s2 = x.pipe(map(function(x) { return f(g(x)); })); assert.equal(s1(), s2()); x(12); assert.equal(s1(), s2()); }); }); + describe('chain', function() { + it('applies function to values in stream', function() { + var result = []; + function f(v) { + result.push(v); + return stream(); + } + var s = stream(); + flyd.chain(f, s); + s(1)(2)(3)(4)(5); + assert.deepEqual(result, [1, 2, 3, 4, 5]); + }); + it('returns stream with result from all streams created by function', function() { + var result = []; + function f(v) { + var s = stream(); + setImmediate(function() { + s(v + 1)(v + 2)(v + 3); + }); + return s; + } + var s = stream(); + flyd.map(function(v) { + result.push(v); + }, flyd.chain(f, s)); + s(1)(3)(5); + setImmediate(function() { + assert.deepEqual(result, [2, 3, 4, + 4, 5, 6, + 6, 7, 8]); + }); + }); + it('passed bug outlined in https://github.com/paldepind/flyd/issues/31', function(done) { + function delay(val, ms) { + var outStream = flyd.stream(); + + setTimeout(function() { + outStream(val); + outStream.end(true); + }, ms); + + return outStream; + } + + var main = delay(1, 500); + var merged = flyd.chain(function(v) { + return delay(v, 1000) + }, main); + + flyd.on(function() { + assert(main() === 1); + assert(merged() === 1); + done(); + }, merged.end); + }); + + it('preserves ordering', function() { + function delay(val, ms) { + var outStream = flyd.stream(); + + setTimeout(function() { + outStream(val); + outStream.end(true); + }, ms); + + return outStream; + } + + var s = stream(); + + var s2 = s + .pipe(chain(function(val) { + return delay(val, 100); + })); + s(1)(2)(3)(4); + + flyd.on(function(val) { + assert.equal(val, 4); + }, s2) + }); + }); + describe('scan', function() { it('has initial acc as value when stream is undefined', function() { var numbers = stream(); @@ -558,7 +641,7 @@ describe('stream', function() { it('applies functions in stream', function() { var a = stream(function(x) { return 2 * x; }); var v = stream(3); - var s = a.ap(v); + var s = a.pipe(ap(v)); assert.equal(s(), 6); a(function(x) { return x / 3; }); assert.equal(s(), 1); @@ -569,14 +652,17 @@ describe('stream', function() { var a = stream(function(x) { return x * 2; }); var u = stream(function(x) { return x + 5; }); var v = stream(8); - var s1 = a.map(function(f) { - return function(g) { - return function(x) { - return f(g(x)); + var s1 = a + .pipe(map(function(f) { + return function(g) { + return function(x) { + return f(g(x)); + }; }; - }; - }).ap(u).ap(v); - var s2 = a.ap(u.ap(v)); + })) + .pipe(ap(u)) + .pipe(ap(v)); + var s2 = a.pipe(ap(u.pipe(ap(v)))); assert.equal(s1(), 26); assert.equal(s2(), 26); a(function(x) { return x * 4; }); @@ -597,7 +683,7 @@ describe('stream', function() { var s1 = stream(0); var s2 = stream(0); var s3 = stream(0); - var sum = flyd.map(sumThree, s1).ap(s2).ap(s3); + var sum = flyd.map(sumThree, s1).pipe(ap(s2)).pipe(ap(s3)); flyd.map(function(v) { result.push(v); }, sum); s1(3); s2(2); s3(5); assert.deepEqual(result, [0, 3, 5, 10]); @@ -608,7 +694,7 @@ describe('stream', function() { var numbers1 = stream(); var numbers2 = stream(); var addToNumbers1 = flyd.map(add, numbers1); - var added = addToNumbers1.ap(numbers2); + var added = addToNumbers1.pipe(ap(numbers2)); flyd.map(function(n) { result.push(n); }, added); numbers1(3); numbers2(2); numbers1(4); assert.deepEqual(result, [5, 6]); @@ -616,6 +702,14 @@ describe('stream', function() { }); describe('of', function() { + it('can be accessed through the constructor property', function() { + var s1 = stream(2); + var s2 = s1.constructor.of(3); + var s3 = s2.constructor['fantasy-land/of'](3); + assert.equal(flyd.isStream(s2), true); + assert.equal(s2(), 3); + assert.equal(s3(), 3); + }); it('returns a stream with the passed value', function() { var s1 = stream(2); var s2 = s1.of(3); @@ -625,20 +719,20 @@ describe('stream', function() { var a = stream(); var id = function(a) { return a; }; var v = stream(12); - assert.equal(a.of(id).ap(v)(), v()); + assert.equal(a.of(id).pipe(ap(v))(), v()); }); it('is homomorphic', function() { var a = stream(0); var f = function(x) { return 2 * x; }; var x = 12; - assert.equal(a.of(f).ap(a.of(x))(), a.of(f(x))()); + assert.equal(a.of(f).pipe(ap(a.of(x)))(), a.of(f(x))()); }); it('is interchangeable', function() { var y = 7; var a = stream(); var u = stream()(function(x) { return 3 * x; }); - assert.equal(u.ap(a.of(y))(), - a.of(function(f) { return f(y); }).ap(u)()); + assert.equal(u.pipe(ap(a.of(y)))(), + a.of(function(f) { return f(y); }).pipe(ap(u))()); }); it('can create dependent stream inside stream', function() { var one = flyd.stream(); @@ -662,7 +756,7 @@ describe('stream', function() { }, str); flyd.map(function() { // create a stream, the first dependant on `str` should still be updated - flyd.combine(function() {}, []); + flyd.combine(function() { }, []); }, str); str(1); str(2); @@ -825,7 +919,7 @@ describe('stream', function() { var f = flyd.combine(function(d) { return d() + 5; }, [d]); var g = flyd.combine(function(d) { return d() + 6; }, [d]); - flyd.combine(function(a,b,c,d,e,f,g,self,changed) { + flyd.combine(function(a, b, c, d, e, f, g, self, changed) { var vals = changed.map(function(s) { return s(); }); result.push(vals); return 1; @@ -836,4 +930,43 @@ describe('stream', function() { ]); }); }); + + describe('fantasy-land', function() { + it('map', function() { + var s = stream(1); + var mapped = R.map(R.add(3), s); + assert.equal(mapped(), 4); + assert.equal(s(), 1); + }); + + it('chain', function() { + var s = stream(1); + var chained = R.chain(R.compose(stream, R.add(3)), s); + assert.equal(chained(), 4); + assert.equal(s(), 1); + }); + + it('ap', function() { + var s = stream(R.add(3)); + var val = stream(3); + var applied = R.ap(s, val); + assert.equal(applied(), 6); + }); + + it('old ap', function() { + var s = stream(R.add(3)) + .ap(stream(3)); + assert.equal(s(), 6); + }); + + it('of', function() { + var s = flyd.stream(3); + var s2 = s['fantasy-land/of'](5); + assert(flyd.isStream(s)); + assert.equal(s(), 3); + + assert(flyd.isStream(s2)); + assert.equal(s2(), 5); + }) + }); });