-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathUVServer.py
61 lines (50 loc) · 2.51 KB
/
UVServer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ##
# @brief [UVServer] UVServer included Pipette Machine + Linear Actuator in Autonomous Laboratory
# @author Hyuk Jun Yoo ([email protected])
# TEST 2021-09-24
# TEST 2022-03-24
from UV.UV_Class import USB2000plus_Class
from Pipette_Machine.Pipette_Class import Pipette
from Pipette_Machine.uarm.wrapper.swift_api import SwiftAPI
from Log.Logging_Class import NodeLogger
from BaseUtils.TCP_Node import BaseTCPNode
import socket
import time
SIZE = 1048576
# # TCP/IP
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('161.122.22.80', 54009)) # ip address, port
server_socket.listen() # wait status requirement of client connection
NodeLogger_obj = NodeLogger(platform_name="UV Platform", setLevel="DEBUG",
SAVE_DIR_PATH="C:/Users/User/Desktop/UVPlatform")
UV_obj = USB2000plus_Class(NodeLogger_obj)
Pipette_obj = Pipette(NodeLogger_obj)
swift = SwiftAPI(port=Pipette_obj.PIPETTE_serial_add)
base_tcp_node_obj = BaseTCPNode()
try:
while True:
client_socket, addr = server_socket.accept() # accept connection. return ip, port
data = client_socket.recv(SIZE) # recieve data from client. print buffer size
packet_info = str(data.decode()).split(sep="/")
print("packet information list : ", packet_info)
time.sleep(1)
if packet_info[0] == "UV":
hardware_name, action_type, mode_type = packet_info
res_msg = UV_obj.getUVData(client_socket, action_type, mode_type=mode_type)
base_tcp_node_obj.checkSocketStatus(client_socket, res_msg, hardware_name, action_type=action_type)
elif packet_info[0] == "PIPETTE":
hardware_name, action_type, vial_num, tip_position, mode_type = packet_info
res_msg = Pipette_obj.inject2Cuvette(swift, vial_num=vial_num, pos=tip_position, mode_type=mode_type)
base_tcp_node_obj.checkSocketStatus(client_socket, res_msg, hardware_name, action_type)
elif packet_info[0] == "OPTICS":
hardware_name, action_type, _ = packet_info
total_dict={}
total_dict["Pipette machine"]=Pipette_obj.hello()
total_dict["Ocean optics"]=UV_obj.hello()
base_tcp_node_obj.checkSocketStatus(client_socket, total_dict, hardware_name, action_type)
else:
raise ValueError("[{}] Packet Error : hardware_name is wrong".format(hardware_name))
except KeyboardInterrupt:
print('Ctrl + C, interrupt message')