Skip to content

Commit

Permalink
Enable adding unsupported device types via the DeviceManager (#262)
Browse files Browse the repository at this point in the history
* feat: Added the ability to connect to devices that are part of an unsupported device type category

* test: Add test for the `add_unsupported_device()` method
  • Loading branch information
nfelt14 authored Aug 2, 2024
1 parent c6e87cf commit 6887440
Show file tree
Hide file tree
Showing 28 changed files with 297 additions and 146 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ Valid subsections within a version are:

Things to be included in the next release go here.

### Added

- Added a new method to the `DeviceManager` class, `add_unsupported_device()`, which enables adding an unsupported device type.

---

## v2.1.0 (2024-07-31)
Expand Down
63 changes: 62 additions & 1 deletion examples/miscellaneous/custom_device_driver_support.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,76 @@
"""An example of external device support via a custom driver."""

from typing import Tuple, Union

from tm_devices import DeviceManager
from tm_devices.drivers import MSO5
from tm_devices.drivers.pi.pi_device import PIDevice
from tm_devices.drivers.pi.scopes.scope import Scope

# noinspection PyPep8Naming
from tm_devices.helpers import ReadOnlyCachedProperty as cached_property # noqa: N813


# Custom devices that inherit from a supported device type can be defined by inheriting from the
# specific device type class. This custom class must implement all abstract methods defined by the
# abstract parent classes.
class CustomScope(Scope):
"""Custom scope class."""

# This is an abstract method that must be implemented by the custom device driver
@cached_property
def total_channels(self) -> int:
return 4

def custom_method(self, value: str) -> None:
"""Add a custom method to the custom driver."""
print(f"{self.name}, {value=}")


# Custom devices that do not inherit from a supported device type can be defined by inheriting from
# a parent class further up the inheritance tree. This custom class must implement all abstract
# methods defined by the abstract parent classes.
class CustomDevice(PIDevice):
"""A custom device that is not one of the officially supported devices."""

# Custom device types not officially supported need to define what type of device they are.
_DEVICE_TYPE = "CustomDevice"

# This is an abstract property that must be implemented by the custom device driver.
# NOTE: The implementation of this example was copied from the base Scope class.
@property
def all_channel_names_list(self) -> Tuple[str, ...]:
return tuple(f"CH{x+1}" for x in range(self.total_channels))

# This is an abstract property that must be implemented by the custom device driver.
@cached_property
def total_channels(self) -> int:
return 4

# This is an abstract method that must be implemented by the custom device driver.
def expect_esr(self, esr: Union[int, str], error_string: str = "") -> Tuple[bool, str]:
# The contents of this method would need to be properly implemented,
# this is just example code. :)
return True, ""

# This is an abstract method that must be implemented by the custom device driver.
def get_eventlog_status(self) -> Tuple[bool, str]:
# The contents of this method would need to be properly implemented,
# this is just example code. :)
return True, ""

def custom_device_method(self, value: int) -> None:
"""Add a custom method to the custom device driver."""
print(f"{self.name}, {value=}")


# For VISA devices, the model series is based on the model that is returned from
# the ``*IDN?`` query. (See the ``tm_devices.helpers.get_model_series()`` function for details)
# For REST API devices, the model series is provided via the ``device_driver`` parameter in
# the configuration file, environment variable, or python code.
CUSTOM_DEVICE_DRIVERS = { # A mapping of custom model series strings to Python driver classes
"CustomModelSeries": CustomScope,
"CustomDeviceModelSeries": CustomDevice,
}


Expand All @@ -27,10 +79,19 @@ def custom_method(self, value: str) -> None:
mso5: MSO5 = device_manager.add_scope("192.168.0.1")
# Add the custom scope
custom_scope: CustomScope = device_manager.add_scope("192.168.0.2")
# Add the custom device that is a device type not officially supported
# NOTE: If using a config file or environment variable to define a device that is unsupported,
# the `device_type` key must be set to "UNSUPPORTED".
custom_device: CustomDevice = device_manager.add_unsupported_device("192.168.0.3")

# Custom drivers inherit all methods
custom_scope.expect_esr(0) # check for no errors
custom_scope.cleanup() # cleanup the custom scope

# Custom drivers can also use added methods
custom_scope.custom_method("value")

# Custom device types still inherit methods from their parent classes, though device type
# specific functionality is not defined by default
custom_device.expect_esr(0) # check for no errors
# Custom devices can also use any custom methods added to the custom class
custom_device.custom_device_method(10)
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ order-by-type = false
"examples/**" = [
"S101" # Use of assert detected
]
"examples/miscellaneous/custom_device_driver_support.py" = [
"ARG002", # Unused method argument
"D102" # Missing docstring in public method
]
"src/tm_devices/commands/**" = [
"A003", # Class attribute is shadowing a python builtin
"D104", # Missing docstring in public package
Expand Down
105 changes: 88 additions & 17 deletions src/tm_devices/device_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from tm_devices.components import DMConfigParser
from tm_devices.drivers.api.rest_api.margin_testers.margin_tester import MarginTester
from tm_devices.drivers.api.rest_api.rest_api_device import RESTAPIDevice
from tm_devices.drivers.device import Device
from tm_devices.drivers.pi.data_acquisition_systems.data_acquisition_system import (
DataAcquisitionSystem,
)
Expand All @@ -40,6 +41,7 @@
DeviceTypes,
DMConfigOptions,
get_model_series,
PACKAGE_NAME,
print_with_timestamp,
PYVISA_PY_BACKEND,
SerialConfig,
Expand All @@ -62,37 +64,37 @@
from pyvisa.resources import MessageBasedResource
from typing_extensions import Self

from tm_devices.drivers.device import Device

####################################################################################################
# Type Aliases
####################################################################################################
AFGAlias = TypeVar("AFGAlias", bound=AFG, default=AFG) # pylint: disable=unexpected-keyword-arg,useless-suppression # TODO: remove pylint disable statement
AFGAlias = TypeVar("AFGAlias", bound=AFG, default=AFG)
"""An alias to a specific Arbitrary Function Generator Python driver."""
AWGAlias = TypeVar("AWGAlias", bound=AWG, default=AWG) # pylint: disable=unexpected-keyword-arg,useless-suppression # TODO: remove pylint disable statement
AWGAlias = TypeVar("AWGAlias", bound=AWG, default=AWG)
"""An alias to a specific Arbitrary Waveform Generator Python driver."""
DataAcquisitionSystemAlias = TypeVar( # pylint: disable=unexpected-keyword-arg,useless-suppression # TODO: remove pylint disable statement
DataAcquisitionSystemAlias = TypeVar(
"DataAcquisitionSystemAlias", bound=DataAcquisitionSystem, default=DataAcquisitionSystem
)
"""An alias to a specific Data Acquisition System Python driver."""
DigitalMultimeterAlias = TypeVar( # pylint: disable=unexpected-keyword-arg,useless-suppression # TODO: remove pylint disable statement
DigitalMultimeterAlias = TypeVar(
"DigitalMultimeterAlias", bound=DigitalMultimeter, default=DigitalMultimeter
)
"""An alias to a specific Digital Multimeter Python driver."""
ScopeAlias = TypeVar("ScopeAlias", bound=Scope, default=Scope) # pylint: disable=unexpected-keyword-arg,useless-suppression # TODO: remove pylint disable statement
ScopeAlias = TypeVar("ScopeAlias", bound=Scope, default=Scope)
"""An alias to a specific Scope driver."""
MarginTesterAlias = TypeVar("MarginTesterAlias", bound=MarginTester, default=MarginTester) # pylint: disable=unexpected-keyword-arg,useless-suppression # TODO: remove pylint disable statement
MarginTesterAlias = TypeVar("MarginTesterAlias", bound=MarginTester, default=MarginTester)
"""An alias to a specific Margin Tester Python driver."""
PowerSupplyUnitAlias = TypeVar( # pylint: disable=unexpected-keyword-arg,useless-suppression # TODO: remove pylint disable statement
PowerSupplyUnitAlias = TypeVar(
"PowerSupplyUnitAlias", bound=PowerSupplyUnit, default=PowerSupplyUnit
)
"""An alias to a specific Power Supply Unit Python driver."""
SourceMeasureUnitAlias = TypeVar( # pylint: disable=unexpected-keyword-arg,useless-suppression # TODO: remove pylint disable statement
SourceMeasureUnitAlias = TypeVar(
"SourceMeasureUnitAlias", bound=SourceMeasureUnit, default=SourceMeasureUnit
)
"""An alias to a specific Source Measure Unit Python driver."""
SystemsSwitchAlias = TypeVar("SystemsSwitchAlias", bound=SystemsSwitch, default=SystemsSwitch) # pylint: disable=unexpected-keyword-arg,useless-suppression # TODO: remove pylint disable statement
SystemsSwitchAlias = TypeVar("SystemsSwitchAlias", bound=SystemsSwitch, default=SystemsSwitch)
"""An alias to a specific Systems Switch Python driver."""
UnsupportedDeviceAlias = TypeVar("UnsupportedDeviceAlias", bound=Device, default=Device)
"""An alias to a custom device driver for an unsupported device type."""


####################################################################################################
Expand Down Expand Up @@ -618,6 +620,48 @@ def add_ss(
),
)

def add_unsupported_device(
self,
address: str,
*,
alias: Optional[str] = None,
connection_type: Optional[str] = None,
port: Optional[int] = None,
gpib_board_number: Optional[int] = None,
) -> UnsupportedDeviceAlias:
"""Add a custom device to the DeviceManager that is not an officially supported device type.
!!! warning
This should not be used unless absolutely necessary.
Args:
address: The address of the device, either an IP address or hostname. If the connection
type is ``"USB"`` then the address must be specified as ``"<model>-<serial>"``.
alias: An optional alias to use to refer to the device. If no alias is provided,
the device type and number can be used to access the device instead.
connection_type: The type of connection to use for VISA, defaults to TCPIP, not needed
when the address is a visa resource expression since the connection type is parsed
from the address string.
port: The port to use when creating a socket connection.
gpib_board_number: The GPIB board number (also referred to as a controller) to be used
when making a GPIB connection (defaults to 0).
Returns:
The custom device driver.
"""
self.__protect_access()
return cast(
UnsupportedDeviceAlias,
self._add_device(
device_type=DeviceTypes.UNSUPPORTED.value,
address=address,
alias=alias,
connection_type=connection_type,
port=port,
gpib_board_number=gpib_board_number,
),
)

def cleanup_all_devices(self) -> None:
"""Cleanup and reset all devices."""
self.__protect_access()
Expand Down Expand Up @@ -818,7 +862,11 @@ def get_device(
message = f"{device_name} was not found in the device driver dictionary."
raise LookupError(message) from error
# double check that the device is the correct type
if device_type is not None and device.device_type != device_type.upper():
if (
device_type is not None
and device.device_type != device_type.upper()
and device.config_entry.device_type != DeviceTypes.UNSUPPORTED
):
message = (
f'A device of type "{device_type}" was specified to be accessed, '
f'but the accessed device type is actually of type "{device.device_type}".'
Expand Down Expand Up @@ -954,7 +1002,7 @@ def load_config_file(self, config_file_path: Union[str, os.PathLike[str]]) -> No
print_with_timestamp("Opening Connections to Devices")
for device_name, device_config in self.__config.devices.items():
if device_name not in self.__devices:
self.__create_device(device_name, device_config)
self.__create_device(device_name, device_config, 3)

def open(self) -> bool:
"""Reopen all devices if the DeviceManager has been previously closed.
Expand All @@ -974,7 +1022,7 @@ def open(self) -> bool:
if self.__config.devices:
print_with_timestamp("Opening Connections to Devices")
for device_name, device_config in self.__config.devices.items():
self.__create_device(device_name, device_config)
self.__create_device(device_name, device_config, 3)
if self.__setup_cleanup_enabled:
self.cleanup_all_devices()
self.__is_open = True
Expand Down Expand Up @@ -1107,7 +1155,7 @@ def _add_device( # noqa: PLR0913
config_dict["gpib_board_number"] = gpib_board_number
new_device_name, new_device_config = self.__config.add_device(**config_dict) # pyright: ignore[reportArgumentType]

return self.__create_device(new_device_name, new_device_config)
return self.__create_device(new_device_name, new_device_config, 4)

@staticmethod
def __clear_visa_output_buffer_and_get_idn(visa_resource: MessageBasedResource) -> str:
Expand Down Expand Up @@ -1179,13 +1227,17 @@ def __clear_visa_output_buffer_and_get_idn(visa_resource: MessageBasedResource)
return idn_response

def __create_device(
self, device_config_name: str, device_config: DeviceConfigEntry
self,
device_config_name: str,
device_config: DeviceConfigEntry,
warning_stacklevel: int,
) -> Union[RESTAPIDevice, PIDevice]:
"""Create a new device driver and add it to the device dictionary.
Args:
device_config_name: The name returned when creating the device_config.
device_config: The dataclass holding the device configuration information.
warning_stacklevel: The stacklevel of the warning to raise for unsupported device types.
Returns:
The created device driver.
Expand All @@ -1207,6 +1259,14 @@ def __create_device(
device_drivers = DEVICE_DRIVER_MODEL_MAPPING

alias_string = f' "{device_config.alias}"' if device_config.alias else ""
if device_config.device_type == DeviceTypes.UNSUPPORTED:
warnings.warn(
f"An unsupported device type is being added to the {self.__class__.__name__}. "
f"Not all functionality will be available in the device driver. "
f"Please consider contributing to {PACKAGE_NAME} to implement official "
f"support for this device type.",
stacklevel=warning_stacklevel,
)
print_with_timestamp(f"Creating Connection to {device_config_name}{alias_string}")
new_device: Union[RESTAPIDevice, PIDevice]
if device_config.connection_type == ConnectionTypes.REST_API:
Expand Down Expand Up @@ -1239,9 +1299,20 @@ def __create_device(
self.__devices[device_config_name] = new_device
if device_config.alias:
self.__devices[device_config.alias] = new_device
if new_device.config_entry.device_type == DeviceTypes.UNSUPPORTED:
# Add an alias to the AliasDict which contains the device_type that the custom device
# driver defines, which may be different from the device type defined in the config,
# which is "UNSUPPORTED". This allows the device to be removed from the config and
# DeviceManager when necessary.
self.__devices[f"{new_device.device_type} {new_device.device_number}".upper()] = (
new_device
)

# double check created device is correct type
if new_device.device_type != new_device.config_entry.device_type.value:
if (
new_device.device_type != new_device.config_entry.device_type.value
and new_device.config_entry.device_type != DeviceTypes.UNSUPPORTED
):
self.remove_device(
alias=new_device.config_entry.alias,
device_type=new_device.config_entry.device_type.value,
Expand Down
4 changes: 0 additions & 4 deletions src/tm_devices/drivers/api/rest_api/margin_testers/tmt4.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,3 @@ def _check_api_connection(self) -> bool:
def _cleanup(self) -> None:
"""Perform the cleanup defined for the device."""
# TODO: implement

def _reboot(self) -> None:
"""Reboot the device."""
# TODO: implement
1 change: 0 additions & 1 deletion src/tm_devices/drivers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ def _open(self) -> bool:
A boolean indicating if device connected successfully.
"""

@abstractmethod
def _reboot(self) -> None:
"""Perform the actual rebooting code."""
raise NotImplementedError(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
"""Base Data Acquisition (DAQ) device driver module."""

import inspect

from abc import ABC

from tm_devices.drivers.device import family_base_class
Expand All @@ -18,14 +16,3 @@ class DataAcquisitionSystem(TSPDevice, ABC):
################################################################################################
# Private Methods
################################################################################################
def _reboot(self) -> None:
"""Perform the actual rebooting code.
Raises:
NotImplementedError: Indicates the current driver has not implemented this method.
"""
# TODO: implement
raise NotImplementedError(
f"``.{inspect.currentframe().f_code.co_name}()``" # pyright: ignore[reportOptionalMemberAccess]
f" is not yet implemented for the {self.__class__.__name__} driver"
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
"""Base Digital Multimeter (DMM) device driver module."""

import inspect

from abc import ABC

from tm_devices.drivers.pi.tsp_device import TSPDevice
Expand All @@ -24,14 +22,3 @@ class DigitalMultimeter(TSPDevice, ABC):
################################################################################################
# Private Methods
################################################################################################
def _reboot(self) -> None:
"""Perform the actual rebooting code.
Raises:
NotImplementedError: Indicates the current driver has not implemented this method.
"""
# TODO: implement
raise NotImplementedError(
f"``.{inspect.currentframe().f_code.co_name}()``" # pyright: ignore[reportOptionalMemberAccess]
f" is not yet implemented for the {self.__class__.__name__} driver"
)
Loading

0 comments on commit 6887440

Please sign in to comment.