-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathenergenie_eg_pm2_lan.py
102 lines (90 loc) · 2.86 KB
/
energenie_eg_pm2_lan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/usr/bin/python
# -*- coding: utf-8 -*-
import urllib2
class EnergeniePowerSwitch(object):
# used to control EG-PM2-LAN from energenie
def __init__(self, hostname, password):
self.hostname = hostname
self.password = password
self.lastresponse = None
self.loggedin = False
def get_states(self):
if not self.loggedin:
raise RuntimeError("You need to login first.")
data = self._parse_login_response(self.lastresponse)
result = data
return result
def get_state(self, port_number):
if not self.loggedin:
raise RuntimeError("You need to login first.")
if port_number > 4 or port_number < 1:
raise RuntimeError("Wrong port number.")
val = self.get_states()[port_number-1]
if "1" == val:
return True
else:
return False
def set_state(self, port_number, state):
if not self.loggedin:
raise RuntimeError("you need to login first.")
if port_number > 4 or port_number < 1:
raise RuntimeError("Wrong port number.")
port = "1" if state else "0"
url = "http://" + self.hostname
url_params = "cte" + str(port_number) + "=" + port
header = {"Content-Type":"text/plain"}
req = urllib2.Request(url, url_params.encode('utf-8'), header)
response = urllib2.urlopen(req)
response_read = response.read()
if not self._is_good_return(response_read):
raise RuntimeError("Login not successful, check hostname and password.")
self.lastresponse = response_read
return True
def login(self):
if self.loggedin:
return True
url = "http://" + self.hostname + "/login.html"
url_params = "pw=" + self.password
header = {"Content-Type":"text/plain"}
req = urllib2.Request(url, url_params.encode('utf-8'), header)
try:
response = urllib2.urlopen(req)
response_read = response.read()
if not self._is_good_return(response_read):
raise RuntimeError("Login not successful, check hostname and password.")
self.lastresponse = response_read
self.loggedin = True
return True
except (urllib2.HTTPError, urllib2.URLError) as e:
self.loggedin = False
self.lastresponse = None
raise e
def logout(self):
if not self.loggedin:
return
url = "http://" + self.hostname + "/login.html"
url_params = "nothing"
header = {"Content-Type":"text/plain"}
req = urllib2.Request(url, url_params.encode('utf-8'), header)
try:
response = urllib2.urlopen(req)
response_read = response.read()
self.lastresponse = response_read
self.loggedin = False
except Exception:
self.loggedin = False
self.lastresponse = None
raise e
def _get_socketstates(self, response):
return response.rfind("var sockstates =")
def _is_good_return(self, response):
indx = self._get_socketstates(response)
if indx > 0:
return True
else:
return False
def _parse_login_response(self, response):
indx = self._get_socketstates(response)
sli = response[indx + 18:indx + 25]
spl = sli.split(",")
return spl