diff --git a/lib/couchit.js b/lib/couchit.js index 27468dd..4ba0560 100644 --- a/lib/couchit.js +++ b/lib/couchit.js @@ -1,5 +1,6 @@ const Db = require('./db'); const Util = require('./util'); +const async = require('async'); /** * class Couchit @@ -12,14 +13,13 @@ function Couchit() {} Couchit.prototype.iterate = function(config, cb) { const tasks = config.tasks; const url = 'http://' + config.dbUser + ':' + config.dbPass + '@' + config.dbEndpoint + '/' + config.dbName; - const stat = { __docs: 0, __pages: 0, __save: 0, __remove: 0, __audit: 0, __hash: 0, __validate: 0 }; + const stat = { __docs: 0, __pages: 0, __save: 0, __remove: 0, __audit: 0, __hash: 0, __validate: 0, __dereference: 0 }; const start = new Date(); const db = new Db(url); const util = new Util(stat, [], db.db, config); function _pageCb(rows) { const docsCount = (rows.length === config.pageSize + 1) ? config.pageSize : rows.length; - util.increment('__docs', docsCount); util.increment('__pages', 1); @@ -34,15 +34,13 @@ Couchit.prototype.iterate = function(config, cb) { const queuedDocs = util.getQueue().slice(0); // a copy of the queue if (queuedDocs.length >= config.batchSize) { - console.log('Updating %d doc%s', queuedDocs.length, queuedDocs.length > 1 ? 's' : ''); + util.log('Updating %d doc%s', queuedDocs.length, queuedDocs.length > 1 ? 's' : ''); db.update(queuedDocs, function(err, result) { if (err) { console.error(err.message); } else { - if (!config.quiet) { - console.log('Bulk update %d doc%s done', result.length, result.length > 1 ? 's' : ''); - } + util.log('Bulk update %d doc%s done', result.length, result.length > 1 ? 's' : ''); } }); util.resetQueue(); @@ -50,30 +48,14 @@ Couchit.prototype.iterate = function(config, cb) { } function _endCb(err, result) { - - function _report(err, result) { - const audit = util.getAudit(); - const stat = util.getStat(); - - // Add elapsed time to stat object - stat.__elapsed = new Date() - start; - - if (!config.quiet) { - console.log(util.createReport(stat)); - } - - // return a callback response object containing the stat object and audit array - cb(err, { stat, audit }); - } - const queue = util.getQueue(); + // update the remaining queued documents if (queue.length > 0) { - // update the remaining queued documents db.update(queue, function(err, result) { function _wait() { if (db.done()) { - _report(err, result); + finalize(err, result); } else { setImmediate(_wait); } @@ -82,8 +64,34 @@ Couchit.prototype.iterate = function(config, cb) { }); // if queue is empty, then just report regardless there's an error or not } else { - _report(err, result); + finalize(err, result); + } + + /** + * To allow long-running asyncronous processes to complete before returning the + * callback that reflects the results of all procerssing, ensure there are no + * outstanding waits. + */ + function finalize(err, result) { + async.whilst( + function() { + return util.doWaitsExist(); + }, + function(callback) { + setTimeout(callback, 100); + }, + function(err, result) { + const audit = util.getAudit(); + const stat = util.getStat(); + stat.__elapsed = new Date() - start; + util.log(util.createReport(stat)); + + // return a callback response object containing the stat object and audit array + cb(err, { stat, audit }); + } + ); } + } db.paginate(config.interval, config.startKey, config.endKey, config.pageSize, config.numPages, _pageCb, _endCb); diff --git a/lib/util.js b/lib/util.js index 92858e8..3817ef5 100644 --- a/lib/util.js +++ b/lib/util.js @@ -1,5 +1,6 @@ const hasher = require('node-object-hash')({ coerce: false }).hash; const Ajv = require('ajv'); +const $RefParser = require('json-schema-ref-parser'); /** * Util is exposed to couchit.js task functions. @@ -11,6 +12,36 @@ function Util(stat, queue, nano, config) { this.config = config; this.auditItems = []; this.hasher = hasher; + + /** + * User-defined "wait" counter to allow for additional interator flow control + * + * A wait represents a single outstanding asyncronous process. Waits can be + * used by the developer inside the config.tasks file where desired to ensure + * longer-running processes such as Util.validate() are able to complete. + */ + this.waits = 0; +} + +/** + * Returns true if there are waits + */ +Util.prototype.doWaitsExist = function() { + return this.waits > 0; +} + +/** + * Increment wait counter (use at the start of a process/function call) + */ +Util.prototype.incrementWaits = function() { + this.waits += 1; +} + +/** + * Decrement wait counter (use at the process/function completion) + */ +Util.prototype.decrementWaits = function() { + this.waits -= 1; } /** @@ -91,12 +122,14 @@ Util.prototype.remove = function(doc) { }; /** - * Log message. + * Wrapper for console.log() which prevents output if config.quiet === true * * @param {String} message: the message to log */ Util.prototype.log = function(message) { - console.log(message); + if (!this.config.quiet) { + console.log(message); + } }; /** @@ -123,7 +156,7 @@ Util.prototype.getAudit = function() { * @return queue */ Util.prototype.getQueue = function() { - console.log('| Retrieving page (' + this.config.pageSize + ' docs)'); + this.log('| Retrieving page (' + this.config.pageSize + ' docs)'); return this.queue; }; @@ -135,13 +168,32 @@ Util.prototype.resetQueue = function() { }; /** - * json-schema validation + * Validate based on json-schema + * + * Returns a promise. */ Util.prototype.validate = function(schema, data) { this.count('__validate'); const ajv = new Ajv({ allErrors: true }); // options can be passed, e.g. {allErrors: true} - const validate = ajv.compile(schema); - return validate(data); + const valid = ajv.validate(schema, data); + + if (valid) { + return Promise.resolve(); + } + + // If the schema does not validate, return a promise rejection with the reason + return Promise.reject(ajv.errorsText()); +} + +/** + * De-reference a json-schema reference file. + * + * Typically used with util.validate() to reference other json-schema + * files to allow re-use and avoid duplicating common schemas. + */ +Util.prototype.dereference = function(file) { + this.count('__dereference'); + return $RefParser.dereference(file); } /** @@ -161,6 +213,7 @@ Util.prototype.createReport = function(stat) { 'Pages retrieved : ' + stat.__pages + '\n' + 'Hashes computed : ' + stat.__hash + '\n' + 'Schemas validated : ' + stat.__validate + '\n' + + 'Schemas dereferenced : ' + stat.__dereference + '\n' + 'Objects audited : ' + stat.__audit + '\n' + 'docs-per-sec : ' + docsPerSecond + '\n' + 'save-per-sec : ' + savePerSecond + '\n' + @@ -168,7 +221,7 @@ Util.prototype.createReport = function(stat) { Object.keys(stat).forEach(prop => { if (!prop.match(/^__.+/)) { - report += 'count - ' + prop + ': ' + stat[prop] + '\n'; + report += '\ncount - ' + prop + ': ' + stat[prop]; } }); diff --git a/package.json b/package.json index c621f5f..63ef97b 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,7 @@ "json-schema", "schema validator" ], - "version": "0.6.0", + "version": "0.7.0", "homepage": "http://github.com/dsquier/couchit", "author": "David Squier (dsquier@gmail.com)", "main": "./lib/couchit", @@ -33,9 +33,10 @@ "lib": "./lib" }, "dependencies": { - "ajv": "^5.1.5", + "ajv": "^5.2.0", "async": "^2.4.1", - "commander": "^2.9.0", + "commander": "^2.10.0", + "json-schema-ref-parser": "^3.1.2", "nano": "^6.3.0", "node-object-hash": "1.2.0" },