-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_connector.py
64 lines (47 loc) · 1.8 KB
/
data_connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import obd_connector
import mock_obd_connector
import time
import uuid
import datetime
import csv_writer
class DataConnector:
config = { 'connection_attempt_limit': 30 }
use_mock = False
running = True
log_data = False
def __init__(self, live_data, data_points, mock_data, log_data, port):
self.live_data = live_data
self.data_points = data_points
self.use_mock = mock_data
self.log_data = log_data
self.config['communication_port'] = port
def process_response(self, response):
if not response.is_null():
self.live_data[response.command.name] = round(response.value.magnitude, 2)
def configure_watches(self):
for i in self.data_points:
self.connection.watch(i, callback=self.process_response)
def get_connector(self):
return mock_obd_connector.connect if self.use_mock else obd_connector.connect
def start(self):
self.connection = self.get_connector()(self.config)
if self.connection is None:
print("Connection could not be made. Not trying any more!")
return
drive_id = uuid.uuid4()
self.live_data['TIMESTAMP'] = None
if self.log_data:
csv_writer.initialize_csv(drive_id, self.live_data.keys())
self.configure_watches()
self.connection.start()
while self.running:
with self.connection.paused() as was_running:
self.live_data["TIMESTAMP"] = str(datetime.datetime.now())
if self.log_data:
csv_writer.write_to_csv(drive_id, self.live_data)
time.sleep(2)
if was_running:
self.connection.start()
def stop(self):
self.connection.stop()
self.running = False