Skip to content

Commit

Permalink
make all internal imports relative (#386)
Browse files Browse the repository at this point in the history
this allows more freedom installing and moving the snap7 module around
the filesystem for peculiar setups.
  • Loading branch information
vmsh0 authored Sep 22, 2022
1 parent a55d938 commit 076dbee
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 102 deletions.
14 changes: 7 additions & 7 deletions snap7/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
"""
import pkg_resources

import snap7.client as client
import snap7.common as common
import snap7.error as error
import snap7.logo as logo
import snap7.server as server
import snap7.types as types
import snap7.util as util
from . import client
from . import common
from . import error
from . import logo
from . import server
from . import types
from . import util

__all__ = ['client', 'common', 'error', 'logo', 'server', 'types', 'util']

Expand Down
84 changes: 42 additions & 42 deletions snap7/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
from datetime import datetime
from typing import List, Optional, Tuple, Union

import snap7
from snap7.common import check_error, ipv4, load_library
from snap7.types import S7SZL, Areas, BlocksList, S7CpInfo, S7CpuInfo, S7DataItem
from snap7.types import S7OrderCode, S7Protection, S7SZLList, TS7BlockInfo, WordLen
from snap7.types import S7Object, buffer_size, buffer_type, cpu_statuses, param_types
from .common import check_error, ipv4, load_library
from .types import S7SZL, Areas, BlocksList, S7CpInfo, S7CpuInfo, S7DataItem
from .types import S7OrderCode, S7Protection, S7SZLList, TS7BlockInfo, WordLen
from .types import S7Object, buffer_size, buffer_type, cpu_statuses, param_types
from .types import S7CpuInfo, RemotePort, wordlen_to_ctypes, block_types
logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -154,7 +154,7 @@ def get_cpu_info(self) -> S7CpuInfo:
ASName: b'SNAP7-SERVER' Copyright: b'Original Siemens Equipment'
ModuleName: b'CPU 315-2 PN/DP'>
"""
info = snap7.types.S7CpuInfo()
info = S7CpuInfo()
result = self._library.Cli_GetCpuInfo(self._pointer, byref(info))
check_error(result, context="client")
return info
Expand Down Expand Up @@ -189,7 +189,7 @@ def connect(self, address: str, rack: int, slot: int, tcpport: int = 102) -> int
"""
logger.info(f"connecting to {address}:{tcpport} rack {rack} slot {slot}")

self.set_param(snap7.types.RemotePort, tcpport)
self.set_param(RemotePort, tcpport)
return self._library.Cli_ConnectTo(
self._pointer, c_char_p(address.encode()),
c_int(rack), c_int(slot))
Expand Down Expand Up @@ -218,7 +218,7 @@ def db_read(self, db_number: int, start: int, size: int) -> bytearray:
"""
logger.debug(f"db_read, db_number:{db_number}, start:{start}, size:{size}")

type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value]
type_ = wordlen_to_ctypes[WordLen.Byte.value]
data = (type_ * size)()
result = (self._library.Cli_DBRead(
self._pointer, db_number, start, size,
Expand Down Expand Up @@ -246,7 +246,7 @@ def db_write(self, db_number: int, start: int, data: bytearray) -> int:
>>> client.db_write(1, 10, buffer) # writes the bit number 0 from the byte 10 to TRUE.
"""
wordlen = WordLen.Byte
type_ = snap7.types.wordlen_to_ctypes[wordlen.value]
type_ = wordlen_to_ctypes[wordlen.value]
size = len(data)
cdata = (type_ * size).from_buffer_copy(data)
logger.debug(f"db_write db_number:{db_number} start:{start} size:{size} data:{data}")
Expand All @@ -264,7 +264,7 @@ def delete(self, block_type: str, block_num: int) -> int:
Error code from snap7 library.
"""
logger.info("deleting block")
blocktype = snap7.types.block_types[block_type]
blocktype = block_types[block_type]
result = self._library.Cli_Delete(self._pointer, blocktype, block_num)
return result

Expand All @@ -282,7 +282,7 @@ def full_upload(self, _type: str, block_num: int) -> Tuple[bytearray, int]:
"""
_buffer = buffer_type()
size = c_int(sizeof(_buffer))
block_type = snap7.types.block_types[_type]
block_type = block_types[_type]
result = self._library.Cli_FullUpload(self._pointer, block_type,
block_num, byref(_buffer),
byref(size))
Expand All @@ -302,7 +302,7 @@ def upload(self, block_num: int) -> bytearray:
Buffer with the uploaded block.
"""
logger.debug(f"db_upload block_num: {block_num}")
block_type = snap7.types.block_types['DB']
block_type = block_types['DB']
_buffer = buffer_type()
size = c_int(sizeof(_buffer))

Expand Down Expand Up @@ -383,19 +383,19 @@ def read_area(self, area: Areas, dbnumber: int, start: int, size: int) -> bytear
>>> import snap7
>>> client = snap7.client.Client()
>>> client.connect("192.168.0.1", 0, 0)
>>> buffer = client.read_area(snap7.types.Areas.DB, 1, 10, 4) # Reads the DB number 1 from the byte 10 to the byte 14.
>>> buffer = client.read_area(Areas.DB, 1, 10, 4) # Reads the DB number 1 from the byte 10 to the byte 14.
>>> buffer
bytearray(b'\\x00\\x00')
"""
if area not in Areas:
raise ValueError(f"{area} is not implemented in snap7.types")
raise ValueError(f"{area} is not implemented in types")
elif area == Areas.TM:
wordlen = WordLen.Timer
elif area == Areas.CT:
wordlen = WordLen.Counter
else:
wordlen = WordLen.Byte
type_ = snap7.types.wordlen_to_ctypes[wordlen.value]
type_ = wordlen_to_ctypes[wordlen.value]
logger.debug(f"reading area: {area.name} dbnumber: {dbnumber} start: {start}: amount {size}: wordlen: {wordlen.name}={wordlen.value}")
data = (type_ * size)()
result = self._library.Cli_ReadArea(self._pointer, area.value, dbnumber, start,
Expand All @@ -421,15 +421,15 @@ def write_area(self, area: Areas, dbnumber: int, start: int, data: bytearray) ->
>>> client = snap7.client.Client()
>>> client.connect("192.168.0.1", 0, 0)
>>> buffer = bytearray([0b00000001])
>>> client.write_area(snap7.types.Areas.DB, 1, 10, buffer) # Writes the bit 0 of the byte 10 from the DB number 1 to TRUE.
>>> client.write_area(Areas.DB, 1, 10, buffer) # Writes the bit 0 of the byte 10 from the DB number 1 to TRUE.
"""
if area == Areas.TM:
wordlen = WordLen.Timer
elif area == Areas.CT:
wordlen = WordLen.Counter
else:
wordlen = WordLen.Byte
type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value]
type_ = wordlen_to_ctypes[WordLen.Byte.value]
size = len(data)
logger.debug(f"writing area: {area.name} dbnumber: {dbnumber} start: {start}: size {size}: "
f"wordlen {wordlen.name}={wordlen.value} type: {type_}")
Expand Down Expand Up @@ -483,7 +483,7 @@ def list_blocks_of_type(self, blocktype: str, size: int) -> Union[int, Array]:
:obj:`ValueError`: if the `blocktype` is not valid.
"""

_blocktype = snap7.types.block_types.get(blocktype)
_blocktype = block_types.get(blocktype)
if not _blocktype:
raise ValueError("The blocktype parameter was invalid")

Expand Down Expand Up @@ -536,7 +536,7 @@ def get_block_info(self, blocktype: str, db_number: int) -> TS7BlockInfo:
Family: b''
Header: b''
"""
blocktype_ = snap7.types.block_types.get(blocktype)
blocktype_ = block_types.get(blocktype)

if not blocktype_:
raise ValueError("The blocktype parameter was invalid")
Expand Down Expand Up @@ -639,7 +639,7 @@ def ab_read(self, start: int, size: int) -> bytearray:
Buffer with the data read.
"""
wordlen = WordLen.Byte
type_ = snap7.types.wordlen_to_ctypes[wordlen.value]
type_ = wordlen_to_ctypes[wordlen.value]
data = (type_ * size)()
logger.debug(f"ab_read: start: {start}: size {size}: ")
result = self._library.Cli_ABRead(self._pointer, start, size,
Expand All @@ -658,7 +658,7 @@ def ab_write(self, start: int, data: bytearray) -> int:
Snap7 code.
"""
wordlen = WordLen.Byte
type_ = snap7.types.wordlen_to_ctypes[wordlen.value]
type_ = wordlen_to_ctypes[wordlen.value]
size = len(data)
cdata = (type_ * size).from_buffer_copy(data)
logger.debug(f"ab write: start: {start}: size: {size}: ")
Expand Down Expand Up @@ -693,7 +693,7 @@ def as_ab_write(self, start: int, data: bytearray) -> int:
Snap7 code.
"""
wordlen = WordLen.Byte
type_ = snap7.types.wordlen_to_ctypes[wordlen.value]
type_ = wordlen_to_ctypes[wordlen.value]
size = len(data)
cdata = (type_ * size).from_buffer_copy(data)
logger.debug(f"ab write: start: {start}: size: {size}: ")
Expand Down Expand Up @@ -754,7 +754,7 @@ def as_ct_write(self, start: int, amount: int, data: bytearray) -> int:
Returns:
Snap7 code.
"""
type_ = snap7.types.wordlen_to_ctypes[WordLen.Counter.value]
type_ = wordlen_to_ctypes[WordLen.Counter.value]
cdata = (type_ * amount).from_buffer_copy(data)
result = self._library.Cli_AsCTWrite(self._pointer, start, amount, byref(cdata))
check_error(result, context="client")
Expand Down Expand Up @@ -991,14 +991,14 @@ def wait_as_completion(self, timeout: int) -> int:

def _prepare_as_read_area(self, area: Areas, size: int) -> Tuple[WordLen, Array]:
if area not in Areas:
raise ValueError(f"{area} is not implemented in snap7.types")
raise ValueError(f"{area} is not implemented in types")
elif area == Areas.TM:
wordlen = WordLen.Timer
elif area == Areas.CT:
wordlen = WordLen.Counter
else:
wordlen = WordLen.Byte
type_ = snap7.types.wordlen_to_ctypes[wordlen.value]
type_ = wordlen_to_ctypes[wordlen.value]
usrdata = (type_ * size)()
return wordlen, usrdata

Expand All @@ -1024,14 +1024,14 @@ def as_read_area(self, area: Areas, dbnumber: int, start: int, size: int, wordle

def _prepare_as_write_area(self, area: Areas, data: bytearray) -> Tuple[WordLen, Array]:
if area not in Areas:
raise ValueError(f"{area} is not implemented in snap7.types")
raise ValueError(f"{area} is not implemented in types")
elif area == Areas.TM:
wordlen = WordLen.Timer
elif area == Areas.CT:
wordlen = WordLen.Counter
else:
wordlen = WordLen.Byte
type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value]
type_ = wordlen_to_ctypes[WordLen.Byte.value]
cdata = (type_ * len(data)).from_buffer_copy(data)
return wordlen, cdata

Expand All @@ -1049,7 +1049,7 @@ def as_write_area(self, area: Areas, dbnumber: int, start: int, size: int, wordl
Returns:
Snap7 code.
"""
type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value]
type_ = wordlen_to_ctypes[WordLen.Byte.value]
logger.debug(f"writing area: {area.name} dbnumber: {dbnumber} start: {start}: size {size}: "
f"wordlen {wordlen} type: {type_}")
cdata = (type_ * len(pusrdata)).from_buffer_copy(pusrdata)
Expand Down Expand Up @@ -1083,7 +1083,7 @@ def as_eb_write(self, start: int, size: int, data: bytearray) -> int:
Returns:
Snap7 code.
"""
type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value]
type_ = wordlen_to_ctypes[WordLen.Byte.value]
cdata = (type_ * size).from_buffer_copy(data)
result = self._library.Cli_AsEBWrite(self._pointer, start, size, byref(cdata))
check_error(result, context="client")
Expand All @@ -1104,7 +1104,7 @@ def as_full_upload(self, _type: str, block_num: int) -> int:
"""
_buffer = buffer_type()
size = c_int(sizeof(_buffer))
block_type = snap7.types.block_types[_type]
block_type = block_types[_type]
result = self._library.Cli_AsFullUpload(self._pointer, block_type, block_num, byref(_buffer), byref(size))
check_error(result, context="client")
return result
Expand All @@ -1123,7 +1123,7 @@ def as_list_blocks_of_type(self, blocktype: str, data, count) -> int:
Raises:
:obj:`ValueError`: if the `blocktype` is invalid
"""
_blocktype = snap7.types.block_types.get(blocktype)
_blocktype = block_types.get(blocktype)
if not _blocktype:
raise ValueError("The blocktype parameter was invalid")
result = self._library.Cli_AsListBlocksOfType(self._pointer, _blocktype, byref(data), byref(count))
Expand Down Expand Up @@ -1156,7 +1156,7 @@ def as_mb_write(self, start: int, size: int, data: bytearray) -> int:
Returns:
Snap7 code.
"""
type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value]
type_ = wordlen_to_ctypes[WordLen.Byte.value]
cdata = (type_ * size).from_buffer_copy(data)
result = self._library.Cli_AsMBWrite(self._pointer, start, size, byref(cdata))
check_error(result, context="client")
Expand Down Expand Up @@ -1218,7 +1218,7 @@ def as_tm_write(self, start: int, amount: int, data: bytearray) -> int:
Returns:
Snap7 code.
"""
type_ = snap7.types.wordlen_to_ctypes[WordLen.Timer.value]
type_ = wordlen_to_ctypes[WordLen.Timer.value]
cdata = (type_ * amount).from_buffer_copy(data)
result = self._library.Cli_AsTMWrite(self._pointer, start, amount, byref(cdata))
check_error(result)
Expand All @@ -1238,7 +1238,7 @@ def as_upload(self, block_num: int, _buffer, size) -> int:
Returns:
Snap7 code.
"""
block_type = snap7.types.block_types['DB']
block_type = block_types['DB']
result = self._library.Cli_AsUpload(self._pointer, block_type, block_num, byref(_buffer), byref(size))
check_error(result, context="client")
return result
Expand Down Expand Up @@ -1266,7 +1266,7 @@ def ct_read(self, start: int, amount: int) -> bytearray:
Returns:
Buffer read.
"""
type_ = snap7.types.wordlen_to_ctypes[WordLen.Counter.value]
type_ = wordlen_to_ctypes[WordLen.Counter.value]
data = (type_ * amount)()
result = self._library.Cli_CTRead(self._pointer, start, amount, byref(data))
check_error(result, context="client")
Expand All @@ -1283,7 +1283,7 @@ def ct_write(self, start: int, amount: int, data: bytearray) -> int:
Returns:
Snap7 code.
"""
type_ = snap7.types.wordlen_to_ctypes[WordLen.Counter.value]
type_ = wordlen_to_ctypes[WordLen.Counter.value]
cdata = (type_ * amount).from_buffer_copy(data)
result = self._library.Cli_CTWrite(self._pointer, start, amount, byref(cdata))
check_error(result)
Expand Down Expand Up @@ -1313,7 +1313,7 @@ def eb_read(self, start: int, size: int) -> bytearray:
Returns:
Data read.
"""
type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value]
type_ = wordlen_to_ctypes[WordLen.Byte.value]
data = (type_ * size)()
result = self._library.Cli_EBRead(self._pointer, start, size, byref(data))
check_error(result, context="client")
Expand All @@ -1330,7 +1330,7 @@ def eb_write(self, start: int, size: int, data: bytearray) -> int:
Returns:
Snap7 code.
"""
type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value]
type_ = wordlen_to_ctypes[WordLen.Byte.value]
cdata = (type_ * size).from_buffer_copy(data)
result = self._library.Cli_EBWrite(self._pointer, start, size, byref(cdata))
check_error(result)
Expand Down Expand Up @@ -1450,7 +1450,7 @@ def mb_read(self, start: int, size: int) -> bytearray:
Returns:
Buffer with the data read.
"""
type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value]
type_ = wordlen_to_ctypes[WordLen.Byte.value]
data = (type_ * size)()
result = self._library.Cli_MBRead(self._pointer, start, size, byref(data))
check_error(result, context="client")
Expand All @@ -1467,7 +1467,7 @@ def mb_write(self, start: int, size: int, data: bytearray) -> int:
Returns:
Snap7 code.
"""
type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value]
type_ = wordlen_to_ctypes[WordLen.Byte.value]
cdata = (type_ * size).from_buffer_copy(data)
result = self._library.Cli_MBWrite(self._pointer, start, size, byref(cdata))
check_error(result)
Expand Down Expand Up @@ -1523,7 +1523,7 @@ def tm_read(self, start: int, amount: int) -> bytearray:
Buffer read.
"""
wordlen = WordLen.Timer
type_ = snap7.types.wordlen_to_ctypes[wordlen.value]
type_ = wordlen_to_ctypes[wordlen.value]
data = (type_ * amount)()
result = self._library.Cli_TMRead(self._pointer, start, amount, byref(data))
check_error(result, context="client")
Expand All @@ -1541,7 +1541,7 @@ def tm_write(self, start: int, amount: int, data: bytearray) -> int:
Snap7 code.
"""
wordlen = WordLen.Timer
type_ = snap7.types.wordlen_to_ctypes[wordlen.value]
type_ = wordlen_to_ctypes[wordlen.value]
cdata = (type_ * amount).from_buffer_copy(data)
result = self._library.Cli_TMWrite(self._pointer, start, amount, byref(cdata))
check_error(result)
Expand Down
Loading

0 comments on commit 076dbee

Please sign in to comment.