Skip to content

Commit

Permalink
WIP, mostly works
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanramage committed Feb 26, 2016
1 parent 2d6c7a6 commit a283e49
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 15 deletions.
10 changes: 9 additions & 1 deletion bin/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,15 @@ var config = require('rc')('couch2elastic4sync', {
load: {
swallowErrors: false
},
checkpointSize: 20
checkpointSize: 20,
retry : {
times: 10,
interval: 200
},
shutdown: {
times: 5,
interval: 2000
}
})
if (!config.elasticsearch) {
console.log('No elasticsearch search.')
Expand Down
55 changes: 41 additions & 14 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
var async = require('async')
var follow = require('follow')
var request = require('request')
var jsonist = require('jsonist')
var remove_meta = require('./remove-meta')
var template = require('lodash.template')
var once = require('lodash.once')

module.exports = function (config, log, since) {
var _retry = async.retry.bind(null, config.retry)
var follow_config = {
db: config.database,
include_docs: true,
Expand All @@ -14,9 +17,30 @@ module.exports = function (config, log, since) {
var updating_checkpoint_counter = false
var last_seen_seq
var pending = {}
var _shutdown_attempts = 0
var _shutdown_last_pending

var shutdown = function () {
if (Object.keys(pending).length) return
log.info('shutdown called')
var _remain = Object.keys(pending).length
if (_remain > 0) {
if (!_shutdown_last_pending) _shutdown_last_pending = _remain
if (_remain < _shutdown_last_pending) {
_shutdown_last_pending = _remain
log.info({pending: _remain}, 'still processing')
return setTimeout(shutdown, config.shutdown.interval);
}


if (_shutdown_attempts > config.shutdown.times) {
log.error('shutdown wait time exceeded. shutting down anyway.')

} else {
log.info({pending: _remain, attempts: _shutdown_attempts++}, 'there are pending keys, sleeping')
return setTimeout(shutdown, config.shutdown.interval);
}
}

if (!last_seen_seq) return process.exit(0)
jsonist.put(config.seq_url, {_meta: {seq: last_seen_seq }}, function (err, resp) {
if (err) log.error('Could not record sequence in elasticsearch', last_seen_seq, err)
Expand All @@ -27,7 +51,8 @@ module.exports = function (config, log, since) {
}

var onDone = function (log, _id, _rev, type, seq, err, resp) {
last_seen_seq = seq
if (seq > last_seen_seq) last_seen_seq = seq
delete pending[seq]
if (err) return log.error('error occured', type, _id, _rev, err)
log.info({change: seq}, 'success. ', type, _id, _rev, err)
checkpoint_counter++
Expand All @@ -41,7 +66,7 @@ module.exports = function (config, log, since) {
log.info({change: seq}, 'stored in elasticsearch. ')
})
}
delete pending[seq]

}

var feed = follow(follow_config, function (err, change) {
Expand All @@ -52,11 +77,16 @@ module.exports = function (config, log, since) {

var doc = change.doc
var es_doc_url = config.elasticsearch + '/' + change.id
var compiled
if (config.urlTemplate) {
compiled = template(es_doc_url)
es_doc_url = compiled(doc)
}
var _onDone

if (doc._deleted) {
// delete the doc from es
if (config.urlTemplate) {
var compiled = template(es_doc_url)

// we need the prev doc. hack attack
request({
url: config.database + '/' + change.id + '?revs=true&open_revs=all'
Expand All @@ -73,21 +103,17 @@ module.exports = function (config, log, since) {
}, function (err, resp, _prev_doc) {
try {
es_doc_url = compiled(_prev_doc)
jsonist.delete(es_doc_url, onDone.bind(null, log, doc._id, null, 'delete', change.seq))
_retry(jsonist.delete.bind(null, es_doc_url), onDone.bind(null, log, doc._id, null, 'delete', change.seq))
} catch (e) {}
})
})
return
// end of finding prev doc for delete of a urlTemplate
} else {
return jsonist.delete(es_doc_url, onDone.bind(null, log, doc._id, null, 'delete', change.seq))
return _retry(jsonist.delete.bind(null, es_doc_url), onDone.bind(null, log, doc._id, null, 'delete', change.seq))
}
}
// adjust the url if its a urlTemplate
if (config.urlTemplate) {
var compiled = template(es_doc_url)
es_doc_url = compiled(doc)
}

var _rev = doc._rev
if (config.mapper) {
try {
Expand All @@ -106,13 +132,14 @@ module.exports = function (config, log, since) {
if (config.removeMeta) {
doc = remove_meta(doc)
}
jsonist.put(es_doc_url, doc, onDone.bind(null, log, change.doc._id, _rev, 'update', change.seq))
_retry(jsonist.put.bind(null, es_doc_url, doc), onDone.bind(null, log, change.doc._id, _rev, 'update', change.seq))
})

var _shutdown = once(shutdown)
if (config.endOnCatchup) {
feed.on('catchup', function () {
feed.pause()
setInterval(shutdown, 400)
setInterval(_shutdown, 400)
})
}

Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
"main": "./lib/index.js",
"bin": "./bin/cli.js",
"dependencies": {
"async": "^1.5.2",
"bunyan": "^1.5.1",
"follow": "^0.12.1",
"hyperquest": "^1.2.0",
"jsonfilter": "^1.1.2",
"jsonist": "^1.3.0",
"lodash.once": "^4.0.0",
"lodash.template": "^4.2.0",
"md5": "^2.0.0",
"mkdirp": "^0.5.1",
Expand Down

0 comments on commit a283e49

Please sign in to comment.