-
Notifications
You must be signed in to change notification settings - Fork 70
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
pytest fix flake8 pytest topic add update reg fix fix improve tester flake8 fix typo fix folder structure start modbus server only in secondary mode automate modbus test
- Loading branch information
Showing
20 changed files
with
413 additions
and
39 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ jobs: | |
run: | | ||
python -m pip install --upgrade pip | ||
pip install flake8 pytest paho-mqtt requests-mock jq pyjwt==2.6.0 bs4 pkce typing_extensions python-dateutil==2.8.2 | ||
pip install umodbus | ||
- name: Flake8 with annotations in packages folder | ||
uses: TrueBrain/[email protected] | ||
with: | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
#!/usr/bin/env python | ||
import logging | ||
from socketserver import TCPServer | ||
from collections import defaultdict | ||
import struct | ||
from umodbus import conf | ||
from umodbus.server.tcp import RequestHandler, get_server | ||
from umodbus.utils import log_to_stream | ||
|
||
from helpermodules import timecheck | ||
from helpermodules.hardware_configuration import get_serial_number | ||
from helpermodules.pub import Pub | ||
from helpermodules.subdata import SubData | ||
|
||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
log_to_stream(level=logging.DEBUG) | ||
data_store = defaultdict(int) | ||
conf.SIGNED_VALUES = True | ||
TCPServer.allow_reuse_address = True | ||
app = get_server(TCPServer, ('0.0.0.0', 1502), RequestHandler) | ||
|
||
serial_number = get_serial_number().replace("snnumber=", "") | ||
|
||
|
||
def _form_int32(value, startreg): | ||
secondreg = startreg + 1 | ||
try: | ||
binary32 = struct.pack('>l', int(value)) | ||
high_byte, low_byte = struct.unpack('>hh', binary32) | ||
data_store[startreg] = high_byte | ||
data_store[secondreg] = low_byte | ||
except Exception: | ||
log.exception("Fehler beim Füllen der Register") | ||
data_store[startreg] = -1 | ||
data_store[secondreg] = -1 | ||
|
||
|
||
def _form_int16(value, startreg): | ||
try: | ||
value = int(value) | ||
if (value > 32767 or value < -32768): | ||
raise Exception("Number to big") | ||
data_store[startreg] = value | ||
except Exception: | ||
log.exception("Fehler beim Füllen der Register") | ||
data_store[startreg] = -1 | ||
|
||
|
||
def _form_str(value: str, startreg): | ||
bytes = value.encode("utf-8") | ||
length = len(bytes) | ||
if length > 20: | ||
raise ValueError("String darf max 20 Zeichen enthalten.") | ||
register_offset = 0 | ||
for i in range(0, length, 2): | ||
try: | ||
if i < length-1: | ||
stream_two_bytes = struct.pack(">bb", bytes[i], bytes[i+1]) | ||
stream_one_word = struct.unpack(">h", stream_two_bytes)[0] | ||
else: | ||
stream_two_bytes = struct.pack(">bb", bytes[i], 0) | ||
stream_one_word = struct.unpack(">h", stream_two_bytes)[0] | ||
data_store[startreg+register_offset] = stream_one_word | ||
except Exception: | ||
data_store[startreg+register_offset] = -1 | ||
finally: | ||
register_offset += 1 | ||
|
||
|
||
def _get_pos(number, n): | ||
return number // 10**n % 10 - 1 | ||
|
||
|
||
@app.route(slave_ids=[1], function_codes=[3, 4], addresses=list(range(0, 32000))) | ||
def read_data_store(slave_id, function_code, address): | ||
"""" Return value of address. """ | ||
if address > 10099: | ||
Pub().pub("openWB/set/internal_chargepoint/global_data", | ||
{"heartbeat": timecheck.create_timestamp_unix(), "parent_ip": None}) | ||
chargepoint = SubData.internal_chargepoint_data[f"cp{_get_pos(address, 2)}"] | ||
askedvalue = int(str(address)[-2:]) | ||
if askedvalue == 00: | ||
_form_int32(chargepoint.get.power, address) | ||
elif askedvalue == 2: | ||
_form_int32(chargepoint.get.imported, address) | ||
elif 4 <= askedvalue <= 6: | ||
_form_int16(chargepoint.get.voltages[askedvalue-4]*100, address) | ||
elif 7 <= askedvalue <= 9: | ||
_form_int16(chargepoint.get.currents[askedvalue-7]*100, address) | ||
elif askedvalue == 14: | ||
_form_int16(chargepoint.get.plug_state, address) | ||
elif askedvalue == 15: | ||
_form_int16(chargepoint.get.charge_state, address) | ||
elif askedvalue == 16: | ||
_form_int16(chargepoint.get.evse_current, address) | ||
elif 30 <= askedvalue <= 32: | ||
_form_int16(chargepoint.get.powers[askedvalue-30], address) | ||
elif askedvalue == 41: | ||
_form_int32(chargepoint.get.exported, address) | ||
elif askedvalue == 43: | ||
_form_int16(1, address) | ||
elif askedvalue == 50: | ||
_form_str(serial_number, address) | ||
elif askedvalue == 60: | ||
_form_str(chargepoint.get.rfid, address) | ||
|
||
return data_store[address] | ||
|
||
|
||
@app.route(slave_ids=[1], function_codes=[6, 16], addresses=list(range(0, 32000))) | ||
def write_data_store(slave_id, function_code, address, value): | ||
"""" Set value for address. """ | ||
if 10170 < address: | ||
cp_topic = f"openWB/set/internal_chargepoint/{_get_pos(address, 2)}/data/" | ||
askedvalue = int(str(address)[-2:]) | ||
if askedvalue == 71: | ||
Pub().pub(f"{cp_topic}set_current", value/100) | ||
elif askedvalue == 80: | ||
Pub().pub(f"{cp_topic}phases_to_use", value) | ||
elif askedvalue == 81: | ||
Pub().pub(f"{cp_topic}trigger_phase_switch", value) | ||
elif askedvalue == 98: | ||
Pub().pub(f"{cp_topic}cp_interruption_duration", value) | ||
elif askedvalue == 99: | ||
Pub().pub("openWB/set/command/modbus_server/todo", {"command": "systemUpdate", "data": {}}) | ||
|
||
|
||
def start_modbus_server(event_modbus_server): | ||
try: | ||
# Wenn start_modbus_server aus SubData aufegrufen wird, wenn das Topic gesetzt wird, führt das zu einem | ||
# circular Import. | ||
event_modbus_server.wait() | ||
event_modbus_server.clear() | ||
app.serve_forever() | ||
finally: | ||
app.shutdown() | ||
app.server_close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.