-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathUartLayer.py
95 lines (76 loc) · 2.41 KB
/
UartLayer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#:pyserial 3.4
__all__ = ('UartLayer',)
__title__ = 'Application_V_0_1'
__version__ = '0.1'
__author__ = 'Bultot Geoffrey'
import serial
import serial.tools.list_ports
from threading import Thread
import time
#import spidev
from dictionary import *
C_BUS_IDLE = 0x00
C_BUS_BUSY = 0x01
class UartLayer(object):
'''
classdocs
'''
def quit(self):
'''
close UART connexion
'''
self.ReadBus_Threat_ON = False
self.ReadThread1.join(0)
sys.exit()
'''****************************************************************************************'''
def __init__(self,app):
self.app = app
self.BusState = C_BUS_IDLE
# We only have SPI bus 0 available to us on the Pi
bus = 1
#Device is the chip select pin. Set to 0 or 1, depending on the connections
device = 0
# Enable SPI
#self.spi = spidev.SpiDev()
# Open a connection to a specific bus and device (chip select pin)
#self.spi.open(bus, device)
# Set SPI speed and mode
#self.spi.max_speed_hz = 140625#375000
#self.spi.lsbfirst=False
#self.spi.mode=0b00
self.ReadBus_Threat_ON = True
self.ReadThread= Thread(target=self.RefreshTM_Thread,args=())
self.ReadThread.start()
'''Buffers to store Telemetries received by DPC'''
def SendDatas(self,array):
while(self.BusState == C_BUS_BUSY):
pass
self.BusState == C_BUS_BUSY
#respA=self.spi.xfer2(array)
#resp = self.spi.readbytes(6)
self.BusState == C_BUS_IDLE
def RefreshTM_Thread(self):
#To store the current segment
CurrentSegment = []
#Current position of segment
Rx_framePosition = 0
Message_Sent = False
inv_TM_TABLE_ID = {v: k for k, v in TM_TABLE_ID.items()} #reverse the id table to get the name depending of the ID
#print(inv_TM_TABLE_ID)
while self.ReadBus_Threat_ON:
temptable = []
if(self.BusState == C_BUS_IDLE):
self.BusState = C_BUS_BUSY
for i in range (0,len(TM_TABLE_ID)):
ID = TM_TABLE_ID[inv_TM_TABLE_ID[i]]
#respA=self.spi.xfer2([0x80,i,0,0,0,0])#,0x80,0x24,0x41,0x2,0x1,0x2,0x1,0x2,0x1,0x2,0x1,0x2,0x1,0x2])
temptable.append([0,0,2+i,0,0,0])
buff_Telemetries_Words = []
for i in range (0,len(temptable)):
data_temp = temptable[i][1]
data_temp += temptable[i][2] * 256
buff_Telemetries_Words.append(data_temp)
#print(buff_Telemetries_Words)
self.app.Table_Tm_Reg =buff_Telemetries_Words
self.BusState = C_BUS_IDLE
time.sleep(0.1)