diff --git a/.github/workflows/pynetbox.yaml b/.github/workflows/pynetbox.yaml new file mode 100644 index 0000000..92f6226 --- /dev/null +++ b/.github/workflows/pynetbox.yaml @@ -0,0 +1,45 @@ +name: Pylint-Tests-Codecov + +on: + push: + branches: + - master + pull_request: + paths: + - "pynetbox_data_uploader/**" + - ".github/workflows/pynetbox.yaml" + +jobs: + Pylint-Tests-Codecov: + runs-on: ubuntu-20.04 + strategy: + matrix: + python-version: ["3.x"] + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: "pip" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + cd pynetbox_query + pip install -r requirements.txt + + - name: Analyse with pylint + run: cd pynetbox_query && pylint $(git ls-files '*.py') + + - name: Run tests and collect coverage + run: cd pynetbox_query && python3 -m pytest + + - name: Run tests and collect coverage + run: cd pynetbox_query && python3 -m pytest . --cov-report xml:coverage.xml --cov + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v4 + with: + token: ${{secrets.CODECOV_TOKEN}} + files: ./pynetbox_query/coverage.xml \ No newline at end of file diff --git a/.github/workflows/reverse_ssl_cert_chain.yaml b/.github/workflows/reverse_ssl_cert_chain.yaml new file mode 100644 index 0000000..1f86937 --- /dev/null +++ b/.github/workflows/reverse_ssl_cert_chain.yaml @@ -0,0 +1,45 @@ +name: Pylint-Tests-Codecov + +on: + push: + branches: + - master + pull_request: + paths: + - "reverse_ssl_cert_chain/**" + - ".github/workflows/reverse_ssl_cert_chain.yaml" + +jobs: + Pylint-Tests-Codecov: + runs-on: ubuntu-20.04 + strategy: + matrix: + python-version: ["3.x"] + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: "pip" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + cd reverse_ssl_cert_chain + pip install -r requirements.txt + + - name: Analyse with pylint + run: cd reverse_ssl_cert_chain && pylint $(git ls-files '*.py') + + - name: Run tests and collect coverage + run: cd reverse_ssl_cert_chain && python3 -m pytest + + - name: Run tests and collect coverage + run: cd reverse_ssl_cert_chain && python3 -m pytest . --cov-report xml:coverage.xml --cov + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v4 + with: + token: ${{ secrets.CODECOV_TOKEN }} + files: ./reverse_ssl_cert_chain/coverage.xml \ No newline at end of file diff --git a/README.md b/README.md index 1820681..cb266db 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,12 @@ # cloud-ops-tools A collection of tools used by the cloud ops teams + +## pynetbox_query + +A Python package to bulk upload systems data to Netbox from files creating devices and interfaces. +[More Here](pynetbox_query/) + +## reverse_ssl_cert_chain + +A Python script to reverse the SSL certificate chain order. For example, a certificate as CA -> Root would output as Root -> CA. +[More Here](reverse_ssl_cert_chain/) diff --git a/pynetbox_query/.coveragerc b/pynetbox_query/.coveragerc new file mode 100644 index 0000000..f4a5869 --- /dev/null +++ b/pynetbox_query/.coveragerc @@ -0,0 +1,4 @@ +[report] + +exclude_lines = + if __name__ == .__main__.: diff --git a/pynetbox_query/.pylintrc b/pynetbox_query/.pylintrc new file mode 100644 index 0000000..c056933 --- /dev/null +++ b/pynetbox_query/.pylintrc @@ -0,0 +1,10 @@ +[FORMAT] +# Black will enforce 88 chars on Python code +# this will enforce 120 chars on docs / comments +max-line-length=120 + +# Disable various warnings: +# C0114: Missing module string - we don't need module strings for the small repo +# W0212: Access to protected members - Don't know how to fix this :) + +disable=C0114, W0212 diff --git a/pynetbox_query/pynetboxquery/__init__.py b/pynetbox_query/pynetboxquery/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pynetbox_query/pynetboxquery/__main__.py b/pynetbox_query/pynetboxquery/__main__.py new file mode 100644 index 0000000..afcbfcf --- /dev/null +++ b/pynetbox_query/pynetboxquery/__main__.py @@ -0,0 +1,24 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from importlib import import_module +import sys +from pynetboxquery.utils.error_classes import UserMethodNotFoundError + + +def main(): + """ + This function will run the correct user method for the action specified in the CLI. + """ + user_methods_names = ["upload_devices_to_netbox", "validate_data_fields_in_netbox"] + for user_method in user_methods_names: + user_method_module = import_module(f"pynetboxquery.user_methods.{user_method}") + user_method_class = getattr(user_method_module, "Main")() + aliases = user_method_class.aliases() + if sys.argv[1] in aliases: + user_method_module.Main().main() + return + raise UserMethodNotFoundError(f"The user method {sys.argv[1]} was not found.") + + +if __name__ == "__main__": + main() diff --git a/pynetbox_query/pynetboxquery/netbox_api/__init__.py b/pynetbox_query/pynetboxquery/netbox_api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pynetbox_query/pynetboxquery/netbox_api/netbox_connect.py b/pynetbox_query/pynetboxquery/netbox_api/netbox_connect.py new file mode 100644 index 0000000..f7a179b --- /dev/null +++ b/pynetbox_query/pynetboxquery/netbox_api/netbox_connect.py @@ -0,0 +1,13 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from pynetbox import api + + +def api_object(url: str, token: str) -> api: + """ + This function returns the Pynetbox Api object used to interact with Netbox. + :param url: The Netbox URL. + :param token: User Api token. + :return: The Pynetbox api object. + """ + return api(url, token) diff --git a/pynetbox_query/pynetboxquery/netbox_api/netbox_create.py b/pynetbox_query/pynetboxquery/netbox_api/netbox_create.py new file mode 100644 index 0000000..e39e98a --- /dev/null +++ b/pynetbox_query/pynetboxquery/netbox_api/netbox_create.py @@ -0,0 +1,33 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from typing import Union, Dict, List + + +class NetboxCreate: + """ + This class contains methods that will interact create objects in Netbox. + """ + + def __init__(self, api): + """ + This initialises the class with the api object to be used by methods. + """ + self.netbox = api + + def create_device(self, data: Union[Dict, List]) -> bool: + """ + This method uses the pynetbox Api to create a device in Netbox. + :param data: A list or a single dictionary containing data required to create devices in Netbox. + :return: Returns bool if the devices where made or not. + """ + devices = self.netbox.dcim.devices.create(data) + return bool(devices) + + def create_device_type(self, data: Union[Dict, List]) -> bool: + """ + This method creates a new device type in Netbox. + :param data: A list or single dictionary containing data required to create device types in Netbox. + :return: Returns bool if the device types where made or not. + """ + device_type = self.netbox.dcim.device_types.create(data) + return bool(device_type) diff --git a/pynetbox_query/pynetboxquery/netbox_api/netbox_get_id.py b/pynetbox_query/pynetboxquery/netbox_api/netbox_get_id.py new file mode 100644 index 0000000..b44d1d1 --- /dev/null +++ b/pynetbox_query/pynetboxquery/netbox_api/netbox_get_id.py @@ -0,0 +1,43 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from pynetboxquery.utils.device_dataclass import Device + + +# pylint: disable = R0903,C0115 +class NetboxGetId: + def __init__(self, api): + self.netbox = api + + def get_id(self, device: Device, attr: str) -> Device: + """ + This method queries Netbox for ID's of values. + :param device: The device dataclass. + :param attr: The attribute to query Netbox for. + :return: Returns an updated copy of the device dataclass. + """ + value = getattr(device, attr) + netbox_id = "" + if attr in ["status", "face", "airflow", "position", "name", "serial"]: + return getattr(device, attr) + match attr: + case "tenant": + netbox_id = self.netbox.tenancy.tenants.get(name=value).id + case "device_role": + netbox_id = self.netbox.dcim.device_roles.get(name=value).id + case "manufacturer": + netbox_id = self.netbox.dcim.manufacturers.get(name=value).id + case "device_type": + netbox_id = self.netbox.dcim.device_types.get(slug=value).id + case "site": + netbox_id = self.netbox.dcim.sites.get(name=value).id + case "location": + if isinstance(device.site, int): + site_slug = self.netbox.dcim.sites.get(id=device.site).slug + else: + site_slug = device.site.replace(" ", "-").lower() + netbox_id = self.netbox.dcim.locations.get( + name=value, site=site_slug + ).id + case "rack": + netbox_id = self.netbox.dcim.racks.get(name=value).id + return netbox_id diff --git a/pynetbox_query/pynetboxquery/netbox_api/validate_data.py b/pynetbox_query/pynetboxquery/netbox_api/validate_data.py new file mode 100644 index 0000000..c501641 --- /dev/null +++ b/pynetbox_query/pynetboxquery/netbox_api/validate_data.py @@ -0,0 +1,101 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from typing import List +from pynetbox import api +from pynetboxquery.utils.device_dataclass import Device + + +# Disabling this Pylint warning as it is unnecessary. +# pylint: disable = R0903 +class ValidateData: + """ + This class contains methods that check if things exist in Netbox or not. + """ + + def validate_data( + self, device_list: List[Device], netbox_api: api, fields: List[str] + ): + """ + This method will take a list of dataclasses. + Validate any data specified by the key word arguments. + :param device_list: A list of Device dataclasses containing the data. + :param netbox_api: The Api Object for Netbox. + :param fields: The Device fields to check. + """ + for field in fields: + results = self._call_validation_methods(device_list, netbox_api, field) + for result in results: + print(f"{result}\n") + + def _call_validation_methods( + self, device_list: List[Device], netbox_api: api, field: str + ) -> List[str]: + """ + This method will validate the field data by calling the correct validate method. + :param device_list: List of devices to validate. + :param netbox_api: The Api Object for Netbox. + :param field: Field to validate. + :return: Returns the results of the validation call. + """ + match field: + case "name": + device_names = [device.name for device in device_list] + results = self._check_list_device_name_in_netbox( + device_names, netbox_api + ) + case "device_type": + device_types = [device.device_type for device in device_list] + results = self._check_list_device_type_in_netbox( + device_types, netbox_api + ) + case _: + results = [f"Could not find a field for the argument: {field}."] + return results + + @staticmethod + def _check_device_name_in_netbox(device_name: str, netbox_api: api) -> bool: + """ + This method will check if a device exists in Netbox. + :param device_name: The name of the device. + :return: Returns bool. + """ + device = netbox_api.dcim.devices.get(name=device_name) + return bool(device) + + def _check_list_device_name_in_netbox( + self, device_names: List[str], netbox_api: api + ) -> List[str]: + """ + This method will call the validate method on each device name in the list and return the results. + :param device_names: List of device names to check. + :return: Results of the check. + """ + results = [] + for name in device_names: + in_netbox = self._check_device_name_in_netbox(name, netbox_api) + results += [f"Device {name} exists in Netbox: {in_netbox}."] + return results + + @staticmethod + def _check_device_type_in_netbox(device_type: str, netbox_api: api) -> bool: + """ + This method will check if a device type exists in Netbox. + :param device_type: The device type. + :return: Returns bool. + """ + device_type = netbox_api.dcim.device_types.get(slug=device_type) + return bool(device_type) + + def _check_list_device_type_in_netbox( + self, device_type_list: List[str], netbox_api: api + ) -> List[str]: + """ + This method will call the validate method on each device type in the list and return the results. + :param device_type_list: List of device types to check. + :return: Results of the check. + """ + results = [] + for device_type in device_type_list: + in_netbox = self._check_device_type_in_netbox(device_type, netbox_api) + results += [f"Device type {device_type} exists in Netbox: {in_netbox}."] + return results diff --git a/pynetbox_query/pynetboxquery/user_methods/__init__.py b/pynetbox_query/pynetboxquery/user_methods/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pynetbox_query/pynetboxquery/user_methods/abstract_user_method.py b/pynetbox_query/pynetboxquery/user_methods/abstract_user_method.py new file mode 100644 index 0000000..64fec25 --- /dev/null +++ b/pynetbox_query/pynetboxquery/user_methods/abstract_user_method.py @@ -0,0 +1,47 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from abc import ABC, abstractmethod +from typing import List, Dict +from argparse import ArgumentParser + + +class AbstractUserMethod(ABC): + """ + This Abstract class provides a template for user methods. + """ + + def _collect_kwargs(self) -> Dict: + """ + This method collects the arguments from the subparser into a dictionary of kwargs. + :return: Dictionary of kwargs. + """ + main_parser = self._subparser() + args = main_parser.parse_args() + kwargs = vars(args) + return kwargs + + @abstractmethod + def _subparser(self) -> ArgumentParser: + """ + This method creates a subparser with specific arguments to the user method. + :return: Returns the main parser that should contain the subparser information. + """ + + @staticmethod + @abstractmethod + def aliases() -> List[str]: + """Returns the aliases viable for this user_method.""" + + def main(self): + """ + This method gets the arguments and calls the run method with them. + """ + kwargs = self._collect_kwargs() + self._run(**kwargs) + + @staticmethod + @abstractmethod + def _run(url: str, token: str, file_path: str, **kwargs): + """ + This the main method in the user script. It contains all calls needed to perform the action. + """ diff --git a/pynetbox_query/pynetboxquery/user_methods/upload_devices_to_netbox.py b/pynetbox_query/pynetboxquery/user_methods/upload_devices_to_netbox.py new file mode 100644 index 0000000..b96ae24 --- /dev/null +++ b/pynetbox_query/pynetboxquery/user_methods/upload_devices_to_netbox.py @@ -0,0 +1,61 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from dataclasses import asdict +from pynetboxquery.utils.read_utils.read_file import ReadFile +from pynetboxquery.netbox_api.validate_data import ValidateData +from pynetboxquery.utils.query_device import QueryDevice +from pynetboxquery.netbox_api.netbox_create import NetboxCreate +from pynetboxquery.netbox_api.netbox_connect import api_object +from pynetboxquery.utils.parsers import Parsers +from pynetboxquery.user_methods.abstract_user_method import AbstractUserMethod + + +class Main(AbstractUserMethod): + """ + This class contains the run method to run the user script. + """ + + @staticmethod + def _run(url: str, token: str, file_path: str, **kwargs): + """ + This function does the following: + Create a Pynetbox Api Object with the users credentials. + Reads the file data into a list of Device dataclasses. + Validates the Device names and device types against Netbox. + Converts the data in the Devices to their Netbox ID's. + Changes those Device dataclasses into dictionaries. + Creates the Devices in Netbox. + :param url: The Netbox URL. + :param token: The user's Netbox api token. + :param file_path: The file path. + """ + api = api_object(url, token) + device_list = ReadFile().read_file(file_path, **kwargs) + ValidateData().validate_data( + device_list, api, **{"fields": ["name", "device_type"]} + ) + queried_devices = QueryDevice(api).query_list(device_list) + dictionary_devices = [asdict(device) for device in queried_devices] + NetboxCreate(api).create_device(dictionary_devices) + print("Devices added to Netbox.\n") + + def _subparser(self): + """ + This function creates the subparser for this user script inheriting the parent parser arguments. + """ + parent_parser, main_parser, subparsers = Parsers().arg_parser() + subparsers.add_parser( + "create_devices", + description="Create devices in Netbox from a file.", + usage="pynetboxquery create_devices ", + parents=[parent_parser], + aliases=self.aliases(), + ) + return main_parser + + @staticmethod + def aliases(): + """ + This function returns a list of aliases the script should be callable by. + """ + return ["create", "create_devices"] diff --git a/pynetbox_query/pynetboxquery/user_methods/validate_data_fields_in_netbox.py b/pynetbox_query/pynetboxquery/user_methods/validate_data_fields_in_netbox.py new file mode 100644 index 0000000..8d56046 --- /dev/null +++ b/pynetbox_query/pynetboxquery/user_methods/validate_data_fields_in_netbox.py @@ -0,0 +1,53 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from pynetboxquery.utils.parsers import Parsers +from pynetboxquery.netbox_api.validate_data import ValidateData +from pynetboxquery.utils.read_utils.read_file import ReadFile +from pynetboxquery.netbox_api.netbox_connect import api_object +from pynetboxquery.user_methods.abstract_user_method import AbstractUserMethod + + +class Main(AbstractUserMethod): + """ + This class contains the run method to run the user script. + """ + + @staticmethod + def _run(url: str, token: str, file_path: str, **kwargs): + """ + This function does the following: + Reads the file from the file path. + Creates a Pynetbox Api Object with the users credentials. + Validates all the fields specified by the user + :param url: The Netbox URL. + :param token: The users Netbox api token. + :param file_path: The file_path. + """ + device_list = ReadFile().read_file(file_path, **kwargs) + api = api_object(url, token) + ValidateData().validate_data(device_list, api, kwargs["fields"]) + + def _subparser(self): + """ + This function creates the subparser for this user script inheriting the parent parser arguments. + It also adds an argument to collect the fields specified in the command line. + """ + parent_parser, main_parser, subparsers = Parsers().arg_parser() + parser_validate_data_fields_in_netbox = subparsers.add_parser( + "validate_data_fields_in_netbox", + description="Check data fields values in Netbox from a file.", + usage="pynetboxquery validate_data_fields_in_netbox ", + parents=[parent_parser], + aliases=self.aliases(), + ) + parser_validate_data_fields_in_netbox.add_argument( + "fields", help="The fields to check in Netbox.", nargs="*" + ) + return main_parser + + @staticmethod + def aliases(): + """ + This function returns a list of aliases the script should be callable by. + """ + return ["validate", "validate_data_fields_in_netbox"] diff --git a/pynetbox_query/pynetboxquery/utils/__init__.py b/pynetbox_query/pynetboxquery/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pynetbox_query/pynetboxquery/utils/device_dataclass.py b/pynetbox_query/pynetboxquery/utils/device_dataclass.py new file mode 100644 index 0000000..00d7342 --- /dev/null +++ b/pynetbox_query/pynetboxquery/utils/device_dataclass.py @@ -0,0 +1,32 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from dataclasses import dataclass, fields +from typing import Optional + + +# pylint: disable = R0902 +@dataclass +class Device: + """ + This class instantiates device objects with the device data. + """ + + tenant: str + device_role: str + manufacturer: str + device_type: str + status: str + site: str + location: str + rack: str + position: str + name: str + serial: str + face: Optional[str] = None + airflow: Optional[str] = None + + def return_attrs(self): + """ + This method returns a list of the names of the fields above. + """ + return [field.name for field in list(fields(self))] diff --git a/pynetbox_query/pynetboxquery/utils/error_classes.py b/pynetbox_query/pynetboxquery/utils/error_classes.py new file mode 100644 index 0000000..0771534 --- /dev/null +++ b/pynetbox_query/pynetboxquery/utils/error_classes.py @@ -0,0 +1,34 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +# Disabling this Pylint error as these classes do not need Docstring. +# They are just errors +# pylint: disable = C0115 +"""Custom exceptions for the package.""" + + +class DeviceFoundError(Exception): + pass + + +class DeviceTypeNotFoundError(Exception): + pass + + +class FileTypeNotSupportedError(Exception): + pass + + +class DelimiterNotSpecifiedError(Exception): + pass + + +class SheetNameNotSpecifiedError(Exception): + pass + + +class ApiObjectNotParsedError(Exception): + pass + + +class UserMethodNotFoundError(Exception): + pass diff --git a/pynetbox_query/pynetboxquery/utils/parsers.py b/pynetbox_query/pynetboxquery/utils/parsers.py new file mode 100644 index 0000000..c5bb5b9 --- /dev/null +++ b/pynetbox_query/pynetboxquery/utils/parsers.py @@ -0,0 +1,42 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +import argparse + + +# Disabling this Pylint warning as it's irrelevant. +# pylint: disable = R0903 +class Parsers: + """ + This class contains the argparse methods for different commands. + """ + + def arg_parser(self): + """ + This function creates a parser object and adds 3 arguments to it. + This allows users to run the python file with arguments. Like a script. + """ + + parent_parser = self._parent_parser() + main_parser = argparse.ArgumentParser( + description="The main command. This cannot be run standalone and requires a subcommand to be provided.", + usage="pynetboxquery [command] [filepath] [url] [token] [kwargs]", + ) + subparsers = main_parser.add_subparsers( + dest="subparsers", + ) + + return parent_parser, main_parser, subparsers + + @staticmethod + def _parent_parser(): + parent_parser = argparse.ArgumentParser(add_help=False) + parent_parser.add_argument("url", help="The Netbox URL.") + parent_parser.add_argument("token", help="Your Netbox Token.") + parent_parser.add_argument("file_path", help="Your file path to csv files.") + parent_parser.add_argument( + "--delimiter", help="The separator in the text file." + ) + parent_parser.add_argument( + "--sheet-name", help="The sheet in the Excel Workbook to read from." + ) + return parent_parser diff --git a/pynetbox_query/pynetboxquery/utils/query_device.py b/pynetbox_query/pynetboxquery/utils/query_device.py new file mode 100644 index 0000000..4560fd9 --- /dev/null +++ b/pynetbox_query/pynetboxquery/utils/query_device.py @@ -0,0 +1,37 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from typing import List +from pynetboxquery.utils.device_dataclass import Device +from pynetboxquery.netbox_api.netbox_get_id import NetboxGetId + + +class QueryDevice: + """ + This class contains methods that update the device dataclasses with ID's from Netbox. + """ + + def __init__(self, api): + self.netbox = api + + def query_list(self, device_list: List[Device]) -> List[Device]: + """ + This method iterates through the list of Devices. + :param device_list: List of device dataclasses + :return: Returns an updated list of device dataclasses. + """ + new_device_list = [] + for device in device_list: + new_device = self.query_device(device) + new_device_list.append(new_device) + return new_device_list + + def query_device(self, device: Device) -> Device: + """ + This method calls the query method on each attribute of the device. + :param device: The device to get the values from. + :return: Returns the updated device. + """ + changes = {} + for attr in device.return_attrs(): + changes[attr] = NetboxGetId(self.netbox).get_id(device, attr) + return Device(**changes) diff --git a/pynetbox_query/pynetboxquery/utils/read_utils/__init__.py b/pynetbox_query/pynetboxquery/utils/read_utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pynetbox_query/pynetboxquery/utils/read_utils/read_abc.py b/pynetbox_query/pynetboxquery/utils/read_utils/read_abc.py new file mode 100644 index 0000000..8f21f55 --- /dev/null +++ b/pynetbox_query/pynetboxquery/utils/read_utils/read_abc.py @@ -0,0 +1,58 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from abc import ABC, abstractmethod +from pathlib import Path +from typing import List, Dict +from pynetboxquery.utils.device_dataclass import Device + + +# Disabling this pylint warning as it is not necessary. +# pylint: disable = R0903 +class ReadAbstractBase(ABC): + """ + This Abstract Base Class ensures any Read methods made contain at least all of this code. + """ + + def __init__(self, file_path, **kwargs): + """ + Assigns the attribute file path. Checks the fie path is valid. Validates any kwargs needed. + """ + self.file_path = file_path + self._check_file_path(self.file_path) + self._validate(kwargs) + + @abstractmethod + def read(self) -> List[Dict]: + """ + This method reads the contents of a file into a list od Device Dataclasses. + """ + + @staticmethod + @abstractmethod + def _validate(kwargs): + """ + This method checks if a certain argument is given in kwargs and if not raise an error. + """ + + @staticmethod + def _check_file_path(file_path: str): + """ + This method checks if the file path exists in the user's system. + A FileNotFoundError will be raised if the file path is invalid. + If the file path is valid nothing will happen. + We don't need to declare that the file path is valid. + :param file_path: The file path to the file + """ + file_path_valid = Path(file_path).exists() + if not file_path_valid: + raise FileNotFoundError + + @staticmethod + def _dict_to_dataclass(dictionary_list: List[Dict]) -> List[Device]: + """ + This method takes a list of dictionaries and converts them all into Device dataclasses + then returns the list. + :param dictionary_list: A list of dictionaries. + :return: A list of Device dataclasses. + """ + return [Device(**dictionary) for dictionary in dictionary_list] diff --git a/pynetbox_query/pynetboxquery/utils/read_utils/read_csv.py b/pynetbox_query/pynetboxquery/utils/read_utils/read_csv.py new file mode 100644 index 0000000..41cdf2f --- /dev/null +++ b/pynetbox_query/pynetboxquery/utils/read_utils/read_csv.py @@ -0,0 +1,30 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from csv import DictReader +from typing import List +from pynetboxquery.utils.device_dataclass import Device +from pynetboxquery.utils.read_utils.read_abc import ReadAbstractBase + + +# Disabling this pylint warning as it is not necessary. +# pylint: disable = R0903 +class ReadCSV(ReadAbstractBase): + """ + This class contains methods to read data from CSV files into a list of Device dataclasses. + """ + + def read(self) -> List[Device]: + """ + This method reads the contents of the csv file then returns a list of dictionaries + where each dictionary is a row of data with the keys being the column headers. + :return: A list of dictionaries. + """ + with open(self.file_path, mode="r", encoding="UTF-8") as file: + csv_reader = DictReader(file) + dictionary_list = list(csv_reader) + device_list = self._dict_to_dataclass(dictionary_list) + return device_list + + @staticmethod + def _validate(kwargs): + pass diff --git a/pynetbox_query/pynetboxquery/utils/read_utils/read_file.py b/pynetbox_query/pynetboxquery/utils/read_utils/read_file.py new file mode 100644 index 0000000..5c89b98 --- /dev/null +++ b/pynetbox_query/pynetboxquery/utils/read_utils/read_file.py @@ -0,0 +1,38 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from typing import List +from pynetboxquery.utils.device_dataclass import Device +from pynetboxquery.utils.error_classes import FileTypeNotSupportedError +from pynetboxquery.utils.read_utils.read_csv import ReadCSV +from pynetboxquery.utils.read_utils.read_txt import ReadTXT +from pynetboxquery.utils.read_utils.read_xlsx import ReadXLSX + + +# Disabling this pylint warning as it is not necessary. +# pylint: disable = R0903 +class ReadFile: + """ + This class + """ + + @staticmethod + def read_file(file_path, **kwargs) -> List[Device]: + """ + + :param file_path: + :param kwargs: + :return: + """ + file_type = file_path.split(".")[-1] + match file_type: + case "csv": + device_list = ReadCSV(file_path).read() + case "txt": + device_list = ReadTXT(file_path, **kwargs).read() + case "xlsx": + device_list = ReadXLSX(file_path, **kwargs).read() + case _: + raise FileTypeNotSupportedError( + f"The file type '.{file_type}' is not supported by the method." + ) + return device_list diff --git a/pynetbox_query/pynetboxquery/utils/read_utils/read_txt.py b/pynetbox_query/pynetboxquery/utils/read_utils/read_txt.py new file mode 100644 index 0000000..e74885c --- /dev/null +++ b/pynetbox_query/pynetboxquery/utils/read_utils/read_txt.py @@ -0,0 +1,42 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from csv import DictReader +from typing import List +from pynetboxquery.utils.device_dataclass import Device +from pynetboxquery.utils.error_classes import DelimiterNotSpecifiedError +from pynetboxquery.utils.read_utils.read_abc import ReadAbstractBase + + +# Disabling this pylint warning as it is not necessary. +# pylint: disable = R0903 +class ReadTXT(ReadAbstractBase): + """ + This class contains methods to read data from TXT files into a list of Device dataclasses. + """ + + def __init__(self, file_path, **kwargs): + super().__init__(file_path, **kwargs) + self.delimiter = kwargs["delimiter"].replace("\\t", "\t") + + @staticmethod + def _validate(kwargs): + """ + This method checks that the delimiter kwarg has been parsed. + If not raise an error. + """ + if "delimiter" not in kwargs: + raise DelimiterNotSpecifiedError( + "You must specify the delimiter in the text file." + ) + + def read(self) -> List[Device]: + """ + This method reads the contents of the text file then returns a list of dictionaries + where each dictionary is a row of data with the keys being the column headers. + :return: A list of dictionaries. + """ + with open(self.file_path, mode="r", encoding="UTF-8") as file: + file_reader = DictReader(file, delimiter=self.delimiter) + dictionary_list = list(file_reader) + device_list = self._dict_to_dataclass(dictionary_list) + return device_list diff --git a/pynetbox_query/pynetboxquery/utils/read_utils/read_xlsx.py b/pynetbox_query/pynetboxquery/utils/read_utils/read_xlsx.py new file mode 100644 index 0000000..4428d80 --- /dev/null +++ b/pynetbox_query/pynetboxquery/utils/read_utils/read_xlsx.py @@ -0,0 +1,41 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from typing import List +from pandas import read_excel +from pynetboxquery.utils.device_dataclass import Device +from pynetboxquery.utils.error_classes import SheetNameNotSpecifiedError +from pynetboxquery.utils.read_utils.read_abc import ReadAbstractBase + + +# Disabling this pylint warning as it is not necessary. +# pylint: disable = R0903 +class ReadXLSX(ReadAbstractBase): + """ + This class contains methods to read data from XLSX files into a list of Device dataclasses. + """ + + def __init__(self, file_path, **kwargs): + super().__init__(file_path, **kwargs) + self.sheet_name = kwargs["sheet_name"] + + @staticmethod + def _validate(kwargs): + """ + This method checks that the delimiter kwarg has been parsed. + If not raise an error. + """ + if "sheet_name" not in kwargs: + raise SheetNameNotSpecifiedError( + "You must specify the sheet name in the excel workbook." + ) + + def read(self) -> List[Device]: + """ + This method reads the contents of a Sheet in an Excel Workbook then returns a list of dictionaries + where each dictionary is a row of data with the keys being the column headers. + :return: A list of dictionaries. + """ + dataframe = read_excel(self.file_path, sheet_name=self.sheet_name) + dictionary_list = dataframe.to_dict(orient="records") + device_list = self._dict_to_dataclass(dictionary_list) + return device_list diff --git a/pynetbox_query/pytest.ini b/pynetbox_query/pytest.ini new file mode 100644 index 0000000..fe1723a --- /dev/null +++ b/pynetbox_query/pytest.ini @@ -0,0 +1,6 @@ +[pytest] +pythonpath = lib +testpaths = tests +python_files = *.py +python_functions = test_* +addopts = --ignore=setup.py \ No newline at end of file diff --git a/pynetbox_query/requirements.txt b/pynetbox_query/requirements.txt new file mode 100644 index 0000000..3e89aef Binary files /dev/null and b/pynetbox_query/requirements.txt differ diff --git a/pynetbox_query/setup.py b/pynetbox_query/setup.py new file mode 100644 index 0000000..d27b4fb --- /dev/null +++ b/pynetbox_query/setup.py @@ -0,0 +1,20 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from setuptools import setup, find_packages + +VERSION = "0.1.0" +DESCRIPTION = "python package for pynetbox tools" + +LONG_DESCRIPTION = """Python package to query Netbox.""" + +setup( + name="pynetboxQuery", + version=VERSION, + author="Kalibh Halford", + author_email="", + description=DESCRIPTION, + long_description=LONG_DESCRIPTION, + packages=find_packages(), + python_requires=">=3.10", + keywords=["python"], +) diff --git a/pynetbox_query/tests/conftest.py b/pynetbox_query/tests/conftest.py new file mode 100644 index 0000000..e9471eb --- /dev/null +++ b/pynetbox_query/tests/conftest.py @@ -0,0 +1,71 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from typing import Dict +from pytest import fixture +from pynetboxquery.utils.device_dataclass import Device + + +@fixture(scope="function", name="dict_to_device") +def dict_to_device_instance(): + """ + This fixture returns a helper function to create Device Dataclasses from dictionaries. + """ + + def func(dictionary: Dict) -> Device: + return Device(**dictionary) + + return func + + +@fixture(scope="function", name="mock_device") +def mock_device_fixture(dict_to_device): + """ + This method returns a device dataclass. + """ + device = { + "tenant": "t1", + "device_role": "dr1", + "manufacturer": "m1", + "device_type": "dt1", + "status": "st1", + "site": "si1", + "location": "l1", + "rack": "r1", + "face": "f1", + "airflow": "a1", + "position": "p1", + "name": "n1", + "serial": "se1", + } + return dict_to_device(device) + + +@fixture(scope="function", name="mock_device_2") +def mock_device_2_fixture(dict_to_device): + """ + This method returns a second device dataclass. + """ + device = { + "tenant": "t2", + "device_role": "dr2", + "manufacturer": "m2", + "device_type": "dt2", + "status": "st2", + "site": "si2", + "location": "l2", + "rack": "r2", + "face": "f2", + "airflow": "a2", + "position": "p2", + "name": "n2", + "serial": "se2", + } + return dict_to_device(device) + + +@fixture(scope="function", name="mock_device_list") +def mock_device_list_fixture(mock_device, mock_device_2): + """ + This fixture returns a list of mock device types. + """ + return [mock_device, mock_device_2] diff --git a/pynetbox_query/tests/test__main__.py b/pynetbox_query/tests/test__main__.py new file mode 100644 index 0000000..ff7ac1e --- /dev/null +++ b/pynetbox_query/tests/test__main__.py @@ -0,0 +1,39 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from unittest.mock import patch +from pytest import raises +from pynetboxquery.__main__ import main +from pynetboxquery.utils.error_classes import UserMethodNotFoundError + + +@patch("pynetboxquery.__main__.import_module") +@patch("pynetboxquery.__main__.getattr") +@patch("pynetboxquery.__main__.sys") +def test_main(mock_sys, mock_getattr, mock_import_module): + """ + This test ensures the main method for the correct user script is called for the mocked argument. + """ + mock_sys.argv.__getitem__.return_value = "upload_devices_to_netbox" + mock_getattr.return_value.return_value.aliases.return_value = [ + "upload_devices_to_netbox" + ] + main() + mock_import_module.assert_called_with( + "pynetboxquery.user_methods.upload_devices_to_netbox" + ) + mock_getattr.assert_called_with(mock_import_module.return_value, "Main") + mock_import_module.return_value.Main.return_value.main.assert_called_once_with() + + +def test_main_fail(): + """ + This test ensures the main function errors when the incorrect argument is given. + """ + with patch("pynetboxquery.__main__.sys"): + with patch("pynetboxquery.__main__.import_module"): + with patch("pynetboxquery.__main__.getattr") as mock_getattr: + mock_getattr.return_value.return_value.aliases.return_value = [ + "not_real_method" + ] + with raises(UserMethodNotFoundError): + main() diff --git a/pynetbox_query/tests/test_netbox_api/test_netbox_connect.py b/pynetbox_query/tests/test_netbox_api/test_netbox_connect.py new file mode 100644 index 0000000..f83ca22 --- /dev/null +++ b/pynetbox_query/tests/test_netbox_api/test_netbox_connect.py @@ -0,0 +1,16 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from unittest.mock import patch +from pynetboxquery.netbox_api.netbox_connect import api_object + + +@patch("pynetboxquery.netbox_api.netbox_connect.api") +def test_api_object(mock_api): + """ + This test checks that the Api object is returned. + """ + mock_url = "url" + mock_token = "token" + res = api_object(mock_url, mock_token) + mock_api.assert_called_once_with(mock_url, mock_token) + assert res == mock_api.return_value diff --git a/pynetbox_query/tests/test_netbox_api/test_netbox_create.py b/pynetbox_query/tests/test_netbox_api/test_netbox_create.py new file mode 100644 index 0000000..bc089c3 --- /dev/null +++ b/pynetbox_query/tests/test_netbox_api/test_netbox_create.py @@ -0,0 +1,62 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from unittest.mock import NonCallableMock +from dataclasses import asdict +from pytest import fixture +from pynetboxquery.netbox_api.netbox_create import NetboxCreate + + +@fixture(name="instance") +def instance_fixture(): + """ + This fixture method calls the class being tested. + :return: The class object. + """ + mock_api = NonCallableMock() + return NetboxCreate(mock_api) + + +def test_create_device_one(instance, mock_device): + """ + This test ensures the .create method is called once with the correct arguments. + """ + mock_device_dict = asdict(mock_device) + res = instance.create_device(mock_device_dict) + instance.netbox.dcim.devices.create.assert_called_once_with(mock_device_dict) + assert res + + +def test_create_device_many(instance, mock_device, mock_device_2): + """ + This test ensures the .create method is called once with the correct arguments. + """ + mock_device_dict = asdict(mock_device) + mock_device_2_dict = asdict(mock_device_2) + res = instance.create_device([mock_device_dict, mock_device_2_dict]) + instance.netbox.dcim.devices.create.assert_called_once_with( + [mock_device_dict, mock_device_2_dict] + ) + assert res + + +def test_create_device_type_one(instance): + """ + This test ensures the .create method is called once with the correct arguments. + """ + mock_device_type = "" + res = instance.create_device_type(mock_device_type) + instance.netbox.dcim.device_types.create.assert_called_once_with(mock_device_type) + assert res + + +def test_create_device_type_many(instance): + """ + This test ensures the .create method is called once with the correct arguments. + """ + mock_device_type = "" + mock_device_type_2 = "" + res = instance.create_device_type([mock_device_type, mock_device_type_2]) + instance.netbox.dcim.device_types.create.assert_called_once_with( + [mock_device_type, mock_device_type_2] + ) + assert res diff --git a/pynetbox_query/tests/test_netbox_api/test_netbox_get_id.py b/pynetbox_query/tests/test_netbox_api/test_netbox_get_id.py new file mode 100644 index 0000000..4fd2b73 --- /dev/null +++ b/pynetbox_query/tests/test_netbox_api/test_netbox_get_id.py @@ -0,0 +1,96 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from unittest.mock import patch +from pytest import fixture +from pynetboxquery.netbox_api.netbox_get_id import NetboxGetId + + +@fixture(name="instance") +def instance_fixture(): + """ + This function returns the class to be tested. + """ + mock_api = "" + return NetboxGetId(mock_api) + + +def test_get_id_tenant(instance, mock_device): + """ + This test ensures the correct case is matched for the field. + """ + with patch.object(instance, "netbox") as mock_netbox: + res = instance.get_id(mock_device, "tenant") + assert res == mock_netbox.tenancy.tenants.get().id + + +def test_get_id_device_role(instance, mock_device): + """ + This test ensures the correct case is matched for the field. + """ + with patch.object(instance, "netbox") as mock_netbox: + res = instance.get_id(mock_device, "device_role") + assert res == mock_netbox.dcim.device_roles.get().id + + +def test_get_id_manufacturer(instance, mock_device): + """ + This test ensures the correct case is matched for the field. + """ + with patch.object(instance, "netbox") as mock_netbox: + res = instance.get_id(mock_device, "manufacturer") + assert res == mock_netbox.dcim.manufacturers.get().id + + +def test_get_id_device_type(instance, mock_device): + """ + This test ensures the correct case is matched for the field. + """ + with patch.object(instance, "netbox") as mock_netbox: + res = instance.get_id(mock_device, "device_type") + assert res == mock_netbox.dcim.device_types.get().id + + +def test_get_id_site(instance, mock_device): + """ + This test ensures the correct case is matched for the field. + """ + with patch.object(instance, "netbox") as mock_netbox: + res = instance.get_id(mock_device, "site") + assert res == mock_netbox.dcim.sites.get().id + + +def test_get_id_location_site_str(instance, mock_device): + """ + This test ensures the correct case is matched for the field. + """ + with patch.object(instance, "netbox") as mock_netbox: + res = instance.get_id(mock_device, "location") + assert res == mock_netbox.dcim.locations.get().id + + +def test_get_id_location_site_int(instance, mock_device): + """ + This test ensures the correct case is matched for the field. + """ + mock_device.site = 1 + with patch.object(instance, "netbox") as mock_netbox: + res = instance.get_id(mock_device, "location") + assert res == mock_netbox.dcim.locations.get().id + + +def test_get_id_rack(instance, mock_device): + """ + This test ensures the correct case is matched for the field. + """ + with patch.object(instance, "netbox") as mock_netbox: + res = instance.get_id(mock_device, "rack") + assert res == mock_netbox.dcim.racks.get().id + + +def test_get_id_else(instance, mock_device): + """ + This test ensures the correct case is matched for the field. + """ + for field in ["status", "face", "airflow", "position", "name", "serial"]: + res = instance.get_id(mock_device, field) + assert res == getattr(mock_device, field) diff --git a/pynetbox_query/tests/test_netbox_api/test_validate_data.py b/pynetbox_query/tests/test_netbox_api/test_validate_data.py new file mode 100644 index 0000000..930cf90 --- /dev/null +++ b/pynetbox_query/tests/test_netbox_api/test_validate_data.py @@ -0,0 +1,213 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from unittest.mock import patch, NonCallableMock +from pytest import fixture +from pynetboxquery.netbox_api.validate_data import ValidateData + + +@fixture(name="instance") +def instance_fixture(): + """ + This fixture returns the class instance to be tested. + """ + return ValidateData() + + +@patch("pynetboxquery.netbox_api.validate_data.ValidateData._call_validation_methods") +def test_validate_data_one_field( + mock_call_validation_methods, instance, mock_device_list +): + """ + This test ensures that the correct methods are called when validating one field. + """ + mock_netbox_api = "" + mock_fields = ["field1"] + instance.validate_data(mock_device_list, mock_netbox_api, mock_fields) + mock_call_validation_methods.assert_called_once_with( + mock_device_list, mock_netbox_api, mock_fields[0] + ) + + +@patch("pynetboxquery.netbox_api.validate_data.ValidateData._call_validation_methods") +def test_validate_data_many_fields( + mock_call_validation_methods, instance, mock_device_list +): + """ + This test ensures that the correct methods are called when validating more than one field. + """ + mock_netbox_api = "" + mock_fields = ["field1", "field2"] + instance.validate_data(mock_device_list, mock_netbox_api, mock_fields) + mock_call_validation_methods.assert_any_call( + mock_device_list, mock_netbox_api, mock_fields[0] + ) + mock_call_validation_methods.assert_any_call( + mock_device_list, mock_netbox_api, mock_fields[1] + ) + + +@patch("pynetboxquery.netbox_api.validate_data.ValidateData._call_validation_methods") +@patch("pynetboxquery.netbox_api.validate_data.print") +def test_validate_data_one_result( + mock_print, mock_call_validation_methods, instance, mock_device_list +): + """ + This test ensures that the correct methods are called when printing one set of results. + """ + mock_netbox_api = "" + mock_fields = ["field1"] + mock_call_validation_methods.return_value = ["result1"] + instance.validate_data(mock_device_list, mock_netbox_api, mock_fields) + assert mock_print.call_count == 1 + + +@patch("pynetboxquery.netbox_api.validate_data.ValidateData._call_validation_methods") +@patch("pynetboxquery.netbox_api.validate_data.print") +def test_validate_data_many_results( + mock_print, mock_call_validation_methods, instance, mock_device_list +): + """ + This test ensures that the correct methods are called when printing more than one set of results. + """ + mock_netbox_api = "" + mock_fields = ["field1"] + mock_call_validation_methods.return_value = ["result1", "result2"] + instance.validate_data(mock_device_list, mock_netbox_api, mock_fields) + assert mock_print.call_count == 2 + + +@patch( + "pynetboxquery.netbox_api.validate_data.ValidateData._check_list_device_name_in_netbox" +) +def test_call_validation_methods_name( + mock_check_list_device_name_in_netbox, instance, mock_device_list +): + """ + This test ensures the correct methods are called when validating the name field. + """ + mock_netbox_api = "" + res = instance._call_validation_methods(mock_device_list, mock_netbox_api, "name") + mock_device_values = [device.name for device in mock_device_list] + mock_check_list_device_name_in_netbox.assert_called_once_with( + mock_device_values, mock_netbox_api + ) + assert res == mock_check_list_device_name_in_netbox.return_value + + +@patch( + "pynetboxquery.netbox_api.validate_data.ValidateData._check_list_device_type_in_netbox" +) +def test_call_validation_methods_type( + mock_check_list_device_type_in_netbox, instance, mock_device_list +): + """ + This test ensures the correct methods are called when validating the device_type field. + """ + mock_netbox_api = "" + res = instance._call_validation_methods( + mock_device_list, mock_netbox_api, "device_type" + ) + mock_device_values = [device.device_type for device in mock_device_list] + mock_check_list_device_type_in_netbox.assert_called_once_with( + mock_device_values, mock_netbox_api + ) + assert res == mock_check_list_device_type_in_netbox.return_value + + +def test_call_validation_methods_wildcard(instance, mock_device_list): + """ + This test ensures the correct methods are called when validating a field that doesn't exist. + """ + mock_netbox_api = "" + res = instance._call_validation_methods( + mock_device_list, mock_netbox_api, "wildcard" + ) + assert res == ["Could not find a field for the argument: wildcard."] + + +def test_check_device_name_in_netbox(instance): + """ + This test ensures the .get() method is called with the correct arguments when checking the device name. + """ + netbox_api = NonCallableMock() + res = instance._check_device_name_in_netbox("name", netbox_api) + netbox_api.dcim.devices.get.assert_called_once_with(name="name") + assert res + + +@patch( + "pynetboxquery.netbox_api.validate_data.ValidateData._check_device_name_in_netbox" +) +def test_check_list_device_name_in_netbox_one( + mock_check_device_name_in_netbox, instance +): + """ + This test ensures the correct methods are called for checking a list of device names that holds one value. + """ + res = instance._check_list_device_name_in_netbox(["name"], "api") + mock_check_device_name_in_netbox.assert_called_once_with("name", "api") + assert res == [ + f"Device name exists in Netbox: {mock_check_device_name_in_netbox.return_value}." + ] + + +@patch( + "pynetboxquery.netbox_api.validate_data.ValidateData._check_device_name_in_netbox" +) +def test_check_list_device_name_in_netbox_many( + mock_check_device_name_in_netbox, instance +): + """ + This test ensures the correct methods are called for checking a list of device names that holds many values. + """ + res = instance._check_list_device_name_in_netbox(["name1", "name2"], "api") + mock_check_device_name_in_netbox.assert_any_call("name1", "api") + mock_check_device_name_in_netbox.assert_any_call("name2", "api") + assert res == [ + f"Device name1 exists in Netbox: {mock_check_device_name_in_netbox.return_value}.", + f"Device name2 exists in Netbox: {mock_check_device_name_in_netbox.return_value}.", + ] + + +def test_check_device_type_in_netbox(instance): + """ + This test ensures the .get() method is called with the correct arguments when checking the device type. + """ + netbox_api = NonCallableMock() + res = instance._check_device_type_in_netbox("type", netbox_api) + netbox_api.dcim.device_types.get.assert_called_once_with(slug="type") + assert res + + +@patch( + "pynetboxquery.netbox_api.validate_data.ValidateData._check_device_type_in_netbox" +) +def test_check_list_device_type_in_netbox_one( + mock_check_device_type_in_netbox, instance +): + """ + This test ensures the correct methods are called for checking a list of device types that holds one value. + """ + res = instance._check_list_device_type_in_netbox(["type"], "api") + mock_check_device_type_in_netbox.assert_called_once_with("type", "api") + assert res == [ + f"Device type type exists in Netbox: {mock_check_device_type_in_netbox.return_value}." + ] + + +@patch( + "pynetboxquery.netbox_api.validate_data.ValidateData._check_device_type_in_netbox" +) +def test_check_list_device_type_in_netbox_many( + mock_check_device_type_in_netbox, instance +): + """ + This test ensures the correct methods are called for checking a list of device types that holds many values. + """ + res = instance._check_list_device_type_in_netbox(["type1", "type2"], "api") + mock_check_device_type_in_netbox.assert_any_call("type1", "api") + mock_check_device_type_in_netbox.assert_any_call("type2", "api") + assert res == [ + f"Device type type1 exists in Netbox: {mock_check_device_type_in_netbox.return_value}.", + f"Device type type2 exists in Netbox: {mock_check_device_type_in_netbox.return_value}.", + ] diff --git a/pynetbox_query/tests/test_user_methods/test_abstract_user_method.py b/pynetbox_query/tests/test_user_methods/test_abstract_user_method.py new file mode 100644 index 0000000..f1ee0ce --- /dev/null +++ b/pynetbox_query/tests/test_user_methods/test_abstract_user_method.py @@ -0,0 +1,50 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from argparse import ArgumentParser +from typing import List +from unittest.mock import patch +from pytest import fixture +from pynetboxquery.user_methods.abstract_user_method import AbstractUserMethod + + +@fixture(name="instance") +def instance_fixture(): + """ + This fixture returns the Stub class to be tested. + """ + return StubAbstractUserMethod() + + +class StubAbstractUserMethod(AbstractUserMethod): + """ + This class provides a Stub version of the AbstractUserMethod class. + So we do not have to patch all the abstract methods. + """ + + def _subparser(self) -> ArgumentParser: + """Placeholder Method.""" + + @staticmethod + def aliases() -> List[str]: + """Placeholder Method.""" + + @staticmethod + def _run(url: str, token: str, file_path: str, **kwargs): + """Placeholder Method.""" + + +@patch( + "pynetboxquery.user_methods.abstract_user_method.AbstractUserMethod._collect_kwargs" +) +def test_main(mock_collect_kwargs, instance): + """ + This test ensures the collect kwargs method is called. + We do not need to assert that ._run is called as it is an abstract method. + """ + mock_collect_kwargs.return_value = { + "url": "mock_url", + "token": "mock_token", + "file_path": "mock_file_path", + } + instance.main() + mock_collect_kwargs.assert_called_once() diff --git a/pynetbox_query/tests/test_user_methods/test_upload_devices_to_netbox.py b/pynetbox_query/tests/test_user_methods/test_upload_devices_to_netbox.py new file mode 100644 index 0000000..ddb2eb1 --- /dev/null +++ b/pynetbox_query/tests/test_user_methods/test_upload_devices_to_netbox.py @@ -0,0 +1,68 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from dataclasses import asdict +from unittest.mock import patch, NonCallableMock +from pynetboxquery.user_methods.upload_devices_to_netbox import Main + + +def test_aliases(): + """ + This test ensures that the aliases function returns a list of aliases. + """ + res = Main().aliases() + assert res == ["create", "create_devices"] + + +# pylint: disable = R0801 +@patch("pynetboxquery.user_methods.upload_devices_to_netbox.Parsers") +def test_subparser(mock_parsers): + """ + This test ensures all the correct methods are called with the correct arguments to create a subparser. + """ + mock_subparsers = NonCallableMock() + mock_parsers.return_value.arg_parser.return_value = ( + "mock_parent_parser", + "mock_main_parser", + mock_subparsers, + ) + res = Main()._subparser() + mock_parsers.return_value.arg_parser.assert_called_once() + mock_subparsers.add_parser.assert_called_once_with( + "create_devices", + description="Create devices in Netbox from a file.", + usage="pynetboxquery create_devices ", + parents=["mock_parent_parser"], + aliases=["create", "create_devices"], + ) + assert res == "mock_main_parser" + + +@patch("pynetboxquery.user_methods.upload_devices_to_netbox.ReadFile") +@patch("pynetboxquery.user_methods.upload_devices_to_netbox.api_object") +@patch("pynetboxquery.user_methods.upload_devices_to_netbox.ValidateData") +@patch("pynetboxquery.user_methods.upload_devices_to_netbox.QueryDevice") +@patch("pynetboxquery.user_methods.upload_devices_to_netbox.NetboxCreate") +def test_upload_devices_to_netbox( + mock_netbox_create, mock_query_device, mock_validate_data, mock_api, mock_read_file +): + """ + This test ensures all the correct methods are called with the correct arguments + """ + Main()._run("mock_url", "mock_token", "mock_file_path") + mock_api_object = mock_api.return_value + mock_api.assert_called_once_with("mock_url", "mock_token") + mock_device_list = mock_read_file.return_value.read_file.return_value + mock_read_file.return_value.read_file.assert_called_once_with("mock_file_path") + mock_validate_data.return_value.validate_data.assert_called_once_with( + mock_device_list, mock_api_object, **{"fields": ["name", "device_type"]} + ) + mock_queried_devices = mock_query_device.return_value.query_list.return_value + mock_query_device.assert_called_once_with(mock_api_object) + mock_query_device.return_value.query_list.assert_called_once_with(mock_device_list) + mock_dictionary_devices = [ + asdict(mock_device) for mock_device in mock_queried_devices + ] + mock_netbox_create.assert_called_once_with(mock_api_object) + mock_netbox_create.return_value.create_device.assert_called_once_with( + mock_dictionary_devices + ) diff --git a/pynetbox_query/tests/test_user_methods/test_validate_data_fields_in_netbox.py b/pynetbox_query/tests/test_user_methods/test_validate_data_fields_in_netbox.py new file mode 100644 index 0000000..c339872 --- /dev/null +++ b/pynetbox_query/tests/test_user_methods/test_validate_data_fields_in_netbox.py @@ -0,0 +1,61 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from unittest.mock import patch, NonCallableMock +from pynetboxquery.user_methods.validate_data_fields_in_netbox import Main + + +def test_aliases(): + """ + This test ensures that the aliases function returns a list of aliases. + """ + res = Main().aliases() + assert res == ["validate", "validate_data_fields_in_netbox"] + + +# pylint: disable = R0801 +@patch("pynetboxquery.user_methods.validate_data_fields_in_netbox.Parsers") +def test_subparser(mock_parsers): + """ + This test ensures all the correct methods are called with the correct arguments to create a subparser. + """ + mock_subparsers = NonCallableMock() + mock_parsers.return_value.arg_parser.return_value = ( + "mock_parent_parser", + "mock_main_parser", + mock_subparsers, + ) + res = Main()._subparser() + mock_parsers.return_value.arg_parser.assert_called_once() + mock_subparsers.add_parser.assert_called_once_with( + "validate_data_fields_in_netbox", + description="Check data fields values in Netbox from a file.", + usage="pynetboxquery validate_data_fields_in_netbox ", + parents=["mock_parent_parser"], + aliases=Main().aliases(), + ) + mock_subparsers.add_parser.return_value.add_argument.assert_called_once_with( + "fields", help="The fields to check in Netbox.", nargs="*" + ) + assert res == "mock_main_parser" + + +@patch("pynetboxquery.user_methods.validate_data_fields_in_netbox.ReadFile") +@patch("pynetboxquery.user_methods.validate_data_fields_in_netbox.api_object") +@patch("pynetboxquery.user_methods.validate_data_fields_in_netbox.ValidateData") +def test_validate_data_fields_in_netbox( + mock_validate_data, mock_api_object, mock_read_file +): + """ + This test ensures all the correct methods are called with the correct arguments + """ + mock_kwargs = {"fields": ["mock_val"]} + Main()._run("mock_url", "mock_token", "mock_file_path", **mock_kwargs) + mock_device_list = mock_read_file.return_value.read_file.return_value + mock_read_file.return_value.read_file.assert_called_once_with( + "mock_file_path", **mock_kwargs + ) + mock_api = mock_api_object.return_value + mock_api_object.assert_called_once_with("mock_url", "mock_token") + mock_validate_data.return_value.validate_data.assert_called_once_with( + mock_device_list, mock_api, mock_kwargs["fields"] + ) diff --git a/pynetbox_query/tests/test_utils/test_device_dataclass.py b/pynetbox_query/tests/test_utils/test_device_dataclass.py new file mode 100644 index 0000000..21a7b68 --- /dev/null +++ b/pynetbox_query/tests/test_utils/test_device_dataclass.py @@ -0,0 +1,12 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from dataclasses import asdict + + +def test_return_attrs(mock_device): + """ + This test ensures the field names are returned for a device. + """ + res = mock_device.return_attrs() + expected = asdict(mock_device).keys() + assert set(res) == set(expected) diff --git a/pynetbox_query/tests/test_utils/test_parsers.py b/pynetbox_query/tests/test_utils/test_parsers.py new file mode 100644 index 0000000..c0a1986 --- /dev/null +++ b/pynetbox_query/tests/test_utils/test_parsers.py @@ -0,0 +1,58 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from unittest.mock import patch +from pytest import fixture +from pynetboxquery.utils.parsers import Parsers + + +@fixture(name="instance") +def instance_fixture(): + """ + This fixture returns the Parsers class to be used in tests. + """ + return Parsers() + + +@patch("pynetboxquery.utils.parsers.Parsers._parent_parser") +@patch("pynetboxquery.utils.parsers.argparse") +def test_arg_parser(mock_argparse, mock_parent_parser, instance): + """ + This test ensures the argparse method adds the correct arguments and returns them. + """ + res = instance.arg_parser() + mock_parent_parser.assert_called_once_with() + mock_argparse.ArgumentParser.assert_called_once_with( + description="The main command. This cannot be run standalone and requires a subcommand to be provided.", + usage="pynetboxquery [command] [filepath] [url] [token] [kwargs]", + ) + mock_argparse.ArgumentParser.return_value.add_subparsers.assert_called_once_with( + dest="subparsers", + ) + expected_parent = mock_parent_parser.return_value + expected_main = mock_argparse.ArgumentParser.return_value + expected_subparser = ( + mock_argparse.ArgumentParser.return_value.add_subparsers.return_value + ) + assert res == (expected_parent, expected_main, expected_subparser) + + +@patch("pynetboxquery.utils.parsers.argparse") +def test_parent_parser(mock_argparse, instance): + """ + This test ensures the parent parser is created with the correct arguments. + """ + res = instance._parent_parser() + mock_argparse.ArgumentParser.assert_called_once_with(add_help=False) + mock_parent_parser = mock_argparse.ArgumentParser.return_value + mock_parent_parser.add_argument.assert_any_call("url", help="The Netbox URL.") + mock_parent_parser.add_argument.assert_any_call("token", help="Your Netbox Token.") + mock_parent_parser.add_argument.assert_any_call( + "file_path", help="Your file path to csv files." + ) + mock_parent_parser.add_argument.assert_any_call( + "--delimiter", help="The separator in the text file." + ) + mock_parent_parser.add_argument.assert_any_call( + "--sheet-name", help="The sheet in the Excel Workbook to read from." + ) + assert res == mock_parent_parser diff --git a/pynetbox_query/tests/test_utils/test_query_device.py b/pynetbox_query/tests/test_utils/test_query_device.py new file mode 100644 index 0000000..1dfe302 --- /dev/null +++ b/pynetbox_query/tests/test_utils/test_query_device.py @@ -0,0 +1,62 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from unittest.mock import NonCallableMock, patch +from dataclasses import asdict +from pytest import fixture +from pynetboxquery.utils.query_device import QueryDevice + + +@fixture(name="instance") +def instance_fixture(): + """ + This fixture method returns the Class to be tested. + """ + api = NonCallableMock() + return QueryDevice(api) + + +def test_query_list_no_device(instance): + """ + This test ensures that an empty list is returned if an empty list is given to the query list method. + """ + mock_device_list = [] + with patch("pynetboxquery.utils.query_device.QueryDevice.query_device"): + res = instance.query_list(mock_device_list) + assert res == [] + + +def test_query_list_one_device(instance): + """ + This test ensures that one device is returned if one device is given to the method. + """ + mock_device_list = [""] + with patch( + "pynetboxquery.utils.query_device.QueryDevice.query_device" + ) as mock_query_device: + res = instance.query_list(mock_device_list) + assert res == [mock_query_device.return_value] + + +def test_query_list_multiple_devices(instance): + """ + This test ensures 2 devices are returned if 2 devices are given. + """ + mock_device_list = ["", ""] + with patch( + "pynetboxquery.utils.query_device.QueryDevice.query_device" + ) as mock_query_device: + res = instance.query_list(mock_device_list) + assert res == [mock_query_device.return_value, mock_query_device.return_value] + + +def test_query_device(instance, mock_device, dict_to_device): + """ + This test ensures the get_id is called on all fields in a dataclass. + """ + with patch("pynetboxquery.utils.query_device.NetboxGetId.get_id") as mock_get_id: + res = instance.query_device(mock_device) + val = mock_get_id.return_value + expected_device_dict = asdict(mock_device) + for key in expected_device_dict.keys(): + expected_device_dict[key] = val + assert res == dict_to_device(expected_device_dict) diff --git a/pynetbox_query/tests/test_utils/test_read_utils/test_read_abc.py b/pynetbox_query/tests/test_utils/test_read_utils/test_read_abc.py new file mode 100644 index 0000000..159b87e --- /dev/null +++ b/pynetbox_query/tests/test_utils/test_read_utils/test_read_abc.py @@ -0,0 +1,53 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from dataclasses import asdict +from unittest.mock import patch +from pytest import raises +from pynetboxquery.utils.read_utils.read_abc import ReadAbstractBase + + +# pylint: disable = R0903 +class StubAbstractBase(ReadAbstractBase): + """ + This class provides a Stub version of the ReadAbstractBase class. + So we do not have to patch all the abstract methods. + """ + + def read(self): + """Placeholder method.""" + + @staticmethod + def _validate(kwargs): + """Placeholder method.""" + + +@patch("pynetboxquery.utils.read_utils.read_abc.Path") +def test_check_file_path(mock_path): + """ + This test ensures the _check_file_path method is called. + """ + StubAbstractBase("mock_file_path")._check_file_path("mock_file_path") + mock_path.assert_called_with("mock_file_path") + mock_path.return_value.exists.assert_called() + + +@patch("pynetboxquery.utils.read_utils.read_abc.Path") +def test_check_file_path_error(mock_path): + """ + This test ensures the _check_file_path method is called and raises an error. + """ + mock_path.return_value.exists.return_value = False + with raises(FileNotFoundError): + StubAbstractBase("mock_file_path")._check_file_path("mock_file_path") + mock_path.assert_called_with("mock_file_path") + mock_path.return_value.exists.assert_called() + + +def test_dict_to_dataclass(mock_device): + """ + This test ensures that the method _dict_to_dataclass returns the right list of devices. + """ + with patch("pynetboxquery.utils.read_utils.read_abc.Path"): + mock_dictionary = [asdict(mock_device)] + res = StubAbstractBase("mock_file_path")._dict_to_dataclass(mock_dictionary) + assert res == [mock_device] diff --git a/pynetbox_query/tests/test_utils/test_read_utils/test_read_csv.py b/pynetbox_query/tests/test_utils/test_read_utils/test_read_csv.py new file mode 100644 index 0000000..8a9a05a --- /dev/null +++ b/pynetbox_query/tests/test_utils/test_read_utils/test_read_csv.py @@ -0,0 +1,30 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from unittest.mock import patch +from pynetboxquery.utils.read_utils.read_csv import ReadCSV + + +@patch("pynetboxquery.utils.read_utils.read_csv.ReadCSV._check_file_path") +@patch("pynetboxquery.utils.read_utils.read_csv.open") +@patch("pynetboxquery.utils.read_utils.read_csv.DictReader") +@patch("pynetboxquery.utils.read_utils.read_csv.list") +@patch("pynetboxquery.utils.read_utils.read_csv.ReadCSV._dict_to_dataclass") +def test_read( + mock_dict_to_dataclass, + mock_list, + mock_dict_reader, + mock_open_func, + mock_check_file_path, +): + """ + This test ensures all calls are made correctly in the read method. + """ + res = ReadCSV("mock_file_path").read() + mock_check_file_path.assert_called_once_with("mock_file_path") + mock_open_func.assert_called_once_with("mock_file_path", mode="r", encoding="UTF-8") + mock_dict_reader.assert_called_once_with( + mock_open_func.return_value.__enter__.return_value + ) + mock_list.assert_called_once_with(mock_dict_reader.return_value) + mock_dict_to_dataclass.assert_called_once_with(mock_list.return_value) + assert res == mock_dict_to_dataclass.return_value diff --git a/pynetbox_query/tests/test_utils/test_read_utils/test_read_file.py b/pynetbox_query/tests/test_utils/test_read_utils/test_read_file.py new file mode 100644 index 0000000..2068c63 --- /dev/null +++ b/pynetbox_query/tests/test_utils/test_read_utils/test_read_file.py @@ -0,0 +1,47 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from unittest.mock import patch +from pytest import raises +from pynetboxquery.utils.read_utils.read_file import ReadFile +from pynetboxquery.utils.error_classes import FileTypeNotSupportedError + + +def test_read_file_wildcard(): + """ + This test ensures an error is raised when a user supplies an unsupported file type. + """ + with raises(FileTypeNotSupportedError): + ReadFile().read_file("mock_file_path.wildcard") + + +@patch("pynetboxquery.utils.read_utils.read_file.ReadCSV") +def test_read_file_csv(mock_read_csv): + """ + This test ensures the correct read method is called when supplied with a csv file path. + """ + res = ReadFile().read_file("mock_file_path.csv") + mock_read_csv.assert_called_once_with("mock_file_path.csv") + mock_read_csv.return_value.read.assert_called_once_with() + assert res == mock_read_csv.return_value.read.return_value + + +@patch("pynetboxquery.utils.read_utils.read_file.ReadTXT") +def test_read_file_txt(mock_read_txt): + """ + This test ensures the correct read method is called when supplied with a txt file path. + """ + res = ReadFile().read_file("mock_file_path.txt") + mock_read_txt.assert_called_once_with("mock_file_path.txt") + mock_read_txt.return_value.read.assert_called_once_with() + assert res == mock_read_txt.return_value.read.return_value + + +@patch("pynetboxquery.utils.read_utils.read_file.ReadXLSX") +def test_read_file_xlsx(mock_read_xlsx): + """ + This test ensures the correct read method is called when supplied with a xlsx file path. + """ + res = ReadFile().read_file("mock_file_path.xlsx") + mock_read_xlsx.assert_called_once_with("mock_file_path.xlsx") + mock_read_xlsx.return_value.read.assert_called_once_with() + assert res == mock_read_xlsx.return_value.read.return_value diff --git a/pynetbox_query/tests/test_utils/test_read_utils/test_read_txt.py b/pynetbox_query/tests/test_utils/test_read_utils/test_read_txt.py new file mode 100644 index 0000000..a96c6b7 --- /dev/null +++ b/pynetbox_query/tests/test_utils/test_read_utils/test_read_txt.py @@ -0,0 +1,43 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from unittest.mock import patch +from pytest import raises +from pynetboxquery.utils.read_utils.read_txt import ReadTXT +from pynetboxquery.utils.error_classes import DelimiterNotSpecifiedError + + +@patch("pynetboxquery.utils.read_utils.read_txt.ReadTXT._check_file_path") +@patch("pynetboxquery.utils.read_utils.read_txt.open") +@patch("pynetboxquery.utils.read_utils.read_txt.DictReader") +@patch("pynetboxquery.utils.read_utils.read_txt.list") +@patch("pynetboxquery.utils.read_utils.read_txt.ReadTXT._dict_to_dataclass") +def test_read( + mock_dict_to_dataclass, mock_list, mock_dict_reader, mock_open, mock_check_file_path +): + """ + This test ensures all calls are made correctly in the read method. + """ + res = ReadTXT("mock_file_path", **{"delimiter": ","}).read() + mock_check_file_path.assert_called_once_with("mock_file_path") + mock_open.assert_called_once_with("mock_file_path", mode="r", encoding="UTF-8") + mock_dict_reader.assert_called_once_with( + mock_open.return_value.__enter__.return_value, delimiter="," + ) + mock_list.assert_called_once_with(mock_dict_reader.return_value) + mock_dict_to_dataclass.assert_called_once_with(mock_list.return_value) + assert res == mock_dict_to_dataclass.return_value + + +def test_validate(): + """ + This test ensures the validate method is called and doesn't error for a correct case. + """ + ReadTXT("", **{"delimiter": ","}) + + +def test_validate_fail(): + """ + This test ensures the validate method is called and does error for an incorrect case. + """ + with raises(DelimiterNotSpecifiedError): + ReadTXT("") diff --git a/pynetbox_query/tests/test_utils/test_read_utils/test_read_xlsx.py b/pynetbox_query/tests/test_utils/test_read_utils/test_read_xlsx.py new file mode 100644 index 0000000..eb4f6a2 --- /dev/null +++ b/pynetbox_query/tests/test_utils/test_read_utils/test_read_xlsx.py @@ -0,0 +1,39 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 United Kingdom Research and Innovation +from unittest.mock import patch +from pytest import raises +from pynetboxquery.utils.read_utils.read_xlsx import ReadXLSX +from pynetboxquery.utils.error_classes import SheetNameNotSpecifiedError + + +def test_validate(): + """ + This test ensures the validate method is called and doesn't error for a correct case. + """ + ReadXLSX("", **{"sheet_name": "test"}) + + +def test_validate_fail(): + """ + This test ensures the validate method is called and does error for an incorrect case. + """ + with raises(SheetNameNotSpecifiedError): + ReadXLSX("") + + +@patch("pynetboxquery.utils.read_utils.read_xlsx.ReadXLSX._check_file_path") +@patch("pynetboxquery.utils.read_utils.read_xlsx.read_excel") +@patch("pynetboxquery.utils.read_utils.read_xlsx.ReadXLSX._dict_to_dataclass") +def test_read(mock_dict_to_dataclass, mock_read_excel, mock_check_file_path): + """ + This test ensures all calls are made correctly in the read method. + """ + res = ReadXLSX("mock_file_path", **{"sheet_name": "test"}).read() + mock_check_file_path.assert_called_once_with("mock_file_path") + mock_read_excel.assert_called_once_with("mock_file_path", sheet_name="test") + mock_dataframe = mock_read_excel.return_value + mock_dataframe.to_dict.assert_called_once_with(orient="records") + mock_dictionary_list = mock_dataframe.to_dict.return_value + mock_dict_to_dataclass.assert_called_once_with(mock_dictionary_list) + mock_device_list = mock_dict_to_dataclass.return_value + assert res == mock_device_list diff --git a/reverse_ssl_cert_chain/.coveragerc b/reverse_ssl_cert_chain/.coveragerc new file mode 100644 index 0000000..f4a5869 --- /dev/null +++ b/reverse_ssl_cert_chain/.coveragerc @@ -0,0 +1,4 @@ +[report] + +exclude_lines = + if __name__ == .__main__.: diff --git a/reverse_ssl_cert_chain/.pylintrc b/reverse_ssl_cert_chain/.pylintrc new file mode 100644 index 0000000..d27d7aa --- /dev/null +++ b/reverse_ssl_cert_chain/.pylintrc @@ -0,0 +1,8 @@ +[FORMAT] +# Black will enforce 88 chars on Python code +# this will enforce 120 chars on docs / comments +max-line-length=120 + +# Disable various warnings: + +disable= diff --git a/reverse_ssl_cert_chain/__init__.py b/reverse_ssl_cert_chain/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/reverse_ssl_cert_chain/pytest.ini b/reverse_ssl_cert_chain/pytest.ini new file mode 100644 index 0000000..92a0e93 --- /dev/null +++ b/reverse_ssl_cert_chain/pytest.ini @@ -0,0 +1,5 @@ +[pytest] +pythonpath = lib +python_files = *.py +python_functions = test_* +addopts = --ignore=setup.py \ No newline at end of file diff --git a/reverse_ssl_cert_chain/requirements.txt b/reverse_ssl_cert_chain/requirements.txt new file mode 100644 index 0000000..c4449ae Binary files /dev/null and b/reverse_ssl_cert_chain/requirements.txt differ diff --git a/reverse_ssl_cert_chain/reverse_chain.py b/reverse_ssl_cert_chain/reverse_chain.py new file mode 100644 index 0000000..e9523e4 --- /dev/null +++ b/reverse_ssl_cert_chain/reverse_chain.py @@ -0,0 +1,121 @@ +""" +If you have a SSL certificate with the full chain from root CA to you, +then you can use this script to reverse the chain order. Also, the +private key is prepended to the top of the chain. +If you do not need the private key to be added to the chain you can +use the script with the flag "--no-key". + +Usage: python3 +Optional: --no-key , Use this when do not want to prepend the key. +""" + +from typing import List +from pathlib import Path +from sys import argv + + +def read_crt(crt: Path) -> List[str]: + """ + Reads the lines from the file and stores each line as an element to a list. + :param crt: Path to the certificate + :return: A list where each element is a line from the file + """ + with open(crt, "r", encoding="utf-8") as file: + file_lines = file.readlines() + return file_lines + + +def construct_blocks(file_lines: List[str]) -> List[List]: + """ + Creates a new list where each element is a single certificate. + It checks for the "BEGIN CERTIFICATE" then adds all proceeding + lines to a list and stops when it meets "END CERTIFICATE". + :param file_lines: A list where each element is a line from the file + :return: A list where each element is a list of lines containing a single certificate + """ + blocks = [] + current_block = [] + for line in file_lines: + if line == "-----BEGIN CERTIFICATE-----\n": + current_block = [line] + elif line == "-----END CERTIFICATE-----\n": + current_block.append(line) + blocks.append(current_block) + elif line == "-----END CERTIFICATE-----": + current_block.append((line + "\n")) + blocks.append(current_block) + elif line == "\n": + pass + else: + current_block.append(line) + return blocks + + +def reverse_blocks(blocks: List[List]) -> List[List]: + """ + Reverses the order of the list. + :param blocks: A list where each element is a list representing a single certificate + :return: The same list as blocks just reversed + """ + blocks.reverse() + return blocks + + +def read_key(key: Path) -> str: + """ + Reads the contents of the key file into a single string. + :param key: The path to the key file + :return: A string containing the entire key + """ + with open(key, "r", encoding="utf-8") as file: + file_data = file.read() + return file_data + + +def prepend_key_to_crt(crt: List[List], key: str) -> List[List]: + """ + Adds the key to the top of the certificate chain + :param crt: A list containing the certificates + :param key: The key string + :return: A list containing the key then all proceeding certificates + """ + full_chain_list = [key] + crt + return full_chain_list + + +def construct_file(full_chain_list: List[List]): + """ + Writes all the lines in the list and subsequent lists into a file on the system. + :param full_chain_list: A list containing the key and certificates + """ + with open("full_chain.pem", "w", encoding="utf-8") as file: + for block in full_chain_list: + file.writelines(block) + + +def main(crt: Path, key: Path, add_key: bool): + """ + This method calls all the above functions in order to reverse the supplied certificate chain. + If the add_key parameter is true it will add the key to the top of the certificate chain. + :param crt: Path to the certificate chain + :param key: Path to the key file + :param add_key: Whether to add the key to the chain or not + """ + crt_data = read_crt(crt) + key_data = read_key(key) + crt_blocks = construct_blocks(crt_data) + full_chain_list = reverse_blocks(crt_blocks) + if add_key: + full_chain_list = prepend_key_to_crt(full_chain_list, key_data) + construct_file(full_chain_list) + + +if __name__ == "__main__": + # Collect the command line arguments and passes them to the main function. + args = argv[1:] + crt_path = Path(args[0]) + key_path = Path(args[1]) + if "--no-key" in args: + main(crt_path, key_path, False) + else: + main(crt_path, key_path, True) diff --git a/reverse_ssl_cert_chain/test_reverse_chain.py b/reverse_ssl_cert_chain/test_reverse_chain.py new file mode 100644 index 0000000..4278c85 --- /dev/null +++ b/reverse_ssl_cert_chain/test_reverse_chain.py @@ -0,0 +1,238 @@ +""" +This test file should test all the functions in the reverse_chain file. +""" + +from pathlib import Path +from unittest.mock import patch, mock_open +from pytest import fixture + +# Disabling this pylint error as there is only a single python file. +# pylint: disable=import-error +import reverse_chain + + +@fixture(scope="function", name="mock_path") +def mock_path_fixture(): + """ + This fixture provides a mock path. + """ + return Path("/some/mock/path") + + +@fixture(scope="function", name="mock_file_certificate") +def mock_certificate_file_fixture(): + """ + This fixture provides a mock certificate file. + """ + certificate_str = ( + "-----BEGIN CERTIFICATE-----\n" + "somecertificatejunk1" + "somecertificatejunk2" + "-----END CERTIFICATE-----\n" + "-----BEGIN CERTIFICATE-----\n" + "somecertificatejunkA" + "somecertificatejunkB" + "-----END CERTIFICATE-----\n" + ) + return certificate_str + + +@fixture(scope="function", name="mock_certificate_list") +def mock_certificate_list_fixture(): + """ + This fixture provides a list containing mock certificate file lines. + """ + certificate_list = [ + "-----BEGIN CERTIFICATE-----\n", + "somecertificatejunk1", + "somecertificatejunk2", + "-----END CERTIFICATE-----\n", + "-----BEGIN CERTIFICATE-----\n", + "somecertificatejunkA", + "somecertificatejunkB", + "-----END CERTIFICATE-----\n", + ] + return certificate_list + + +@fixture(scope="function", name="mock_certificate_list_no_eof_newline") +def mock_certificate_list_no_eof_newline_fixture(): + """ + This fixture provides a list containing mock certificate file lines + with not end of file new line. + """ + certificate_list = [ + "-----BEGIN CERTIFICATE-----\n", + "somecertificatejunk1", + "somecertificatejunk2", + "-----END CERTIFICATE-----\n", + "-----BEGIN CERTIFICATE-----\n", + "somecertificatejunkA", + "somecertificatejunkB", + "-----END CERTIFICATE-----", + ] + return certificate_list + + +@fixture(scope="function", name="mock_crt_block_list") +def mock_crt_block_list_fixture(): + """ + This fixture provides a mock list with sub lists of mock certificates. + """ + mock_crt_block_list = [ + [ + "-----BEGIN CERTIFICATE-----\n", + "somecertificatejunk1", + "somecertificatejunk2", + "-----END CERTIFICATE-----\n", + ], + [ + "-----BEGIN CERTIFICATE-----\n", + "somecertificatejunkA", + "somecertificatejunkB", + "-----END CERTIFICATE-----\n", + ], + ] + return mock_crt_block_list + + +def test_read_crt(mock_path): + """ + This test ensures the open function is called and the readlines method is called. + """ + with patch( + "reverse_chain.open", mock_open(read_data="line1\nline2\n") + ) as mock_open_ctx: + res = reverse_chain.read_crt(mock_path) + mock_open_ctx.assert_called_once_with(mock_path, "r", encoding="utf-8") + mock_open_ctx.return_value.readlines.assert_called_once() + assert res == ["line1\n", "line2\n"] + + +def test_construct_blocks(mock_certificate_list_no_eof_newline): + """ + This test ensures that the certificate files are put into correct blocks. + """ + res = reverse_chain.construct_blocks(mock_certificate_list_no_eof_newline) + assert res == [ + [ + "-----BEGIN CERTIFICATE-----\n", + "somecertificatejunk1", + "somecertificatejunk2", + "-----END CERTIFICATE-----\n", + ], + [ + "-----BEGIN CERTIFICATE-----\n", + "somecertificatejunkA", + "somecertificatejunkB", + "-----END CERTIFICATE-----\n", + ], + ] + + +def test_construct_blocks_no_eof_newline(mock_certificate_list_no_eof_newline): + """ + This test ensures the certificate files are put into correct blocks when there + is no newline at the end of the file. + """ + res = reverse_chain.construct_blocks(mock_certificate_list_no_eof_newline) + assert res == [ + [ + "-----BEGIN CERTIFICATE-----\n", + "somecertificatejunk1", + "somecertificatejunk2", + "-----END CERTIFICATE-----\n", + ], + [ + "-----BEGIN CERTIFICATE-----\n", + "somecertificatejunkA", + "somecertificatejunkB", + "-----END CERTIFICATE-----\n", + ], + ] + + +def test_reverse_blocks(mock_crt_block_list): + """ + This test ensures that the lists are reversed properly. + """ + reversed_blocks = [ + [ + "-----BEGIN CERTIFICATE-----\n", + "somecertificatejunkA", + "somecertificatejunkB", + "-----END CERTIFICATE-----\n", + ], + [ + "-----BEGIN CERTIFICATE-----\n", + "somecertificatejunk1", + "somecertificatejunk2", + "-----END CERTIFICATE-----\n", + ], + ] + res = reverse_chain.reverse_blocks(mock_crt_block_list) + assert res == reversed_blocks + + +def test_read_key(mock_path): + """ + This test ensures the open function is called and the read method is called. + """ + with patch( + "reverse_chain.open", mock_open(read_data="somemockkey") + ) as mock_open_ctx: + res = reverse_chain.read_key(mock_path) + mock_open_ctx.assert_called_once_with(mock_path, "r", encoding="utf-8") + mock_open_ctx.return_value.read.assert_called_once() + assert res == "somemockkey" + + +def test_prepend_key_to_crt(mock_crt_block_list): + """ + This test ensures the key string as prepended to the certificate list. + """ + mock_key = "somemockkey" + res = reverse_chain.prepend_key_to_crt(mock_crt_block_list, mock_key) + assert res == ([mock_key] + mock_crt_block_list) + + +def test_construct_file(mock_crt_block_list): + """ + This test ensures the open function is called to write to the file. + """ + with patch("reverse_chain.open") as mock_open_ctx: + reverse_chain.construct_file(mock_crt_block_list) + mock_open_ctx.assert_called_once_with("full_chain.pem", "w", encoding="utf-8") + + +# Disabling this pylint error as I am testing the main function. +# pylint: disable=too-many-arguments +@patch("reverse_chain.read_crt") +@patch("reverse_chain.read_key") +@patch("reverse_chain.construct_blocks") +@patch("reverse_chain.reverse_blocks") +@patch("reverse_chain.prepend_key_to_crt") +@patch("reverse_chain.construct_file") +def test_main_add_key( + mock_construct_file, + mock_prepend_key_to_crt, + mock_reverse_blocks, + mock_construct_blocks, + mock_read_key, + mock_read_crt, + mock_path, +): + """ + This test ensures the main functions calls all the methods appropriately + with the correct arguments. It also tests if the prepend key function + is called when add_key is true. + """ + reverse_chain.main(mock_path, mock_path, True) + mock_read_crt.assert_called_once_with(mock_path) + mock_read_key.assert_called_once_with(mock_path) + mock_construct_blocks.assert_called_once_with(mock_read_crt.return_value) + mock_reverse_blocks.assert_called_once_with(mock_construct_blocks.return_value) + mock_prepend_key_to_crt.assert_called_once_with( + mock_reverse_blocks.return_value, mock_read_key.return_value + ) + mock_construct_file.assert_called_once_with(mock_prepend_key_to_crt.return_value)