From 398a75afcb291047ebb09e726636daf60b540b39 Mon Sep 17 00:00:00 2001 From: Tim Cameron Ryan Date: Mon, 9 Feb 2015 17:08:18 -0800 Subject: [PATCH] Adds support for a stream option. --- README.md | 1 + lib/decode.js | 39 +++++++++++++++++++++++++++++++++++++++ lib/index.js | 1 + lib/stream.js | 29 +++++++++++++++++++++++++++++ package.json | 2 +- 5 files changed, 71 insertions(+), 1 deletion(-) create mode 100644 lib/stream.js diff --git a/README.md b/README.md index 75c864e..4f113f8 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ var notepack = require('notepack'); var encoded = notepack.encode({ foo: 'bar'}); // var decoded = notepack.decode(encoded); // { foo: 'bar' } +var stream = notepack.createStream().on('data', ...); // decode continuous streams of buffers ``` ## Performance diff --git a/lib/decode.js b/lib/decode.js index 7d87cce..83e64ac 100644 --- a/lib/decode.js +++ b/lib/decode.js @@ -5,6 +5,13 @@ function Decoder(buffer) { this.buffer = buffer; } +Decoder.prototype.expect = function (length) { + var remain = this.buffer.length - this.offset; + if (remain < length) { + throw new Error('Expected ' + length + ' bytes, found ' + remain + '.'); + } +} + Decoder.prototype.array = function (length) { var value = new Array(length); for (var i = 0; i < length; i++) { @@ -23,12 +30,14 @@ Decoder.prototype.map = function (length) { }; Decoder.prototype.str = function (length) { + this.expect(length); var value = this.buffer.toString('utf8', this.offset, this.offset + length); this.offset += length; return value; }; Decoder.prototype.bin = function (length) { + this.expect(length); var value = this.buffer.slice(this.offset, this.offset + length); this.offset += length; return value; @@ -73,30 +82,36 @@ Decoder.prototype.parse = function () { // bin case 0xc4: + this.expect(1); length = this.buffer.readUInt8(this.offset); this.offset += 1; return this.bin(length); case 0xc5: + this.expect(2); length = this.buffer.readUInt16BE(this.offset); this.offset += 2; return this.bin(length); case 0xc6: + this.expect(4); length = this.buffer.readUInt32BE(this.offset); this.offset += 4; return this.bin(length); // ext case 0xc7: + this.expect(2); length = this.buffer.readUInt8(this.offset); type = this.buffer.readInt8(this.offset + 1); this.offset += 2; return [type, this.bin(length)]; case 0xc8: + this.expect(3); length = this.buffer.readUInt16BE(this.offset); type = this.buffer.readInt8(this.offset + 2); this.offset += 3; return [type, this.bin(length)]; case 0xc9: + this.expect(5); length = this.buffer.readUInt32BE(this.offset); type = this.buffer.readInt8(this.offset + 4); this.offset += 5; @@ -104,28 +119,34 @@ Decoder.prototype.parse = function () { // float case 0xca: + this.expect(4); value = this.buffer.readFloatBE(this.offset); this.offset += 4; return value; case 0xcb: + this.expect(8); value = this.buffer.readDoubleBE(this.offset); this.offset += 8; return value; // uint case 0xcc: + this.expect(1); value = this.buffer.readUInt8(this.offset); this.offset += 1; return value; case 0xcd: + this.expect(2); value = this.buffer.readUInt16BE(this.offset); this.offset += 2; return value; case 0xce: + this.expect(4); value = this.buffer.readUInt32BE(this.offset); this.offset += 4; return value; case 0xcf: + this.expect(8); hi = this.buffer.readUInt32BE(this.offset) * Math.pow(2, 32); lo = this.buffer.readUInt32BE(this.offset + 4); this.offset += 8; @@ -133,18 +154,22 @@ Decoder.prototype.parse = function () { // int case 0xd0: + this.expect(1); value = this.buffer.readInt8(this.offset); this.offset += 1; return value; case 0xd1: + this.expect(2); value = this.buffer.readInt16BE(this.offset); this.offset += 2; return value; case 0xd2: + this.expect(4); value = this.buffer.readInt32BE(this.offset); this.offset += 4; return value; case 0xd3: + this.expect(8); hi = this.buffer.readInt32BE(this.offset) * Math.pow(2, 32); lo = this.buffer.readUInt32BE(this.offset + 4); this.offset += 8; @@ -152,6 +177,7 @@ Decoder.prototype.parse = function () { // fixext case 0xd4: + this.expect(1); type = this.buffer.readInt8(this.offset); this.offset += 1; if (type === 0x00) { @@ -160,17 +186,21 @@ Decoder.prototype.parse = function () { } return [type, this.bin(1)]; case 0xd5: + this.expect(1); type = this.buffer.readInt8(this.offset); this.offset += 1; return [type, this.bin(2)]; case 0xd6: + this.expect(1); type = this.buffer.readInt8(this.offset); this.offset += 1; return [type, this.bin(4)]; case 0xd7: + this.expect(1); type = this.buffer.readInt8(this.offset); this.offset += 1; if (type === 0x00) { + this.expect(8); hi = this.buffer.readInt32BE(this.offset) * Math.pow(2, 32); lo = this.buffer.readUInt32BE(this.offset + 4); this.offset += 8; @@ -178,40 +208,48 @@ Decoder.prototype.parse = function () { } return [type, this.bin(8)]; case 0xd8: + this.expect(1); type = this.buffer.readInt8(this.offset); this.offset += 1; return [type, this.bin(16)]; // str case 0xd9: + this.expect(1); length = this.buffer.readUInt8(this.offset); this.offset += 1; return this.str(length); case 0xda: + this.expect(2); length = this.buffer.readUInt16BE(this.offset); this.offset += 2; return this.str(length); case 0xdb: + this.expect(4); length = this.buffer.readUInt32BE(this.offset); this.offset += 4; return this.str(length); // array case 0xdc: + this.expect(2); length = this.buffer.readUInt16BE(this.offset); this.offset += 2; return this.array(length); case 0xdd: + this.expect(4); length = this.buffer.readUInt32BE(this.offset); this.offset += 4; return this.array(length); // map case 0xde: + this.expect(2); length = this.buffer.readUInt16BE(this.offset); this.offset += 2; return this.map(length); case 0xdf: + this.expect(4); length = this.buffer.readUInt32BE(this.offset); this.offset += 4; return this.map(length); @@ -229,4 +267,5 @@ function decode(buffer) { return value; } +decode.Decoder = Decoder; module.exports = decode; diff --git a/lib/index.js b/lib/index.js index 8dae629..1578a75 100644 --- a/lib/index.js +++ b/lib/index.js @@ -1,2 +1,3 @@ exports.encode = require('./encode'); exports.decode = require('./decode'); +exports.createStream = require('./stream'); diff --git a/lib/stream.js b/lib/stream.js new file mode 100644 index 0000000..c3c3eb5 --- /dev/null +++ b/lib/stream.js @@ -0,0 +1,29 @@ +'use strict'; + +var stream = require('stream'); +var Decoder = require('./decode').Decoder; + +function createStream () { + var cache = new Buffer(''); + + var ret = stream.Writable(); + ret._write = function (chunk, encoding, next) { + cache = Buffer.concat([cache, chunk]); + + try { + while (true) { + var d = new Decoder(cache); + ret.emit('data', d.parse()); + cache = cache.slice(d.offset); + if (!cache.length) { + break; + } + } + } catch (e) { + } + next(); + }; + return ret; +} + +module.exports = createStream; diff --git a/package.json b/package.json index c9aba7f..9a8c059 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "notepack", - "version": "0.0.2", + "version": "0.0.3", "description": "A fast Node.js implementation of the latest MessagePack spec", "main": "lib/index.js", "repository": {