A port of the rx-node project for v5 and beyond. Note that this is only for v5 and above. If you are still using RxJS 4 then you should be using: https://github.com/Reactive-Extensions/rx-node
This project provides Reactive Extensions for JavaScript (RxJS) bindings for Node.js and io.js to abstract over the EventEmitter, Streams and more.
There are a number of ways to get started with the RxJS bindings for Node.js.
To download the source of the Node.js Bindings for the Reactive Extensions for JavaScript, type in the following:
git clone https://github.com/paulpdaniels/rx-node-vx.git
cd ./rx-node-vx
npm install rx-node-vx
Converts the given observable sequence to an event emitter with the given event name. The errors are handled on the 'error' event and completion on the 'end' event.
(Obsesrvable): The observable sequence to convert to an EventEmitter.eventName
(String): The event name to subscribe.
(EventEmitter): An EventEmitter which emits the given eventName for each onNext call in addition to 'error' and 'end' events.
var Rx = require('rxjs');
var RxNode = require('rx-node-vx');
var source = Rx.Observable.return(42);
var emitter = RxNode.toEventEmitter(source, 'data');
emitter.on('data', function (data) {
console.log('Data: ' + data);
emitter.on('end', function () {
// Ensure to call publish to fire events from the observable
// => Data: 42
// => End
- index.js
Converts a flowing stream to an Observable sequence.
(Stream): A stream to convert to a observable sequence.[finishEventName]
(String): Event that notifies about closed stream. ("end" by default)[dataEventName]
(String): Event that notifies about incoming data. ("data" by default)
(Observable): An observable sequence which fires on each 'data' event as well as handling 'error' and finish events like end
or finish
var RxNode = require('rx-node-vx');
var subscription = RxNode.fromStream(process.stdin, 'end')
.subscribe(function (x) { console.log(x); });
// => r<Buffer 72>
// => x<Buffer 78>
- index.js
Converts a flowing readable stream to an Observable sequence.
(Stream): A stream to convert to a observable sequence.[dataEventName]
(String): Event that notifies about incoming data. ("data" by default)
(Observable): An observable sequence which fires on each 'data' event as well as handling 'error' and 'end' events.
var RxNode = require('rx-node-vx');
var subscription = RxNode.fromReadableStream(process.stdin)
.subscribe(function (x) { console.log(x); });
// => r<Buffer 72>
// => x<Buffer 78>
Converts a flowing readable stream to an Observable sequence.
(Stream): A stream to convert to a observable sequence.
(Observable): An observable sequence which fires on each 'line' event as well as handling 'error' and 'close' events.
var readline = require('readline');
var fs = require('fs');
var RxNode = require('rx-node-vx');
var rl = readline.createInterface({
input: fs.createReadStream('sample.txt')
var subscription = RxNode.fromReadLineStream(rl)
.subscribe(function (x) { console.log(x); });
// Prints contents of 'sample.txt' line by line:
// => rx
// => supports 'readline'
- index.js
Converts a flowing writeable stream to an Observable sequence.
(Stream): A stream to convert to a observable sequence.
(Observable): An observable sequence which fires on each 'data' event as well as handling 'error' and 'finish' events.
var RxNode = require('rx-node');
var subscription = RxNode.fromWritableStream(process.stdout)
.subscribe(function (x) { console.log(x); });
// => r<Buffer 72>
// => x<Buffer 78>
- index.js
Converts a flowing transform stream to an Observable sequence.
(Stream): A stream to convert to a observable sequence.
(Observable): An observable sequence which fires on each 'data' event as well as handling 'error' and 'finish' events.
var RxNode = require('rx-node-vx');
var subscription = RxNode.fromTransformStream(getTransformStreamSomehow());
- index.js
Writes an observable sequence to a stream.
(Observable): Observable sequence to write to a stream.stream
(Stream): The stream to write to.[encoding]
(String): The encoding of the item to write.
(Subscription): The subscription handle.
var Rx = require('rxjs');
var RxNode = require('rx-node-vx');
var source = Rx.Observable.range(0, 5);
var subscription = RxNode.writeToStream(source, process.stdout, 'utf8');
// => 01234
- index.js