Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding Sumo Logic upload capability #19

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 92 additions & 2 deletions bin/btmon.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/python -u
__version__ = '3.2.0'
__version__ = '3.2.1'
"""Data collector/processor for Brultech monitoring devices.

Collect data from Brultech ECM-1240, ECM-1220, and GEM power monitors. Print
Expand Down Expand Up @@ -706,6 +706,20 @@
mqtt_upload_period=60


Sumo Logic Configuration:

1) log into sumologic.com interface
2) create HTTP hook as you will use it in configuration

By default, all channels on all ECMs will be uploaded.

For example, this configuration will upload all data from all ECMs.

[sumologic]
sumo_out=true
sumo_url=xxx


Upgrading:

Please consider the following when upgrading from ecmread.py:
Expand All @@ -731,6 +745,9 @@

Changelog:

- 3.2.1 06feb19 mike horwath
* added Sumo Logic support

- 3.2.0 07sep16 mwall
* added MQTT support (thanks to mrguessed)

Expand Down Expand Up @@ -1215,6 +1232,7 @@
import traceback
import urllib
import urllib2
import requests

import warnings
warnings.filterwarnings('ignore', category=DeprecationWarning) # MySQLdb in 2.6
Expand Down Expand Up @@ -4149,6 +4167,63 @@ def process_calculated(self, packets):
dbgmsg('MQTT: Nothing to send')


class SumoLogicProcessor(UploadProcessor):

def __init__(self, url, period, timeout):
super(SumoLogicProcessor, self).__init__()
self.url = url
self.process_period = int(period)
self.timeout = int(timeout)

infmsg('Sumo: upload period: %d' % self.process_period)
infmsg('Sumo: timeout: %d' % self.timeout)
infmsg('Sumo: url: %s' % self.url)

def setup(self):
if not (self.url):
print 'Sumo Logic Error: Insufficient parameters'
if not self.url:
print ' A URL is required'
sys.exit(1)

def process_calculated(self, packets):
for p in packets:
osn = obfuscate_serial(p['serial'])
data = []
data.append('"%s":"%.1f"' % (mklabel(osn, 'volts'), p['volts']))
if INCLUDE_CURRENT:
for idx, c, in enumerate(PACKET_FORMAT.channels(FILTER_CURRENT)):
data.append('"%s":"%.2f"' % (mklabel(osn, c), p[c]))
for idx, c in enumerate(PACKET_FORMAT.channels(FILTER_PE_LABELS)):
data.append('"%s_w":"%.2f"' % (mklabel(osn, c), p[c + '_w']))
# do you need Wh?
# for idx, c in enumerate(PACKET_FORMAT.channels(FILTER_PE_LABELS)):
# data.append('"%s_wh":"%.2f"' % (mklabel(osn, c), p[c + '_wh']))
# do you need pulse?
# for idx, c in enumerate(PACKET_FORMAT.channels(FILTER_PULSE)):
# data.append('"%s":"%d"' % (mklabel(osn, c), p[c]))
# do you need sensor?
# for idx, c in enumerate(PACKET_FORMAT.channels(FILTER_SENSOR)):
# data.append('"%s":"%.2f"' % (mklabel(osn, c), p[c]))
if len(data):
url = '%s' % (self.url)
payload = '{"time_created":"%s",%s}' % (
p['time_created'], ','.join(data))
dbgmsg('Sumo: uploading %d bytes' %
sys.getsizeof(json.dumps(payload)))
result = requests.put(url, payload)
result.raise_for_status()

def _create_request(self, url):
req = super(SumoLogicProcessor, self)._create_request(url)
return req

def _handle_urlopen_error(self, e, url, payload):
errmsg(''.join(['%s Error: %s' % (self.__class__.__name__, e),
'\n URL: ' + url,
'\n data: ' + payload]))


if __name__ == '__main__':
parser = optparse.OptionParser(version=__version__)

Expand Down Expand Up @@ -4371,6 +4446,16 @@ def process_calculated(self, packets):
group.add_option('--mqtt-upload-period', type='int', help='upload period in seconds', metavar='PERIOD')
parser.add_option_group(group)

group = optparse.OptionGroup(parser, 'Sumo Logic options')
group.add_option('--sumo', action='store_true', dest='sumo_out',
default=False, help='upload data using Sumo Logic API')
group.add_option('--sumo-url', help='URL', metavar='URL')
group.add_option('--sumo-upload-period',
help='upload period in seconds', metavar='PERIOD')
group.add_option(
'--sumo-timeout', help='timeout period in seconds', metavar='TIMEOUT')
parser.add_option_group(group)

(options, args) = parser.parse_args()

if options.quiet:
Expand Down Expand Up @@ -4562,7 +4647,7 @@ def process_calculated(self, packets):
options.enersave_out or options.bidgely_out or
options.peoplepower_out or options.eragy_out or
options.smartenergygroups_out or options.thingspeak_out or
options.pachube_out or options.oem_out or
options.pachube_out or options.oem_out or options.sumo_out or
options.wattvision_out or options.pvo_out or options.mqtt_out):
print 'Please specify one or more processing options (or \'-h\' for help):'
print ' --print print to screen'
Expand Down Expand Up @@ -4713,6 +4798,11 @@ def process_calculated(self, packets):
options.mqtt_tls,
options.mqtt_map or MQTT_MAP,
options.mqtt_upload_period or MQTT_UPLOAD_PERIOD))
if options.sumo_out:
procs.append(SumoLogicProcessor
(options.sumo_url or OEM_URL,
mtaumike marked this conversation as resolved.
Show resolved Hide resolved
options.sumo_upload_period or OEM_UPLOAD_PERIOD,
options.sumo_timeout or OEM_TIMEOUT))
mtaumike marked this conversation as resolved.
Show resolved Hide resolved

mon = Monitor(col, procs)
mon.run()
Expand Down