Skip to content

Commit

Permalink
Merge pull request ryanramage#6 from ryanramage/issue-4
Browse files Browse the repository at this point in the history
Issue 4
  • Loading branch information
ryanramage committed Feb 27, 2016
2 parents 2d6c7a6 + 1cf8918 commit caf9fbb
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 31 deletions.
8 changes: 6 additions & 2 deletions bin/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ var config = require('rc')('couch2elastic4sync', {
load: {
swallowErrors: false
},
checkpointSize: 20
concurrency: 5,
checkpointSize: 20,
retry: {
times: 10,
interval: 200
}
})
if (!config.elasticsearch) {
console.log('No elasticsearch search.')
Expand Down Expand Up @@ -75,7 +80,6 @@ function getLogFile (config) {
return log
}


function getSince (config, cb) {
if (config.since) return cb(null, config.since)
jsonist.get(config.seq_url, function (err, data) {
Expand Down
83 changes: 54 additions & 29 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
var async = require('async')
var follow = require('follow')
var request = require('request')
var jsonist = require('jsonist')
var remove_meta = require('./remove-meta')
var template = require('lodash.template')

module.exports = function (config, log, since) {
var _retry = async.retry.bind(null, config.retry)
var follow_config = {
db: config.database,
include_docs: true,
Expand All @@ -13,10 +15,9 @@ module.exports = function (config, log, since) {
var checkpoint_counter = 0
var updating_checkpoint_counter = false
var last_seen_seq
var pending = {}

var shutdown = function () {
if (Object.keys(pending).length) return
log.info('shutdown called')
if (!last_seen_seq) return process.exit(0)
jsonist.put(config.seq_url, {_meta: {seq: last_seen_seq }}, function (err, resp) {
if (err) log.error('Could not record sequence in elasticsearch', last_seen_seq, err)
Expand All @@ -26,9 +27,12 @@ module.exports = function (config, log, since) {

}

var onDone = function (log, _id, _rev, type, seq, err, resp) {
last_seen_seq = seq
if (err) return log.error('error occured', type, _id, _rev, err)
var onDone = function (endTask, log, _id, _rev, type, seq, err, resp) {
if (seq > last_seen_seq) last_seen_seq = seq
if (err) {
log.error('error occured', type, _id, _rev, err)
return endTask()
}
log.info({change: seq}, 'success. ', type, _id, _rev, err)
checkpoint_counter++
if (!updating_checkpoint_counter && checkpoint_counter > config.checkpointSize) {
Expand All @@ -37,30 +41,38 @@ module.exports = function (config, log, since) {
// store the thing change seq
jsonist.put(config.seq_url, {_meta: {seq: seq }}, function (err, resp) {
updating_checkpoint_counter = false
if (err) return log.error('Could not record sequence in elasticsearch', seq, err)
log.info({change: seq}, 'stored in elasticsearch. ')
if (err) log.error('Could not record sequence in elasticsearch', seq, err)
else log.info({change: seq}, 'stored in elasticsearch. ')
return endTask()
})
}
delete pending[seq]
}

var feed = follow(follow_config, function (err, change) {
if (err) return log.error(err)
if (change.id.indexOf('_design') === 0) return
return endTask()
}

pending[change.seq] = true
var q = async.queue(function (change, endTask) {
if (change.id.indexOf('_design') === 0) return endTask()

var doc = change.doc
var es_doc_url = config.elasticsearch + '/' + change.id
var compiled
if (config.urlTemplate) {
compiled = template(es_doc_url)
es_doc_url = compiled(doc)
}

if (doc._deleted) {
// delete the doc from es
if (config.urlTemplate) {
var compiled = template(es_doc_url)

// we need the prev doc. hack attack
request({
url: config.database + '/' + change.id + '?revs=true&open_revs=all'
}, function (err, resp, body) {
if (err) {
log.error(err)
return endTask()
}

// total hacky
var _json = JSON.parse(body.split('\n')[3])
var _rev = (_json._revisions.start - 1) + '-' + _json._revisions.ids[1]
Expand All @@ -71,48 +83,61 @@ module.exports = function (config, log, since) {
},
json: true
}, function (err, resp, _prev_doc) {
if (err) {
log.error(err)
return endTask()
}

try {
es_doc_url = compiled(_prev_doc)
jsonist.delete(es_doc_url, onDone.bind(null, log, doc._id, null, 'delete', change.seq))
} catch (e) {}
_retry(jsonist.delete.bind(null, es_doc_url), onDone.bind(null, endTask, log, doc._id, null, 'delete', change.seq))
} catch (e) {
log.error(e)
return endTask()
}
})
})
return
// end of finding prev doc for delete of a urlTemplate

} else {
return jsonist.delete(es_doc_url, onDone.bind(null, log, doc._id, null, 'delete', change.seq))
return _retry(jsonist.delete.bind(null, es_doc_url), onDone.bind(null, endTask, log, doc._id, null, 'delete', change.seq))
}
}
// adjust the url if its a urlTemplate
if (config.urlTemplate) {
var compiled = template(es_doc_url)
es_doc_url = compiled(doc)
}

var _rev = doc._rev
if (config.mapper) {
try {
var mapped = config.mapper(change.doc)
if (mapped && config.addRaw) mapped[config.rawField] = change.doc
doc = mapped
} catch (e) {
delete pending[change.seq]
log.error(e)
return log.error({change: feed.original_db_seq}, change.doc._id, _rev, 'An error occured in the mapping', e)
}
}
if (!doc) {
delete pending[change.seq]
return log.error({change: feed.original_db_seq}, change.doc._id, _rev, 'No document came back from the mapping')
}
if (config.removeMeta) {
doc = remove_meta(doc)
}
jsonist.put(es_doc_url, doc, onDone.bind(null, log, change.doc._id, _rev, 'update', change.seq))
return _retry(jsonist.put.bind(null, es_doc_url, doc), onDone.bind(null, endTask, log, change.doc._id, _rev, 'update', change.seq))

}, config.concurrency)

var _caughtUp = false
q.drain = function () {
if (_caughtUp) setInterval(shutdown, 400)
}

var feed = follow(follow_config, function (err, change) {
if (err) return log.error(err)
q.push(change)
})

if (config.endOnCatchup) {
feed.on('catchup', function () {
feed.pause()
setInterval(shutdown, 400)
_caughtUp = true
})
}

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"main": "./lib/index.js",
"bin": "./bin/cli.js",
"dependencies": {
"async": "^1.5.2",
"bunyan": "^1.5.1",
"follow": "^0.12.1",
"hyperquest": "^1.2.0",
Expand Down

0 comments on commit caf9fbb

Please sign in to comment.