Skip to content

Commit

Permalink
Add wait system, Improve json-schema validation
Browse files Browse the repository at this point in the history
- Added json-schema-ref-parser as Util.dereference() which returns a
promise and is used in conjunction with Util.validate(). This allows for
more maintainable json-schema definitions, reducing duplication. See:
http://json-schema.org/example2.html
- Added wait system. There is now an optional, user-defined "wait"
counter to allow for additional interator flow control A wait represents
a single outstanding asyncronous process. Waits can be used by the
developer inside the config.tasks file where desired to ensure
longer-running processes such as Util.validate() are able to complete.
- Modified how Util.log() behaves. Previously just a wrapper for
console.log(), it now suppresses output if config.quiet is set to true.
- Changed Util.validat() to return a promise. This clarified the
behavior, such failed validations return a rejection with the specific
reason.
- Bumped AJV, commander versions.
  • Loading branch information
dsquier committed Jun 23, 2017
1 parent 90dd339 commit 3ee330f
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 35 deletions.
58 changes: 33 additions & 25 deletions lib/couchit.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const Db = require('./db');
const Util = require('./util');
const async = require('async');

/**
* class Couchit
Expand All @@ -12,14 +13,13 @@ function Couchit() {}
Couchit.prototype.iterate = function(config, cb) {
const tasks = config.tasks;
const url = 'http://' + config.dbUser + ':' + config.dbPass + '@' + config.dbEndpoint + '/' + config.dbName;
const stat = { __docs: 0, __pages: 0, __save: 0, __remove: 0, __audit: 0, __hash: 0, __validate: 0 };
const stat = { __docs: 0, __pages: 0, __save: 0, __remove: 0, __audit: 0, __hash: 0, __validate: 0, __dereference: 0 };
const start = new Date();
const db = new Db(url);
const util = new Util(stat, [], db.db, config);

function _pageCb(rows) {
const docsCount = (rows.length === config.pageSize + 1) ? config.pageSize : rows.length;

util.increment('__docs', docsCount);
util.increment('__pages', 1);

Expand All @@ -34,46 +34,28 @@ Couchit.prototype.iterate = function(config, cb) {
const queuedDocs = util.getQueue().slice(0); // a copy of the queue

if (queuedDocs.length >= config.batchSize) {
console.log('Updating %d doc%s', queuedDocs.length, queuedDocs.length > 1 ? 's' : '');
util.log('Updating %d doc%s', queuedDocs.length, queuedDocs.length > 1 ? 's' : '');

db.update(queuedDocs, function(err, result) {
if (err) {
console.error(err.message);
} else {
if (!config.quiet) {
console.log('Bulk update %d doc%s done', result.length, result.length > 1 ? 's' : '');
}
util.log('Bulk update %d doc%s done', result.length, result.length > 1 ? 's' : '');
}
});
util.resetQueue();
}
}

function _endCb(err, result) {

function _report(err, result) {
const audit = util.getAudit();
const stat = util.getStat();

// Add elapsed time to stat object
stat.__elapsed = new Date() - start;

if (!config.quiet) {
console.log(util.createReport(stat));
}

// return a callback response object containing the stat object and audit array
cb(err, { stat, audit });
}

const queue = util.getQueue();

// update the remaining queued documents
if (queue.length > 0) {
// update the remaining queued documents
db.update(queue, function(err, result) {
function _wait() {
if (db.done()) {
_report(err, result);
finalize(err, result);
} else {
setImmediate(_wait);
}
Expand All @@ -82,8 +64,34 @@ Couchit.prototype.iterate = function(config, cb) {
});
// if queue is empty, then just report regardless there's an error or not
} else {
_report(err, result);
finalize(err, result);
}

/**
* To allow long-running asyncronous processes to complete before returning the
* callback that reflects the results of all procerssing, ensure there are no
* outstanding waits.
*/
function finalize(err, result) {
async.whilst(
function() {
return util.doWaitsExist();
},
function(callback) {
setTimeout(callback, 100);
},
function(err, result) {
const audit = util.getAudit();
const stat = util.getStat();
stat.__elapsed = new Date() - start;
util.log(util.createReport(stat));

// return a callback response object containing the stat object and audit array
cb(err, { stat, audit });
}
);
}

}

db.paginate(config.interval, config.startKey, config.endKey, config.pageSize, config.numPages, _pageCb, _endCb);
Expand Down
67 changes: 60 additions & 7 deletions lib/util.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const hasher = require('node-object-hash')({ coerce: false }).hash;
const Ajv = require('ajv');
const $RefParser = require('json-schema-ref-parser');

/**
* Util is exposed to couchit.js task functions.
Expand All @@ -11,6 +12,36 @@ function Util(stat, queue, nano, config) {
this.config = config;
this.auditItems = [];
this.hasher = hasher;

/**
* User-defined "wait" counter to allow for additional interator flow control
*
* A wait represents a single outstanding asyncronous process. Waits can be
* used by the developer inside the config.tasks file where desired to ensure
* longer-running processes such as Util.validate() are able to complete.
*/
this.waits = 0;
}

/**
* Returns true if there are waits
*/
Util.prototype.doWaitsExist = function() {
return this.waits > 0;
}

/**
* Increment wait counter (use at the start of a process/function call)
*/
Util.prototype.incrementWaits = function() {
this.waits += 1;
}

/**
* Decrement wait counter (use at the process/function completion)
*/
Util.prototype.decrementWaits = function() {
this.waits -= 1;
}

/**
Expand Down Expand Up @@ -91,12 +122,14 @@ Util.prototype.remove = function(doc) {
};

/**
* Log message.
* Wrapper for console.log() which prevents output if config.quiet === true
*
* @param {String} message: the message to log
*/
Util.prototype.log = function(message) {
console.log(message);
if (!this.config.quiet) {
console.log(message);
}
};

/**
Expand All @@ -123,7 +156,7 @@ Util.prototype.getAudit = function() {
* @return queue
*/
Util.prototype.getQueue = function() {
console.log('| Retrieving page (' + this.config.pageSize + ' docs)');
this.log('| Retrieving page (' + this.config.pageSize + ' docs)');
return this.queue;
};

Expand All @@ -135,13 +168,32 @@ Util.prototype.resetQueue = function() {
};

/**
* json-schema validation
* Validate based on json-schema
*
* Returns a promise.
*/
Util.prototype.validate = function(schema, data) {
this.count('__validate');
const ajv = new Ajv({ allErrors: true }); // options can be passed, e.g. {allErrors: true}
const validate = ajv.compile(schema);
return validate(data);
const valid = ajv.validate(schema, data);

if (valid) {
return Promise.resolve();
}

// If the schema does not validate, return a promise rejection with the reason
return Promise.reject(ajv.errorsText());
}

/**
* De-reference a json-schema reference file.
*
* Typically used with util.validate() to reference other json-schema
* files to allow re-use and avoid duplicating common schemas.
*/
Util.prototype.dereference = function(file) {
this.count('__dereference');
return $RefParser.dereference(file);
}

/**
Expand All @@ -161,14 +213,15 @@ Util.prototype.createReport = function(stat) {
'Pages retrieved : ' + stat.__pages + '\n' +
'Hashes computed : ' + stat.__hash + '\n' +
'Schemas validated : ' + stat.__validate + '\n' +
'Schemas dereferenced : ' + stat.__dereference + '\n' +
'Objects audited : ' + stat.__audit + '\n' +
'docs-per-sec : ' + docsPerSecond + '\n' +
'save-per-sec : ' + savePerSecond + '\n' +
'elapsed-sec : ' + elapsed + '\n';

Object.keys(stat).forEach(prop => {
if (!prop.match(/^__.+/)) {
report += 'count - ' + prop + ': ' + stat[prop] + '\n';
report += '\ncount - ' + prop + ': ' + stat[prop];
}
});

Expand Down
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"json-schema",
"schema validator"
],
"version": "0.6.0",
"version": "0.7.0",
"homepage": "http://github.com/dsquier/couchit",
"author": "David Squier ([email protected])",
"main": "./lib/couchit",
Expand All @@ -33,9 +33,10 @@
"lib": "./lib"
},
"dependencies": {
"ajv": "^5.1.5",
"ajv": "^5.2.0",
"async": "^2.4.1",
"commander": "^2.9.0",
"commander": "^2.10.0",
"json-schema-ref-parser": "^3.1.2",
"nano": "^6.3.0",
"node-object-hash": "1.2.0"
},
Expand Down

0 comments on commit 3ee330f

Please sign in to comment.