Skip to content

Commit

Permalink
Apply suggestions from review, move Get command outside of function, …
Browse files Browse the repository at this point in the history
…add paramterized tests
  • Loading branch information
jwlodek committed Nov 15, 2024
1 parent 7b6f2e1 commit 5c8b2d4
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 17 deletions.
43 changes: 26 additions & 17 deletions src/pandablocks_ioc/ioc.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,32 +177,38 @@ def create_softioc(
asyncio.run_coroutine_threadsafe(client.close(), dispatcher.loop).result()


async def get_panda_versions(
client: AsyncioClient,
) -> dict[EpicsName, str]:
"""Function that gets version information from the PandA using the IDN command
def get_panda_versions(idn_repsonse: str) -> dict[EpicsName, str]:
"""Function that parses version info from the PandA's response to the IDN command
See: https://pandablocks-server.readthedocs.io/en/latest/commands.html#system-commands
Args:
client (AsyncioClient): Client used for commuication with the PandA
idn_response (str): Response from PandA to Get(*IDN) command
Returns:
Dict[str, str]: Dictionary mapping firmware name to version
dict[EpicsName, str]: Dictionary mapping firmware record name to version
"""

idn = await client.send(Get("*IDN"))

# Currently, IDN reports sw, fpga, and rootfs versions
firmware_versions = {"PandA SW": "Unknown", "FPGA": "Unknown", "rootfs": "Unknown"}

# If the *IDN response contains too many keys, break and leave versions as "Unknown"
if len(idn.split(":")) / 2 > len(list(firmware_versions.keys())):
logging.error(f"Version string {idn} recieved from PandA could not be parsed!")
# Since spaces are used to deliminate versions and can also be in the keys and
# values, if an additional key is present that we don't explicitly handle,
# our approach of using regex matching will not work.
if sum(name in idn_repsonse for name in firmware_versions) < idn_repsonse.count(
":"
):
logging.error(
f"Recieved unexpected version numbers in version string {idn_repsonse}!"
)
else:
for firmware_name in firmware_versions:
pattern = re.compile(
rf'{re.escape(firmware_name)}:\s*([^:]+?)(?=\s*\b(?:{"|".join(map(re.escape, firmware_versions))}):|$)' # noqa: E501
rf'{re.escape(firmware_name)}:\s*([^:]+?)(?=\s*\b(?: \
{"|".join(map(re.escape, firmware_versions))}):|$)'
)
if match := pattern.search(idn):
if match := pattern.search(idn_repsonse):
firmware_versions[firmware_name] = match.group(1).strip()
logging.info(
f"{firmware_name} Version: {firmware_versions[firmware_name]}"
Expand Down Expand Up @@ -1865,11 +1871,11 @@ def create_block_records(

return record_dict

def create_version_records(self, fw_vers_dict: dict[EpicsName, str]):
def create_version_records(self, firmware_versions: dict[EpicsName, str]):
"""Creates handful of records for tracking versions of IOC/Firmware via EPICS
Args:
fw_vers_dict (dict[str, str]): Dictionary mapping firmwares to versions
firmware_versions (dict[str, str]): Dictionary mapping firmwares to versions
"""

system_block_prefix = "SYSTEM"
Expand All @@ -1885,7 +1891,7 @@ def create_version_records(self, fw_vers_dict: dict[EpicsName, str]):
builder.stringIn,
)

for firmware_name, version in fw_vers_dict.items():
for firmware_name, version in firmware_versions.items():
firmware_record_name = EpicsName(
system_block_prefix + f":{firmware_name}_VERSION"
)
Expand Down Expand Up @@ -1926,15 +1932,18 @@ async def create_records(
"""Query the PandA and create the relevant records based on the information
returned"""

fw_vers_dict = await get_panda_versions(client)
# Get version information from PandA using IDN command
idn_response = await client.send(Get("*IDN"))
fw_vers_dict = get_panda_versions(idn_response)

(panda_dict, all_values_dict) = await introspect_panda(client)

# Dictionary containing every record of every type
all_records: dict[EpicsName, RecordInfo] = {}

record_factory = IocRecordFactory(client, record_prefix, all_values_dict)

# Add some top level records for version of IOC, FPGA, and software
# Add records for version of IOC, FPGA, and software to SYSTEM block
record_factory.create_version_records(fw_vers_dict)

# For each field in each block, create block_num records of each field
Expand Down
68 changes: 68 additions & 0 deletions tests/test_ioc.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
StringRecordLabelValidator,
_RecordUpdater,
_TimeRecordUpdater,
get_panda_versions,
update,
)

Expand Down Expand Up @@ -840,3 +841,70 @@ class MockConnectionStatus:
# unreliable number of calls to the set method.
record_info.record.set.assert_any_call(True)
record_info.record.set.assert_any_call(0)


@pytest.mark.parametrize(
"sample_idn_response, expected_output, expected_log_messages",
[
(
"PandA SW: 3.0-11-g6422090 FPGA: 3.0.0C4 86e5f0a2 "
"07d202f8 rootfs: PandA 3.1a1-1-g22fdd94",
{
EpicsName("PANDA_SW"): "3.0-11-g6422090",
EpicsName("FPGA"): "3.0.0C4 86e5f0a2 07d202f8",
EpicsName("ROOTFS"): "PandA 3.1a1-1-g22fdd94",
},
[],
),
(
"PandA SW: 3.0-11-g6422090 FPGA: 3.0.0C4 86e5f0a2 07d202f8",
{
EpicsName("PANDA_SW"): "3.0-11-g6422090",
EpicsName("FPGA"): "3.0.0C4 86e5f0a2 07d202f8",
EpicsName("ROOTFS"): "Unknown",
},
["Failed to get rootfs version information!"],
),
(
"PandA SW: 3.0-11-g6422090 rootfs: PandA 3.1a1-1-g22fdd94",
{
EpicsName("PANDA_SW"): "3.0-11-g6422090",
EpicsName("FPGA"): "Unknown",
EpicsName("ROOTFS"): "PandA 3.1a1-1-g22fdd94",
},
["Failed to get FPGA version information!"],
),
(
"",
{
EpicsName("PANDA_SW"): "Unknown",
EpicsName("FPGA"): "Unknown",
EpicsName("ROOTFS"): "Unknown",
},
[
"Failed to get PandA SW version information!",
"Failed to get FPGA version information!",
"Failed to get rootfs version information!",
],
),
(
"FPGA: 3.0.0C4 86e5f0a2 07d202f8 "
"Hello World: 12345 rootfs: PandA 3.1a1-1-g22fdd94",
{
EpicsName("PANDA_SW"): "Unknown",
EpicsName("FPGA"): "Unknown",
EpicsName("ROOTFS"): "Unknown",
},
[
"Recieved unexpected version numbers",
],
),
],
)
def test_get_version_information(
sample_idn_response, expected_output, expected_log_messages, caplog
):
parsed_firmware_versions = get_panda_versions(sample_idn_response)
assert parsed_firmware_versions == expected_output
for log_message in expected_log_messages:
assert log_message in caplog.text

0 comments on commit 5c8b2d4

Please sign in to comment.