Releases: Reactive-Extensions/RxJS
RxJS Release v2.2.16
Minor fixes from RxJS v2.2.15 to include bug fixes and changes to Enumerable to make it ES6 compatible.
From Iterable
This release has a new function which makes it ES6 compatible with iterables. We now have Rx.Observable.fromIterable
method which takes in an iterable and converts it to observable sequence.
var set = new Set([1,2,3]);
var observable = Rx.Observable.fromIterable(set);
var subscription = observable.subscribe(console.log.bind(console));
// => 1
// => 2
// => 3
Bugs Fixed
RxJS Release v2.2.15
This is a slight update to RxJS v2.2.14 release to fix some bugs associated with backpressure. This is a much more tested solution complete with controlled to get the number of requested items.
BackPressure
This is the first experimental release of backpressure. The idea is to pause and resume for a particular observable if the observer cannot keep up for whatever reason. To do this automatically seems to us naive, and instead, we should not punish the producer if the consumer cannot keep up, so we've set a pretty high bar for getting it right. You can now find them in their own file rx.backpressure.js
or if you're using rx.lite.js
, then you're already in luck because you have them already.
There are many ways around this problem of backpressure including using throttle
if you're ok with losing some data, the buffer
methods by time, count, etc, if you'd like the results in batch for a particular count or timeframe. In addition, if you want only a value in a given time span, then you could use sample
. In this case, we've added three methods, pausable
, pausableBuffered
and controlled
.
With pausable
, you have the ability to pause a hot observable, such as mouse movements and then resume, but you will lose data in between the pause and resume methods. Below is an example of it in action.
var controller = new Rx.Subject();
var events = Rx.Observable.fromEvent(document, 'mousemove');
// Control the events by the controller
var controlled = events.pausable(controller);
var subscription = controlled.subscribe(function (e) {
// Do something with events
// Woops, too fast
// Pause the event stream
controller.onNext(false);
// When you want to start again, call this:
controller.onNext(true);
});
// Start listening
controller.onNext(true);
For a more comprehensive view of it in action, check out the tests for pausable
.
The other is [
pausableBuffered`](https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/backpressure/pausablebuffered.js) where you will not lose data, in fact, it will be kept in a buffer until you are ready to start consuming again.
var controller = new Rx.Subject();
var interval = Rx.Observable.interval(1000).timeInterval();
// Control the events by the controller
var controlled = interval.pausableBuffered(controller);
var subscription = controlled.subscribe(function (x) {
console.log('x', x.value);
});
// Start it
var shouldRun = true;
controller.onNext(shouldRun);
// Make it pause every so often, and then will backfill with results emptying the buffer
setInterval(function () {
controller.onNext(shouldRun = !shouldRun);
}, 5000);
Once again, to see a more comprehensive view of this in action, check the associated tests for pausableBuffererd
Finally, we have the controlled
operator, which turns the observable sequence into a push/pull scenario in which we can request the number of items we want at a time. This gives the observer the chance to tell the observable sequence how many items it wants at any point, unlike any of the buffer
methods.
var source = Rx.Observable.range(0, 1000).controlled();
source.subscribe(function(x) {
console.log('x', x);
});
// Get 10 items
source.request(10);
// Maybe later get another
source.request(5);
You can get a better sense of the method's details, check out the associated tests for controlled
What's Next?
Before we hit the 2.3 mark, we'd like to get more feedback on the backpressure items, including if some of the other implementations such as windowed and stop and wait mechanisms are worthwhile. In addition, we are starting the expression parsing work in order to enable scenarios like remote communication with serialized observable sequences.
RxJS Release v2.2.17
Slight update to RxJS v2.2.16 by fixing the algorithm for detecting whether the object is a promise.
RxJS Release v2.2.14
This will be one of the last updates before we start making our way to version 2.3.
This release includes the following:
- Promises Integration
- BackPressure
- Bug Fixes
Promises Integration
A major highlight of this release is the integration with JavaScript Promises. With the upcoming ES6 standardization, and Promises showing up in browsers such as Chrome, now is the time to strike. In order for Promises integration to work with RxJS, the Promise implementation must be compatible with ES6, in that the constructor must look like the following:
var promise = new Promise(function (resolve, reject) {
resolve(42);
});
In previous releases, we've had the ability to convert a Promise to an Observable by using Rx.Observable.fromPromise
, but now, we have the ability to create Promises from Observable sequences by using the toPromise
method. You can either give the constructor to the toPromise
method or use our configuration with Rx.config.Promise
.
// Using the constructor
var promise = Rx.Observable.return(42).toPromise(RSVP.Promise);
promise.then(console.log.bind(console));
// Using configuration
Rx.config.Promise = RSVP.Promise;
var promise = Rx.Observable.return(42).toPromise();
promise.then(console.log.bind(console));
We have also implemented overloads to the following methods as to accept Promises, or even in some cases Observable of Promises. We have implemented flatMap
/selectMany
, concatAll
/concatObservable
, mergeAll
/mergeObservable
, switch
/switchLatest
, and finally startAsync
.
// Using flatMap/selectMany with a promise
var observable = Rx.Observable.return(42)
.flatMap(RSVP.Promise.resolve(56));
observable.subscribe(console.log.bind(console));
// Using flatMap/selectMany with a promise inside a function
var observable = Rx.Observable.return(42)
.flatMap(function (x, i) { return RSVP.Promise.resolve(x + 1); });
observable.subscribe(console.log.bind(console));
// => 43
// Using flatMap/selectMany with a promise inside a function with a result selector
var observable = Rx.Observable.return(42)
.flatMap(
function (x, i) { return RSVP.Promise.resolve(x + 1); },
function (x, y, i) { return { fst: x + i, snd: y }});
observable.subscribe(console.log.bind(console));
// => { fst: 42, snd: 43 }
// Concat support
var sources = Rx.Observable
.fromArray([
RSVP.Promise.resolve(42),
RSVP.Promise.resolve(56),
RSVP.Promise.resolve(72)]
)
.concatAll();
sources.subscribe(console.log.bind(console));
// => 42
// => 56
// => 72
// Merge support
var sources = Rx.Observable
.fromArray([
RSVP.Promise.resolve(42),
RSVP.Promise.resolve(56),
RSVP.Promise.resolve(72)]
)
.mergeAll();
sources.subscribe(console.log.bind(console));
// => 42
// => 56
// => 72
// Switch support
var sources = Rx.Observable
.fromArray([
RSVP.Promise.resolve(42),
RSVP.Promise.resolve(56),
RSVP.Promise.resolve(72)]
)
.switch();
sources.subscribe(console.log.bind(console));
// => 72
// StartAsync Support
var source = Rx.Observable.startAsync(function () {
return RSVP.Promise.resolve(42);
});
source.subscribe(console.log.bind(console));
// => 42
BackPressure
This is the first experimental release of backpressure. The idea is to pause and resume for a particular observable if the observer cannot keep up for whatever reason. To do this automatically seems to us naive, and instead, we should not punish the producer if the consumer cannot keep up, so we've set a pretty high bar for getting it right. You can now find them in their own file rx.backpressure.js
or if you're using rx.lite.js
, then you're already in luck because you have them already.
There are many ways around this problem of backpressure including using throttle
if you're ok with losing some data, the buffer
methods by time, count, etc, if you'd like the results in batch for a particular count or timeframe. In this case, we've added two methods, pausable
and pausableBuffer
.
With pausable
, you have the ability to pause a hot observable, such as mouse movements and then resume, but you will lose data in between the pause and resume methods. Below is an example of it in action.
var controller = new Rx.Subject();
var events = Rx.Observable.fromEvent(document, 'mousemove');
// Control the events by the controller
var controlled = events.pausable(controller);
var subscription = controlled.subscribe(function (e) {
// Do something with events
// Woops, too fast
// Pause the event stream
controller.onNext(false);
// When you want to start again, call controller.onNext(true);
});
// Start listening
controller.onNext(true);
The other is pausableBuffer
where you will not lose data, in fact, it will be kept in a buffer until you are ready to start consuming again.
var controller = new Rx.Subject();
var interval = Rx.Observable.interval(1000).timeInterval();
// Control the events by the controller
var controlled = interval.pausable(controller);
var subscription = controlled.subscribe(function (x) {
console.log('x', x.value);
});
// Start it
var shouldRun = true;
controller.onNext(shouldRun);
// Make it pause every so often
setIterval(function () {
controller.onNext(shouldRun = !shouldRun);
}, 5000);
In future releases, we will also cover ideas on how you can request a number of items each time, such as the following.
var source = Rx.Observable.range(0, 1000).controlled();
source.subscribe(function(x) {
console.log('x', x);
});
// Get 10 items
source.request(10);
// Maybe later get another
source.request(5);
This is just a first exploration into what's possible, but we're pretty excited about this.
Bug Fixes
Closed the following bugs:
RxJS Release 2.2.12
This is a minor update from 2.2.12 with the following changes.
ChangeList:
- Fixed
TestScheduler.createHotObservable
to handle multiple observers Issue #89 - Using
MutationObserver
orWebKitMutationObserver
as a scheduling mechanism Issue #91
The TestScheduler
will now use the following logic to determine the fastest immediate scheduling if available:
MutationObserver
orWebKitMutationObserver
process.nextTick
in Node.jswindow.setImmediate
window.postMessage
window.MessageChannel
script.readystatechanged
with script injectionwindow.setTimeout
Reactive Extensions for JavaScript (RxJS) version 2.2.10
Minor update from RxJS version 2.2.9, which has the following fixes:
- Make the following operators cold observables instead of hot
Rx.Observable.fromCallback
Rx.Observable.fromNodeCallback
Rx.Observable.fromPromise
- Fix
Rx.Observable.zipArray
to accept both parameters and an array.
Reactive Extensions for JavaScript (RxJS) version 2.2.9
Slight update to 2.2.7 to include shortcut operators for adding a reference counter for all connectable observables, by calling refCount
. Adding the share()
, shareValue()
, and shareReplay()
methods. These methods are optimized for the 80% case, in which the developer simply wants to share side effects among multiple concurrent observer. This allows developers simple code like this:
var interval = Rx.Observable.interval(1000);
var source = interval
.take(2)
.do(function (x) { console.log('Side effect'); })
.share();
// When the number of observers subscribed to published observable goes from
// 0 to 1, we connect to the underlying observable sequence.
source.subscribe(createObserver('SourceA'));
// When the second subscriber is added, no additional subscriptions are added to the
// underlying observable sequence. As a result the operations that result in side
// effects are not repeated per subscriber.
source.subscribe(createObserver('SourceB'));
function createObserver(tag) {
return Rx.Observer.create(
function (x) {
console.log('Next: ' + tag + x);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
}
);
}
Other changes:
- Fixed Bower version issue
- Added SauceLabs testing
Reactive Extensions for JavaScript (RxJS) version 2.2.7
Slight update to 2.2.5 which updates the following:
Changes:
rx.js
|rx.compat.js
- Generalized
throttle
to usethrottleWithSelector
thus cutting code duplication
- Generalized
rx.lite.js
|rx.lite.compat.js
:- Removed
generateWithRelativeTime
andremoveWithAbsoluteTime
and replaced withgenerateWithTime
- Removes absolute time scheduling from:
delay
timer
- Removed
Reactive Extensions for JavaScript (RxJS) version 2.2.5
Important update to the Reactive Extensions for JavaScript (RxJS) including the addition of the notion of a lite file, which contains most of what the user needs in one file.
Changes:
Rx.Observable.create
now takes either aDisposable
as a return in the function, a function, or nothing.
/* With a disposable */
var observable = Rx.Observable.create(function (observer) {
observer.onNext(42);
observer.onCompleted();
return Rx.Disposable.create(function () {
console.log('disposed!');
});
});
/* With a function*/
var observable = Rx.Observable.create(function (observer) {
observer.onNext(42);
observer.onCompleted();
return function () {
console.log('disposed!');
};
});
/* With no retutnr*/
var observable = Rx.Observable.create(function (observer) {
observer.onNext(42);
observer.onCompleted();
// Nothing to dispose
});
New Files:
- rx.lite.js - a lighter version of RxJS which contains most operators used on a daily basis including standard operators, time-based, and async event, callback and promise based operators.
- rx.lite.compat.js - much like
rx.lite.js
but has compatibility back to IE6
New NuGet Packages:
- RxJS-Lite - A NuGet package containing both
rx.lite.js
andrx.lite.compat.js
.
Operators included in RxJS-Lite:
- Creation Operators
create
defer
empty
fromArray
generate
never
range
repeat
return
throw
- Multiple Value Operators
catch
combineLatest
concat
concatObservable
|concatAll
merge
mergeObservable
|mergeAll
skipuntil
switch
takeuntil
zip
ziparray
- Single Value Operators
asObservable
dematerialize
distinctUntilChanged
do
finally
ignoreElements
materialize
repeat
retry
scan
skipLast
startWith
takeLast
takeLastBuffer
- Standard Query Operators
select
|map
selectMany
|flatMap
selectSwitch
|flatMapLatest
skip
skipWhile
take
takeWhile
where
|filter
- Async Operators
fromCallback
fromNodeCallback
fromEvent
fromEventPattern
fromPromise
- Binding Operators
multicast
publish
publishLast
publishValue
replay
- Time-based Operators
interval
timer
delay
throttle
timeInterval
timestamp
sample
timeout
generateWithAbsolutetime
generateWithRelativetime
delaySubscription
delayWithSelector
timeoutWithSelector
throttleWithSelector
skipLastWithTime
takeLastWithTime
takeLastBufferWithtime
takewithtime
skipWithTime
skipUntilWithTime
takeUntilWithtime
Reactive Extensions for JavaScript (RxJS) version 2.2.4
Minor update from v2.2.3 which shortcuts behavior for callbacks with only a single value.
The old behavior of Rx.Observable.fromCallback
and Rx.Observable.fromNodeCallback
was to by default return an array, even if only a single value.
var Rx = require('rx'),
fs = require('fs');
var stat = Rx.Observable.fromNodeCallback(
fs.stat,
null, /* default scheduler */
null, /* default context */,
function (arr) { return arr[0]; }); /* only return the first */
var files = ['file1.txt', 'file2.txt', 'file3.txt'];
Rx.Observable
.for(files, function (file) {
return stat(file);
})
.toArray()
.subscribe(
function (results) {
results.forEach(function (result) {
console.log('is file: ' + result.isFile());
});
},
function (err) {
console.log('err ' + err);
}
);
The new behavior shortcuts this if there is only one parameter and returns it as a single value.
var Rx = require('rx'),
fs = require('fs');
var stat = Rx.Observable.fromNodeCallback(fs.stat);
var files = ['file1.txt', 'file2.txt', 'file3.txt'];
Rx.Observable
.for(files, function (file) {
return stat(file);
})
.toArray()
.subscribe(
function (results) {
results.forEach(function (result) {
console.log('is file: ' + result.isFile());
});
},
function (err) {
console.log('err ' + err);
}
);