Skip to content

Commit

Permalink
Merge pull request #107 from CarletonURocketry/eh/document
Browse files Browse the repository at this point in the history
Document and refactor ground-station code
  • Loading branch information
linguini1 authored Aug 7, 2024
2 parents 94ef982 + 8b749ac commit 3047978
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 281 deletions.
2 changes: 1 addition & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
}

# Constants
VERSION: str = "0.6.0-DEV"
VERSION: str = "1.0.0-DEV"


class ShutdownException(Exception):
Expand Down
156 changes: 5 additions & 151 deletions modules/telemetry/json_packets.py → modules/telemetry/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,164 +7,18 @@
import logging
import os
from dataclasses import dataclass, field
from enum import IntEnum
from pathlib import Path
from typing import TypeAlias
from modules.telemetry.telemetry_utils import ParsedBlock

# Constants
MISSION_EXTENSION: str = "mission"
MISSIONS_DIR: str = "missions"
from modules.telemetry.parsing_utils import ParsedBlock

# Aliases
OutputFormat: TypeAlias = dict[str, dict[str, dict[str, dict[str, str]]]]

logger = logging.getLogger(__name__)


# Helper classes
class MissionState(IntEnum):
"""The state of the mission."""

DNE = -1
LIVE = 0
RECORDED = 1


class ReplayState(IntEnum):
"""Represents the state of the mission being currently replayed."""

DNE = -1
PAUSED = 0
PLAYING = 1
FINISHED = 2


@dataclass
class MissionEntry:
"""Represents an available mission for replay."""

name: str
length: int = 0
filepath: Path = Path.cwd() / MISSIONS_DIR
version: int = 1
valid: bool = False

def __iter__(self):
yield "name", self.name
yield "length", self.length
yield "version", self.version

def __len__(self) -> int:
return self.length

def __bool__(self):
return self.valid


# Status packet classes
@dataclass
class SerialData:
"""The serial data packet for the telemetry process."""

available_ports: list[str] = field(default_factory=list)

def __iter__(self):
yield "available_ports", self.available_ports


@dataclass
class RN2483RadioData:
"""The RN2483 radio data packet for the telemetry process."""

connected: bool = False
connected_port: str = ""
snr: int = 0 # TODO SET SNR

def __iter__(self):
yield "connected", self.connected,
yield "connected_port", self.connected_port
yield "snr", self.snr


@dataclass
class MissionData:
"""The mission data packet for the telemetry process."""

name: str = ""
epoch: int = -1
state: MissionState = MissionState.DNE
recording: bool = False
last_mission_time: int = -1

def __iter__(self):
yield "name", self.name,
yield "epoch", self.epoch,
yield "state", self.state.value,
yield "recording", self.recording


# Replay packet class
@dataclass
class ReplayData:
"""The replay data packet for the telemetry process."""

state: ReplayState = ReplayState.DNE
speed: float = 1.0
last_played_speed: float = 1.0
mission_files_list: list[Path] = field(default_factory=list)
mission_list: list[MissionEntry] = field(default_factory=list)

def __post_init__(self) -> None:
# Update the mission list on creation
self.update_mission_list()

def update_mission_list(self, missions_dir: Path = Path.cwd().joinpath(MISSIONS_DIR)) -> None:
"""Gets the available mission recordings from the mission folder."""

# TODO change this so that mission_extension and directory are not defined in multiple files
self.mission_files_list = [file for file in missions_dir.glob(f"*.{MISSION_EXTENSION}") if file.is_file()]

# Check each file to output its misc details
self.mission_list = []
for mission_file in self.mission_files_list:
self.mission_list.append(parse_mission_file(mission_file))

def __iter__(self):
yield "state", self.state
yield "speed", self.speed,
yield "mission_list", [dict(e) for e in self.mission_list]


@dataclass
class StatusData:
"""The status data packet for the telemetry process."""

mission: MissionData = field(default_factory=MissionData)
serial: SerialData = field(default_factory=SerialData)
rn2483_radio: RN2483RadioData = field(default_factory=RN2483RadioData)
replay: ReplayData = field(default_factory=ReplayData)

def __iter__(self):
yield "mission", dict(self.mission),
yield "serial", dict(self.serial),
yield "rn2483_radio", dict(self.rn2483_radio),
yield "replay", dict(self.replay),


def parse_mission_file(mission_file: Path) -> MissionEntry:
"""Obtains mission metadata from file"""

length = 0
with open(mission_file, "r") as file:
for _ in file:
length += 1

return MissionEntry(name=mission_file.stem, length=length, filepath=mission_file, version=1)


@dataclass
class TelemetryDataPacketBlock:
class TelemetryDataPacket:
"""A generic block object to store information for telemetry data
All stored values must be updated at once!"""

Expand Down Expand Up @@ -204,7 +58,7 @@ def clear(self) -> None:
self.stored_values = {key: [] for key in self.stored_values.keys()}

def __str__(self):
"""Returns a string representation of the TelemetryDataPacketBlock"""
"""Returns a string representation of the TelemetryDataPacket"""
return f"{self.__class__.__name__} -> time: {self.mission_time} ms, {self.stored_values}"

def __iter__(self):
Expand All @@ -229,7 +83,7 @@ def __init__(self, telemetry_buffer_size: int = 20):
self.decoder: list[dict[int, dict[str, str]]] = [{} for _ in range(5)]

self.last_mission_time: int = -1
self.output_blocks: dict[str, TelemetryDataPacketBlock] = {}
self.output_blocks: dict[str, TelemetryDataPacket] = {}
self.update_buffer: dict[str, dict[str, float | int | str | None]] = {}

# Read packet definition file
Expand All @@ -240,7 +94,7 @@ def __init__(self, telemetry_buffer_size: int = 20):
# Generate telemetry data packet from output specification
for key in output_format.keys():
telemetry_keys: list[str] = list(output_format[key].keys())
self.output_blocks[key] = TelemetryDataPacketBlock(stored_values={key: [] for key in telemetry_keys})
self.output_blocks[key] = TelemetryDataPacket(stored_values={key: [] for key in telemetry_keys})
self.update_buffer[key] = {key: None for key in telemetry_keys}

# Generate extremely efficient access decoder matrix
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional
import logging

Expand All @@ -14,36 +13,13 @@
import modules.telemetry.v1.data_block as v1db
from modules.misc.config import Config

MISSION_EXTENSION: str = "mission"
FILE_CREATION_ATTEMPT_LIMIT: int = 50
MIN_SUPPORTED_VERSION: int = 1
MAX_SUPPORTED_VERSION: int = 1

logger = logging.getLogger(__name__)


# Helper functions
def mission_path(mission_name: str, missions_dir: Path, file_suffix: int = 0) -> Path:
"""Returns the path to the mission file with the matching mission name."""

return missions_dir.joinpath(f"{mission_name}{'' if file_suffix == 0 else f'_{file_suffix}'}.{MISSION_EXTENSION}")


def get_filepath_for_proposed_name(mission_name: str, missions_dir: Path) -> Path:
"""Obtains filepath for proposed name, with a maximum of giving a suffix 50 times before failing."""
file_suffix = 1
missions_filepath = mission_path(mission_name, missions_dir)

while missions_filepath.is_file() and file_suffix < FILE_CREATION_ATTEMPT_LIMIT:
missions_filepath = mission_path(mission_name, missions_dir, file_suffix)
file_suffix += 1

if file_suffix >= FILE_CREATION_ATTEMPT_LIMIT:
raise ValueError(f"Too many mission files already exist with name {mission_name}.")

return missions_filepath


# Dataclasses that allow us to structure the telemetry data
@dataclass
class ParsedBlock:
"""Parsed block data from the telemetry process."""
Expand All @@ -62,52 +38,7 @@ class ParsedTransmission:
blocks: List[ParsedBlock]


def parse_radio_block(pkt_version: int, block_header: BlockHeader, hex_block_contents: str) -> Optional[ParsedBlock]:
"""
Parses telemetry payload blocks from either parsed packets or stored replays. Block contents are a hex string.
"""

# Working with hex strings until this point.
# Hex/Bytes Demarcation point
logger.debug(
f"Parsing v{pkt_version} type {block_header.message_type} subtype {block_header.message_subtype} contents: \
{hex_block_contents}"
)
block_bytes: bytes = bytes.fromhex(hex_block_contents)

try:
block_subtype = v1db.DataBlockSubtype(block_header.message_subtype)
except ValueError:
logger.error(f"Invalid data block subtype {block_header.message_subtype}!")
return

try:
# TODO Make an interface to support multiple v1/v2/v3 objects
block_contents = v1db.DataBlock.parse(block_subtype, block_bytes)
except NotImplementedError:
logger.warning(
f"Block parsing for type {block_header.message_type}, with subtype {block_header.message_subtype} not \
implemented!"
)
return
except v1db.DataBlockException as e:
logger.error(e)
logger.error(f"Block header: {block_header}")
logger.error(f"Block contents: {hex_block_contents}")
return

block_name = block_subtype.name.lower()

logger.debug(str(block_contents))

# TODO fix at some point
# if block == DataBlockSubtype.STATUS:
# self.status.rocket = jsp.RocketData.from_data_block(block)
# return

return ParsedBlock(block_name, block_header, dict(block_contents)) # type: ignore


# Parsing functions
def parse_rn2483_transmission(data: str, config: Config) -> Optional[ParsedTransmission]:
"""
Parses RN2483 Packets and extracts our telemetry payload blocks, returns parsed transmission object if packet
Expand Down Expand Up @@ -178,3 +109,51 @@ def from_approved_callsign(pkt_hdr: PacketHeader, approved_callsigns: dict[str,
return False

return True


def parse_radio_block(pkt_version: int, block_header: BlockHeader, hex_block_contents: str) -> Optional[ParsedBlock]:
"""
Parses telemetry payload blocks from either parsed packets or stored replays. Block contents are a hex string.
"""

# Working with hex strings until this point.
# Hex/Bytes Demarcation point
logger.debug(
f"Parsing v{pkt_version} type {block_header.message_type} subtype {block_header.message_subtype} contents: \
{hex_block_contents}"
)
block_bytes: bytes = bytes.fromhex(hex_block_contents)

# Convert message subtype string to enum
try:
block_subtype = v1db.DataBlockSubtype(block_header.message_subtype)
except ValueError:
logger.error(f"Invalid data block subtype {block_header.message_subtype}!")
return

# Use the appropriate parser for the block subtype enum
try:
# TODO Make an interface to support multiple v1/v2/v3 objects
block_contents = v1db.DataBlock.parse(block_subtype, block_bytes)
except NotImplementedError:
logger.warning(
f"Block parsing for type {block_header.message_type}, with subtype {block_header.message_subtype} not \
implemented!"
)
return
except v1db.DataBlockException as e:
logger.error(e)
logger.error(f"Block header: {block_header}")
logger.error(f"Block contents: {hex_block_contents}")
return

block_name = block_subtype.name.lower()

logger.debug(str(block_contents))

# TODO fix at some point
# if block == DataBlockSubtype.STATUS:
# self.status.rocket = jsp.RocketData.from_data_block(block)
# return

return ParsedBlock(block_name, block_header, dict(block_contents)) # type: ignore
Loading

0 comments on commit 3047978

Please sign in to comment.