Skip to content

Commit

Permalink
Modularize implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
jessehansen committed Jan 8, 2016
1 parent e7acd1f commit da033dc
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 117 deletions.
17 changes: 9 additions & 8 deletions .eslintrc
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{
"env": {
"node": true
},
"extends": "leisurelink",
"rules": {
"no-console": 0,
"complexity": 0
}
"extends": "leisurelink",
"env": {
"node": true,
"es6": true
},
"rules": {
"no-console": 0,
"complexity": 0
}
}
123 changes: 20 additions & 103 deletions consul-kv-sync.js
Original file line number Diff line number Diff line change
@@ -1,53 +1,11 @@
#!/usr/bin/env node

'use strict';

/*eslint no-console: 0*/

var fs = require('fs');
var Promise = require('bluebird');
var program = require('commander');
var jptr = require('json-ptr');
var _ = require('lodash');
var consul = require('consul');

var pkg = require('./package.json');

var readFile = Promise.promisify(fs.readFile);
var firstFragmentId, client, flattened, reduced, deleteCount = 0, putCount = 0, existing = {};

function info(message) {
console.log(message);
}
function error(message) {
console.log(message);
}
function debug(message) {
if (program.verbose) {
console.log(message);
}
}

function readFragments(fileName) {
return readFile(fileName, 'utf8').then(JSON.parse).then(function validateContents(contents) {
var pointers, keys;
debug('Read ' + fileName);
debug(contents);
keys = _.keys(contents);
if (keys.length !== 1) {
return Promise.reject(new Error('Each configuration file must have a single top-level node identifying the service. "' + fileName + '" has ' + keys.length + ' top-level nodes.'));
}
pointers = jptr.list(contents);
if (firstFragmentId) {
if (pointers[1].pointer !== firstFragmentId) {
return Promise.reject(new Error('Each file must have the same top-level node. Expected "' + fileName + '" to have top-level node "' + firstFragmentId.substring(1) + '", but it has "' + pointers[1].pointer.substring(1) + '".'));
}
} else {
firstFragmentId = pointers[1].pointer;
}
return Promise.resolve(pointers);
});
}
const program = require('commander');
const pkg = require('./package');
const log = require('./lib/logger');
const clientFactory = require('./lib/client');
const workflowFactory = require('./lib/workflow');

function collectCA(value, items) {
items.push(value);
Expand Down Expand Up @@ -77,63 +35,22 @@ if (!program.args.length) {
program.outputHelp();
process.exit(1);
}
if (program.verbose) {
log.transports.console.level = 'debug';
}

Promise.all(_.map(program.ca, readFile)).then(function(certificates) {
var config = {
host: program.host || process.env.CONSUL_HOST || 'consul.service.consul',
port: program.port || process.env.CONSUL_PORT || 8500,
secure: program.secure || process.env.CONSUL_SECURE === 'true',
ca: certificates
};
debug('Config:');
debug(config);
client = consul(config);
Promise.promisifyAll(client.kv);

return Promise.all(_.map(program.args, readFragments));
}).then(function(files) {
var prefix;
flattened = _.flatten(files);
prefix = flattened[1].pointer.substring(1) + '/';

debug('Getting keys for ' + prefix);
return client.kv.keysAsync(prefix);
}).then(function(results) {
debug('Keys:');
debug(results);
existing = results;
}).catch(function(err) {
if (err.message === 'not found') {
debug('No existing keys found');
let client = clientFactory(program);
let workflow = workflowFactory(client, program.args);
workflow.exec().then(() => {
log.info('Sync completed. ' + workflow.stats.put + ' items set, ' + workflow.stats.deleted + ' items deleted.');
log.info('Config:');
log.info(workflow.config);
})
.catch((err) => {
if (program.verbose) {
log.error(err);
} else {
error(err);
process.exit(99);
log.error(err.message);
}
}).then(function() {
reduced = _.reduce(_.filter(flattened, function(x) {
return _.isString(x.value) || _.isFinite(x.value);
}), function(acc, item) {
acc[item.pointer.substring(1)] = item.value;
return acc;
}, {});
return Promise.all(_.map(reduced, function(value, key) {
putCount++;
existing = _.filter(existing, function(item) {
return item !== key;
});
debug('Setting "' + key + '" to "' + value + '"');
return client.kv.setAsync({
key: key,
value: ''+value
});
}));
}).then(function() {
return Promise.all(_.map(existing, function(key) {
deleteCount++;
return client.kv.delAsync(key);
}));
}).then(function() {
info('Sync completed. ' + putCount + ' items set, ' + deleteCount + ' items deleted.');
info('Config:');
info(reduced);
process.exit(99);
});
22 changes: 22 additions & 0 deletions lib/client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
'use strict';

const consul = require('consul');
const _ = require('lodash');
const Promise = require('bluebird');
const readFileSync = require('fs').readFileSync;
const log = require('./logger');

module.exports = (program) => {
const certificates = _.map(program.ca, readFileSync);
var config = {
host: program.host || process.env.CONSUL_HOST || 'consul.service.consul',
port: program.port || process.env.CONSUL_PORT || 8500,
secure: program.secure || process.env.CONSUL_SECURE === 'true',
ca: certificates
};
log.debug('Config: ', config);

let client = consul(config);
Promise.promisifyAll(client.kv);
return client;
};
9 changes: 9 additions & 0 deletions lib/logger.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
'use strict';

const SkinnyLoggins = require('@leisurelink/skinny-loggins');
const logger = new SkinnyLoggins();

logger.transports.console.timestamp = false;
logger.transports.console.showLevel = false;

module.exports = logger;
104 changes: 104 additions & 0 deletions lib/workflow.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
'use strict';

const jptr = require('json-ptr');
const Promise = require('bluebird');
const readFile = Promise.promisify(require('fs').readFile);
const _ = require('lodash');
const log = require('./logger');

module.exports = (client, files) => {
let workflow = { stats: { put: 0, deleted: 0 }, config: null };
let firstFragmentId;
const readFragments = (fileName) => {
return readFile(fileName, 'utf8')
.then(JSON.parse)
.then((contents) => {
log.debug(`Read ${fileName}:`);
log.debug(contents);
let keys = _.keys(contents);
if (keys.length !== 1) {
throw new Error('Each configuration file must have a single top-level node identifying the service. "' + fileName + '" has ' + keys.length + ' top-level nodes.');
}
let pointers = jptr.list(contents);
if (firstFragmentId) {
if (pointers[1].pointer !== firstFragmentId) {
throw new Error('Each file must have the same top-level node. Expected "' + fileName + '" to have top-level node "' + firstFragmentId.substring(1) + '", but it has "' + pointers[1].pointer.substring(1) + '".');
}
} else {
firstFragmentId = pointers[1].pointer;
}
return pointers;
});
};

const readFiles = () => {
return Promise.map(files, readFragments);
};

let flattened, prefix;
const flatten = (contents) => {
flattened = _.flatten(contents);
prefix = flattened[1].pointer.substring(1) + '/';
return flattened;
};

const reduce = (flattened) => {
let reduced = _.reduce(_.filter(flattened, (x) => {
return _.isString(x.value) || _.isFinite(x.value);
}), function (acc, item) {
acc[item.pointer.substring(1)] = item.value;
return acc;
}, {});
workflow.config = reduced;
return reduced;
};

let existing;
const getExistingKeys = () => {
log.debug('Getting all keys for ' + prefix);
return client.kv.keysAsync(prefix)
.catch((err) => {
if (err.message === 'not found') {
return [];
}
throw err;
})
.then((keys) => {
log.debug('Keys: ', keys);
existing = keys;
});
};

const put = () => {
return Promise.all(_.map(workflow.config, (value, key) => {
workflow.stats.put++;
existing = _.filter(existing, (item) => {
return item !== key;
});
log.debug(`Setting "${key}" to "${value}"`);
return client.kv.setAsync({
key: key,
value: '' + value
});
}));
};

const del = () => {
return Promise.map(existing, function(key) {
workflow.stats.deleted++;
log.debug(`Deleting "${key}"`);
return client.kv.delAsync(key);
});
};

workflow.exec = () => {
return readFiles()
.then(flatten)
.then(reduce)
.then(getExistingKeys)
.then(put)
.then(del);
};

return workflow;
};
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"ci": "npm run lint && npm run test"
},
"dependencies": {
"@leisurelink/skinny-loggins": "^0.3.2",
"bluebird": "^3.1.1",
"commander": "^2.9.0",
"consul": "^0.22.0",
Expand Down
13 changes: 7 additions & 6 deletions test/test.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
'use strict';

var exec = require('child_process').exec;
var expect = require('chai').expect;
var _ = require('lodash');
var Promise = require('bluebird');
var consul = require('consul');
const exec = require('child_process').exec;
const expect = require('chai').expect;
const _ = require('lodash');
const Promise = require('bluebird');
const consul = require('consul');

function execute(commandLine) {
return new Promise(function(resolve, reject) {
exec(commandLine, {
cwd: __dirname
cwd: __dirname,
env: process.env
}, function(err, stdout, stderr) {
if (err) {
reject(err);
Expand Down

0 comments on commit da033dc

Please sign in to comment.