forked from ttu/ruuvitag-sensor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathverification.py
154 lines (109 loc) · 2.76 KB
/
verification.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""
Verification script for RuuviTags
Run the script with Python 3.x and 2.7. Requires at least one active RuuviTag.
"""
import time
from ruuvitag_sensor.decoder import UrlDecoder, Df3Decoder
from ruuvitag_sensor.ruuvi import RuuviTagSensor, RunFlag
from ruuvitag_sensor.ruuvitag import RuuviTag
from ruuvitag_sensor.ruuvi_rx import RuuviTagReactive
# Uncomment to turn on console print
# import ruuvitag_sensor
# from ruuvitag_sensor.log import log
# ruuvitag_sensor.log.enable_console()
#
# Helper Functions
#
def print_header(name):
print('############################################')
print(name)
print('############################################')
def wait_for_finish(run_flag, name):
max_time = 20
while run_flag.running:
time.sleep(0.1)
max_time -= 0.1
if max_time < 0:
raise Exception('%s not finished' % name)
#
# UrlDecoder.decode_data
#
print_header('UrlDecoder.decode_data')
decoder = UrlDecoder()
data = decoder.decode_data('AjwYAMFc')
print(data)
if not data['temperature']:
raise Exception('FAILED')
else:
print('OK')
#
# UrlDecoder.decode_data
#
print_header('UrlDecoder.decode_data')
decoder = Df3Decoder()
data = decoder.decode_data('03291A1ECE1EFC18F94202CA0B5300000000BB')
print(data)
if not data['temperature']:
raise Exception('FAILED')
else:
print('OK')
#
# RuuviTagSensor.get_data_for_sensors
#
print_header('RuuviTagSensor.get_data_for_sensors')
datas = RuuviTagSensor.get_data_for_sensors(search_duratio_sec=15)
print(datas)
if not datas:
raise Exception('FAILED')
else:
print('OK')
#
# RuuviTagSensor.get_data_for_sensors with macs
#
print_header('RuuviTagSensor.get_data_for_sensors with macs')
datas = RuuviTagSensor.get_data_for_sensors(list(datas.keys())[0], search_duratio_sec=15)
print(datas)
if not datas:
raise Exception('FAILED')
else:
print('OK')
#
# RuuviTag.update
#
print_header('RuuviTag.update')
tag = RuuviTag(list(datas.keys())[0])
tag.update()
print(tag.state)
if not tag.state:
raise Exception('FAILED')
else:
print('OK')
#
# RuuviTagSensor.get_datas
#
print_header('RuuviTagSensor.get_datas')
flag = RunFlag()
def handle_data(found_data):
flag.running = False
if not found_data:
raise Exception('FAILED')
else:
print('OK')
RuuviTagSensor.get_datas(handle_data, run_flag=flag)
wait_for_finish(flag, 'RuuviTagSensor.get_datas')
#
# ruuvi_rx.subscribe
#
print_header('ruuvi_rx.subscribe')
ruuvi_rx = RuuviTagReactive()
def hadle_rx(found_data):
print(found_data)
ruuvi_rx.stop()
if not found_data:
raise Exception('FAILED')
else:
print('OK')
ruuvi_rx.get_subject().\
subscribe(hadle_rx)
wait_for_finish(ruuvi_rx._run_flag, 'ruuvi_rx.subscribe')
print('Verification OK')