diff --git a/bin/cli.js b/bin/cli.js index 46e73c1..f0ba6df 100755 --- a/bin/cli.js +++ b/bin/cli.js @@ -16,7 +16,15 @@ var config = require('rc')('couch2elastic4sync', { load: { swallowErrors: false }, - checkpointSize: 20 + checkpointSize: 20, + retry : { + times: 10, + interval: 200 + }, + shutdown: { + times: 5, + interval: 2000 + } }) if (!config.elasticsearch) { console.log('No elasticsearch search.') diff --git a/lib/index.js b/lib/index.js index f526726..b0ad756 100644 --- a/lib/index.js +++ b/lib/index.js @@ -1,10 +1,13 @@ +var async = require('async') var follow = require('follow') var request = require('request') var jsonist = require('jsonist') var remove_meta = require('./remove-meta') var template = require('lodash.template') +var once = require('lodash.once') module.exports = function (config, log, since) { + var _retry = async.retry.bind(null, config.retry) var follow_config = { db: config.database, include_docs: true, @@ -14,9 +17,30 @@ module.exports = function (config, log, since) { var updating_checkpoint_counter = false var last_seen_seq var pending = {} + var _shutdown_attempts = 0 + var _shutdown_last_pending var shutdown = function () { - if (Object.keys(pending).length) return + log.info('shutdown called') + var _remain = Object.keys(pending).length + if (_remain > 0) { + if (!_shutdown_last_pending) _shutdown_last_pending = _remain + if (_remain < _shutdown_last_pending) { + _shutdown_last_pending = _remain + log.info({pending: _remain}, 'still processing') + return setTimeout(shutdown, config.shutdown.interval); + } + + + if (_shutdown_attempts > config.shutdown.times) { + log.error('shutdown wait time exceeded. shutting down anyway.') + + } else { + log.info({pending: _remain, attempts: _shutdown_attempts++}, 'there are pending keys, sleeping') + return setTimeout(shutdown, config.shutdown.interval); + } + } + if (!last_seen_seq) return process.exit(0) jsonist.put(config.seq_url, {_meta: {seq: last_seen_seq }}, function (err, resp) { if (err) log.error('Could not record sequence in elasticsearch', last_seen_seq, err) @@ -27,7 +51,8 @@ module.exports = function (config, log, since) { } var onDone = function (log, _id, _rev, type, seq, err, resp) { - last_seen_seq = seq + if (seq > last_seen_seq) last_seen_seq = seq + delete pending[seq] if (err) return log.error('error occured', type, _id, _rev, err) log.info({change: seq}, 'success. ', type, _id, _rev, err) checkpoint_counter++ @@ -41,7 +66,7 @@ module.exports = function (config, log, since) { log.info({change: seq}, 'stored in elasticsearch. ') }) } - delete pending[seq] + } var feed = follow(follow_config, function (err, change) { @@ -52,11 +77,16 @@ module.exports = function (config, log, since) { var doc = change.doc var es_doc_url = config.elasticsearch + '/' + change.id + var compiled + if (config.urlTemplate) { + compiled = template(es_doc_url) + es_doc_url = compiled(doc) + } + var _onDone + if (doc._deleted) { // delete the doc from es if (config.urlTemplate) { - var compiled = template(es_doc_url) - // we need the prev doc. hack attack request({ url: config.database + '/' + change.id + '?revs=true&open_revs=all' @@ -73,21 +103,17 @@ module.exports = function (config, log, since) { }, function (err, resp, _prev_doc) { try { es_doc_url = compiled(_prev_doc) - jsonist.delete(es_doc_url, onDone.bind(null, log, doc._id, null, 'delete', change.seq)) + _retry(jsonist.delete.bind(null, es_doc_url), onDone.bind(null, log, doc._id, null, 'delete', change.seq)) } catch (e) {} }) }) return // end of finding prev doc for delete of a urlTemplate } else { - return jsonist.delete(es_doc_url, onDone.bind(null, log, doc._id, null, 'delete', change.seq)) + return _retry(jsonist.delete.bind(null, es_doc_url), onDone.bind(null, log, doc._id, null, 'delete', change.seq)) } } - // adjust the url if its a urlTemplate - if (config.urlTemplate) { - var compiled = template(es_doc_url) - es_doc_url = compiled(doc) - } + var _rev = doc._rev if (config.mapper) { try { @@ -106,13 +132,14 @@ module.exports = function (config, log, since) { if (config.removeMeta) { doc = remove_meta(doc) } - jsonist.put(es_doc_url, doc, onDone.bind(null, log, change.doc._id, _rev, 'update', change.seq)) + _retry(jsonist.put.bind(null, es_doc_url, doc), onDone.bind(null, log, change.doc._id, _rev, 'update', change.seq)) }) + var _shutdown = once(shutdown) if (config.endOnCatchup) { feed.on('catchup', function () { feed.pause() - setInterval(shutdown, 400) + setInterval(_shutdown, 400) }) } diff --git a/package.json b/package.json index 36c7be6..258c846 100644 --- a/package.json +++ b/package.json @@ -5,11 +5,13 @@ "main": "./lib/index.js", "bin": "./bin/cli.js", "dependencies": { + "async": "^1.5.2", "bunyan": "^1.5.1", "follow": "^0.12.1", "hyperquest": "^1.2.0", "jsonfilter": "^1.1.2", "jsonist": "^1.3.0", + "lodash.once": "^4.0.0", "lodash.template": "^4.2.0", "md5": "^2.0.0", "mkdirp": "^0.5.1",