Skip to content
This repository has been archived by the owner on Jan 22, 2018. It is now read-only.

Commit

Permalink
Added backend code for sqs insight page
Browse files Browse the repository at this point in the history
  • Loading branch information
aklinkert committed Oct 20, 2015
0 parents commit eaae6e8
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
config/config_local.json
node_modules
npm-debug.log
.tmp
9 changes: 9 additions & 0 deletions client/app.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
require('bootstrap');
require('socket.io-client');
require('angular-socket-io');

var angular = require('angular');

angular
.module('sqsInsight', [require('angular-ui-bootstrap'), 'btford.socket-io'])
.directive('sqsInsight', require('./sqsInsightDirective'));
27 changes: 27 additions & 0 deletions client/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<!DOCTYPE html>
<html lang="en">
<head>
<title>SQS Insight</title>

<!-- Required meta tags always come first -->
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta http-equiv="x-ua-compatible" content="ie=edge">

<!-- Bootstrap CSS -->
<link rel="stylesheet" href="/bootstrap.css"/>
</head>

<body ng-app="sqsInsight" ng-cloak>

<div class="container">
<navigation></navigation>

<div ui-view></div>
</div>

<script type="text/javascript" src="http://localhost:3000/socket.io/socket.io.js"></script>

<script type="text/javascript" src="/app.js"></script>
</body>
</html>
9 changes: 9 additions & 0 deletions client/sqsInsightDirective.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module.exports = QueueInsightDirective;

QueueInsightDirective.$inject = [];

function QueueInsightDirective() {
return {

};
}
17 changes: 17 additions & 0 deletions config/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"port": 3000,
"endpoints": [
{
"key": "notValidKey",
"secretKey": "notValidSecret",
"region": "eu-central-1",
"url": "http://sqs.amazonaws.com/my-user/my-queue"
},
{
"key": "notValidKey",
"secretKey": "notValidSecret",
"region": "eu-central-1",
"url": "http://sqs.amazonaws.com/my-user/my-queue-2"
}
]
}
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module.exports = require('./lib');
50 changes: 50 additions & 0 deletions lib/QueueMessageDispatcher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
module.exports = QueueMessageDispatcher;

var Consumer = require('sqs-consumer'),
AWS = require('aws-sdk');

Consumer.prototype._deleteMessage = function () {};

function QueueMessageDispatcher(eventEmitter) {
this.eventEmitter = eventEmitter;
this.consumer = {};
}

QueueMessageDispatcher.prototype.addConsumer = function (endpoint) {
var that = this;

AWS.config.update({
region: endpoint.region,
accessKeyId: endpoint.key,
secretAccessKey: endpoint.secretKey
});

var consumer = Consumer.create({
sqs: new AWS.SQS(),
region: endpoint.region,
queueUrl: endpoint.url,
batchSize: 1,
visibilityTimeout: 1,
waitTimeSeconds: 20,

handleMessage: function (message, done) {
that.eventEmitter.emit('queue-message', endpoint, message);
done();
}
});

consumer.on('error', function (err) {
that.eventEmitter.emit('queue-error', endpoint, err);
});

consumer.start();

this.consumer[endpoint.url] = consumer;
};

QueueMessageDispatcher.prototype.stopConsumer = function (endpoint) {
if (endpoint.url in this.consumer && this.consumer[endpoint.url]) {
this.consumer[endpoint.url].stop();
delete this.consumer[endpoint.url];
}
};
98 changes: 98 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
var browserify = require('browserify-middleware'),
socket = require('socket.io'),
express = require('express'),
http = require('http'),
fs = require('q-io/fs'),
chalk = require('chalk'),
util = require('util'),

app = express(),
server = http.Server(app),
io = socket(server, { serveClient: false }),

EventEmitter = require('events').EventEmitter,
emitter = new EventEmitter(),

QueueMessageDispatcher = require('./QueueMessageDispatcher'),
dispatcher = new QueueMessageDispatcher(emitter),

configLocalFile = __dirname + '/../config/config_local.json',
configFile = __dirname + '/../config/config.json',
config;

// http routes
app.use('/app.js', browserify(__dirname + '/../client/app.js'));
app.use('/bootstrap.css', express['static'](__dirname + '/../node_modules/bootstrap/dist/css/bootstrap.min.css'));
app.use('/', express['static'](__dirname + '/../client'));
app.get('/queues', function (req, res) {
var names = [];
config.endpoints.forEach(function (elem) {
names.push(elem.url.split('/').pop());
});
res.json(names);
})

// load local config, is not exists use default fallback.
fs.exists(configLocalFile).then(function (exists) {
return exists ? configLocalFile : configFile;

}).then(function (file) {
console.log(chalk.grey(util.format('Loading config file from "%s"', file)));
return fs.read(file);

}).then(function (json) {
config = JSON.parse(json);
if (!config.endpoints || typeof config.endpoints !== 'object' || config.endpoints.length < 1) {
throw new Error('Invalid endpoints array in config');
}

}).then(function () {
console.log(chalk.grey(util.format('Config contains %d queues.', config.endpoints.length)));

for(var endpointIndex in config.endpoints) {
if (!config.endpoints.hasOwnProperty(endpointIndex)) {
continue;
}

var endpoint = config.endpoints[endpointIndex];
console.log(chalk.white('Adding consumer for ' + endpoint.url));
dispatcher.addConsumer(endpoint);
}

}).then(function () {
emitter.on('queue-message', function (endpoint, message) {
console.log(chalk.blue(util.format('Received message from queue "%s": %s', endpoint.url, JSON.stringify(message))));

io.emit('queue-message', endpoint.url, message);
});

emitter.on('queue-error', function (endpoint, error) {

if (/AWS\.SimpleQueueService\.NonExistentQueue/.test(error.message)) {
dispatcher.stopConsumer(endpoint);
console.log(chalk.red(util.format('Queue "%s" does not exist. Consumer was stop for that specific queue.', endpoint.url)));

return;
}

/**
* just ignore this. It seems, that AWS.SQS has a problem with receiving nothing from elasticmq.
* @see http://www.multiasking.com/blog/xml2js-sax-js-non-whitespace-before-first-tag/
*/
if (/Non-whitespace before first tag/.test(error.message)) {
return;
}

console.log(chalk.red(util.format('Error on queue "%s": %s', endpoint.url, error.message)));
});

app.listen(config.port, function () {
console.log(chalk.green('listening on port ' + config.port));
});

}).catch(function (err) {

console.error(err);
console.error('exiting ... ');
process.exit(1);
});
39 changes: 39 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{
"name": "sqs-insight",
"version": "0.1.0",
"description": "Gives a small insight into sqs queues.",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"repository": {
"type": "git",
"url": "git+ssh://[email protected]/finanzcheck/sqs-insight.git"
},
"keywords": [
"AWS",
"SQS",
"Queue",
"Insight",
"Node.JS"
],
"author": "Alexander Pinnecke <[email protected]>",
"license": "MIT",
"bugs": {
"url": "https://github.com/finanzcheck/sqs-insight/issues"
},
"homepage": "https://github.com/finanzcheck/sqs-insight#readme",
"dependencies": {
"angular": "^1.4.7",
"angular-ui-bootstrap": "^0.14.2",
"aws-sdk": "^2.2.10",
"bootstrap": "^3.3.5",
"browserify-middleware": "^7.0.0",
"chalk": "^1.1.1",
"express": "^4.13.3",
"q-io": "^1.13.1",
"socket.io": "^1.3.7",
"socket.io-client": "^1.3.7",
"sqs-consumer": "apinnecke/sqs-consumer"
}
}

0 comments on commit eaae6e8

Please sign in to comment.