Skip to content

Commit

Permalink
adding sqlite storage
Browse files Browse the repository at this point in the history
  • Loading branch information
rksm committed Oct 9, 2013
1 parent 5971849 commit 82b3387
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 5 deletions.
7 changes: 6 additions & 1 deletion VersionedFileSystem.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ var path = require("path");
var fs = require("fs");
var findit = require('findit');
var MemoryStore = require('./memory-storage');
var SQLiteStore = require('./sqlite-storage');
var d = require('./domain');

// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
Expand Down Expand Up @@ -41,7 +42,8 @@ util._extend(VersionedFileSystem.prototype, d.bindMethods({
// initializing
initialize: function(options) {
if (!options.fs) this.emit('error', 'VersionedFileSystem needs location!');
this.storage = new MemoryStore();
this.storage = new SQLiteStore();
// this.storage = new MemoryStore();
this.rootDirectory = options.fs;
this.excludedDirectories = options.excludedDirectories || [];
},
Expand All @@ -50,6 +52,7 @@ util._extend(VersionedFileSystem.prototype, d.bindMethods({
console.log('LivelyFS initialize at %s', this.getRootDirectory());
var self = this;
async.waterfall([
function(next) { self.storage.reset(next); },
this.walkFiles.bind(this, this.excludedDirectories),
function(findResult, next) {
console.log('LivelyFS initialize synching %s files', findResult.files.length);
Expand Down Expand Up @@ -143,6 +146,8 @@ util._extend(VersionedFileSystem.prototype, d.bindMethods({
if (ignoredDirs.indexOf(base) >= 0) stop();
});
find.on('file', function (file, stat) {
// !FIXME!
if (file.indexOf('.sqlite') >= 0) return;
result.files.push({
path: path.relative(root, file),
stat: stat
Expand Down
2 changes: 1 addition & 1 deletion domain.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
var d = require('domain').create();

d.on('error', function(err) {
console.error('LivelyFS encountered error: ', err.stack);
console.error('LivelyFS encountered error: ', err.stack || err);
process.exit();
});

Expand Down
4 changes: 3 additions & 1 deletion memory-storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ var EventEmitter = require("events").EventEmitter;
var d = require('./domain');

function MemoryStore() {
this.versions = {};
this.versions = null;
EventEmitter.call(this);
}

util._extend(MemoryStore.prototype, EventEmitter.prototype);

util._extend(MemoryStore.prototype, d.bindMethods({

reset: function(thenDo) { this.versions = {}; thenDo && thenDo(null); },

store: function(versionData, thenDo) {
var versions = this.versions[versionData.path]
|| (this.versions[versionData.path] = []);
Expand Down
5 changes: 4 additions & 1 deletion repository.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ util._extend(Repository.prototype, d.bindMethods({
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
// intialize-release
initialize: function(options) {
if (global.lively) {
lively.repository = this;
}
this.fs = new VersionedFileSystem(options);
// we keep a queue for changes b/c they should be committed to the
// versioned file system in their incoming order. Before they can be
Expand Down Expand Up @@ -66,7 +69,7 @@ util._extend(Repository.prototype, d.bindMethods({

// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
// change recording
isSynchronized: function() { return this.pendingChangeQueue.length === 0 },
isSynchronized: function() { return this.pendingChangeQueue.length === 0; },

commitPendingChanges: function() {
var repo = this,
Expand Down
2 changes: 1 addition & 1 deletion request-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ util._extend(LivelyFsHandler.prototype, d.bindMethods({
initialize: function(options) {
options = options || {};
options.fs = options.fs || process.cwd();
options.excludedDirectories = options.excludedDirectories || ['.git', 'node_modules'];
options.excludedDirectories = options.excludedDirectories || ['.svn', '.git', 'node_modules'];
this.repository = new Repository(options);
},

Expand Down
172 changes: 172 additions & 0 deletions sqlite-storage.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
"use strict"

var util = require("util");
var path = require("path");
var EventEmitter = require("events").EventEmitter;
var d = require('./domain');
var async = require('async');
var sqlite3 = require('sqlite3').verbose();

// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

function log(/*args*/) { console.log.apply(console, arguments); }

function sqlPrep(db, stmt) { return db.prepare(stmt, function(err) { console.log(err) }); }

function run(db, stmt, args, thenDo) {
db.run(stmt, args, function(err) {
if (err) log('err: ', err);
else log('%s -- lastID: %s, changes: %s', stmt, this.lastID, this.changes);
thenDo(err, {lastID: this.lastID, changes: this.changes});
});
}

function query(db, stmt, args, thenDo) {
var rows = [];
db.all(stmt, args, thenDo);
// db.each(stmt, args,
// function(err, row) {
// if (err) log('err: ', err); else rows.push(row);
// }, function(err, noRows) {
// if (err) log('err: ', err); else log('%s: #%s', stmt, noRows);
// thenDo && thenDo(err, rows);
// });
}

function initTable(db, tableName, createStmt) {
return function(next) {
db.serialize(function() {
db.run('DROP TABLE IF EXISTS '+ tableName, function(err) {
log('DROP TABLE', tableName, err);
});
db.run(createStmt, function(err) {
err && log('error: ', err);
next(err); });
});
}
}

function initFSTables(db, thenDo) {
async.parallel([
initTable(db, "versioned_objects",
"CREATE TABLE versioned_objects (\n"
+ " path TEXT,\n"
+ " version TEXT NOT NULL DEFAULT '0',\n"
+ " change TEXT,\n"
+ " author TEXT,\n"
+ " date TEXT,\n"
+ " content TEXT,\n"
+ " PRIMARY KEY(path,version)\n"
+ ");\n"
+ "CREATE INDEX ON versioned_objects(path,version);"),
], function(err) {
log('DONE: CREATE TABLES', err);
thenDo && thenDo(err);
});
}

function storeVersionedObjects(db, dataAccessors, thenDo) {
// this batch-processes worlds inserts
// worldDataAccessors is an array of functions that expect one parameter, a
// callback, that in turn has an error callback and an object
// {uri, version,json} this should be stored in the db
// queued so that we do not start open file handles to all worlds at once
function afterInsert() {}
function worker(accessor, next) {
accessor(function(err, data) {
if (err) {
console.log('Could not access %s: ', data, err);
taskCount--; next(); return;
}
console.log("storing %s...", data.path);
var fields = [data.path, data.change,
data.author, data.date,
data.content, data.path];
stmt.run.apply(stmt, fields.concat([afterInsert]));
// db can run stuff in parallel, no need to wait for stmt to finsish
// next();
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
function afterInsert(err) {
if (err) {
console.error('Error inserting %s: %s', data && data.path, err);
} else {
console.log("... done storing %s", data.path);
}
taskCount--;
next();
if (taskCount > 0) return;
stmt.finalize();
console.log("all worlds imported!");
thenDo && thenDo();
}
});
}
var taskCount = dataAccessors.length,
parallelReads = 10,
sqlInsertStmt = 'INSERT INTO versioned_objects '
+ 'SELECT ?, ifnull(x,0), ?, ?, ?, ? '
+ 'FROM (SELECT max(CAST(objs2.version as integer)) + 1 AS x '
+ ' FROM versioned_objects objs2 '
+ ' WHERE objs2.path = ?);',
stmt = db.prepare(sqlInsertStmt, function(err) {
// this callback is needed, when it is not defined the server crashes
// but when it is there the stmt.run callback also seems the catch the error...
err && console.error('error in sql %s: %s', sqlInsertStmt, err); }),
q = async.queue(worker, parallelReads);
q.push(dataAccessors);
}

// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

function SQLiteStore() {
this.db = null;
EventEmitter.call(this);
// Object.freeze(this);
}

util._extend(SQLiteStore.prototype, EventEmitter.prototype);

util._extend(SQLiteStore.prototype, d.bindMethods({

reset: function(thenDo) {
// this.db = new sqlite3.Database(':memory:');
this.db = new sqlite3.Database(path.join(process.cwd(), "world-db-expt2.sqlite"));
initFSTables(this.db, thenDo);
},

store: function(versionData, thenDo) {
this.storeAll([versionData], thenDo);
},

storeAll: function(versionDataSets, thenDo) {
var accessors = versionDataSets.map(function(dataset) {
return function(callback) { callback(null, dataset); }; });
storeVersionedObjects(this.db, accessors, thenDo);
},

getVersionsFor: function(fn, thenDo) {
var sql = "SELECT * FROM versioned_objects "
+ "WHERE path = ? ";
+ "ORDER BY CAST(version as integer);";
query(this.db, sql, [fn], thenDo);
},

dump: function(thenDo) { // get all versions
var sql = "SELECT * FROM versioned_objects GROUP BY path,version;"
// query(this.db, sql, [], thenDo);
query(this.db, sql, [], function(err, rows) {
// console.log(rows);
// FIXME!
var result = rows.reduce(function(result, row) {
var last = result[result.length-1];
if (last && last[0].path === row.path) { last.push(row); }
else { result.push([row]); }
return result;
}, []);
thenDo(err,result);
});
}

}));

module.exports = SQLiteStore;
15 changes: 15 additions & 0 deletions tests/tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,21 @@ var tests = {
var files = {
"testDir": {"aFile.txt": 'foo bar content'}
};
// var files = {
// "testDir": {
// "aFile.txt": 'foo bar content',
// "dir1": {
// "otherFile.txt": "content content content",
// "boing.jpg": "imagin this would be binary",
// "dir1.1": {"xxx.txt": 'ui'}
// },
// "dir2": {
// "file1.foo": "1",
// "file2.foo": "2"
// },
// "dir3": {}
// }
// };
fsHelper.createDirStructure(baseDirectory, files, next);
},
logProgress('test files created'),
Expand Down

0 comments on commit 82b3387

Please sign in to comment.