Skip to content

Commit

Permalink
Add node OTAP status storage in a persisted location
Browse files Browse the repository at this point in the history
  • Loading branch information
jmo-wp authored and jmo-wp committed Oct 24, 2024
1 parent 1657db1 commit 355163a
Show file tree
Hide file tree
Showing 4 changed files with 358 additions and 13 deletions.
91 changes: 80 additions & 11 deletions examples/example_get_remote_scratchpad_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime

import logging
import sys


def type_to_str(type_int):
Expand Down Expand Up @@ -39,7 +40,7 @@ def print_node_list(nodes):
node_id,
timestamp,
node
));
))
id += 1

print()
Expand All @@ -63,36 +64,104 @@ def print_node_list(nodes):

parser.add_argument('--network',
type=int,
required=True,
help="Network address concerned by scratchpad")

mutual_exclusive_group_1 = parser.add_mutually_exclusive_group()
mutual_exclusive_group_1.add_argument("--gateway",
type=str,
nargs='+',
help="""Gateway list to use (space separator between entries).
If specified, the OTAP status will be queried for nodes connected
under given Gateway IDs.
Warning: Mutually exclusive with --gateway-from-file option""",
default=None)
mutual_exclusive_group_1.add_argument('--gateway-from-file',
type=argparse.FileType('r', encoding='UTF-8'),
help="""Gateway list to use from a file (one gateway ID per line,
UTF-8 encoded with line ending with LF character).
If specified, the OTAP status will be queried for nodes connected
under given Gateway IDs.
Warning: Mutually exclusive with --gateway option""")

# Log parameters
parser.add_argument("--log-level", default="info", type=str,
choices=["debug", "info", "warning", "error", "critical"],
help="Default to 'info'. Log level to be displayed. "
"It has to be chosen between 'debug', 'info', 'warning', 'error' and 'critical'")

# Script behavior
parser.add_argument("--strict-mode",
dest='strict_mode',
action='store_true',
help="Stop execution at first generated error on gateway/sink operation")
parser.add_argument("--query-mode",
action='store_true',
help="When provided, script will request nodes' OTAP status instead of passively collect them")
parser.add_argument("--persist-otap-status-file",
type=str,
help="When provided, received nodes' OTAP status will be persisted.")

args = parser.parse_args()

logging.basicConfig(format='%(levelname)s %(asctime)s %(message)s', level=logging.INFO)
logging.basicConfig(format='%(asctime)s | [%(levelname)s] %(filename)s:%(lineno)d:%(message)s', level=args.log_level.upper(),
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("example_get_remote_scratchpad_status.log", mode="w")
]
)

logging.debug(f"Script arguments: {args}")

wni = WirepasNetworkInterface(args.host,
args.port,
args.username,
args.password,
insecure=args.insecure)
insecure=args.insecure,
strict_mode=args.strict_mode
)

# Get gateway list
gateways = None
if args.gateway is None:
if args.gateway_from_file is not None:
# Load gateway list from file
gateways = []
try:
for line in args.gateway_from_file:
gateways.append(line.strip('\n'))
except ValueError:
logging.error("Invalid file format. Must be UTF-8 with lines ending with LF character")
exit()
else:
gateways = args.gateway

if gateways is None:
logging.info("Nodes' OTAP status can be received from all gateways under network %d" % args.network)
else:
logging.info(f"Nodes' OTAP status can be received from {len(gateways)} gateways under network {args.network}")
logging.debug(f"Gateway list {gateways}")

if args.network is None:
print("No network address provided")
exit()

otapHelper = WirepasOtapHelper(wni,
args.network)
args.network,
gateways,
args.persist_otap_status_file)

while True:
choice = input("l to [l]ist nodes and s to [s]end remote status cmd as broadcast e to [e]xit\n")
choice = input("l to [l]ist nodes and s to [s]end remote status cmd as broadcast (if mode allows it) e to [e]xit\n")
if choice == 'l':
print_node_list(otapHelper.get_current_nodes_status())
continue
elif choice == 's':
sinks = wni.get_sinks()
otapHelper.send_remote_scratchpad_status()
if args.query_mode:
sinks = wni.get_sinks()
otapHelper.send_remote_scratchpad_status()
else:
logging.warning("Operation not allowed! Please add --query-mode parameter to command line.")
continue
elif choice == 'e':
break
else:
print("Wrong choice: s, l or e")
logging.warning("Wrong choice: s, l or e")
continue
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
wirepas_mesh_messaging==1.2.5
paho_mqtt==1.5.1
peewee~=3.17.6
259 changes: 259 additions & 0 deletions wirepas_mqtt_library/otap_status_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
# Copyright 2024 Wirepas Ltd. All Rights Reserved.
#
# See file LICENSE.txt for full license details.
#

import logging
import sys

try:
from peewee import *
except ModuleNotFoundError:
logging.error("Required *peewee* module not found! Please run `python -m pip install peewee`")

# Constants
## Internal
### Database metadata
_MAX_LEN_FIELD_DB_CREATOR= 128
_DB_CREATOR = 'com.wirepas.otaphelper'
_DB_SCHEMA_VERSION = 1
_DB_METADATA_TABLE_NAME = 'otapstatusdbmetadata'

### Database otap status
_DB_DATA_TABLE_NAME = 'otapstatusdb'
_MAX_LEN_FIELD_SCRATCHPAD_ST_TYPE = 16
_MAX_LEN_FIELD_SCRATCHPAD_ST_STATUS = 20
_MAX_LEN_FIELD_WIREPAS_STACK_VERSION = 12
_MAX_LEN_FIELD_APP_VERSION = _MAX_LEN_FIELD_WIREPAS_STACK_VERSION
_MAX_LEN_FIELD_OTAP_ACTION = 33

### Database connection. Init deferred when final filename known.
_DB_CONN = SqliteDatabase(None, autoconnect=False)

## Public
# None

class OtapStatusStorage:
"""Class to handle OTAP status persistence
"""

def __init__(self, storage_file=None):
"""Constructor.
:param storage_file: File where nodes' OTAP status will be stored
"""
logging.debug("Initialise storage")

_DB_CONN.init(storage_file)

if storage_file is not None:
# Create databases if does not exist
self._create_storage()

# Check if storage is properly initialised and compatible with this code
self._check_storage_format()

else:
logging.error("No storage file provided!")
raise ValueError

def _create_storage(self):
# Check if database already initialised
logging.debug("Creating storage...")

with _DB_CONN:
if not _DB_CONN.table_exists(_DB_METADATA_TABLE_NAME):
logging.info(f"Metadata storage does not exist. Creating it...")
_DB_CONN.create_tables([OtapstatusDBMetadata])
# Populate metadata
OtapstatusDBMetadata(creator=_DB_CREATOR, schema_version=_DB_SCHEMA_VERSION).save()
else:
logging.info(f"Existing metadata storage found.")

if not _DB_CONN.table_exists(_DB_DATA_TABLE_NAME):
logging.info(f"Main storage does not exist. Creating it...")
_DB_CONN.create_tables([OtapStatusDB])
else:
logging.info(f"Existing main storage found.")

def _check_storage_format(self):
logging.info("Checking storage format...")
with _DB_CONN:
metadata = OtapstatusDBMetadata.get()

if metadata.creator != _DB_CREATOR:
logging.error("Wrong storage creator!")
raise ValueError

if metadata.schema_version != _DB_SCHEMA_VERSION:
logging.error(f"Unsupported storage version ({metadata.schema_version}). Version {_DB_SCHEMA_VERSION} required.")
raise ValueError
logging.info("Format OK.")

def write_node_status(self, node_status, network_address, node_address, travel_time_ms):
"""Write node OTAP status to persisted storage
:param node_status: node's OTAP status to persist
:param network_address: network address the node belong to
Used to identify if some storage entry with same node address where overwritten between execution
:param node_address: OTAP status originator
:param travel_time_ms: OTAP status packet travel time in milliseconds
"""

# Create data to write to storage
storage_entry = {
OtapStatusDB.node_address: node_address,
OtapStatusDB.network_address: network_address,
OtapStatusDB.rx_timestamp_epoch_ms: node_status["ts"],
OtapStatusDB.tx_timestamp_epoch_ms: node_status["ts"] - travel_time_ms,
OtapStatusDB.scratchpad_stored_seq: node_status["seq"],
OtapStatusDB.scratchpad_stored_crc: node_status["crc"],
OtapStatusDB.scratchpad_stored_len: node_status["length"],
OtapStatusDB.scratchpad_stored_type: node_status["type"],
OtapStatusDB.scratchpad_stored_type_str: OtapStatusStorageFormatter().otap_type_to_str(node_status["type"]),
OtapStatusDB.scratchpad_stored_status: node_status["status"],
OtapStatusDB.scratchpad_stored_status_str: OtapStatusStorageFormatter().otap_status_to_str(node_status["status"]),
OtapStatusDB.wirepas_stack_version: OtapStatusStorageFormatter().component_version_to_str(node_status["stack_version"]),
OtapStatusDB.wirepas_stack_area_id: node_status["stack_area_id"]
}

try:
storage_entry[OtapStatusDB.app_version] = OtapStatusStorageFormatter().component_version_to_str(node_status["app_version"])
storage_entry[OtapStatusDB.app_area_id] = node_status["app_area_id"]
except KeyError:
# Fields not present in OTAP status
storage_entry[OtapStatusDB.app_version] = None
storage_entry[OtapStatusDB.app_area_id] = None

try:
storage_entry[OtapStatusDB.otap_action] = node_status["action"]
storage_entry[OtapStatusDB.otap_action_str] = OtapStatusStorageFormatter().otap_action_to_str(node_status["action"])
storage_entry[OtapStatusDB.otap_target_crc] = node_status["target_crc"]
storage_entry[OtapStatusDB.otap_target_seq] = node_status["target_seq"]
storage_entry[OtapStatusDB.otap_target_delay_m] = node_status["target_delay_m"]
storage_entry[OtapStatusDB.otap_remaining_delay_m] = node_status["remaining_delay_m"]
except KeyError:
# Fields not present in OTAP status
storage_entry[OtapStatusDB.otap_action] = None
storage_entry[OtapStatusDB.otap_action_str] = None
storage_entry[OtapStatusDB.otap_target_crc] = None
storage_entry[OtapStatusDB.otap_target_seq] = None
storage_entry[OtapStatusDB.otap_target_delay_m] = None
storage_entry[OtapStatusDB.otap_remaining_delay_m] = None

with OtapStatusDB._meta.database:

logging.debug(f"Writing node's {node_address} status: {storage_entry} for nw {network_address} and travel_time {travel_time_ms}")
try:
OtapStatusDB.insert(storage_entry).on_conflict(
conflict_target=[OtapStatusDB.node_address],
preserve=[OtapStatusDB.node_address],
update=storage_entry
).execute()

logging.debug(f"Done writing entry.")
except Exception as e:
logging.error(f"Could not write entry. Reason: {e}")


class OtapStatusStorageFormatter:
""" Class to format data to proper storage type
"""
def otap_type_to_str(self, type_int):
map_to_str = ("Blank", "Present", "Process")
try:
return map_to_str[type_int]
except IndexError:
logging.error(f"Invalid scratchpad type value received: {type_int}")
return "Unknown"

def otap_status_to_str(self, status):
map_to_str = {
"0": "Success",
"1": "Flash error",
"2": "Invalid header",
"3": "Invalid CRC",
"4": "Auth error",
"5": "Decompression error",
"6": "No space",
"7": "Invalid file header",
"8": "Flash driver error",
"255": "New"
}

try:
return map_to_str[str(status)]
except (KeyError, TypeError):
logging.error(f"Invalid scratchpad status value received: {status}")
return "Unknown"

def component_version_to_str(self, version):
if version is not None:
try:
return f"{version[0]}.{version[1]}.{version[2]}.{version[3]}"
except IndexError:
logging.error(f"Invalid component version received: {version}")
return "x.x.x.x"
return version

def otap_action_to_str(self, otap_action):
map_to_str = {
"0": "no_otap",
"1": "propagate_only",
"2": "propagate_and_process",
"3": "propagate_and_process_with_delay",
"4": "legacy"
}

try:
return map_to_str[str(otap_action)]
except (KeyError, TypeError):
logging.error(f"Invalid scratchpad action value received: {otap_action}")
return "Unknown"


# 'peewee' documentation recommends to create this class to that
# any other database model definition will use the same storage
class _BaseModel(Model):
class Meta:
database = _DB_CONN

class OtapStatusDB(_BaseModel):
"""OTAP status database model definition: will store the nodes' status info
Note: some fields allowed to be NULL as dependent on remote API status packet version.
"""

# Mandatory fields
node_address = BigIntegerField(primary_key=True)
network_address = IntegerField()
rx_timestamp_epoch_ms = IntegerField()
tx_timestamp_epoch_ms = IntegerField()
scratchpad_stored_seq = IntegerField()
scratchpad_stored_crc = IntegerField()
scratchpad_stored_len = IntegerField()
scratchpad_stored_type = IntegerField()
scratchpad_stored_type_str = FixedCharField(_MAX_LEN_FIELD_SCRATCHPAD_ST_TYPE)
scratchpad_stored_status = IntegerField()
scratchpad_stored_status_str = FixedCharField(_MAX_LEN_FIELD_SCRATCHPAD_ST_STATUS)
wirepas_stack_version = FixedCharField(_MAX_LEN_FIELD_WIREPAS_STACK_VERSION)
wirepas_stack_area_id = BigIntegerField()
# Optional fields
app_version = FixedCharField(_MAX_LEN_FIELD_APP_VERSION, null=True)
app_area_id = BigIntegerField(null=True)
otap_action = IntegerField(null=True)
otap_action_str = FixedCharField(_MAX_LEN_FIELD_OTAP_ACTION, null=True)
otap_target_crc = IntegerField(null=True)
otap_target_seq = IntegerField(null=True)
otap_target_delay_m = IntegerField(null=True)
otap_remaining_delay_m = IntegerField(null=True)

class Meta:
table_name = _DB_DATA_TABLE_NAME

class OtapstatusDBMetadata(_BaseModel):
"""OTAP status metadata model definition
"""
creator = FixedCharField(_MAX_LEN_FIELD_DB_CREATOR)
schema_version = IntegerField()

class Meta:
table_name = _DB_METADATA_TABLE_NAME

Loading

0 comments on commit 355163a

Please sign in to comment.