Skip to content

Commit

Permalink
Merge branch 'master' of github.com:LaurentZuijdwijk/streaming-cache
Browse files Browse the repository at this point in the history
Conflicts:
	package.json
  • Loading branch information
LaurentZuijdwijk committed Mar 31, 2017
2 parents 00b608c + 250195c commit 90121ff
Show file tree
Hide file tree
Showing 10 changed files with 871 additions and 68 deletions.
56 changes: 20 additions & 36 deletions examples/server.js
Original file line number Diff line number Diff line change
@@ -1,50 +1,34 @@

/**
In this example we create a simple server that serves the first file from disk and subsequent
requests from cache
**/
'use strict';
var http = require('http');
const PORT = 8080;
var PORT = 8080;

var fs = require('fs');
var Cache = require('../index.js');
var cache = new Cache();

function xhandleRequest(request, response) {
var _cache = cache.get('a');
var meta;
if (_cache) {
// meta = cache.getMetadata('a');
// if (meta && meta.length) {
// response.setHeader('Content-Length', meta.length)
// response.setHeader('Content-Type', 'image/jpeg');
// }
_cache.pipe(response);
}
else {
var readstream = fs.createReadStream('./stream.jpg');
readstream.pipe(cache.set('a')).pipe(response)
// fs.readFile('./stream.jpg', function (err, data) {
// // cache.setData('a', data);
// response.end(data)
}
}

//Create a server
var server = http.createServer(handleRequest);
var FILENAME = './stream.jpg';

//Lets start our server
server.listen(PORT, function () {
//Callback triggered when server is successfully listening. Hurray!
console.log('Server listening on: http://localhost:%s', PORT);
console.log('Server listening on: http://localhost:%s', PORT);
});

function handleRequest(request, response) {
cache.getData('a', function (err, data) {
if (data) {
response.end(data);
}
else {
fs.readFile('./stream.jpg', function (err, data) {
cache.setData('a', data);
response.end(data)
});
}
});
if (cache.exists(FILENAME)) {
response.setHeader('From-Cache', 'true');
cache.get(FILENAME).pipe(response);
}
else {
response.setHeader('From-Cache', 'false');
fs.createReadStream(FILENAME)
.pipe(cache.set(FILENAME))
.pipe(response);
}
}
4 changes: 4 additions & 0 deletions examples/yarn.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# THIS IS AN AUTOGENERATED FILE. DO NOT EDIT THIS FILE DIRECTLY.
# yarn lockfile v1


38 changes: 20 additions & 18 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ StreamingCache.prototype.getMetadata = function (key) {
StreamingCache.prototype.exists = function (key) {
utils.ensureDefined(key, 'Key');

var hit = this.cache.get(key);
return !!(hit && hit.status);
var hit = this.cache.has(key);
return hit;
};

StreamingCache.prototype.del = function (key) {
Expand Down Expand Up @@ -141,29 +141,31 @@ StreamingCache.prototype.set = function (key) {
self.emitters[key]._buffer = [];

var chunks = new LinkedList();
var stream = new Streams.Duplex()

stream._read = function () {
var chunk = chunks.shift();
if (chunk) {
this.push(chunk);
this.needRead = false;
} else {
this.needRead = true;
}
var stream = new Streams.Duplex();
stream.unfullfilledReadCount = 0;

stream._read = function () {
if(chunks.length){
var chunk = chunks.shift();
this.push(chunk);
this.unfullfilledReadCount = (this.unfullfilledReadCount > 0) ? this.unfullfilledReadCount - 1 : this.unfullfilledReadCount;
}
else{
this.unfullfilledReadCount = this.unfullfilledReadCount + 1;
}
};

stream._write = function (chunk, encoding, next) {
self.emitters[key]._buffer.push(chunk);
self.emitters[key].emit('data', chunk);
if (this.needRead) {
if (this.unfullfilledReadCount > 0) {
this.push(chunk);
this.needRead = false;
this.unfullfilledReadCount = this.unfullfilledReadCount - 1;
}
else {
chunks.push(chunk);
}
next(null, chunk);
next();
}

stream.on('error', function (err) {
Expand All @@ -177,11 +179,11 @@ StreamingCache.prototype.set = function (key) {
});

stream.on('finish', function () {
if (this.needRead) {
if (this.unfullfilledReadCount > 0) {
this.push(null);
}
}
else {
chunks.push(null);
chunks.push(null);
}

var hit = self.cache.get(key);
Expand Down
8 changes: 5 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"description": "Cache and replay NodeJS streams",
"main": "index.js",
"scripts": {
"test": "istanbul cover ./node_modules/jasmine-node/bin/jasmine-node ./spec --captureExceptions --verbose"
"test": "istanbul cover ./node_modules/jasmine-node/bin/jasmine-node ./spec --captureExceptions --verbose && mocha test/integration/*.spec.js"
},
"keywords": [
"cache",
Expand All @@ -17,10 +17,12 @@
"dependencies": {
"linkedlist": "^1.0.1",
"lodash.assign": "^4.0.1",
"lru-cache": "^2.7.0"
"lru-cache": "^2.7.0",
"mocha": "^3.2.0"
},
"devDependencies": {
"istanbul": "^0.3.22",
"jasmine-node": "^1.14.5"
"jasmine-node": "^1.14.5",
"supertest": "^3.0.0"
}
}
19 changes: 9 additions & 10 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Speed up your services.

Cache, queue and distribute streams immediately. Streams can be replayed immediately, even if the source is not finished.

Uses a fixed size LRU-cache in the background.
Uses a fixed size LRU-cache in the background.

Usefull for caching (slow) streaming connections, such as S3 requests or complex database queries.

Expand All @@ -32,14 +32,14 @@ var Cache = require('../index.js');
var cache = new Cache();
var fs = require('fs');

var readstream = fs.createReadStream('readme.md');
var writestream = fs.createWriteStream('test2.txt');
var inputStream = fs.createReadStream('readme.md');
var outputStream = fs.createWriteStream('test2.txt');

readstream.pipe(cache.set('a'));
inputStream.pipe(cache.set('myKey'));

setTimeout(function(){
writestream2.write('written from cache\n\n');
cache.get('a').pipe(writestream);
outputStream.write('written from cache\n\n');
cache.get('myKey').pipe(outputStream);
}, 200);

```
Expand All @@ -60,7 +60,7 @@ fileStream.pipe(cache.set('key')).pipe(res);
```


##### get(key) => ReadableStream
##### get(key) => ReadableStream

```javascript
var cached = cache.get('key');
Expand All @@ -74,11 +74,11 @@ if(cached){
A set data synchronously to stream at a later moment

#### getData
Get data with a callback.
Get data with a callback.

```javascript
cache.getData('key', function(err, data){
if(err){
if(err){
//handle error
}
// do something with data
Expand All @@ -92,4 +92,3 @@ Get metadata

#### exists(key)
returns true or false if a key exists.

36 changes: 35 additions & 1 deletion spec/streamingCacheSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@

var StreamingCache = require('../index');
var Duplex = require('stream').Duplex;
var Writable = require('stream').Writable;


var cache = new StreamingCache();
describe('streaming cache', function () {
var s;

beforeEach(function () {
cache = new StreamingCache()
s = cache.set('a');
Expand All @@ -15,9 +18,11 @@ describe('streaming cache', function () {
it('cache.set needs to be called with a key', function () {
expect(function () {cache.set();}).toThrow('Key expected');
});

it('cache.set should return a stream', function () {
expect(s).toEqual(jasmine.any(Duplex));
});

it('Writing to stream should set data', function (done) {
s.write('a');
s.write('b');
Expand All @@ -28,6 +33,30 @@ describe('streaming cache', function () {
s.end(null);
});

it('reading from a duplex stream should return all data', function(done){
var res = '';
var stream = new Writable({
write(chunk, encoding, callback) {
res += chunk;
callback();
}
});
stream.on('finish', function(){
expect(res).toEqual('abc');
done();
})

var cacheStream = s;
cacheStream.pipe(stream);
cacheStream._read();
cacheStream._read();
cacheStream.write('a');

cacheStream.write('b');
cacheStream.write('c');
cacheStream.end();
});

it('getting stream should return readstream', function () {
var r = cache.get('a');
expect(r).toEqual(jasmine.any(require('../lib/readStream')));
Expand All @@ -44,6 +73,7 @@ describe('streaming cache', function () {

expect(r).toEqual(jasmine.any(require('../lib/readStream')));
});

it('should be written to and readable when set is pending', function (done) {
var dataSpy = jasmine.createSpy();
var endSpy = jasmine.createSpy();
Expand All @@ -65,6 +95,7 @@ describe('streaming cache', function () {
done();
}, 200)
});

it('should handle sync setData', function () {
expect(cache.setData).toThrow();
cache.setData('b', new Buffer(100));
Expand Down Expand Up @@ -104,6 +135,7 @@ describe('streaming cache', function () {
done();
}, 10)
});

it('should handle getmetadata', function () {
cache.cache.set('abc', {data: 1, metaData: 'bbb'});
expect(cache.getMetadata).toThrow();
Expand Down Expand Up @@ -140,18 +172,20 @@ describe('streaming cache', function () {

describe('streaming cache short timeout', function () {
var s;

beforeEach(function () {
cache = new StreamingCache({maxAge: 100})
s = cache.set('b');
});

it('Writing to stream should set data', function (done) {
s.write('a');
s.write('b');
s.end(null);
setTimeout(function () {
expect(s.read().toString() + s.read().toString()).toEqual('ab')
cache.getData('b', function (err, data) {
expect(data.toString()).toEqual(null)
expect(data).toEqual(null)
done();
});
}, 130);
Expand Down
1 change: 1 addition & 0 deletions test/integration/fixtures/text-file-large.txt

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions test/integration/fixtures/text-file-small.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0123456789
Loading

0 comments on commit 90121ff

Please sign in to comment.