From 703d95f5b2e888c216bda766bdb3fd910158a892 Mon Sep 17 00:00:00 2001 From: Nadav Fischer Date: Mon, 11 Nov 2013 17:22:36 +0200 Subject: [PATCH 01/11] Add HTTP input plugin --- .gitignore | 3 +- Readme.markdown | 18 ++++++++++ lib/inputs/input_http.js | 75 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 lib/inputs/input_http.js diff --git a/.gitignore b/.gitignore index b512c09..eb79dd5 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -node_modules \ No newline at end of file +node_modules +.idea diff --git a/Readme.markdown b/Readme.markdown index 49ac815..c8273af 100644 --- a/Readme.markdown +++ b/Readme.markdown @@ -198,6 +198,24 @@ Parameters: * ``pattern_channel``: use channel as pattern. Default value : false * ``unserializer``: please see above. Default value to ``json_logstash``. +HTTP +--- + +This plugin is used on log server to receive logs from an HTTP/HTTPS stream. This is useful in case the agent can only +output logs through an HTTP/HTTPS channel. + +Example: + +* ``input://http://localhost:8080`` + +Parameters: + +* ``type``: to specify the log type, to faciliate crawling in kibana. Example: ``type=http``. No default value. +* ``unserializer``: please see above. Default value to ``json_logstash``. +* ``proto``: the protocol to use (http or https) +* ``private``: full path to a private key (only valid in https protocol) +* ``public``: full path to a public certificate (only valid in https protocol) + Outputs and filter, commons parameters === diff --git a/lib/inputs/input_http.js b/lib/inputs/input_http.js new file mode 100644 index 0000000..6b96154 --- /dev/null +++ b/lib/inputs/input_http.js @@ -0,0 +1,75 @@ +var base_input = require('../lib/base_input'), + http = require('http'), + https = require('https'), + util = require('util'), + fs = require('fs'), + logger = require('log4node'); + +function InputHttp() { + base_input.BaseInput.call(this); + this.config = { + name: 'Http', + host_field: 'host', + port_field: 'port', + optional_params: ['type', 'unserializer', 'proto', 'private', 'public'], + default_values: { + 'unserializer': 'json_logstash', + 'proto': 'http', + 'private': '/etc/ssl/private.pem', + 'public': '/etc/ssl/public.pem' + } + } +} + +util.inherits(InputHttp, base_input.BaseInput); + +InputHttp.prototype.afterLoadConfig = function(callback) { + logger.info('Start listening on ' + this.proto, this.host + ':' + this.port); + + this.configure_unserialize(this.unserializer); + + this.serverCallback = function(request, response) { + request.on('data', function(data) { + this.unserialize_data(data, function(parsed) { + this.emit('data', parsed); + }.bind(this), function(data) { + this.emit('data', { + 'message': data.toString().trim(), + 'host': c.remoteAddress, + 'tcp_port': this.port, + 'type': this.type, + }); + }.bind(this)); + }.bind(this)); + request.on('error', function(err) { + this.emit('error', err); + }.bind(this)); + }.bind(this); + + if (this.proto == 'https') { + var options = { + key: fs.readFileSync(this.private), + cert: fs.readFileSync(this.public) + }; + this.server = https.createServer(options, this.serverCallback); + } else { + this.server = http.createServer(this.serverCallback); + } + + this.server.on('error', function(err) { + this.emit('init_error', err); + }.bind(this)); + + this.server.listen(this.port, this.host); + + this.server.once('listening', callback); +} + +InputHttp.prototype.close = function(callback) { + logger.info('Closing listening ' + this.proto, this.host + ':' + this.port); + this.server.close(callback); +} + +exports.create = function() { + return new InputHttp(); +} From 2b2aaa1dc0483c1a3984fafce3e14e2d92e60073 Mon Sep 17 00:00:00 2001 From: Nadav Fischer Date: Tue, 12 Nov 2013 10:40:36 +0200 Subject: [PATCH 02/11] make msgpack truly optional --- lib/lib/base_input.js | 11 +++++++++-- lib/lib/base_output.js | 10 ++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/lib/lib/base_input.js b/lib/lib/base_input.js index 23ce15f..11cd5ab 100644 --- a/lib/lib/base_input.js +++ b/lib/lib/base_input.js @@ -1,6 +1,6 @@ var base_component = require('./base_component'), util = require('util'), - msgpack = require('msgpack'), + _msgpack = null, logger = require('log4node'); function BaseInput() { @@ -22,6 +22,13 @@ BaseInput.prototype.init = function(url) { }.bind(this)); } +BaseInput.prototype.msgpack = function() { + if (!_msgpack) { + _msgpack = require('msgpack'); + } + return _msgpack; +} + BaseInput.prototype.configure_unserialize = function(serializer) { if (serializer == 'json_logstash') { this.unserialize_data = this.unserialize_data_json; @@ -48,7 +55,7 @@ BaseInput.prototype.unserialize_data_json = function(data, ok_callback, parse_fa BaseInput.prototype.unserialize_data_msgpack = function(data, ok_callback, parse_fail_callback) { try { - var parsed = msgpack.unpack(data); + var parsed = this.msgpack().unpack(data); if (parsed['message']) { return ok_callback(parsed); } diff --git a/lib/lib/base_output.js b/lib/lib/base_output.js index 8506ddb..75c3db8 100644 --- a/lib/lib/base_output.js +++ b/lib/lib/base_output.js @@ -1,6 +1,6 @@ var base_component = require('./base_component'), util = require('util'), - msgpack = require('msgpack'), + _msgpack = null, logger = require('log4node'); function BaseOutput() { @@ -27,6 +27,12 @@ BaseOutput.prototype.init = function(url) { }.bind(this)); } +BaseOutput.prototype.msgpack = function() { + if (!_msgpack) { + _msgpack = require('msgpack'); + } + return _msgpack; +} BaseOutput.prototype.configure_serialize = function(serializer, raw_format) { if (serializer == 'json_logstash') { this.serialize_data = function(data) { return JSON.stringify(data); }; @@ -40,7 +46,7 @@ BaseOutput.prototype.configure_serialize = function(serializer, raw_format) { } } else if (serializer == 'msgpack') { - this.serialize_data = function(data) { return msgpack.pack(data); }; + this.serialize_data = function(data) { return this.msgpack().pack(data); }; } else { throw new Error('Unknown serializer ' + serializer); From 906400f4ba201ceb24877d445c8fb26c17fe6ecf Mon Sep 17 00:00:00 2001 From: Nadav Fischer Date: Tue, 12 Nov 2013 12:38:12 +0200 Subject: [PATCH 03/11] check for @message in addition to message in validity of parsed json --- lib/lib/base_input.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/lib/base_input.js b/lib/lib/base_input.js index 11cd5ab..0c69f02 100644 --- a/lib/lib/base_input.js +++ b/lib/lib/base_input.js @@ -44,7 +44,7 @@ BaseInput.prototype.configure_unserialize = function(serializer) { BaseInput.prototype.unserialize_data_json = function(data, ok_callback, parse_fail_callback) { try { var parsed = JSON.parse(data); - if (parsed['message']) { + if (parsed['message'] || parsed['@message']) { return ok_callback(parsed); } } @@ -56,7 +56,7 @@ BaseInput.prototype.unserialize_data_json = function(data, ok_callback, parse_fa BaseInput.prototype.unserialize_data_msgpack = function(data, ok_callback, parse_fail_callback) { try { var parsed = this.msgpack().unpack(data); - if (parsed['message']) { + if (parsed['message'] || parsed['@message']) { return ok_callback(parsed); } } From 3633018278afa6f5f69b34e1093e17b373fd5255 Mon Sep 17 00:00:00 2001 From: Nadav Fischer Date: Tue, 12 Nov 2013 13:52:23 +0200 Subject: [PATCH 04/11] change default file input encoding to utf8 --- lib/lib/monitor_file.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/lib/monitor_file.js b/lib/lib/monitor_file.js index e4e1ef0..e1d2b11 100644 --- a/lib/lib/monitor_file.js +++ b/lib/lib/monitor_file.js @@ -104,7 +104,7 @@ MonitoredFile.prototype.start = function(start_index) { if (err) { return this.emit('error', err); } - if (bytesRead == last_data.length && last_data == buffer.toString(this.options.buffer_encoding || 'ascii', 0, last_data.length)) { + if (bytesRead == last_data.length && last_data == buffer.toString(this.options.buffer_encoding || 'utf8', 0, last_data.length)) { logger.info('Event changed received, but no data change and last data match', this.filename, 'fd', this.fdTailer.fd); } else { @@ -175,7 +175,7 @@ MonitoredFile.prototype.restart = function(start_index) { if (err) { return this.emit('error', err); } - if (bytesRead == last_data.length && last_data == buffer.toString(this.options.buffer_encoding || 'ascii', 0, last_data.length)) { + if (bytesRead == last_data.length && last_data == buffer.toString(this.options.buffer_encoding || 'utf8', 0, last_data.length)) { logger.info('Start from last read index', this.filename, 'at', file_status[this.filename].index, 'fd', fd); this.fdTailer = new FdTailer(fd, file_status[this.filename].index, this.options, stats.isFIFO(), this); this.fdTailer.read(); @@ -254,7 +254,7 @@ function FdTailer(fd, current_index, options, is_fifo, event_target) { this.current_index = current_index; this.is_fifo = is_fifo; this.buffer = new Buffer(options.buffer_size || 1024); - this.buffer_encoding = options.buffer_encoding || 'ascii'; + this.buffer_encoding = options.buffer_encoding || 'utf8'; this.read_in_progress = false; } From a4fdce7fd7429da5b203667313e8b8e6ea17100b Mon Sep 17 00:00:00 2001 From: Nadav Fischer Date: Wed, 13 Nov 2013 14:30:26 +0200 Subject: [PATCH 05/11] support http_post output through a http/https proxy via the 'tunnel' module --- lib/outputs/output_http_post.js | 36 +++++++++++++++++++++++++++++++-- package.json | 5 +++-- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/lib/outputs/output_http_post.js b/lib/outputs/output_http_post.js index 07a596f..9117d86 100644 --- a/lib/outputs/output_http_post.js +++ b/lib/outputs/output_http_post.js @@ -2,15 +2,19 @@ var abstract_http = require('./abstract_http'), util = require('util'), http = require('http'), https = require('https'), + url = require('url'), + fs = require('fs'), logger = require('log4node'), error_buffer = require('../lib/error_buffer'); function OutputHttpPost() { abstract_http.AbstractHttp.call(this); this.config.name = 'Http Post'; - this.config.required_params.push('path'); + this.config.optional_params.push('path'); this.config.optional_params.push('format'); this.config.optional_params.push('serializer'); + this.config.optional_params.push('proxy'); + this.config.default_values['path'] = '/'; this.config.default_values['format'] = '#{message}'; this.config.default_values['serializer'] = 'raw'; } @@ -19,6 +23,31 @@ util.inherits(OutputHttpPost, abstract_http.AbstractHttp); OutputHttpPost.prototype.afterLoadConfig = function(callback) { this.abstractAfterLoadConfig(function() { + if (this.proxy) { + var tunnel = require('tunnel'); + + var proxyUrl = url.parse(this.proxy); + var proxyType = proxyUrl.protocol ? proxyUrl.protocol.slice(0,-1) : 'http'; + var serverType = this.proto; + + // create the correct type of tunnel. + // Possible values are httpOverHttp, httpOverHttps, httpsOverHttp, httpsOverHttps + var tunnelType = serverType+'Over'+proxyType.charAt(0).toUpperCase()+proxyType.slice(1); + if (!tunnel[tunnelType]) { + throw new Error('Proxy tunnel type '+ tunnelType + ' is not supported'); + } + + this.tunnelingAgent = tunnel[tunnelType]({ + maxSockets: http.globalAgent.maxSocket, + proxy: { + host: proxyUrl.hostname, + port: proxyUrl.port, + proxyAuth: proxyUrl.auth + } + }); + delete proxyUrl.auth; + logger.info('Using ' + tunnelType + ' proxy ' + url.format(proxyUrl)); + } this.configure_serialize(this.serializer, this.format); callback(); }.bind(this)); @@ -32,8 +61,11 @@ OutputHttpPost.prototype.format_payload = function(data, callback) { port: this.port, path: path, method: 'POST', - headers: {'Content-Type': this.output_type == 'json' ? 'application/json' : 'text/plain'}, + headers: {'Content-Type': this.output_type == 'json' ? 'application/json' : 'text/plain'} }; + if (this.tunnelingAgent) { + http_options.agent = this.tunnelingAgent; + } var line = this.serialize_data(data); if (line) { callback(http_options, line); diff --git a/package.json b/package.json index 22ab2f8..8e7bfae 100644 --- a/package.json +++ b/package.json @@ -32,10 +32,11 @@ "zmq": "2.3.x", "moment": "1.7.0", "redis": "0.8.x", - "msgpack": "0.1.8" + "msgpack": "0.1.8", + "tunnel": "0.0.2" }, "directories": { "test": "./test", "bin": "./bin" } -} \ No newline at end of file +} From 7141058f7afa5743058b4a12d52c2ed5e953003e Mon Sep 17 00:00:00 2001 From: Nadav Fischer Date: Wed, 13 Nov 2013 15:00:39 +0200 Subject: [PATCH 06/11] support base64 proxy auth to hide the username and password when a proxy requires authentication (e.g. proxy=http://AsxxRSsCcdx==@proxy.com:8080) --- lib/outputs/output_http_post.js | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/lib/outputs/output_http_post.js b/lib/outputs/output_http_post.js index 9117d86..17cbec3 100644 --- a/lib/outputs/output_http_post.js +++ b/lib/outputs/output_http_post.js @@ -37,14 +37,31 @@ OutputHttpPost.prototype.afterLoadConfig = function(callback) { throw new Error('Proxy tunnel type '+ tunnelType + ' is not supported'); } - this.tunnelingAgent = tunnel[tunnelType]({ + var tunnelingOptions = { maxSockets: http.globalAgent.maxSocket, proxy: { host: proxyUrl.hostname, - port: proxyUrl.port, - proxyAuth: proxyUrl.auth + port: proxyUrl.port } - }); + }; + + // check if the auth part is base64 encoded. + // if there is no colon, then the assumption is that it's base64. + var auth = proxyUrl.auth; + if (auth) { + if (auth.indexOf(':') == -1) { + auth = new Buffer(auth, 'base64').toString('ascii'); + // if after decoding there still isn't a colon, then revert back to the original value + if (auth.indexOf(':') == -1) { + auth = proxyUrl.auth; + } + } + tunnelingOptions.proxy.proxyAuth = auth; + } + + // create the tunnel + this.tunnelingAgent = tunnel[tunnelType](tunnelingOptions); + delete proxyUrl.auth; logger.info('Using ' + tunnelType + ' proxy ' + url.format(proxyUrl)); } From b2cf23043ff5a1f87e6d5544ed4297f623b5c28a Mon Sep 17 00:00:00 2001 From: Nadav Fischer Date: Wed, 13 Nov 2013 15:18:22 +0200 Subject: [PATCH 07/11] update Readme for proxy configuration of http_post --- Readme.markdown | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Readme.markdown b/Readme.markdown index c8273af..eb3b86b 100644 --- a/Readme.markdown +++ b/Readme.markdown @@ -338,6 +338,11 @@ Parameters: * ``proto``: ``http`` or ``https``. Default value: ``http``. * ``serializer``: please see above. Default value to ``json_logstash``. * ``format``: please see above. Used by the ``raw``serializer. +* ``proxy``: url of a proxy that the http post request should be tunneled through. The proxy url must have the format ``http[s]://[userinfo@]hostname[:port]`` which gives support for: + * http and https proxies + * proxy authentication via userinfo ``username:password`` in plain text or in base64 encoding (i.e. ``dXNlcm5hbWU6cGFzc3dvcmQ=``) + * proxy port + Redis --- From e83b5765a910de5ae14b79d5d1e6429050eb3a8c Mon Sep 17 00:00:00 2001 From: Nadav Fischer Date: Wed, 13 Nov 2013 17:13:58 +0200 Subject: [PATCH 08/11] handle proxying to http (non-https) servers correctly --- lib/outputs/output_http_post.js | 87 ++++++++++++++++++++------------- 1 file changed, 52 insertions(+), 35 deletions(-) diff --git a/lib/outputs/output_http_post.js b/lib/outputs/output_http_post.js index 17cbec3..058e41d 100644 --- a/lib/outputs/output_http_post.js +++ b/lib/outputs/output_http_post.js @@ -21,49 +21,62 @@ function OutputHttpPost() { util.inherits(OutputHttpPost, abstract_http.AbstractHttp); -OutputHttpPost.prototype.afterLoadConfig = function(callback) { - this.abstractAfterLoadConfig(function() { - if (this.proxy) { - var tunnel = require('tunnel'); +OutputHttpPost.prototype.setupProxy = function() { + var tunnel = require('tunnel'); - var proxyUrl = url.parse(this.proxy); - var proxyType = proxyUrl.protocol ? proxyUrl.protocol.slice(0,-1) : 'http'; - var serverType = this.proto; + var proxyUrl = url.parse(this.proxy); + var proxyType = proxyUrl.protocol ? proxyUrl.protocol.slice(0,-1) : 'http'; + var serverType = this.proto; - // create the correct type of tunnel. - // Possible values are httpOverHttp, httpOverHttps, httpsOverHttp, httpsOverHttps - var tunnelType = serverType+'Over'+proxyType.charAt(0).toUpperCase()+proxyType.slice(1); - if (!tunnel[tunnelType]) { - throw new Error('Proxy tunnel type '+ tunnelType + ' is not supported'); + // check if the auth part is base64 encoded. + // if there is no colon, then the assumption is that it's base64. + var auth = proxyUrl.auth; + if (auth) { + if (auth.indexOf(':') == -1) { + auth = new Buffer(auth, 'base64').toString('ascii'); + // if after decoding there still isn't a colon, then revert back to the original value + if (auth.indexOf(':') == -1) { + auth = proxyUrl.auth; } + } + delete proxyUrl.auth; + } - var tunnelingOptions = { - maxSockets: http.globalAgent.maxSocket, - proxy: { - host: proxyUrl.hostname, - port: proxyUrl.port - } - }; + if (serverType == 'https') { + // create an https tunnel through the proxy. + // Possible values are httpOverHttp, httpOverHttps, httpsOverHttp, httpsOverHttps + var tunnelType = serverType+'Over'+proxyType.charAt(0).toUpperCase()+proxyType.slice(1); + if (!tunnel[tunnelType]) { + throw new Error('Proxy tunnel type '+ tunnelType + ' is not supported'); + } - // check if the auth part is base64 encoded. - // if there is no colon, then the assumption is that it's base64. - var auth = proxyUrl.auth; - if (auth) { - if (auth.indexOf(':') == -1) { - auth = new Buffer(auth, 'base64').toString('ascii'); - // if after decoding there still isn't a colon, then revert back to the original value - if (auth.indexOf(':') == -1) { - auth = proxyUrl.auth; - } - } - tunnelingOptions.proxy.proxyAuth = auth; + var tunnelingOptions = { + maxSockets: http.globalAgent.maxSocket, + proxy: { + host: proxyUrl.hostname, + port: proxyUrl.port, + proxyAuth: auth } + }; - // create the tunnel - this.tunnelingAgent = tunnel[tunnelType](tunnelingOptions); + // create the tunnel + this.tunnelingAgent = tunnel[tunnelType](tunnelingOptions); - delete proxyUrl.auth; - logger.info('Using ' + tunnelType + ' proxy ' + url.format(proxyUrl)); + } else { + // use a standard forwarding proxy + this.path = url.format({protocol: this.proto+':', hostname: this.host, port: this.port}); + this.host = proxyUrl.hostname; + this.port = proxyUrl.port; + this.proxyAuth = auth; + } + + logger.info('Using http proxy ' + url.format(proxyUrl)); +} + +OutputHttpPost.prototype.afterLoadConfig = function(callback) { + this.abstractAfterLoadConfig(function() { + if (this.proxy) { + this.setupProxy(); } this.configure_serialize(this.serializer, this.format); callback(); @@ -83,8 +96,12 @@ OutputHttpPost.prototype.format_payload = function(data, callback) { if (this.tunnelingAgent) { http_options.agent = this.tunnelingAgent; } + if (this.proxyAuth) { + http_options.headers['Proxy-Authorization'] = 'Basic ' + new Buffer(this.proxyAuth).toString('base64'); + } var line = this.serialize_data(data); if (line) { + http_options.headers['Content-Length'] = line.length; callback(http_options, line); } } From 2b034b0c9604f0e5ec33e59bbcab087b929333ab Mon Sep 17 00:00:00 2001 From: Nadav Fischer Date: Thu, 14 Nov 2013 09:57:00 +0200 Subject: [PATCH 09/11] Fix merging from bpaquet/node-logstash --- lib/lib/base_output.js | 6 ------ lib/outputs/output_http_post.js | 5 ++++- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/lib/lib/base_output.js b/lib/lib/base_output.js index 98847a6..baf9c6b 100644 --- a/lib/lib/base_output.js +++ b/lib/lib/base_output.js @@ -26,12 +26,6 @@ BaseOutput.prototype.init = function(url) { }.bind(this)); } -BaseOutput.prototype.msgpack = function() { - if (!_msgpack) { - _msgpack = require('msgpack'); - } - return _msgpack; -} BaseOutput.prototype.configure_serialize = function(serializer, raw_format) { if (serializer == 'json_logstash') { this.serialize_data = function(data) { diff --git a/lib/outputs/output_http_post.js b/lib/outputs/output_http_post.js index fde3405..7e8663b 100644 --- a/lib/outputs/output_http_post.js +++ b/lib/outputs/output_http_post.js @@ -1,5 +1,8 @@ var abstract_http = require('./abstract_http'), - util = require('util'); + util = require('util'), + http = require('http'), + url = require('url'), + logger = require('log4node'); function OutputHttpPost() { abstract_http.AbstractHttp.call(this); From 9eeaab7a4e970756c77f6594f8ac3a7a2981e5dd Mon Sep 17 00:00:00 2001 From: Nadav Fischer Date: Thu, 14 Nov 2013 10:07:31 +0200 Subject: [PATCH 10/11] Fix merging from bpaquet/node-logstash --- lib/lib/base_input.js | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/lib/lib/base_input.js b/lib/lib/base_input.js index 6c4125f..a5123c2 100644 --- a/lib/lib/base_input.js +++ b/lib/lib/base_input.js @@ -21,14 +21,7 @@ BaseInput.prototype.init = function(url) { }.bind(this)); } -BaseInput.prototype.msgpack = function() { - if (!_msgpack) { - _msgpack = require('msgpack'); - } - return _msgpack; -} - -BaseInput.prototype.configure_unserialize = function(serializer) { +aseInput.prototype.configure_unserialize = function(serializer) { if (serializer == 'json_logstash') { this.unserialize_data = this.unserialize_data_json; } From a4427c53c97c32021917339f7125813fe68bfe11 Mon Sep 17 00:00:00 2001 From: Nadav Fischer Date: Thu, 14 Nov 2013 10:08:22 +0200 Subject: [PATCH 11/11] Fix merging from bpaquet/node-logstash --- lib/lib/base_input.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/lib/base_input.js b/lib/lib/base_input.js index a5123c2..fa984d9 100644 --- a/lib/lib/base_input.js +++ b/lib/lib/base_input.js @@ -21,7 +21,7 @@ BaseInput.prototype.init = function(url) { }.bind(this)); } -aseInput.prototype.configure_unserialize = function(serializer) { +BaseInput.prototype.configure_unserialize = function(serializer) { if (serializer == 'json_logstash') { this.unserialize_data = this.unserialize_data_json; }