-
Notifications
You must be signed in to change notification settings - Fork 0
/
plc.py
116 lines (100 loc) · 3.65 KB
/
plc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# PLC Server Settings
from datetime import datetime
from time import sleep
from plc_server_client import ApiClient, Configuration
from plc_server_client.api.plc_rest_controller_api import PlcRestControllerApi
from plc_server_client.model.plc_read_request import PlcReadRequest
from plc_server_client.model.plc_read_response import PlcReadResponse
from database_methods import get_all_entitys
from extract import (
AutoGeneratedUUID,
Parameter,
AutoGeneratedField,
StateProcessor,
SyncDatabase,
)
PLC_SERVER_HOST = "192.168.167.196"
PLC_SERVER_PORT = "48080"
timeout = 1000
def get_api_client():
api_client: ApiClient = ApiClient(
configuration=Configuration(host=f"http://{PLC_SERVER_HOST}:{PLC_SERVER_PORT}")
)
return api_client
def get_controller_api(client_api):
controller_api: PlcRestControllerApi = PlcRestControllerApi(api_client=client_api)
return controller_api
def read_response(controller_api):
response: PlcReadResponse = controller_api.multiread(
PlcReadRequest(
host="s7://192.168.167.210/0/0",
fields=[
"%DB444.DBD22:DINT",
"%DB444.DBW20:INT",
"%DB444:3.0:REAL",
"%DB444:4.0:REAL",
"%DB444:6.0:REAL",
"%DB4560:12.0:UDINT",
],
timeout=timeout,
),
_check_return_type=False,
)
return response
def make_dict_from_response(response: PlcReadResponse):
element_dict: dict = {}
for i in range(0, len(response.fields)):
element_dict.update(
{response.fields[i].get("field"): response.fields[i].get("value")}
)
element_dict.update({"timestamp": f"{datetime.now()}"})
return element_dict
if __name__ == "__main__":
# thread der alle x sec ne anfrage macht.
api_client: ApiClient = get_api_client()
controller_api: PlcRestControllerApi = get_controller_api(api_client)
def get_config() -> dict:
return {
"Cycle": {
"fields": {
"uuid": AutoGeneratedUUID(),
"id": AutoGeneratedField(),
# "druck": Parameter("%DB444:4.0:REAL"),
"cycle": Parameter("%DB4560:12.0:UDINT"),
},
"primary_key": "id",
},
"ToolEquipped": {
"fields": {
"id": AutoGeneratedField(),
"uuid": AutoGeneratedUUID(),
"tool_name": Parameter("%DB444:6.0:REAL"),
},
"primary_key": "id",
},
"MaterialEquipped": {
"fields": {
"id": AutoGeneratedField(),
"uuid": AutoGeneratedUUID(),
"material_name": Parameter("%DB444.DBD22:DINT"),
"material_typ": Parameter("%DB444.DBW20:INT"),
},
"primary_key": "id",
},
}
# threading.Timer(1.0,read_response,[controller_api]).start()
end_result = []
end_result.append(make_dict_from_response(read_response(controller_api)))
processor: StateProcessor = StateProcessor(get_config())
processor.init_context(end_result[0])
db_sync: SyncDatabase = SyncDatabase(processor.get_result())
db_sync.sync_database_with_index()
sleep(1)
while True:
end_result.append(make_dict_from_response(read_response(controller_api)))
processor.process_state(end_result[-1])
print(end_result[-1])
db_sync.set_end_result(processor.get_result())
db_sync.sync_database_with_index()
sleep(0.1)
arrie = get_all_entitys()