From 076dbee171639b27d7179c9ce69f9444f5669067 Mon Sep 17 00:00:00 2001 From: Riccardo Paolo Bestetti <1955497+vmsh0@users.noreply.github.com> Date: Thu, 22 Sep 2022 14:22:22 +0200 Subject: [PATCH] make all internal imports relative (#386) this allows more freedom installing and moving the snap7 module around the filesystem for peculiar setups. --- snap7/__init__.py | 14 ++++---- snap7/client.py | 84 +++++++++++++++++++++++------------------------ snap7/logo.py | 23 +++++++------ snap7/partner.py | 14 ++++---- snap7/server.py | 64 +++++++++++++++++++----------------- snap7/types.py | 2 +- snap7/util.py | 4 +-- 7 files changed, 103 insertions(+), 102 deletions(-) diff --git a/snap7/__init__.py b/snap7/__init__.py index f4a80879..c24951fa 100644 --- a/snap7/__init__.py +++ b/snap7/__init__.py @@ -3,13 +3,13 @@ """ import pkg_resources -import snap7.client as client -import snap7.common as common -import snap7.error as error -import snap7.logo as logo -import snap7.server as server -import snap7.types as types -import snap7.util as util +from . import client +from . import common +from . import error +from . import logo +from . import server +from . import types +from . import util __all__ = ['client', 'common', 'error', 'logo', 'server', 'types', 'util'] diff --git a/snap7/client.py b/snap7/client.py index 6ce39668..103d5e55 100755 --- a/snap7/client.py +++ b/snap7/client.py @@ -8,11 +8,11 @@ from datetime import datetime from typing import List, Optional, Tuple, Union -import snap7 -from snap7.common import check_error, ipv4, load_library -from snap7.types import S7SZL, Areas, BlocksList, S7CpInfo, S7CpuInfo, S7DataItem -from snap7.types import S7OrderCode, S7Protection, S7SZLList, TS7BlockInfo, WordLen -from snap7.types import S7Object, buffer_size, buffer_type, cpu_statuses, param_types +from .common import check_error, ipv4, load_library +from .types import S7SZL, Areas, BlocksList, S7CpInfo, S7CpuInfo, S7DataItem +from .types import S7OrderCode, S7Protection, S7SZLList, TS7BlockInfo, WordLen +from .types import S7Object, buffer_size, buffer_type, cpu_statuses, param_types +from .types import S7CpuInfo, RemotePort, wordlen_to_ctypes, block_types logger = logging.getLogger(__name__) @@ -154,7 +154,7 @@ def get_cpu_info(self) -> S7CpuInfo: ASName: b'SNAP7-SERVER' Copyright: b'Original Siemens Equipment' ModuleName: b'CPU 315-2 PN/DP'> """ - info = snap7.types.S7CpuInfo() + info = S7CpuInfo() result = self._library.Cli_GetCpuInfo(self._pointer, byref(info)) check_error(result, context="client") return info @@ -189,7 +189,7 @@ def connect(self, address: str, rack: int, slot: int, tcpport: int = 102) -> int """ logger.info(f"connecting to {address}:{tcpport} rack {rack} slot {slot}") - self.set_param(snap7.types.RemotePort, tcpport) + self.set_param(RemotePort, tcpport) return self._library.Cli_ConnectTo( self._pointer, c_char_p(address.encode()), c_int(rack), c_int(slot)) @@ -218,7 +218,7 @@ def db_read(self, db_number: int, start: int, size: int) -> bytearray: """ logger.debug(f"db_read, db_number:{db_number}, start:{start}, size:{size}") - type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value] + type_ = wordlen_to_ctypes[WordLen.Byte.value] data = (type_ * size)() result = (self._library.Cli_DBRead( self._pointer, db_number, start, size, @@ -246,7 +246,7 @@ def db_write(self, db_number: int, start: int, data: bytearray) -> int: >>> client.db_write(1, 10, buffer) # writes the bit number 0 from the byte 10 to TRUE. """ wordlen = WordLen.Byte - type_ = snap7.types.wordlen_to_ctypes[wordlen.value] + type_ = wordlen_to_ctypes[wordlen.value] size = len(data) cdata = (type_ * size).from_buffer_copy(data) logger.debug(f"db_write db_number:{db_number} start:{start} size:{size} data:{data}") @@ -264,7 +264,7 @@ def delete(self, block_type: str, block_num: int) -> int: Error code from snap7 library. """ logger.info("deleting block") - blocktype = snap7.types.block_types[block_type] + blocktype = block_types[block_type] result = self._library.Cli_Delete(self._pointer, blocktype, block_num) return result @@ -282,7 +282,7 @@ def full_upload(self, _type: str, block_num: int) -> Tuple[bytearray, int]: """ _buffer = buffer_type() size = c_int(sizeof(_buffer)) - block_type = snap7.types.block_types[_type] + block_type = block_types[_type] result = self._library.Cli_FullUpload(self._pointer, block_type, block_num, byref(_buffer), byref(size)) @@ -302,7 +302,7 @@ def upload(self, block_num: int) -> bytearray: Buffer with the uploaded block. """ logger.debug(f"db_upload block_num: {block_num}") - block_type = snap7.types.block_types['DB'] + block_type = block_types['DB'] _buffer = buffer_type() size = c_int(sizeof(_buffer)) @@ -383,19 +383,19 @@ def read_area(self, area: Areas, dbnumber: int, start: int, size: int) -> bytear >>> import snap7 >>> client = snap7.client.Client() >>> client.connect("192.168.0.1", 0, 0) - >>> buffer = client.read_area(snap7.types.Areas.DB, 1, 10, 4) # Reads the DB number 1 from the byte 10 to the byte 14. + >>> buffer = client.read_area(Areas.DB, 1, 10, 4) # Reads the DB number 1 from the byte 10 to the byte 14. >>> buffer bytearray(b'\\x00\\x00') """ if area not in Areas: - raise ValueError(f"{area} is not implemented in snap7.types") + raise ValueError(f"{area} is not implemented in types") elif area == Areas.TM: wordlen = WordLen.Timer elif area == Areas.CT: wordlen = WordLen.Counter else: wordlen = WordLen.Byte - type_ = snap7.types.wordlen_to_ctypes[wordlen.value] + type_ = wordlen_to_ctypes[wordlen.value] logger.debug(f"reading area: {area.name} dbnumber: {dbnumber} start: {start}: amount {size}: wordlen: {wordlen.name}={wordlen.value}") data = (type_ * size)() result = self._library.Cli_ReadArea(self._pointer, area.value, dbnumber, start, @@ -421,7 +421,7 @@ def write_area(self, area: Areas, dbnumber: int, start: int, data: bytearray) -> >>> client = snap7.client.Client() >>> client.connect("192.168.0.1", 0, 0) >>> buffer = bytearray([0b00000001]) - >>> client.write_area(snap7.types.Areas.DB, 1, 10, buffer) # Writes the bit 0 of the byte 10 from the DB number 1 to TRUE. + >>> client.write_area(Areas.DB, 1, 10, buffer) # Writes the bit 0 of the byte 10 from the DB number 1 to TRUE. """ if area == Areas.TM: wordlen = WordLen.Timer @@ -429,7 +429,7 @@ def write_area(self, area: Areas, dbnumber: int, start: int, data: bytearray) -> wordlen = WordLen.Counter else: wordlen = WordLen.Byte - type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value] + type_ = wordlen_to_ctypes[WordLen.Byte.value] size = len(data) logger.debug(f"writing area: {area.name} dbnumber: {dbnumber} start: {start}: size {size}: " f"wordlen {wordlen.name}={wordlen.value} type: {type_}") @@ -483,7 +483,7 @@ def list_blocks_of_type(self, blocktype: str, size: int) -> Union[int, Array]: :obj:`ValueError`: if the `blocktype` is not valid. """ - _blocktype = snap7.types.block_types.get(blocktype) + _blocktype = block_types.get(blocktype) if not _blocktype: raise ValueError("The blocktype parameter was invalid") @@ -536,7 +536,7 @@ def get_block_info(self, blocktype: str, db_number: int) -> TS7BlockInfo: Family: b'' Header: b'' """ - blocktype_ = snap7.types.block_types.get(blocktype) + blocktype_ = block_types.get(blocktype) if not blocktype_: raise ValueError("The blocktype parameter was invalid") @@ -639,7 +639,7 @@ def ab_read(self, start: int, size: int) -> bytearray: Buffer with the data read. """ wordlen = WordLen.Byte - type_ = snap7.types.wordlen_to_ctypes[wordlen.value] + type_ = wordlen_to_ctypes[wordlen.value] data = (type_ * size)() logger.debug(f"ab_read: start: {start}: size {size}: ") result = self._library.Cli_ABRead(self._pointer, start, size, @@ -658,7 +658,7 @@ def ab_write(self, start: int, data: bytearray) -> int: Snap7 code. """ wordlen = WordLen.Byte - type_ = snap7.types.wordlen_to_ctypes[wordlen.value] + type_ = wordlen_to_ctypes[wordlen.value] size = len(data) cdata = (type_ * size).from_buffer_copy(data) logger.debug(f"ab write: start: {start}: size: {size}: ") @@ -693,7 +693,7 @@ def as_ab_write(self, start: int, data: bytearray) -> int: Snap7 code. """ wordlen = WordLen.Byte - type_ = snap7.types.wordlen_to_ctypes[wordlen.value] + type_ = wordlen_to_ctypes[wordlen.value] size = len(data) cdata = (type_ * size).from_buffer_copy(data) logger.debug(f"ab write: start: {start}: size: {size}: ") @@ -754,7 +754,7 @@ def as_ct_write(self, start: int, amount: int, data: bytearray) -> int: Returns: Snap7 code. """ - type_ = snap7.types.wordlen_to_ctypes[WordLen.Counter.value] + type_ = wordlen_to_ctypes[WordLen.Counter.value] cdata = (type_ * amount).from_buffer_copy(data) result = self._library.Cli_AsCTWrite(self._pointer, start, amount, byref(cdata)) check_error(result, context="client") @@ -991,14 +991,14 @@ def wait_as_completion(self, timeout: int) -> int: def _prepare_as_read_area(self, area: Areas, size: int) -> Tuple[WordLen, Array]: if area not in Areas: - raise ValueError(f"{area} is not implemented in snap7.types") + raise ValueError(f"{area} is not implemented in types") elif area == Areas.TM: wordlen = WordLen.Timer elif area == Areas.CT: wordlen = WordLen.Counter else: wordlen = WordLen.Byte - type_ = snap7.types.wordlen_to_ctypes[wordlen.value] + type_ = wordlen_to_ctypes[wordlen.value] usrdata = (type_ * size)() return wordlen, usrdata @@ -1024,14 +1024,14 @@ def as_read_area(self, area: Areas, dbnumber: int, start: int, size: int, wordle def _prepare_as_write_area(self, area: Areas, data: bytearray) -> Tuple[WordLen, Array]: if area not in Areas: - raise ValueError(f"{area} is not implemented in snap7.types") + raise ValueError(f"{area} is not implemented in types") elif area == Areas.TM: wordlen = WordLen.Timer elif area == Areas.CT: wordlen = WordLen.Counter else: wordlen = WordLen.Byte - type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value] + type_ = wordlen_to_ctypes[WordLen.Byte.value] cdata = (type_ * len(data)).from_buffer_copy(data) return wordlen, cdata @@ -1049,7 +1049,7 @@ def as_write_area(self, area: Areas, dbnumber: int, start: int, size: int, wordl Returns: Snap7 code. """ - type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value] + type_ = wordlen_to_ctypes[WordLen.Byte.value] logger.debug(f"writing area: {area.name} dbnumber: {dbnumber} start: {start}: size {size}: " f"wordlen {wordlen} type: {type_}") cdata = (type_ * len(pusrdata)).from_buffer_copy(pusrdata) @@ -1083,7 +1083,7 @@ def as_eb_write(self, start: int, size: int, data: bytearray) -> int: Returns: Snap7 code. """ - type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value] + type_ = wordlen_to_ctypes[WordLen.Byte.value] cdata = (type_ * size).from_buffer_copy(data) result = self._library.Cli_AsEBWrite(self._pointer, start, size, byref(cdata)) check_error(result, context="client") @@ -1104,7 +1104,7 @@ def as_full_upload(self, _type: str, block_num: int) -> int: """ _buffer = buffer_type() size = c_int(sizeof(_buffer)) - block_type = snap7.types.block_types[_type] + block_type = block_types[_type] result = self._library.Cli_AsFullUpload(self._pointer, block_type, block_num, byref(_buffer), byref(size)) check_error(result, context="client") return result @@ -1123,7 +1123,7 @@ def as_list_blocks_of_type(self, blocktype: str, data, count) -> int: Raises: :obj:`ValueError`: if the `blocktype` is invalid """ - _blocktype = snap7.types.block_types.get(blocktype) + _blocktype = block_types.get(blocktype) if not _blocktype: raise ValueError("The blocktype parameter was invalid") result = self._library.Cli_AsListBlocksOfType(self._pointer, _blocktype, byref(data), byref(count)) @@ -1156,7 +1156,7 @@ def as_mb_write(self, start: int, size: int, data: bytearray) -> int: Returns: Snap7 code. """ - type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value] + type_ = wordlen_to_ctypes[WordLen.Byte.value] cdata = (type_ * size).from_buffer_copy(data) result = self._library.Cli_AsMBWrite(self._pointer, start, size, byref(cdata)) check_error(result, context="client") @@ -1218,7 +1218,7 @@ def as_tm_write(self, start: int, amount: int, data: bytearray) -> int: Returns: Snap7 code. """ - type_ = snap7.types.wordlen_to_ctypes[WordLen.Timer.value] + type_ = wordlen_to_ctypes[WordLen.Timer.value] cdata = (type_ * amount).from_buffer_copy(data) result = self._library.Cli_AsTMWrite(self._pointer, start, amount, byref(cdata)) check_error(result) @@ -1238,7 +1238,7 @@ def as_upload(self, block_num: int, _buffer, size) -> int: Returns: Snap7 code. """ - block_type = snap7.types.block_types['DB'] + block_type = block_types['DB'] result = self._library.Cli_AsUpload(self._pointer, block_type, block_num, byref(_buffer), byref(size)) check_error(result, context="client") return result @@ -1266,7 +1266,7 @@ def ct_read(self, start: int, amount: int) -> bytearray: Returns: Buffer read. """ - type_ = snap7.types.wordlen_to_ctypes[WordLen.Counter.value] + type_ = wordlen_to_ctypes[WordLen.Counter.value] data = (type_ * amount)() result = self._library.Cli_CTRead(self._pointer, start, amount, byref(data)) check_error(result, context="client") @@ -1283,7 +1283,7 @@ def ct_write(self, start: int, amount: int, data: bytearray) -> int: Returns: Snap7 code. """ - type_ = snap7.types.wordlen_to_ctypes[WordLen.Counter.value] + type_ = wordlen_to_ctypes[WordLen.Counter.value] cdata = (type_ * amount).from_buffer_copy(data) result = self._library.Cli_CTWrite(self._pointer, start, amount, byref(cdata)) check_error(result) @@ -1313,7 +1313,7 @@ def eb_read(self, start: int, size: int) -> bytearray: Returns: Data read. """ - type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value] + type_ = wordlen_to_ctypes[WordLen.Byte.value] data = (type_ * size)() result = self._library.Cli_EBRead(self._pointer, start, size, byref(data)) check_error(result, context="client") @@ -1330,7 +1330,7 @@ def eb_write(self, start: int, size: int, data: bytearray) -> int: Returns: Snap7 code. """ - type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value] + type_ = wordlen_to_ctypes[WordLen.Byte.value] cdata = (type_ * size).from_buffer_copy(data) result = self._library.Cli_EBWrite(self._pointer, start, size, byref(cdata)) check_error(result) @@ -1450,7 +1450,7 @@ def mb_read(self, start: int, size: int) -> bytearray: Returns: Buffer with the data read. """ - type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value] + type_ = wordlen_to_ctypes[WordLen.Byte.value] data = (type_ * size)() result = self._library.Cli_MBRead(self._pointer, start, size, byref(data)) check_error(result, context="client") @@ -1467,7 +1467,7 @@ def mb_write(self, start: int, size: int, data: bytearray) -> int: Returns: Snap7 code. """ - type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value] + type_ = wordlen_to_ctypes[WordLen.Byte.value] cdata = (type_ * size).from_buffer_copy(data) result = self._library.Cli_MBWrite(self._pointer, start, size, byref(cdata)) check_error(result) @@ -1523,7 +1523,7 @@ def tm_read(self, start: int, amount: int) -> bytearray: Buffer read. """ wordlen = WordLen.Timer - type_ = snap7.types.wordlen_to_ctypes[wordlen.value] + type_ = wordlen_to_ctypes[wordlen.value] data = (type_ * amount)() result = self._library.Cli_TMRead(self._pointer, start, amount, byref(data)) check_error(result, context="client") @@ -1541,7 +1541,7 @@ def tm_write(self, start: int, amount: int, data: bytearray) -> int: Snap7 code. """ wordlen = WordLen.Timer - type_ = snap7.types.wordlen_to_ctypes[wordlen.value] + type_ = wordlen_to_ctypes[wordlen.value] cdata = (type_ * amount).from_buffer_copy(data) result = self._library.Cli_TMWrite(self._pointer, start, amount, byref(cdata)) check_error(result) diff --git a/snap7/logo.py b/snap7/logo.py index 8618d8bf..82b74d14 100644 --- a/snap7/logo.py +++ b/snap7/logo.py @@ -6,10 +6,9 @@ import logging from ctypes import byref, c_int, c_int32, c_uint16, c_void_p -import snap7 -from snap7 import types -from snap7.types import WordLen, S7Object, param_types -from snap7.common import ipv4, check_error, load_library +from .types import WordLen, S7Object, param_types +from .types import RemotePort, Areas, wordlen_to_ctypes +from .common import ipv4, check_error, load_library logger = logging.getLogger(__name__) @@ -81,7 +80,7 @@ def connect(self, ip_address: str, tsap_snap7: int, tsap_logo: int, tcpport: int # special handling for Siemens Logo # 1st set connection params # 2nd connect without any parameters - self.set_param(snap7.types.RemotePort, tcpport) + self.set_param(RemotePort, tcpport) self.set_connection_params(ip_address, tsap_snap7, tsap_logo) result = self.library.Cli_Connect(self.pointer) check_error(result, context="client") @@ -96,7 +95,7 @@ def read(self, vm_address: str): Returns: integer """ - area = types.Areas.DB + area = Areas.DB db_number = 1 size = 1 start = 0 @@ -130,7 +129,7 @@ def read(self, vm_address: str): logger.info("Unknown address format") return 0 - type_ = snap7.types.wordlen_to_ctypes[wordlen.value] + type_ = wordlen_to_ctypes[wordlen.value] data = (type_ * size)() logger.debug(f"start:{start}, wordlen:{wordlen.name}={wordlen.value}, data-length:{len(data)}") @@ -158,7 +157,7 @@ def write(self, vm_address: str, value: int) -> int: Examples: >>> write("VW10", 200) or write("V10.3", 1) """ - area = types.Areas.DB + area = Areas.DB db_number = 1 start = 0 amount = 1 @@ -201,9 +200,9 @@ def write(self, vm_address: str, value: int) -> int: return 1 if wordlen == WordLen.Bit: - type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value] + type_ = wordlen_to_ctypes[WordLen.Byte.value] else: - type_ = snap7.types.wordlen_to_ctypes[wordlen.value] + type_ = wordlen_to_ctypes[wordlen.value] cdata = (type_ * amount).from_buffer_copy(data) @@ -226,7 +225,7 @@ def db_read(self, db_number: int, start: int, size: int) -> bytearray: """ logger.debug(f"db_read, db_number:{db_number}, start:{start}, size:{size}") - type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value] + type_ = wordlen_to_ctypes[WordLen.Byte.value] data = (type_ * size)() result = (self.library.Cli_DBRead( self.pointer, db_number, start, size, @@ -246,7 +245,7 @@ def db_write(self, db_number: int, start: int, data: bytearray) -> int: Error code from snap7 library. """ wordlen = WordLen.Byte - type_ = snap7.types.wordlen_to_ctypes[wordlen.value] + type_ = wordlen_to_ctypes[wordlen.value] size = len(data) cdata = (type_ * size).from_buffer_copy(data) logger.debug(f"db_write db_number:{db_number} start:{start} size:{size} data:{data}") diff --git a/snap7/partner.py b/snap7/partner.py index fca6622d..7f3b48ef 100644 --- a/snap7/partner.py +++ b/snap7/partner.py @@ -12,8 +12,8 @@ from ctypes import byref, c_int, c_int32, c_uint32, c_void_p from typing import Tuple, Optional -import snap7.types -from snap7.common import ipv4, check_error, load_library +from .common import ipv4, check_error, load_library +from .types import S7Object, param_types, word logger = logging.getLogger(__name__) @@ -97,8 +97,8 @@ def create(self, active: bool = False): :param active: 0 :returns: a pointer to the partner object """ - self._library.Par_Create.restype = snap7.types.S7Object - self._pointer = snap7.types.S7Object(self._library.Par_Create(int(active))) + self._library.Par_Create.restype = S7Object + self._pointer = S7Object(self._library.Par_Create(int(active))) def destroy(self): """ @@ -124,7 +124,7 @@ def get_param(self, number) -> int: Reads an internal Partner object parameter. """ logger.debug(f"retreiving param number {number}") - type_ = snap7.types.param_types[number] + type_ = param_types[number] value = type_() code = self._library.Par_GetParam(self._pointer, c_int(number), byref(value)) @@ -215,8 +215,8 @@ def start_to(self, local_ip: str, remote_ip: str, local_tsap: int, remote_tsap: raise ValueError(f"{remote_ip} is invalid ipv4") logger.info(f"starting partnering from {local_ip} to {remote_ip}") return self._library.Par_StartTo(self._pointer, local_ip.encode(), remote_ip.encode(), - snap7.types.word(local_tsap), - snap7.types.word(remote_tsap)) + word(local_tsap), + word(remote_tsap)) def stop(self) -> int: """ diff --git a/snap7/server.py b/snap7/server.py index 32b56434..cbaba7f3 100644 --- a/snap7/server.py +++ b/snap7/server.py @@ -8,9 +8,11 @@ import logging from typing import Any, Tuple, Callable, Optional -import snap7 -import snap7.types -from snap7.common import ipv4, check_error, load_library +from . import server as snap7server +from .common import ipv4, check_error, load_library +from .types import SrvEvent, LocalPort, cpu_statuses, server_statuses +from .types import longword, wordlen_to_ctypes, WordLen, S7Object +from .types import srvAreaDB, srvAreaPA, srvAreaTM, srvAreaCT logger = logging.getLogger(__name__) @@ -47,7 +49,7 @@ def __init__(self, log: bool = True): def __del__(self): self.destroy() - def event_text(self, event: snap7.types.SrvEvent) -> str: + def event_text(self, event: SrvEvent) -> str: """Returns a textual explanation of a given event object Args: @@ -69,8 +71,8 @@ def create(self): """Create the server. """ logger.info("creating server") - self.library.Srv_Create.restype = snap7.types.S7Object - self.pointer = snap7.types.S7Object(self.library.Srv_Create()) + self.library.Srv_Create.restype = S7Object + self.pointer = S7Object(self.library.Srv_Create()) @error_wrap def register_area(self, area_code: int, index: int, userdata): @@ -95,9 +97,9 @@ def set_events_callback(self, call_back: Callable[..., Any]) -> int: event is created. """ logger.info("setting event callback") - callback_wrap: Callable[..., Any] = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.POINTER(snap7.types.SrvEvent), ctypes.c_int) + callback_wrap: Callable[..., Any] = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.POINTER(SrvEvent), ctypes.c_int) - def wrapper(usrptr: Optional[ctypes.c_void_p], pevent: snap7.types.SrvEvent, size: int) -> int: + def wrapper(usrptr: Optional[ctypes.c_void_p], pevent: SrvEvent, size: int) -> int: """Wraps python function into a ctypes function Args: @@ -126,10 +128,10 @@ def set_read_events_callback(self, call_back: Callable[..., Any]): """ logger.info("setting read event callback") callback_wrapper: Callable[..., Any] = ctypes.CFUNCTYPE(None, ctypes.c_void_p, - ctypes.POINTER(snap7.types.SrvEvent), + ctypes.POINTER(SrvEvent), ctypes.c_int) - def wrapper(usrptr: Optional[ctypes.c_void_p], pevent: snap7.types.SrvEvent, size: int) -> int: + def wrapper(usrptr: Optional[ctypes.c_void_p], pevent: SrvEvent, size: int) -> int: """Wraps python function into a ctypes function Args: @@ -166,7 +168,7 @@ def start(self, tcpport: int = 102): """ if tcpport != 102: logger.info(f"setting server TCP port to {tcpport}") - self.set_param(snap7.types.LocalPort, tcpport) + self.set_param(LocalPort, tcpport) logger.info(f"starting server on 0.0.0.0:{tcpport}") return self.library.Srv_Start(self.pointer) @@ -199,8 +201,8 @@ def get_status(self) -> Tuple[str, str, int]: check_error(error) logger.debug(f"status server {server_status.value} cpu {cpu_status.value} clients {clients_count.value}") return ( - snap7.types.server_statuses[server_status.value], - snap7.types.cpu_statuses[cpu_status.value], + server_statuses[server_status.value], + cpu_statuses[cpu_status.value], clients_count.value ) @@ -261,7 +263,7 @@ def start_to(self, ip: str, tcpport: int = 102): """ if tcpport != 102: logger.info(f"setting server TCP port to {tcpport}") - self.set_param(snap7.types.LocalPort, tcpport) + self.set_param(LocalPort, tcpport) if not re.match(ipv4, ip): raise ValueError(f"{ip} is invalid ipv4") logger.info(f"starting server to {ip}:102") @@ -301,27 +303,27 @@ def set_cpu_status(self, status: int): """Sets the Virtual CPU status. Args: - status: :obj:`snap7.types.cpu_statuses` object type. + status: :obj:`cpu_statuses` object type. Returns: Error code from snap7 library. Raises: - :obj:`ValueError`: if `status` is not in :obj:`snap7.types.cpu_statuses`. + :obj:`ValueError`: if `status` is not in :obj:`cpu_statuses`. """ - if status not in snap7.types.cpu_statuses: + if status not in cpu_statuses: raise ValueError(f"The cpu state ({status}) is invalid") logger.debug(f"setting cpu status to {status}") return self.library.Srv_SetCpuStatus(self.pointer, status) - def pick_event(self) -> Optional[snap7.types.SrvEvent]: + def pick_event(self) -> Optional[SrvEvent]: """Extracts an event (if available) from the Events queue. Returns: Server event. """ logger.debug("checking event queue") - event = snap7.types.SrvEvent() + event = SrvEvent() ready = ctypes.c_int32() code = self.library.Srv_PickEvent(self.pointer, ctypes.byref(event), ctypes.byref(ready)) @@ -358,7 +360,7 @@ def get_mask(self, kind: int) -> ctypes.c_uint32: Mask """ logger.debug(f"retrieving mask kind {kind}") - mask = snap7.types.longword() + mask = longword() code = self.library.Srv_GetMask(self.pointer, kind, ctypes.byref(mask)) check_error(code) return mask @@ -382,22 +384,22 @@ def mainloop(tcpport: int = 1102, init_standard_values: bool = False): init_standard_values: if `True` will init some defaults values to be read on DB0. """ - server = snap7.server.Server() + server = snap7server.Server() size = 100 - DBdata = (snap7.types.wordlen_to_ctypes[snap7.types.WordLen.Byte.value] * size)() - PAdata = (snap7.types.wordlen_to_ctypes[snap7.types.WordLen.Byte.value] * size)() - TMdata = (snap7.types.wordlen_to_ctypes[snap7.types.WordLen.Byte.value] * size)() - CTdata = (snap7.types.wordlen_to_ctypes[snap7.types.WordLen.Byte.value] * size)() - server.register_area(snap7.types.srvAreaDB, 1, DBdata) - server.register_area(snap7.types.srvAreaPA, 1, PAdata) - server.register_area(snap7.types.srvAreaTM, 1, TMdata) - server.register_area(snap7.types.srvAreaCT, 1, CTdata) + DBdata = (wordlen_to_ctypes[WordLen.Byte.value] * size)() + PAdata = (wordlen_to_ctypes[WordLen.Byte.value] * size)() + TMdata = (wordlen_to_ctypes[WordLen.Byte.value] * size)() + CTdata = (wordlen_to_ctypes[WordLen.Byte.value] * size)() + server.register_area(srvAreaDB, 1, DBdata) + server.register_area(srvAreaPA, 1, PAdata) + server.register_area(srvAreaTM, 1, TMdata) + server.register_area(srvAreaCT, 1, CTdata) if init_standard_values: ba = _init_standard_values() - DBdata = snap7.types.wordlen_to_ctypes[snap7.types.WordLen.Byte.value] * len(ba) + DBdata = wordlen_to_ctypes[WordLen.Byte.value] * len(ba) DBdata = DBdata.from_buffer(ba) - server.register_area(snap7.types.srvAreaDB, 0, DBdata) + server.register_area(srvAreaDB, 0, DBdata) server.start(tcpport=tcpport) while True: diff --git a/snap7/types.py b/snap7/types.py index 9f6ca793..19737e76 100755 --- a/snap7/types.py +++ b/snap7/types.py @@ -4,7 +4,7 @@ import ctypes from enum import Enum -from snap7.common import ADict +from .common import ADict S7Object = ctypes.c_void_p buffer_size = 65536 diff --git a/snap7/util.py b/snap7/util.py index 5e471652..e82119cc 100644 --- a/snap7/util.py +++ b/snap7/util.py @@ -90,8 +90,8 @@ from datetime import date, datetime, timedelta from collections import OrderedDict -from snap7.types import Areas -from snap7.client import Client +from .types import Areas +from .client import Client logger = logging.getLogger(__name__)