-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathkvhost.py
149 lines (125 loc) · 4.3 KB
/
kvhost.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python3
import random
import socket
from time import sleep
class HostLink:
def __init__(self):
# TCP Client Socket
self.plc_address = "192.168.0.10"
self.plc_port = 8501
# Create a TCP/IP socket
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# connect the client
self.client.connect((self.plc_address, self.plc_port))
def confirm_response(self):
response_msg = self.client.recv(4096).decode()
response_msg = response_msg.rstrip()
if response_msg == "OK":
return True
else:
return False
def data_response(self):
response_msg = self.client.recv(4096).decode()
response_msg = response_msg.rstrip()
return response_msg
def change_mode(self, mode):
mode_no = { "program": 0, "run": 1 }
cmd = "M{}\r\n".format(mode_no[mode])
self.client.send(cmd.encode())
return self.confirm_response()
def confirm_mode(self):
cmd = "?M\r\n"
self.client.send(cmd.encode())
response = self.data_response()
mode = { 0: "program", 1: "run"}
return mode[int(response)]
def query_model(self):
cmd = "?K \r\n"
self.client.send(cmd.encode())
response = self.data_response()
plc_model = {
57 : "KV-8000", 55 : "KV-7500",
128 : "KV-NC32T", 132 : "KV-N60",
133 : "KV-N40", 134 : "KV-N24"
}
return plc_model[int(response)]
def forced_set(self, mode, addr):
"""
Device Type: R, MR, LR, CR, T, C, CTC, VB
"""
if mode == "continuous":
cmd = "STS {}\r\n".format(addr)
else:
cmd = "ST {}\r\n".format(addr)
self.client.send(cmd.encode())
return self.confirm_response()
def forced_reset(self, mode, addr):
"""
Device Type: R, MR, LR, CR, T, C, CTC, VB
"""
if mode == "continuous":
cmd = "RSS {}\r\n".format(addr)
else:
cmd = "RS {}\r\n".format(addr)
self.client.send(cmd.encode())
return self.confirm_response()
def single_read(self, addr, data_format=None):
"""
.U : Decimal, 16bit, unsigned
.S : Decimal, 16bit, signed
.D : Decimal, 32bit, unsigned
.L : Decimal, 32bit, signed
"""
if "M" in addr or "LR" in addr:
cmd = "RD {}\r\n".format(addr)
else:
if data_format == None:
data_format = ".U"
cmd = "RD {}{}\r\n".format(addr, data_format)
self.client.send(cmd.encode())
return self.data_response()
def batch_read(self, addr, length=0, data_format=".U"):
"""
.U : Decimal, 16bit, unsigned
.S : Decimal, 16bit, signed
.D : Decimal, 32bit, unsigned
.L : Decimal, 32bit, signed
"""
if "M" in addr or "LR" in addr:
cmd = "RDS {} {}\r\n".format(addr, length)
else:
if data_format == None:
data_format = ".U"
cmd = "RDS {}{} {}\r\n".format(addr, data_format, length)
self.client.send(cmd.encode())
list_resp = self.data_response().split()
return list_resp
def single_write(self, addr, value=0, data_format=".U"):
"""
.U : Decimal, 16bit, unsigned
.S : Decimal, 16bit, signed
.D : Decimal, 32bit, unsigned
.L : Decimal, 32bit, signed
"""
cmd = "WR {}{} {}\r\n".format(addr, data_format, value)
self.client.send(cmd.encode())
return self.confirm_response()
def batch_write(self, addr, data_format=".U", length=None, values=None):
"""
.U : Decimal, 16bit, unsigned
.S : Decimal, 16bit, signed
.D : Decimal, 32bit, unsigned
.L : Decimal, 32bit, signed
"""
vals = ' '.join(values)
cmd = "WRS {}{} {} {}\r\n".format(addr, data_format, length, vals)
self.client.send(cmd.encode())
return self.confirm_response()
if __name__ == "__main__":
keyence = HostLink()
# testing code
# while True:
# # read = keyence.single_read("W05")
# # number = random.randint(0,10)
# # keyence.single_write("DM100", number)
# sleep(0.1)