forked from dimamedia/RuuviTag-logger
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ruuvi2influx.py
365 lines (282 loc) · 8.23 KB
/
ruuvi2influx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# J.V.Ojala 7.5.2020
# wcollector
# Copyright for portions of project RuuviTag-logger are held by
# Dima Tsvetkov author, 2017 as part of project RuuviTag-logger and
# are licensed under the MIT license.
# Copyright for other portions of project RuuviTag-logger are held by
# J.V.Ojala, 2020 as part of project wcollector. For license, see
# LICENSE
from ruuvitag_sensor.ruuvi import RuuviTagSensor
from datetime import datetime
import time
import yaml
import queue
from sender import Sender
# from logging import log
from logger import Logger
logger = Logger(__name__)
class Configuration():
def __init__(self):
config = self.readConfig()
self.column_width = config['column_width'] # 14 normal
self.sample_interval = config['sample_interval'] # seconds
# list all your tags [MAC, TAG_NAME]
self.tags = config['tags']
self.queue_depth = config["event_queue"]
self.db = config['db'] # Enable or disable database saving True/False
self.db_name = config["db_name"]
self.db_user = config["db_user"]
self.db_password = config["db_password"]
self.host = config["db_host"]
self.port = config["db_port"]
# Startup info
logger.info("")
logger.info("Listened macs")
logger.info("")
# Collects initialized tags
tags = self.tags
for mac in tags:
# Creates a Tag Object
# with properties (mac, name)
tag = Tag(mac=mac, name=tags[mac])
# Saves a Teg Object in State Object
State.tags[mac] = tag
# Log the mac-address
logger.info(State.tags[mac].mac)
logger.info("db_name\t" + self.db_name)
logger.info("db_user\t" + self.db_user)
logger.info("db_host\t" + self.host)
logger.info("db_port\t" + str(self.port) )
def readConfig(self):
"""
Reads the config file and returns the dict of
all settings.
"""
filename = "config.yml"
try:
configfile = open(filename, 'r')
except FileNotFoundError:
print("Error: Could not find %s" % filename)
exit()
settings = yaml.load(configfile, Loader=yaml.SafeLoader)
configfile.close()
return settings
class Handler():
def __init__(self, config):
self.event_queue = queue.Queue(config.queue_depth)
self.body = [
{
"measurement": "ruuvitag",
"tags": {
"name": "Test",
'mac': ""
},
"time": 0,
"fields": {
}
}
]
self.db = config.db
self.db_name = config.db_name
self.db_user = config.db_user
self.db_password = config.db_password
self.host = config.host
self.port = config.port
self.sender_thread = Sender(self.event_queue, self.body, self.db_name, self.db_user, self.db_password, self.host, self.port)
self.sender_thread.daemon=True
def handle_data(self, found_data):
# This is the callback that is called every time new data is found
found_mac = found_data[0]
# If state tags{} has an entry mac: Tag() for found_mac
# Appends the found data values to their respevtive lists in the Tag object.
if found_mac in State.tags:
State.tags[found_mac].add(found_data)
# Get the time
now = datetime.timestamp(datetime.now())
time_passed = now - State.last_update_time
# If the sample window has closed, output the data
# Stored in Tag() Objects
if time_passed >= config.sample_interval:
State.last_update_time = now
self.outputs()
def outputs(self):
'''
Outputs the data collected
'''
# Create a SET of data
dbData = {}
for tag_mac in State.tags:
tag = State.tags[tag_mac]
# Trigger caclulation of the output values:
# -> Values collected since last update() are averaged
# and stored, buffers are reset.
tag.update()
# Create tag with a name
dbData[tag.mac] = {'name': tag.name}
# Prepare DB Data
dbData[tag.mac].update({
"temperature": tag.temp,
"pressure": tag.pres,
"humidity": tag.humi,
"voltage": tag.batt
})
# Print values to terminal
self.output_to_screen()
if self.db:
self.output_to_db(dbData)
def output_to_db(self, dbData):
"""
Re-formats dbData as a flat dictionary
Puts the result in send_queue
"""
# This function works, but is ugly.
# It does redundant fiddling the data
# to be fiddled with again.
# Save data to db
posix = round( time.time() * 1000 )
for mac, item in dbData.items():
item["mac"] = mac
item["time"] = posix
# If any one datapoint value ("temperature") is None
# all datapoint values are None
if item["temperature"] is not None:
# Put item to queue
try:
self.event_queue.put(item)
except queue.Full:
logger.error("queue.FULL")
# If no thread is active, restart the thread
if not self.sender_thread.is_alive():
self.sender_thread = Sender(self.event_queue, self.body, self.db_name, self.db_user, self.db_password, self.host, self.port)
self.sender_thread.daemon=True
self.sender_thread.start()
else:
# Do not send datapoints with None values.
# InfluxDB can't handle None values or empty {} values
pass
# Time for sender to finnish
time.sleep(1)
@staticmethod
def output_to_screen():
"""
Outputs data to screen
and/or selected LOG location
"""
logger.info("")
logger.info(datetime.now())
# Print values to terminal
logger.info(title())
logger.info(State.data_line('temp', 'C'))
logger.info(State.data_line('pres', 'hPa'))
logger.info(State.data_line('humi', '%'))
logger.info(State.data_line('batt', 'V'))
class State():
"Holds the state of things"
last_update_time = datetime.timestamp(datetime.now())
# All tags are collected here
tags = {}
def data_line(subject, unit=""):
'''
Returns data from all sensors of the selected type (subject)
formated in a row.
Use: print( data_line('pres','hPa') )
'''
datas = []
dline = ""
for i in State.tags:
tag = State.tags[i]
# try:
if subject == 'temp':
avg_data = tag.temp
elif subject == 'pres':
avg_data = tag.pres
elif subject == 'humi':
avg_data = tag.humi
elif subject == 'batt':
avg_data = tag.batt
else:
logger.critical("Invalid tag subject: '{}', Units:, '{}'".format(subject, unit))
raise Exception("Invalid tag subject: '{}'".format(subject))
# 1.321 + " " + "V"
value = str(avg_data) + " " + unit
datas.append( value.ljust(config.column_width) )
dline = ''.join(datas)
return dline
class Tag():
'''
Holds all data related to a tag
'''
def __init__(self, mac, name):
self.mac = mac
self.id = mac[-6:] # A short ID
self.name = name
# The raw values
self._temp = []
self._humi = []
self._pres = []
self._batt = []
# The probeable values
self.temp = None
self.humi = None
self.pres = None
self.batt = None
def add(self, found_data):
'''
Add a new set of values to the Tag object.
'''
self._temp.append( float( found_data[1]['temperature'] ) )
self._humi.append( float( found_data[1]['humidity'] ) )
self._pres.append( float( found_data[1]['pressure'] ) )
self._batt.append( float( found_data[1]['battery']/1000 ) )
def update(self):
'''
Updates the object stored values with the calculated average of
values received since last update.
Re-initializes the object to collect a new series of data.
If no new datapoints were received, the data point set to None.
'''
try:
self.temp = round( avg( self._temp ), 2)
except ZeroDivisionError:
self.temp = None # If no new datapoints exist, set value to None
try:
self.humi = round( avg( self._humi ), 2)
except ZeroDivisionError:
self.humi = None
try:
self.pres = round( avg( self._pres ), 2)
except ZeroDivisionError:
self.pres = None
try:
self.batt = round( avg( self._batt ), 3)
except ZeroDivisionError:
self.batt = None
self._temp = []
self._humi = []
self._pres = []
self._batt = []
def title():
'''
Returns the header associated with data_line()
Use: print( title() )
'''
titles = []
for i in State.tags:
titles.append( (State.tags[i].id).ljust(config.column_width) )
dtitle = ''.join(titles)
return dtitle
def avg(lst):
'''
returns the average of the list
'''
return sum(lst) / len(lst)
if __name__ == "__main__":
config = Configuration()
handler = Handler(config)
# The recommended way of listening to current Ruuvitags, using interrupts
try:
RuuviTagSensor.get_datas(handler.handle_data)
except KeyboardInterrupt:
logger.info("KeyboardInterrupt: Exiting")