Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HTTP input plugin #44

Closed
wants to merge 12 commits into from
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
node_modules
node_modules
.idea
23 changes: 23 additions & 0 deletions Readme.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,24 @@ Parameters:
* ``pattern_channel``: use channel as pattern. Default value : false
* ``unserializer``: please see above. Default value to ``json_logstash``.

HTTP
---

This plugin is used on log server to receive logs from an HTTP/HTTPS stream. This is useful in case the agent can only
output logs through an HTTP/HTTPS channel.

Example:

* ``input://http://localhost:8080``

Parameters:

* ``type``: to specify the log type, to faciliate crawling in kibana. Example: ``type=http``. No default value.
* ``unserializer``: please see above. Default value to ``json_logstash``.
* ``proto``: the protocol to use (http or https)
* ``private``: full path to a private key (only valid in https protocol)
* ``public``: full path to a public certificate (only valid in https protocol)

Outputs and filter, commons parameters
===

Expand Down Expand Up @@ -321,6 +339,11 @@ Parameters:
* ``proto``: ``http`` or ``https``. Default value: ``http``.
* ``serializer``: please see above. Default value to ``json_logstash``.
* ``format``: please see above. Used by the ``raw``serializer.
* ``proxy``: url of a proxy that the http post request should be tunneled through. The proxy url must have the format ``http[s]://[userinfo@]hostname[:port]`` which gives support for:
* http and https proxies
* proxy authentication via userinfo ``username:password`` in plain text or in base64 encoding (i.e. ``dXNlcm5hbWU6cGFzc3dvcmQ=``)
* proxy port


Redis
---
Expand Down
75 changes: 75 additions & 0 deletions lib/inputs/input_http.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
var base_input = require('../lib/base_input'),
http = require('http'),
https = require('https'),
util = require('util'),
fs = require('fs'),
logger = require('log4node');

function InputHttp() {
base_input.BaseInput.call(this);
this.config = {
name: 'Http',
host_field: 'host',
port_field: 'port',
optional_params: ['type', 'unserializer', 'proto', 'private', 'public'],
default_values: {
'unserializer': 'json_logstash',
'proto': 'http',
'private': '/etc/ssl/private.pem',
'public': '/etc/ssl/public.pem'
}
}
}

util.inherits(InputHttp, base_input.BaseInput);

InputHttp.prototype.afterLoadConfig = function(callback) {
logger.info('Start listening on ' + this.proto, this.host + ':' + this.port);

this.configure_unserialize(this.unserializer);

this.serverCallback = function(request, response) {
request.on('data', function(data) {
this.unserialize_data(data, function(parsed) {
this.emit('data', parsed);
}.bind(this), function(data) {
this.emit('data', {
'message': data.toString().trim(),
'host': c.remoteAddress,
'tcp_port': this.port,
'type': this.type,
});
}.bind(this));
}.bind(this));
request.on('error', function(err) {
this.emit('error', err);
}.bind(this));
}.bind(this);

if (this.proto == 'https') {
var options = {
key: fs.readFileSync(this.private),
cert: fs.readFileSync(this.public)
};
this.server = https.createServer(options, this.serverCallback);
} else {
this.server = http.createServer(this.serverCallback);
}

this.server.on('error', function(err) {
this.emit('init_error', err);
}.bind(this));

this.server.listen(this.port, this.host);

this.server.once('listening', callback);
}

InputHttp.prototype.close = function(callback) {
logger.info('Closing listening ' + this.proto, this.host + ':' + this.port);
this.server.close(callback);
}

exports.create = function() {
return new InputHttp();
}
73 changes: 70 additions & 3 deletions lib/outputs/output_http_post.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,80 @@
var abstract_http = require('./abstract_http'),
util = require('util');
util = require('util'),
http = require('http'),
url = require('url'),
logger = require('log4node');

function OutputHttpPost() {
abstract_http.AbstractHttp.call(this);
this.config.name = 'Http Post';
this.config.required_params.push('path');
this.config.optional_params.push('path');
this.config.optional_params.push('format');
this.config.optional_params.push('serializer');
this.config.optional_params.push('proxy');
this.config.default_values['path'] = '/';
this.config.default_values['format'] = '#{message}';
this.config.default_values['serializer'] = 'raw';
}

util.inherits(OutputHttpPost, abstract_http.AbstractHttp);

OutputHttpPost.prototype.setupProxy = function() {
var tunnel = require('tunnel');

var proxyUrl = url.parse(this.proxy);
var proxyType = proxyUrl.protocol ? proxyUrl.protocol.slice(0,-1) : 'http';
var serverType = this.proto;

// check if the auth part is base64 encoded.
// if there is no colon, then the assumption is that it's base64.
var auth = proxyUrl.auth;
if (auth) {
if (auth.indexOf(':') == -1) {
auth = new Buffer(auth, 'base64').toString('ascii');
// if after decoding there still isn't a colon, then revert back to the original value
if (auth.indexOf(':') == -1) {
auth = proxyUrl.auth;
}
}
delete proxyUrl.auth;
}

if (serverType == 'https') {
// create an https tunnel through the proxy.
// Possible values are httpOverHttp, httpOverHttps, httpsOverHttp, httpsOverHttps
var tunnelType = serverType+'Over'+proxyType.charAt(0).toUpperCase()+proxyType.slice(1);
if (!tunnel[tunnelType]) {
throw new Error('Proxy tunnel type '+ tunnelType + ' is not supported');
}

var tunnelingOptions = {
maxSockets: http.globalAgent.maxSocket,
proxy: {
host: proxyUrl.hostname,
port: proxyUrl.port,
proxyAuth: auth
}
};

// create the tunnel
this.tunnelingAgent = tunnel[tunnelType](tunnelingOptions);

} else {
// use a standard forwarding proxy
this.path = url.format({protocol: this.proto+':', hostname: this.host, port: this.port});
this.host = proxyUrl.hostname;
this.port = proxyUrl.port;
this.proxyAuth = auth;
}

logger.info('Using http proxy ' + url.format(proxyUrl));
}

OutputHttpPost.prototype.afterLoadConfig = function(callback) {
this.abstractAfterLoadConfig(function() {
if (this.proxy) {
this.setupProxy();
}
this.configure_serialize(this.serializer, this.format);
callback();
}.bind(this));
Expand All @@ -28,10 +88,17 @@ OutputHttpPost.prototype.format_payload = function(data, callback) {
port: this.port,
path: path,
method: 'POST',
headers: {'Content-Type': this.output_type == 'json' ? 'application/json' : 'text/plain'},
headers: {'Content-Type': this.output_type == 'json' ? 'application/json' : 'text/plain'}
};
if (this.tunnelingAgent) {
http_options.agent = this.tunnelingAgent;
}
if (this.proxyAuth) {
http_options.headers['Proxy-Authorization'] = 'Basic ' + new Buffer(this.proxyAuth).toString('base64');
}
var line = this.serialize_data(data);
if (line) {
http_options.headers['Content-Length'] = line.length;
callback(http_options, line);
}
}
Expand Down
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@
"zmq": "2.3.x",
"moment": "1.7.0",
"redis": "0.8.x",
"msgpack": "0.1.8"
"msgpack": "0.1.8",
"tunnel": "0.0.2"
},
"directories": {
"test": "./test",
"bin": "./bin"
}
}
}