-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAugury_Lab.py
1862 lines (1608 loc) · 76.1 KB
/
Augury_Lab.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import testble
import cbor2
import array
## Cbor msg to Ep ##
####################
#1
# Canary EP UUT stage:
# The UUT stage is a value set in the internal flash of the EP to indicate which stage the unit is in within the production line testing stages of the EP.
# When the EP image is burned the EP will be in stage 1 and the EP exposes 3 commands regarding the UUT stage:
# Get current stage value and and an indication whether this is the final stage or not
# Increment the current stage value (valid values 1-30)
# Set current value as final - this means that the unit passed all stages and is ready for installation.
#2
# Canary EP BIST (Built-In-Self-Test)
# The Canary EP can run tests to validate that all of its HW components are working correctly. The EP exposes commands over BLE that allow running these tests and getting a report about it.
#3
#Canary EP Current Time:
#The Canary EP allows the user to set/get the current time (date/time) in the EP.
#Setting the current time is a prerequisite for any other testing service it provide
#Canary EP commands:
# Forcing the EP to go to sleep for a given amount of time
# Sampling all/part of the sensors
# Download the last sampled data
######### Device info:
# GATT service: Device Information Service (0x180A)
# The standard Device information service is exposed by the Canary EP and exposes the following information over the standard characteristics:
#
# Char: Model Number string (0x2A24) (standard characteristic)
# Access: Read
# Exposes the Module type as string (in Canary EP the string is “Canary”)
#
# Char: Manufacturer Name string (0x2A29) (standard characteristic)
# Access: Read
# Exposes the Manufacturer Name as string (in Canary EP the string is “Augury Inc.”)
#
# Char Serial number string (0x2A25) (standard characteristic)
# Access: Read
# Exposes the serial number of the EP (string)
#
# Char: Firmware reversion string (0x2A26) (standard characteristic)
# Access: Read
# Exposes the Firmware reversion of the EP (string)
#
# Char: Hardware reversion string (0x2A27) (standard characteristic)
# Access: Read
# Exposes the HW reversion of the EP (string)
######### Current Time:
# GATT service: commands (UUID 0bb52dda-d431-40cc-8a41-a666a25254b6)
# Char: Augu Set Time (UUID d82c52f5-51ee-490f-bf1e-1d841fb9c66f)
# Access: write, read
# This allows the client to set and get the current time in the EP.
# The value (both for read and write) is a cbor encoded map that includes one field that is linux epoch time in milliseconds (UTC time).
# e.g.
# {
# “time”:1651363200000
# }
######### Built In Self Test (BIST):
# GATT service: testing (UUID 53cdde50-a736-4793-a0dd-a818fbbd3fd0)
# Char: BIST (UUID 54f5aab5-5ebf-40b1-a819-e49bf3024048)
# Access: write, notify
#
# Run BIST:
# Write to the BIST Char the value 0x01
#
# Note: This will result in creating a new report file and will overwrite the previous report file
#
# Get last BIST report:
# Enable Notify on the testing Characteristic
# Write to the BIST Char the value 0x02
# The EP will send 2 CBOR files:
# Info CBOR file
# BIST results CBOR file
#
# Info CBOR file
# Sent in 1st notification
# This file includes the version and size of the BIST results CBOR file which is sent next
# Structure:
# {
# "len": <bist_results_size>,
# "ver": <bist_results_version>
# }
#
#
#
# BIST Results CBOR file
# Sent in the 2nd notification and on. These notifications (2nd notification and on) are aggregated to the full BIST results report CBOR file
#
# The structure of the report CBOR file is as follows:
# “bist_result” - (string - pass/fail/not executed)
# “timestamp” - (int - timestamp in posix epoch format)
# “tests” - (array) of items that are of type map that have the following structure:
# “name”: (string)
# “result”: (string - pass/fail reasons separated by “,”)
# “raw” : (byte string)
# example:
# {
# "bist_result": "pass",
# "timestamp": 1651065325557,
# "tests": [
# {
# "name": "battery level (adc)",
# "raw": "-",
# "result": "PASS"
# },
# {
# "name": "hw id check",
# "raw": "-",
# "result": "PASS"
# },
# …
# ]
# }
try:
from bleak.backends.winrt.util import allow_sta
# tell Bleak we are using a graphical user interface that has been properly
# configured to work with asyncio
allow_sta()
except ImportError:
# other OSes and older versions of Bleak will raise ImportError which we
# can safely ignore
pass
from testble import *
import asyncio
from dataclasses import dataclass
from functools import cached_property
from bleak.backends.characteristic import BleakGATTCharacteristic
import os
import time
import struct
import json
#from AuguryUiA2 import *
from AuguryUi3_7 import *
from rich.console import Console
from PyQt5.QtCore import QObject, pyqtSignal
from PyQt5.QtWidgets import (
QApplication,
QComboBox,
QLineEdit,
QMainWindow,
QPlainTextEdit,
QPushButton,
QVBoxLayout,
QWidget,
)
from PyQt5 import QtWebEngineWidgets
#import AmitFunction
#elf.Webwidget = QtWebEngineWidgets.QWebEngineView(parent=self.centralwidget)
import qasync
from bleak import BleakScanner, BleakClient
from bleak.backends.device import BLEDevice
import sys
from PyQt5.QtWidgets import QApplication
from PyQt5 import QtCore, QtWidgets, QtWebEngineWidgets
import plotly.express as px
import plotly.graph_objects as go
from threading import Event
from AuguryTestSample import *
UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
#UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
UART_RX_CHAR_UUID = "ac7bf687-7a30-4336-a6f6-b8030930854d"
# all other EPAugury_Lab.py
CurrentTimeServiceUuidString = "00001805-0000-1000-8000-00805f9b34fb"
CurrentTimeCharacteristicUuidString = "00002a2b-0000-1000-8000-00805f9b34fb"
CommandServiceUUIDString = "0bb52dda-d431-40cc-8a41-a666a25254b6"
commandCharacteristicUUIDString = "ac7bf687-7a30-4336-a6f6-b8030930854d"
BT_UUID_COMMANDS = "38efff62-58d9-475a-b311-e16494a89cf5"
DataServiceUUIDString = "bf289137-78a3-4c60-b381-25558e78e252"
DataCharacteristicUUIDString = "b5d1f9be-b0bb-4059-9cb7-c2fffd184770"
DeviceInformationServiceUuidString = "0000180A-0000-1000-8000-00805f9b34fb"
ModelNumberNameCharacteristicUuidString = "00002A24-0000-1000-8000-00805f9b34fb"
SerialNumberCharacteristicUuidString = "00002A25-0000-1000-8000-00805f9b34fb"
FirmwareRevisionCharacteristicUuidString = "00002A26-0000-1000-8000-00805f9b34fb"
HardwareRevisionCharacteristicUuidString = "00002A27-0000-1000-8000-00805f9b34fb"
ManufacturerNameCharacteristicUuidString = "00002A29-0000-1000-8000-00805f9b34fb"
TestingServiceUUIDString = "53cdde50-a736-4793-a0dd-a818fbbd3fd0"
TestingCharacteristicUUIDString = "54f5aab5-5ebf-40b1-a819-e49bf3024048"
#LEDs_control_UUID= "37028891-93b2-4b81-86c6-f5e49c0616cf
#UUT_stage_(UUID dac93259-da1d-4388-904a-7bc3b6d016f0)
UART_SAFE_SIZE = 20
samplecounter = 0
sensor = 0
deviceBLE = "None"
Bledevicelist = []
listofDeviceComboBox = []
device = None
file = None
fileJson = None
fileName = ""
countersample = -6
hw_revision = 0
data_received_event = asyncio.Event()
global loop
all_data_received = False
verbose = True
mac_addr = ""
output_path = None
address_device = ""
list_Device = None
devices = []
def parse_data(data):
global all_data_received
global mac_addr
global hw_revision
global fw_revision
global output_path
global serial_number
print("Parsing data from augulab ...")
data_vector = []
sample_size = 2
logger.info("num_of_samples: {0}".format(meta_num_of_samples))
for i in range(meta_num_of_samples):
x_tuple = struct.unpack_from('h', data, (i * sample_size))
x = x_tuple[0]
data_vector.append(x)
sensitivity = meta_scale
if data_channel[meta_channel] == "Acceleration_X" or data_channel[meta_channel] == "Acceleration_Y" or data_channel[
meta_channel] == "Acceleration_Z":
print(hw_revision)
print(type(hw_revision))
try :
if "apus_alpha" in hw_revision:
sensor_id = "ads8866-ADXL1002-50_1"
scaling_factor = 0.0019073486328125
else:
sensor_id = "iis3dwb"
scaling_factor = float(sensitivity) / 2.0 ** 15 / np.sqrt(2)
except:
sensor_id = "iis3dwb"
scaling_factor = float(sensitivity) / 2.0 ** 15 / np.sqrt(2)
elif data_channel[meta_channel] == "Magnetic_X" or data_channel[meta_channel] == "Magnetic_Y" or data_channel[
meta_channel] == "Magnetic_Z":
if ("apus_alpha" in hw_revision) or ("halo_ep2" in hw_revision):
sensor_id = "ads8866-HONEYWELL_SS495A"
scaling_factor = 0.0244140625
if ("canary_proto_rev_3" in hw_revision) or (("canary_proto_rev_4" in hw_revision) and ("fxos" in hw_revision)):
sensor_id = "fxos8700"
scaling_factor = 0.001
elif (("canary_proto_rev_4" in hw_revision) or ("canary_ea00019.rev_a" in hw_revision)) and (
"lis3" in hw_revision):
sensor_id = "lis3mdl"
scaling_factor = 1.0 / 2281
else:
sensor_id = "unknown"
scaling_factor = 1
elif data_channel[meta_channel] == "Temperature":
if ("apus_alpha" in hw_revision) or ("halo_ep2" in hw_revision):
scaling_factor = 1
sensor_id = "NCP15WB473F03RC"
if ("canary_proto_rev_3" in hw_revision):
scaling_factor = 0.0625
sensor_id = "tmp1075"
elif ("canary_proto_rev_4" in hw_revision) or ("canary_ea00019.rev_a" in hw_revision):
scaling_factor = 1
sensor_id = "iis3dwb"
else:
scaling_factor = np.NaN
sensor_id = "unknown"
elif data_channel[meta_channel] == "Ambient_Temperature":
scaling_factor = 1
sensor_id = "nrf52840"
else:
logger.error("unknown data_channel")
# JSON
json_data = {}
json_data['Serial_Number'] = serial_number
json_data['HW_Version'] = hw_revision
json_data['FW_Version'] = fw_revision
json_data['Sensor_ID'] = sensor_id
json_data['Data_Channel'] = data_channel[meta_channel]
#print(data_format)
#json_data['Data_Format'] = data_format[meta_data_format]
json_data['Data_Unit'] = data_units[meta_data_unit]
json_data['Sampling_Frequency'] = meta_sampling_freq
json_data['Sensitivity'] = meta_scale
json_data['Scaling_Factor'] = scaling_factor
json_data['Timestamp_utc'] = meta_timestamp
json_data['Sensor_Temperature'] = meta_temp_avg
json_data['Overflow'] = meta_overrun
json_data['Calibration_Vector'] = 'TBD'
json_data['Num_Of_Samples'] = meta_num_of_samples
json_data['Data'] = data_vector
if os.name == "nt":
# in windows, ":" is not allowed
mac_str = mac_addr.replace(':', '.')
else:
mac_str = mac_addr
filename = '{}_{}_{}_{}_{}.json'.format(str(hw_revision),
mac_str,
str(meta_timestamp),
str(meta_sensor_id),
str(meta_channel))
path = output_path
print(hw_revision)
print(mac_str)
print(meta_timestamp)
print(meta_sensor_id)
print(meta_channel)
print(path)
print(filename)
print(hw_revision)
print("try to write to file")
try:
f = open("{path}/{filename}".format(path=path, filename=filename), "x")
except Exception as e:
logger.error(str(e))
return
with open("{path}/{filename}".format(path=path, filename=filename), 'w') as outfile:
json.dump(json_data, outfile)
f.close()
logger.info("Output file created successfully: " + filename)
logger.info("Put the last sample in the file !!!!!!!!!!!!!!!! ")
print(data_received_event.is_set())
print(data_received_event)
data_received_event.set()
all_data_received = True
@dataclass
class QBleakClient(QObject):
global MainWindow
global ui
device: BLEDevice
messageChanged = pyqtSignal(bytes)
count = 0
Elem = 64000
maxCount = 0
countersample = -1
step = 0
state = 0
calc = 0
startMeasure = 0
endMeasure = 0
rec = 0
corrent_count = 0
databuffer = []
TendMeasure = 0
TstartMeasure = 0
sensor_output = {
"name": "#from Connect Device", # from Connect Device
"output_data": "#from Rx channel", # from Rx channel
"fs_rec": "#By user / code", # By user / code
"Sensor_ID": "#from Connect Device", # from Connect Device
"Serial_Number": "", # from Connect Device
"Data_Channel": "Acceleration_Z", # config later more
"Sensitivity": [15, 30, 60], # Sensitivity
"Scaling_Factor": [1, 2, 3], # intern Scaling_Factor
"Sensor_Temperature": "TBD", # Sensor_Temperature
"Num_Of_Samples": "", # from Connect Device
"Data": [] # ActualData
}
def __post_init__(self):
super().__init__()
@cached_property
def client(self) -> BleakClient:
# print(f' client -> self.client : {self.client}')
print(f' client -> self.client : {self.device}')
return BleakClient(self.device, disconnected_callback=self.handle_disconnect2)
async def start(self):
print(" FC -> start in client -> connect & handler ")
MainWindows.UiPrint(ui, " Function call start in client.")
try:
await self.client.connect()
except asyncio.CancelledError:
logger.error("CancelledError - sample")
await self.client.disconnect()
except Exception as e:
err = str(e)
print(err)
#print("connected : {}".format(connected))
await self.client.disconnect()
print(" Qbleak ->end connect ")
print(f' Qbleak -> self.client : {self.client}')
## firmware ##
##############
try:
_value = bytes(await self.client.read_gatt_char(HardwareRevisionCharacteristicUuidString))
print(self.client)
except Exception as e:
_value = str(e).encode()
print(f' 3 value : {_value}')
try:
_value = bytes(await self.client.read_gatt_char(FirmwareRevisionCharacteristicUuidString))
except Exception as e:
_value = str(e).encode()
print(f' 4 firmware value : {_value}')
fw_revision = _value.decode("utf-8")
print(" 5 [FirmwareRevision str: {0} ".format(fw_revision))
print(self.client)
#await self.client.start_notify(DataCharacteristicUUIDString, self.handle_rx) #todo: guy version :UART_TX_CHAR_UUID
#await self.client.start_notify(DataCharacteristicUUIDString, self.handle_rx)
if (self.client.is_connected):
await self.client.start_notify(DataCharacteristicUUIDString, self.data_notification_handler)
#await self.client.start_notify(TestingCharacteristicUUIDString, self.testing_notification_handler)
#await self.client.start_notify(UART_TX_CHAR_UUID, self.handle_rx)
#await self.client.start_notify(UART_SERVICE_UUID, self.handle_rx)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ")
print("Connect -> !!!!!!!!!!!!!!!!!!!!!!!!!!!!! ")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ")
MainWindows.UiPrint(ui, " Device is Connect")
else:
print(" CAN Not Connect ")
async def stop(self):
print("FC -> QBleakClient -> Stop Func")
await self.client.disconnect()
await self.client.disconnect()
MainWindows.UiPrint(ui, "QBleakClient -> Disconnect From Device")
#await self.client.stop_notify(DataCharacteristicUUIDString)
print(self.client)
print(self.client.is_connected)
async def write(self, data):
print("FC -> write ")
await self.client.write_gatt_char(UART_RX_CHAR_UUID, data)
print("send:", data)
async def writedata(self, UUID, data):
print("FC -> write ")
await self.client.write_gatt_char(UUID, data)
print("send:", data)
async def read_gatt(self, char_specifier=HardwareRevisionCharacteristicUuidString):
try:
_value = bytes(await self.client.read_gatt_char(char_specifier))
except Exception as e:
_value = str(e).encode()
return _value
def handle_disconnect2(self, _: BleakClient): # (self) -> None:
MainWindows.UiPrint(ui, "QBleakClient -> handle_disconnect2 call Back from clien")
MainWindows.UiPrint(ui, "Close The File")
#file.close()
MainWindows.UiPrint(ui, "+++++ Device is Disconnect +++++")
MainWindows.progBarUpdate(ui, 0, 0)
# cancelling all tasks effectively ends the program
# for task in asyncio.all_tasks():
# task.cancel()
@qasync.asyncSlot()
def handle_disconnect(self) -> None:
print("Qbleak - handle_disconnect function.")
# cancelling all tasks effectively ends the program
for task in asyncio.all_tasks():
task.cancel()
def _handle_read(self, _: int, data: bytearray) -> None:
# print("received:", data)
self.messageChanged.emit(data)
def testing_notification_handler(self, _: BleakGATTCharacteristic, data: bytearray):
global all_data_received
global PacketsExpected
global PacketsArrived
global DataBytesExpected
global DataBytesArrived
global DataBuffer
print("i get somthing ")
PacketsArrived += 1
logger.info("testing_notification_handler")
if PacketsArrived == 1:
data_as_json = cbor2.loads(binascii.a2b_hex(data.hex()))
logger.info(json.dumps(data_as_json, indent=4, sort_keys=True))
DataBytesExpected = data_as_json["len"]
logger.info("DataBytesExpected={}".format(DataBytesExpected))
DataBuffer.clear()
DataBytesArrived = 0
logger.info(data_as_json)
else:
DataBytesArrived += len(data)
DataBuffer += data
if DataBytesExpected == DataBytesArrived:
all_data_received = True
def data_notification_handler(self, _: BleakGATTCharacteristic, data: bytearray):
global DataBytesExpected
global DataBuffer
global MetaDataBuffer
global data_received_event
global all_data_received
global DataNotificationState
global meta_sensor_id, meta_temp_avg, meta_sampling_freq, meta_timestamp, meta_num_of_samples, meta_overrun, meta_channel, meta_data_format, meta_data_unit, meta_scale
print("iget somthing !!!! ")
# print("get somthing - > data_notification_handler !!!! ")
if DataNotificationState == 0: # meta header
DataNotificationState = 1
data_as_json = cbor2.loads(binascii.a2b_hex(data.hex()))
DataBytesExpected = data_as_json["len"]
DataType = data_as_json["typ"]
logger.info("DataBytesExpected: {0}".format(DataBytesExpected))
logger.info("DataType: {0}".format(DataType))
MetaDataBuffer.clear()
if DataBytesExpected == 0:
all_data_received = True
logger.error("no data available")
elif DataNotificationState == 1: # meta-data
MetaDataBuffer += data
if len(MetaDataBuffer) == DataBytesExpected:
DataNotificationState = 2
meta_data_as_json = cbor2.loads(binascii.a2b_hex(data.hex()))
meta_sensor_id = meta_data_as_json["sid"]
meta_temp_avg = meta_data_as_json["tmp"]
meta_sampling_freq = meta_data_as_json["frq"]
meta_timestamp = meta_data_as_json["tsp"]
meta_num_of_samples = meta_data_as_json["num"]
meta_overrun = meta_data_as_json["ovr"]
meta_channel = meta_data_as_json["ch"]
meta_data_format = meta_data_as_json["fmt"]
meta_data_unit = meta_data_as_json["unt"]
meta_scale = meta_data_as_json["scl"]
elif DataNotificationState == 2: # data-header
DataNotificationState = 3
data_as_json = cbor2.loads(binascii.a2b_hex(data.hex()))
DataBytesExpected = data_as_json["len"]
DataType = data_as_json["typ"]
logger.info("DataBytesExpected: {0}".format(DataBytesExpected))
logger.info("DataType: {0}".format(DataType))
DataBuffer.clear()
elif DataNotificationState == 3: # data
#print("===HERE===")
for x in data:
DataBuffer.append(x)
#print("DATA FORMAT : ".format(DataBuffer))
if DataBytesExpected:
drawProgressBar(len(DataBuffer) / DataBytesExpected, 100)
if len(DataBuffer) == DataBytesExpected:
DataNotificationState = 4
print("All data was received successfully!!! ({0} Bytes)".format(len(DataBuffer)))
parse_data(DataBuffer)
# except:
# print("+++++ Error +++++")
def handle_rx(self, _: BleakGATTCharacteristic, data: bytearray):
global samplecounter
global file
global fileJson
st = time.time()
print("get some data here :")
#print(f' waisting time : {st - self.accumolator}') todo: guy version ! -> 0.085Sec / proces 0.07 mSec
#save the incoming data in list - All the data
self.databuffer.append((data.decode()))
file.write('{0},'.format(str(data.decode()))) # to change one line vs multi
# todo: later fix the buffer more generic for all ep - now save all
#ACK, ACK, ELem =, 64000, Sample Time =, 3921990
if len(self.databuffer) == 7:
if self.databuffer[2] == 'ELem=':
self.Elem = int(self.databuffer[3])
print("===========")
print(self.databuffer)
print(self.Elem)
print(self.databuffer[5])
self.countersample = 0
self.rec = True
print("===========")
# file.write('{0},'.format(str(data.decode()))) # to change one line vs multi
# self.sensor_output['Data'] = self.databuffer
# todo: in the end save it -> only if disconnect / stop !
#self.sensor_output['Data'] = self.databuffer
#json.dump(self.sensor_output, fileJson)
#each 200 sample we call this function to calc data
if self.countersample % 100 == 0 and self.rec:
# cal each 100 rx msg Timinig
if self.state == 0:
print(f"start ----- {self.countersample}")
self.startMeasure = time.time()
self.state = 1
else:
print(f"stop ----- {self.countersample}")
self.endMeasure = time.time()
self.state = 0
self.calc = 1
self.corrent_count += 1
if self.calc == 1:
#timer = AmitFunction.calculate_duration(int((self.endMeasure * 1000) - (self.startMeasure * 1000)), 100, int(self.Elem), Currentcount=self.corrent_count)
self.calc = 0
#MainWindows.hourUpdate(ui, min=timer[0], sec=timer[1])
#MainWindows.progBarUpdate(ui, counter=self.countersample, Element=self.Elem)
#print("@@@---------------------->>>>>>> timer function ")
#print(f' timerAvr: {timer}')
#print("cal time (stim , Endtime , Delta =200 , Elem = 64000 , count) : ")
# call calc calcduration () - > m , s dict
# print(f' Delta Time : {self.endMeasure - self.startMeasure}')
# print(f' self.accumolator {self.accumolator}')
# print(f' counter sample {self.countersample}')
# print(f'( if inside here for update : {self.countersample} + delta T each rx : {time.time() - st}')
#updata UI
# json file write once on valid data only!
self.countersample += 1
if (self.rec and self.countersample == 64000 ):
self.sensor_output['Data'] = self.databuffer
print("DDDDDDDoooooooooooooneeeeeeee")
print("save the data to file")
self.sensor_output['Data'] = self.databuffer
print(self.databuffer)
print(len(self.databuffer))
print(self.sensor_output)
MainWindows.end_sample(ui, self.sensor_output)
#fileJson = open(self.FileNameJson, "w")
# self.sensor_output['Data'] = self.databuffer
# json.dump(self.sensor_output, fileJson)
# self.sensor_output['Data'] = []
self.rec = 0
# END Of Rx interrupt
#self.acc = time.time()
#print(f'( self.countersample {self.countersample} + delta T each rx : {time.time() - st}')
async def waiter(event):
await event.wait()
def command_notification_handler(sender, data):
global data_received_event
"""
Simple notification handler which prints the data received.
"""
print("Notify {0}: {1}".format(sender, data))
data_received_event.set()
class MainWindows(QMainWindow, Ui_MainWindow):
def __init__(self, window):
super().__init__()
global loop
print("init main ui")
self.setupUi(window)
self.resize(750, 650)
self._client = None
scan_button = QPushButton("Scan Devices")
self.devices_combobox = QComboBox()
connect_button = QPushButton("Connect")
self.message_lineedit = QLineEdit()
send_button = QPushButton("Send Message")
self.log_edit = QPlainTextEdit()
central_widget = QWidget()
self.setCentralWidget(central_widget)
lay = QVBoxLayout(central_widget)
lay.addWidget(scan_button)
lay.addWidget(self.devices_combobox)
lay.addWidget(connect_button)
lay.addWidget(self.message_lineedit)
lay.addWidget(send_button)
lay.addWidget(self.log_edit)
self.button = QtWidgets.QPushButton('Plot', self)
self.pb_plot.clicked.connect(self.show_graph)
self.pb_stop_sample.clicked.connect(self.read_temp)
#self.cmdline.event(self.cmdline)
self.pb_stop_sample.setEnabled(True)
self.pb_plot.setEnabled(False)
self.cb60.setEnabled(False)
self.cb30.setEnabled(False)
self.cb15.setEnabled(False)
self.cb15.setChecked(True)
self.lable_config.setEnabled(False)
self.progressBar.setEnabled(False)
self.lable_progress.setEnabled(False)
self.pb_connect.setEnabled(False)
self.pb_start_sample.setEnabled(False)
self.pb_disconnect.setEnabled(False)
self.pb_scan.setEnabled(True)
self.comboBox.setEnabled(False)
self.model = QtGui.QStandardItemModel()
self.listView.setModel(self.model)
self.listView.setAutoScroll(True)
self.listView.scrollToBottom()
self.pb_start_sample.clicked.connect(self.handle_start_sample)
# self.Connect_Pb.setStyleSheet("background-color : red")
self.pb_connect.clicked.connect(self.handle_connect)
self.pb_scan.clicked.connect(self.handle_scan)
self.pb_disconnect.clicked.connect(self.handle_disc)
self.cb15.clicked.connect(lambda: self.handle_cb(value=15))
self.cb30.clicked.connect(lambda: self.handle_cb(value=30))
self.cb60.clicked.connect(lambda: self.handle_cb(value=60))
self.FileName = ""
self.Config = ""
global file, fileJson
self.pushButton_2.clicked.connect(self.handle_pb2)
self.pushButton_3.clicked.connect(self.handle_pb3)
self.pushButton_3.setText("Ep get Data + delete")
self.pushButton_4.clicked.connect(self.handle_pb4)
self.pushButton_5.clicked.connect(self.handle_pb5)
self.pushButton_6.clicked.connect(self.handle_pb66)
self.pushButton_4.setText("Sample & save ")
self.pushButton_6.setText("start sample")
self.cmdline.textChanged.connect(self.filter_cmd)
self.pushButton_1_test.clicked.connect(self.fcbor1)
self.pushButton_1_test.setText("Run Bist")
self.pushButton_2_test.clicked.connect(self.fcbor2)
self.pushButton_3_test.clicked.connect(self.fcbor3)
self.pushButton_4_test.clicked.connect(self.fcbor4)
@qasync.asyncSlot()
async def fcbor1(self):
write_data = bytearray(b'\x01')
print("Sending 'run built in self test' command to EP ")
try:
_value = await self.current_client.write_gatt_char(TestingCharacteristicUUIDString, write_data)
except Exception as e:
_value = str(e).encode()
print("fcbor1")
def fcbor2(self):
print("fcbor2")
def fcbor3(self):
print("fcbor3")
def fcbor4(self):
print("fcbor4")
def filter_cmd(self):
global devices
global list_Device
print("call filter")
self.comboBox.clear()
st = (self.cmdline.toPlainText())
try:
for i, device in enumerate(devices):
if st in (str(device[1][0].name)):
self.comboBox.insertItem(i, (str(device[1][0].name) + " " + str(device[1][1].rssi)), device)
print("add item {}".format(i))
# if st in (str(item[0].name)):
# # dict 2 Add to gui
# print("add")
# print(device[1][0].name)
# for i, device in enumerate(devices):
# self.comboBox.insertItem(i, (str(device[1][0].name) + " " + str(device[1][1].rssi)), device)
# print("add item {}".format(i))
# print(" device {}".format(str(device[1][0].name)))
except:
print("error - filter cmd")
def end_sample(self, buffer):
print(buffer)
with open(self.FileNameJson, 'w') as f:
json.dump(buffer, f)
print("END HERE")
@qasync.asyncSlot()
async def handle_pb66(self):
print(" self.handle_pb66 --- > sample now ") # send somthing here
sample_delay = 0 # after 2 sec
command_id = 0 # SampleNow Command
sample_at_ts = round(time.time()) + sample_delay
sample_at_ts = 0
duration = 4000
scale = 0
odr = 0
sensor_id = 99
logger.info("======== Sample at: {} [{}] ========".format(
sample_at_ts, time.strftime('%H-%M-%S', time.localtime(sample_at_ts))))
write_data = bytearray(struct.pack('=HHHHHI', command_id, sensor_id, odr, scale, duration, sample_at_ts))
print(" Sending Sample-All command to EP (command_id={0} ;"
"sensor_id={1} ; odr={2} ; scale={3} ; duration={4} ; sample_at_ts={5})".
format(command_id, sensor_id, odr, scale, duration, sample_at_ts))
try:
_value = await self.current_client.write(write_data)
except Exception as e:
_value = str(e).encode()
print(" _Value: {0} ".format(_value))
print(" END PB 66 ")
@qasync.asyncSlot()
async def handle_pb2(self):
global address_device
global data_received_event
global all_data_received
global DataNotificationState
global verbose
global mac_addr
global hw_revision
global fw_revision
global output_path
global serial_number
global data_received_event
print("self.handle_pb2") # send somthing here
#await self._client.start_notify(DataCharacteristicUUIDString, data_notification_handler)
# write_data = bytearray(struct.pack('BBH', 0, 0, 0))
# print(write_data)
# try:
# _value = await self._client.write_gatt_char(DataCharacteristicUUIDString, write_data)
# except Exception as e:
# _value = str(e).encode()
# result = self.cmdline.toPlainText() # string here
try:
_value = bytes(await self.current_client.read_gatt())
print(_value)
except Exception as e:
_value = str(e).encode()
# try:
# _value = await self.current_client.write(write_data)
# except Exception as e:
# _value = str(e).encode()
#
# write_data = bytearray(struct.pack('=HHHHHI', command_id, sensor_id, odr, scale, duration, sample_at_ts))
# if (run_bist):
# write_data = bytearray(b'\x01')
# logger.info("Sending 'run built in self test' command to EP ")
#
# try:
# _value = await client.write_gatt_char(TestingCharacteristicUUIDString, write_data)
# except Exception as e:
# _value = str(e).encode()
#
# logger.info("[Characteristic Command ] Write Value: {0} ".format(to_hex(write_data)))
# else:
write_data = bytearray(b'\x02')
logger.info("Sending 'get built in self test results' command to EP ")
try:
_value = await self.current_client.writedata(TestingCharacteristicUUIDString,write_data)
except Exception as e:
_value = str(e).encode()
logger.info("[Characteristic Command ] Write Value: {0} ".format(to_hex(write_data)))
while all_data_received is False: # TODO: AMIT
# data_received_event.clear()
# data_received_waiter_task = asyncio.create_task(waiter(data_received_event))
# try:
# await asyncio.wait_for(data_received_waiter_task, timeout=5)
# except asyncio.CancelledError:
# print("CancelledError - BUILT IN SELF TEST")
# except asyncio.TimeoutError:
# print("Data receive timeout - BUILT IN SELF TEST")
# return False
print("start Collecting data")
data_received_event.clear()
# data_received_waiter_task = asyncio.create_task(waiter(data_received_event))
# await event.wait() !!
try:
print("###################### start asyncio ##########################{0}")
data_received_event = asyncio.Event()
print(data_received_event.is_set())
print(data_received_event)
print(" i start to wait here !!!!!!!!!!!!!!!!!!!!")
await data_received_event.wait()
print(" i am wait here !!!!!!!!!!!!!!!!!!!!!!!!!")
except (asyncio.CancelledError, asyncio.TimeoutError) as e:
logger.error(e)
print("222222222222222222222222222222222222")
all_data_received = True
data_as_json = cbor2.loads(binascii.a2b_hex(DataBuffer.hex()))
logger.info(json.dumps(data_as_json, indent=4, sort_keys=True))
#
#
# print(" input string : {} ".format(result))
# # convert to the same
# b = bytearray()
#
# for item in result:
# b.append(int(item))
# print(b)
# print(" Send cmd msg : {} ".format(b))
#
# try:
# _value = await self.current_client.write_gatt_char(commandCharacteristicUUIDString, b)
# _value = await self.current_client.write_gatt_char(CommandServiceUUIDString, b)
# _value = await self._client.write_gatt_char(CommandServiceUUIDString, b)
# _value = await self._client.write_gatt_char(commandCharacteristicUUIDString, b)
# await self.client.write_gatt_char(commandCharacteristicUUIDString, b)
# await self.client.write_gatt_char(CommandServiceUUIDString, b)
# # _value = bytes(await client.write_gatt_char(CurrentTimeCharacteristicUuidString, write_data))
# print(" Send msg : {} ".format(b))
# except Exception as e:
# _value = str(e).encode()
PacketsExpected = 2
# run_bist = 1
# #await self.current_client.start_notify(TestingCharacteristicUUIDString, testing_notification_handler)
# if (run_bist):
# write_data = bytearray(b'\x01')
# print("Sending 'run built in self test' command to EP ")
#
# try:
# _value = await self.current_client.writedata(TestingCharacteristicUUIDString, write_data)
# except Exception as e:
# _value = str(e).encode()
#
# logger.info("[Characteristic Command ] Write Value: {0} ".format(to_hex(write_data)))
# print(_value)
# else:
write_data = bytearray(b'\x02')
logger.info("Sending 'get built in self test results' command to EP ")
try:
_value = await self.current_client.writedata(TestingCharacteristicUUIDString, write_data)
except Exception as e:
_value = str(e).encode()
print("[Characteristic Command ] Write Value: {0} ".format(to_hex(write_data)))
# make handler - await client.start_notify.