-
Notifications
You must be signed in to change notification settings - Fork 2
/
datamgr.py
54 lines (43 loc) · 1.36 KB
/
datamgr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#!/usr/local/bin/python
# coding: utf-8
import os
from util import dir_root
from variable import Variable
from datasource import DataSource
from datastore import DataStore
from kivy.clock import Clock
variables = dict()
variables['PRESS'] = Variable('PRESS', 'word', 'Pressure')
variables['PRESS'].meta['unit'] = 'mbar'
variables['TEMP'] = Variable('TEMP', 'dword', 'Temperature')
variables['TEMP'].meta['unit'] = '°C'
variables['WARN'] = Variable('WARN', 'bool', 'Warning')
datasrc = DataSource('offline')
datastr = DataStore()
datastr.connect(os.path.join(dir_root, 'data', 'microscada.s3db'))
def load_vars():
names = datastr.get_var_names()
for name in names:
if 'GAUGE' in name:
variable = Variable(name, 'word', 'Gauge')
variable.meta['min'] = 0
variable.meta['max'] = 20
variables[name] = variable
class Scheduler:
running = False
@staticmethod
def process(dt):
for variable in variables.itervalues():
if 'GAUGE' in variable.name:
continue # interaction bug between the variables above and the others
val = datasrc.read(variable)
variable.set_value(val)
datastr.write(variable)
return Scheduler.running
@staticmethod
def start(instance):
Clock.schedule_interval(Scheduler.process, 1.0)
Scheduler.running = True
@staticmethod
def stop(instance):
Scheduler.running = False