Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance enhancements for streaming #150

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions lib/oarequest.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ OARequest.prototype.persist = function () {
this.parser.on('element', function (msg) {
// first msg; successful connection => reset reconnect interval
self.connectInterval = 0
if (msg.delete) { self.emit('delete', msg) }
if (msg.id_str) { self.emit('tweet', msg) }
else if (msg.delete) { self.emit('delete', msg) }
else if (msg.disconnect) { self.handleDisconnect(msg) }
else if (msg.limit) { self.emit('limit', msg) }
else if (msg.scrub_geo) { self.emit('scrub_geo', msg) }
Expand Down Expand Up @@ -97,7 +98,7 @@ OARequest.prototype.persist = function () {
else if (ev === 'list_user_subscribed') { self.emit('list_user_subscribed', msg) }
else if (ev === 'list_user_unsubscribed') { self.emit('list_user_unsubscribed', msg) }
else { self.emit('unknown_user_event', msg) }
} else { self.emit('tweet', msg) }
}
})

this.parser.on('error', function (err) {
Expand Down Expand Up @@ -452,4 +453,3 @@ OARequest.prototype.stopStallAbortTimeout = function () {
}

module.exports = OARequest

114 changes: 79 additions & 35 deletions lib/parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,91 @@
var util = require('util')
, EventEmitter = require('events').EventEmitter;

var Parser = module.exports = function () {
this.message = ''
var Parser = module.exports = function () {
this.msgBuf = ''
this.nextMsgBytes = 0
this.lastByteChecked = 0

EventEmitter.call(this);
};

util.inherits(Parser, EventEmitter);

Parser.prototype.parse = function (chunk) {
this.message += chunk;
chunk = this.message;

var size = chunk.length
, start = 0
, offset = 0
, curr
, next;

while (offset < size) {
curr = chunk[offset];
next = chunk[offset + 1];

if (curr === '\r' && next === '\n') {
var piece = chunk.slice(start, offset);
start = offset += 2;

if (!piece.length) { continue; } //empty object

try {
var msg = JSON.parse(piece)
} catch (err) {
this.emit('error', new Error('Error parsing twitter reply: `'+piece+'`, error message `'+err+'`'));
} finally {
if (msg)
this.emit('element', msg)

continue
}
Parser.prototype.parse = function (chunk) {
// message var represents anything left in local buffer plus new chunk
var message = this.msgBuf += chunk;

// do we have a known piece incoming?
if (this.nextMsgBytes > 0) {
if (message.length >= this.nextMsgBytes) { // we have what we need to process

// separate active head piece of message from remainder
var splitPos = this.nextMsgBytes;
var piece = message.slice(0, splitPos);
var rest = message.slice(splitPos + 1);

// process that piece
this._processObj(piece);

// reset state, then call ourselves recursively with remainder of message
this.nextMsgBytes = 0; // since we just processed it
this.msgBuf = ''; // reset state
this.lastByteChecked = 0; // reset state
this.parse(rest);

} else { // we are still expecting more data
this.msgBuf = message; // store what we have so far
// after this, function will exit, until it gets called with next chunk..
}
offset++;

} else { // we don't have any known pieces coming, so search for EOLs

// parse until we find a full line
// (be sure to indicate the last searched point to make search faster)
var eolPos = message.indexOf('\r\n', this.lastByteChecked);
if (eolPos !== -1) {
// we have an EOL, take the first line for processing and reserve the rest
var line = message.slice(0, eolPos);
var rest = message.slice(eolPos + 2);

// process that line
this._processLine(line);

// reset state, call ourselves recursively with remainder of message
this.msgBuf = '';
this.lastByteChecked = 0;
this.parse(rest);

} else {
// we haven't found an EOL yet--indicate how far we have read so that we
// don't recheck it on next chunk, and store in local buffer to it can be
// resurrected when we get the next chunk.
this.lastByteChecked = message.length - 1;
this.msgBuf = message;
}

}
}

this.message = chunk.slice(start, size);
};
// a line can contain either a delimeted: length hint, or a JSON object
Parser.prototype._processLine = function (line) {
// if entire line is an integer, assume we are getting delimited: length
var hint = parseInt(line);
if (!isNaN(hint)) {
this.nextMsgBytes = hint;
} else {
this._processObj(line)
}
}


// process what we assume should be a JSON object (but allow for errors..)
Parser.prototype._processObj = function (piece) {
try {
var msg = JSON.parse(piece)
} catch (err) {
this.emit('error', new Error('Error parsing twitter reply: `'+piece+'`, error message `'+err+'`'));
} finally {
if (msg) { this.emit('element', msg); }
}
}