diff --git a/poetry.lock b/poetry.lock index 45305c6..8edd59a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -413,6 +413,20 @@ files = [ {file = "packaging-24.1.tar.gz", hash = "sha256:026ed72c8ed3fcce5bf8950572258698927fd1dbda10a5e981cdf0ac37f4f002"}, ] +[[package]] +name = "parameterized" +version = "0.9.0" +description = "Parameterized testing with any Python test framework" +optional = false +python-versions = ">=3.7" +files = [ + {file = "parameterized-0.9.0-py2.py3-none-any.whl", hash = "sha256:4e0758e3d41bea3bbd05ec14fc2c24736723f243b28d702081aef438c9372b1b"}, + {file = "parameterized-0.9.0.tar.gz", hash = "sha256:7fc905272cefa4f364c1a3429cbbe9c0f98b793988efb5bf90aac80f08db09b1"}, +] + +[package.extras] +dev = ["jinja2"] + [[package]] name = "pathspec" version = "0.12.1" @@ -1002,4 +1016,4 @@ tests = ["dill", "flake8", "pylint", "pytest", "pytest-rerunfailures", "pytest-s [metadata] lock-version = "2.0" python-versions = ">=3.10, <3.11" -content-hash = "7e530eb677e760858100de4053821b28b5d132740827c0e23725ac0cf312f83b" +content-hash = "e753e9b2d503fa694d2b1870eb96605e2b6396a319f2c53f9c0d94c5adbf3b79" diff --git a/pyproject.toml b/pyproject.toml index 1a212d8..749f858 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -85,6 +85,9 @@ all = [ ] +[tool.poetry.group.dev.dependencies] +parameterized = "^0.9.0" + [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" diff --git a/tdwii_plus_examples/TDWII_PPVS_subscriber/cstore_handler.py b/tdwii_plus_examples/TDWII_PPVS_subscriber/cstore_handler.py deleted file mode 100644 index aad6707..0000000 --- a/tdwii_plus_examples/TDWII_PPVS_subscriber/cstore_handler.py +++ /dev/null @@ -1,142 +0,0 @@ -"""Utility classes and functions for the apps.""" - -import os - -# from pydicom import dcmread -# from pydicom.datadict import get_entry, repeater_has_keyword, tag_for_keyword -from pydicom.dataset import Dataset -from pydicom.filewriter import write_file_meta_info - -# from pydicom.tag import Tag -from pydicom.uid import DeflatedExplicitVRLittleEndian -from pynetdicom.dsutils import encode - - -def handle_store(event, args, app_logger): - """Handle a C-STORE request. - - Parameters - ---------- - event : pynetdicom.event.event - The event corresponding to a C-STORE request. - args : argparse.Namespace - The namespace containing the arguments to use. The namespace should - contain ``args.ignore`` and ``args.output_directory`` attributes. - app_logger : logging.Logger - The application's logger. - - Returns - ------- - status : pynetdicom.sop_class.Status or int - A valid return status code, see PS3.4 Annex B.2.3 or the - ``StorageServiceClass`` implementation for the available statuses - """ - if args.ignore: - return 0x0000 - - try: - ds = event.dataset - # Remove any Group 0x0002 elements that may have been included - ds = ds[0x00030000:] - except Exception as exc: - app_logger.error("Unable to decode the dataset") - app_logger.exception(exc) - # Unable to decode dataset - return 0x210 - - # Add the file meta information elements - ds.file_meta = event.file_meta - - # Because pydicom uses deferred reads for its decoding, decoding errors - # are hidden until encountered by accessing a faulty element - try: - sop_class = ds.SOPClassUID - sop_instance = ds.SOPInstanceUID - except Exception as exc: - app_logger.error( - "Unable to decode the received dataset or missing 'SOP Class " "UID' and/or 'SOP Instance UID' elements" - ) - app_logger.exception(exc) - # Unable to decode dataset - return 0xC210 - - try: - # Get the elements we need - mode_prefix = SOP_CLASS_PREFIXES[sop_class][0] - except KeyError: - mode_prefix = "UN" - - filename = f"{mode_prefix}.{sop_instance}.dcm" - app_logger.info(f"Storing DICOM file: {filename}") - - status_ds = Dataset() - status_ds.Status = 0x0000 - - # Try to save to output-directory - if args.output_directory is not None: - filename = os.path.join(args.output_directory, filename) - try: - os.makedirs(args.output_directory, exist_ok=True) - except Exception as exc: - app_logger.error("Unable to create the output directory:") - app_logger.error(f" {args.output_directory}") - app_logger.exception(exc) - # Failed - Out of Resources - IOError - status_ds.Status = 0xA700 - return status_ds - - if os.path.exists(filename): - app_logger.warning("DICOM file already exists, overwriting") - - try: - if event.context.transfer_syntax == DeflatedExplicitVRLittleEndian: - # Workaround for pydicom issue #1086 - with open(filename, "wb") as f: - f.write(b"\x00" * 128) - f.write(b"DICM") - write_file_meta_info(f, event.file_meta) - f.write(encode(ds, False, True, True)) - else: - # We use `write_like_original=False` to ensure that a compliant - # File Meta Information Header is written - ds.save_as(filename, write_like_original=False) - - status_ds.Status = 0x0000 # Success - except IOError as exc: - app_logger.error("Could not write file to specified directory:") - app_logger.error(f" {os.path.dirname(filename)}") - app_logger.exception(exc) - # Failed - Out of Resources - IOError - status_ds.Status = 0xA700 - except Exception as exc: - app_logger.error("Could not write file to specified directory:") - app_logger.error(f" {os.path.dirname(filename)}") - app_logger.exception(exc) - # Failed - Out of Resources - Miscellaneous error - status_ds.Status = 0xA701 - - return status_ds - - -SOP_CLASS_PREFIXES = { - "1.2.840.10008.5.1.4.1.1.2": ("CT", "CT Image Storage"), - "1.2.840.10008.5.1.4.1.1.2.1": ("CTE", "Enhanced CT Image Storage"), - "1.2.840.10008.5.1.4.1.1.4": ("MR", "MR Image Storage"), - "1.2.840.10008.5.1.4.1.1.4.1": ("MRE", "Enhanced MR Image Storage"), - "1.2.840.10008.5.1.4.1.1.128": ("PT", "Positron Emission Tomography Image Storage"), - "1.2.840.10008.5.1.4.1.1.130": ("PTE", "Enhanced PET Image Storage"), - "1.2.840.10008.5.1.4.1.1.481.1": ("RI", "RT Image Storage"), - "1.2.840.10008.5.1.4.1.1.481.2": ("RD", "RT Dose Storage"), - "1.2.840.10008.5.1.4.1.1.481.5": ("RP", "RT Plan Storage"), - "1.2.840.10008.5.1.4.1.1.481.8": ("RN", "RT Ion Plan Storage"), - "1.2.840.10008.5.1.4.1.1.481.9": ("RX", "RT Ion Beams Treatment Record Storage"), - "1.2.840.10008.5.1.4.1.1.481.3": ("RS", "RT Structure Set Storage"), - "1.2.840.10008.5.1.4.1.1.1": ("CR", "Computed Radiography Image Storage"), - "1.2.840.10008.5.1.4.1.1.6.1": ("US", "Ultrasound Image Storage"), - "1.2.840.10008.5.1.4.1.1.6.2": ("USE", "Enhanced US Volume Storage"), - "1.2.840.10008.5.1.4.1.1.12.1": ("XA", "X-Ray Angiographic Image Storage"), - "1.2.840.10008.5.1.4.1.1.12.1.1": ("XAE", "Enhanced XA Image Storage"), - "1.2.840.10008.5.1.4.1.1.20": ("NM", "Nuclear Medicine Image Storage"), - "1.2.840.10008.5.1.4.1.1.7": ("SC", "Secondary Capture Image Storage"), - "1.2.840.10008.5.1.4.34.7": ("RB", "RT Beams Delivery Instruction Storage​"), -} diff --git a/tdwii_plus_examples/TDWII_PPVS_subscriber/ppvsscp.py b/tdwii_plus_examples/TDWII_PPVS_subscriber/ppvsscp.py index c761741..4238e7a 100644 --- a/tdwii_plus_examples/TDWII_PPVS_subscriber/ppvsscp.py +++ b/tdwii_plus_examples/TDWII_PPVS_subscriber/ppvsscp.py @@ -6,10 +6,10 @@ from tdwii_plus_examples import tdwii_config from tdwii_plus_examples.basescp import BaseSCP from tdwii_plus_examples.TDWII_PPVS_subscriber.nevent_receiver import NEventReceiver -from tdwii_plus_examples.TDWII_PPVS_subscriber.storescp import StoreSCP +from tdwii_plus_examples.cstorescp import CStoreSCP -class PPVS_SCP(NEventReceiver, StoreSCP): +class PPVS_SCP(NEventReceiver, CStoreSCP): def __init__( self, ae_title: str = "PPVS_SCP", @@ -27,15 +27,15 @@ def __init__( if port < 1: port = tdwii_config.known_ae_port[ae_title] self.nevent_callback = nevent_callback - StoreSCP.__init__( + CStoreSCP.__init__( self, ae_title=ae_title, port=port, logger=logger, bind_address=bind_address, + sop_classes=None, # use first 128 SOP Classes + transfer_syntaxes=None, # use pynetdicom defaults xfer syntaxes custom_handler=custom_cstore_handler, - storage_presentation_contexts=storage_presentation_contexts, - transfer_syntaxes=transfer_syntaxes, store_directory=store_directory, ) NEventReceiver.__init__( @@ -48,11 +48,11 @@ def __init__( ) def _add_contexts(self): - StoreSCP._add_contexts(self) + CStoreSCP._add_contexts(self) NEventReceiver._add_contexts(self) def _add_handlers(self): - StoreSCP._add_handlers(self) + CStoreSCP._add_handlers(self) NEventReceiver._add_handlers(self) def run(self): diff --git a/tdwii_plus_examples/TDWII_PPVS_subscriber/storescp.py b/tdwii_plus_examples/TDWII_PPVS_subscriber/storescp.py deleted file mode 100644 index 49e3b74..0000000 --- a/tdwii_plus_examples/TDWII_PPVS_subscriber/storescp.py +++ /dev/null @@ -1,53 +0,0 @@ -import os -from argparse import Namespace -from time import sleep - -from pynetdicom import ALL_TRANSFER_SYNTAXES, AllStoragePresentationContexts, evt - -from tdwii_plus_examples.basescp import BaseSCP -from tdwii_plus_examples.TDWII_PPVS_subscriber.cstore_handler import handle_store -from tdwii_plus_examples.cechoscp import CEchoSCP - - -class StoreSCP(CEchoSCP): - def __init__( - self, - ae_title: str = "STORE_SCP", - port: int = 11112, - logger=None, - bind_address: str = "", - storage_presentation_contexts=AllStoragePresentationContexts, - transfer_syntaxes=ALL_TRANSFER_SYNTAXES, - custom_handler=None, - store_directory=os.path.curdir, - ): - self.storage_presentation_contexts = storage_presentation_contexts - self.transfer_syntaxes = transfer_syntaxes - if custom_handler is None: - self.handle_cstore = handle_store - else: - self.handle_cstore = custom_handler - self.store_directory = store_directory - CEchoSCP.__init__(self, ae_title=ae_title, port=port, logger=logger, bind_address=bind_address) - - def _add_contexts(self): - CEchoSCP._add_contexts(self) - for context in self.storage_presentation_contexts: - self.ae.add_supported_context(context.abstract_syntax, self.transfer_syntaxes) - - def _add_handlers(self): - CEchoSCP._add_handlers(self) - args = Namespace(ignore=False, output_directory=self.store_directory) - - self.handlers.append((evt.EVT_C_STORE, self.handle_cstore, [args, self.logger])) - - def run(self): - # Listen for incoming association requests - BaseSCP.run(self) - - -if __name__ == "__main__": - my_scp = StoreSCP() - my_scp.run() - while True: - sleep(100) # sleep forever diff --git a/tdwii_plus_examples/basehandlers.py b/tdwii_plus_examples/basehandlers.py index 9112679..2c21b53 100644 --- a/tdwii_plus_examples/basehandlers.py +++ b/tdwii_plus_examples/basehandlers.py @@ -3,9 +3,9 @@ def handle_open(event, logger): Parameters ---------- - event : events.Event - The corresponding event. - logger : logging.Logger + event : pynetdicom.event.event + The event corresponding to the opening of a TCP connection. + app_logger : logging.Logger The application's logger. Returns @@ -26,9 +26,9 @@ def handle_close(event, logger): Parameters ---------- - event : events.Event - The corresponding event. - logger : logging.Logger + event : pynetdicom.event.event + The event corresponding to the closing of a TCP connection. + app_logger : logging.Logger The application's logger. Returns diff --git a/tdwii_plus_examples/basescp.py b/tdwii_plus_examples/basescp.py index a2ee739..363360a 100644 --- a/tdwii_plus_examples/basescp.py +++ b/tdwii_plus_examples/basescp.py @@ -36,55 +36,92 @@ class BaseSCP: stop(self) Stops the SCP AE. """ + def __init__(self, ae_title: str = "BASE_SCP", bind_address: str = "", port: int = 11112, logger=None): - """ Initializes a new instance of the BaseSCP class. This method creates an AE without presentation contexts. Parameters ---------- - ae_title : str + ae_title : str or None The title of the Application Entity (AE) Optional, default: "BASE_SCP" - bind_address : str + bind_address : str or None A specific IP address or hostname of the AE Optional, default: "" will bind to all interfaces. - port: int + port: int or None The port number to listen on Optional, default: 11112 (as registered for DICOM at IANA) - logger: logging.Logger + logger: logging.Logger or None A logger instance - Optional, default: None, a debug logger will be used. + Optional, default: None, a debug level logger will be used. """ if logger is None: self.logger = setup_logging( - Namespace(log_type="d", log_level="debug"), "base_scp") - elif not isinstance(logger, logging.Logger): - raise TypeError("logger must be an instance of logging.Logger") - else: + Namespace(log_type=None, log_level="debug"), "base_scp") + self.logger.info( + "Logger not provided, using default logger with level %s", + logging.getLevelName(self.logger.level)) + elif isinstance(logger, logging.Logger): self.logger = logger + self.logger.debug( + "Logger set to %s with level %s", + logger.name, logging.getLevelName(logger.getEffectiveLevel()) + ) + else: + raise TypeError("logger must be an instance of logging.Logger") - if not isinstance(bind_address, str): - raise ValueError("bind_address must be a string") if not bind_address: - self.logger.debug("bind_address empty, binding to all interfaces") - self.bind_address = bind_address - - if not isinstance(port, int): - raise TypeError("port must be an integer") - self.port = port + self.bind_address = "" + self.logger.info("bind_address empty, binding to all interfaces") + elif isinstance(bind_address, str): + self.bind_address = bind_address + self.logger.debug(f"bind_address set to {bind_address}") + else: + raise TypeError("bind_address must be a string or None") + + if port is None: + self.port = 11112 + self.logger.info("Port not provided, using default: 11112") + elif isinstance(port, int): + if port < 0: + raise ValueError("port must not be negative") + elif 0 <= port <= 1023 and port != 104: + raise ValueError( + "port must not be in the range (0-1023), except 104") + elif port == 104: + self.port = port + self.logger.warning("DICOM port 104 may need admin privileges") + elif port in range(1024, 11111) or port in range(11161, 49151): + self.port = port + self.logger.warning( + "Registered port (1024-49151) may be used by others") + elif port > 65535: + raise ValueError("port must not exceed 65535") + else: + self.port = port + self.logger.debug(f"port set to {port}") + else: + raise TypeError("port must be an integer or None") + + if not ae_title: + self.ae_title = "BASE_SCP" + self.logger.info("AE title not provided, using default: %s" % + self.ae_title) + elif isinstance(ae_title, str): + self.ae_title = ae_title + self.logger.debug(f"ae_title set to {ae_title}") + else: + raise TypeError("ae_title must be a string or None") - if not isinstance(ae_title, str): - raise TypeError("ae_title must be a string") - self.ae_title = ae_title self.ae = AE(self.ae_title) self._add_contexts() diff --git a/tdwii_plus_examples/cechohandler.py b/tdwii_plus_examples/cechohandler.py index 8c2505c..05a18da 100644 --- a/tdwii_plus_examples/cechohandler.py +++ b/tdwii_plus_examples/cechohandler.py @@ -2,9 +2,9 @@ def handle_cecho(event, logger): """Handler for evt.EVT_C_ECHO. Parameters ---------- - event : events.Event - The corresponding event. - logger : logging.Logger + event : pynetdicom.event.event + The event corresponding to a C-ECHO request. + app_logger : logging.Logger The application's logger. Returns diff --git a/tdwii_plus_examples/cechoscp.py b/tdwii_plus_examples/cechoscp.py index 641ba4f..10810d6 100644 --- a/tdwii_plus_examples/cechoscp.py +++ b/tdwii_plus_examples/cechoscp.py @@ -1,5 +1,9 @@ +from argparse import Namespace +import logging + from pynetdicom import evt from pynetdicom.sop_class import Verification +from pynetdicom.apps.common import setup_logging from tdwii_plus_examples.basescp import BaseSCP from tdwii_plus_examples.cechohandler import handle_cecho @@ -30,12 +34,12 @@ class CEchoSCP(BaseSCP): _add_handlers(self) Adds handlers for DICOM communication events to the SCP instance. """ + def __init__(self, ae_title: str = "ECHO_SCP", bind_address: str = "", port: int = 11112, logger=None): - """ Initializes a new instance of the CEchoSCP class. This method creates an AE without presentation contexts. @@ -58,8 +62,31 @@ def __init__(self, A logger instance Optional, default: None, a debug logger will be used. """ + if logger is None: + self.logger = setup_logging( + Namespace(log_type=None, log_level="debug"), "cecho_scp") + self.logger.info( + "Logger not provided, using default logger with level %s", + logging.getLevelName(self.logger.level) + ) + elif isinstance(logger, logging.Logger): + self.logger = logger + self.logger.debug( + "Logger set to %s with level %s", + logger.name, logging.getLevelName(logger.getEffectiveLevel()) + ) + else: + raise TypeError("logger must be an instance of logging.Logger") + + if not ae_title: + self.ae_title = "ECHO_SCP" + self.logger.info("AE title not provided, using default: %s", + self.ae_title) + else: + self.ae_title = ae_title + super().__init__( - ae_title=ae_title, + ae_title=self.ae_title, bind_address=bind_address, port=port, logger=logger) diff --git a/tdwii_plus_examples/cli/echoscp.py b/tdwii_plus_examples/cli/echoscp.py index 15976c4..4c30d5f 100755 --- a/tdwii_plus_examples/cli/echoscp.py +++ b/tdwii_plus_examples/cli/echoscp.py @@ -1,16 +1,18 @@ #!/usr/bin/env python + import argparse import logging +import time from tdwii_plus_examples.cechoscp import CEchoSCP -def main(): +def main(loop_forever=True): # Add a parameter to control the loop parser = argparse.ArgumentParser( description="Run a DICOM Verification SCP." ) parser.add_argument( - '-a', '--ae_title', type=str, default='ECHO_SCP', + '-a', '--ae_title', type=str, default='', help='Application Entity Title' ) parser.add_argument( @@ -18,7 +20,7 @@ def main(): help='Specific IP address or hostname, omit to bind to all interfaces' ) parser.add_argument( - '-p', '--port', type=int, default=11112, + '-p', '--port', type=int, default=None, help='Port number' ) parser.add_argument( @@ -40,6 +42,7 @@ def main(): logging.basicConfig(level=log_level) logger = logging.getLogger('echoscp') + logger.setLevel(log_level) logger.info("Starting up the DICOM Verification SCP...") cechoscp = CEchoSCP(ae_title=args.ae_title, bind_address=args.bind_address, @@ -47,8 +50,8 @@ def main(): cechoscp.run() # Keep the main application running try: - while True: - pass # You can replace this with your main application logic + while loop_forever: + time.sleep(1) # Sleep to prevent high CPU usage except KeyboardInterrupt: logger.info("Shutting down the DICOM Verification SCP...") diff --git a/tdwii_plus_examples/cli/storescp.py b/tdwii_plus_examples/cli/storescp.py new file mode 100755 index 0000000..82d974d --- /dev/null +++ b/tdwii_plus_examples/cli/storescp.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python +import argparse +import logging +import time + +from tdwii_plus_examples.cstorescp import CStoreSCP + +from pydicom.uid import UID +from pynetdicom.status import Status + + +def my_handler(event, args, logger): + """ + Example of a custom handler for C-STORE requests. + + This handler logs all arguments passed to it, as well as information + of the C-STORE-RQ message. It's meant to be used as a starting point for + writing your own custom handlers. + + Parameters + ---------- + event : pynetdicom.event.event + The event that triggered the handler + args : argparse.Namespace + The arguments passed to the handler + logger : logging.Logger + The logger instance + + Returns + ------- + status : pynetdicom.sop_class.Status or int + The return status of the handler + """ + logger.info("Custom handler for %s called", event.request.msg_type) + + # Log all arguments + args_dict = vars(args) + for key in args_dict: + logger.info("\twith argument %s: %s", key, args_dict[key]) + + # Log the C-STORE request information + request_info = { + "MessageID": event.request.MessageID, + "AffectedSOPClassUID": event.request.AffectedSOPClassUID, + "AffectedSOPInstanceUID": event.request.AffectedSOPInstanceUID, + "Priority": event.request.Priority, + "MoveOriginatorApplicationEntityTitle": + event.request.MoveOriginatorApplicationEntityTitle, + "MoveOriginatorMessageID": event.request.MoveOriginatorMessageID + } + + for key, value in request_info.items(): + if key == "AffectedSOPClassUID": + logger.info("Command Set %s: %s (%s)", key, value, UID(value).name) + else: + logger.info("Command Set %s: %s", key, value) + + data_set_info = ( + f"{event.request.DataSet.getbuffer().nbytes} bytes" + if event.request.DataSet is not None else "Absent" + ) + logger.info("Command Set Data Set: %s", data_set_info) + + return Status.SUCCESS + + +def main(loop_forever=True): # Add a parameter to control the loop + parser = argparse.ArgumentParser( + description="Run a DICOM Storage SCP." + ) + parser.add_argument( + '-a', '--ae_title', type=str, default='ECHO_SCP', + help='Application Entity Title' + ) + parser.add_argument( + '-b', '--bind_address', type=str, default='', + help='Specific IP address or hostname, omit to bind to all interfaces' + ) + parser.add_argument( + '-p', '--port', type=int, default=11112, + help='Port number' + ) + parser.add_argument( + '-s', '--sop_classes', nargs='+', + help='List of SOP Class UID or valid keywords from PS3.6 Annex A' + ) + parser.add_argument( + '-t', '--transfer_syntaxes', nargs='+', + help='List of Transfer syntax to support' + ) + parser.add_argument( + '-c', '--custom_handler', type=str, + help='Custom C-STORE handler function' + ) + parser.add_argument( + '-o', '--output_directory', type=str, + help='Output directory, defaults to current working directory' + ) + parser.add_argument( + '-v', '--verbose', action='store_true', + help='Set log level to INFO' + ) + parser.add_argument( + '-d', '--debug', action='store_true', + help='Set log level to DEBUG' + ) + + args = parser.parse_args() + + log_level = logging.WARNING + if args.verbose: + log_level = logging.INFO + elif args.debug: + log_level = logging.DEBUG + + logging.basicConfig(level=log_level) + logger = logging.getLogger('storescp') + + if args.custom_handler: + # Custom handler should be a function defined in the global namespace + # at the module level, so we use globals() to look it up. + # This allows passing a function name as a string argument. + handler = globals().get(args.custom_handler) + else: + handler = None + + logger.info("Starting up the DICOM Storage SCP...") + cstorescp = CStoreSCP( + ae_title=args.ae_title, + bind_address=args.bind_address, + port=args.port, + logger=logger, + sop_classes=args.sop_classes, + transfer_syntaxes=args.transfer_syntaxes, + custom_handler=handler, + store_directory=args.output_directory + ) + cstorescp.run() + logger.info("DICOM Storage SCP is running...") + # Keep the main application running + try: + while loop_forever: + time.sleep(1) # Sleep to prevent high CPU usage + except KeyboardInterrupt: + logger.info("Shutting down the DICOM Storage SCP...") + + +if __name__ == "__main__": + main() diff --git a/tdwii_plus_examples/cstorehandler.py b/tdwii_plus_examples/cstorehandler.py new file mode 100644 index 0000000..a8632ba --- /dev/null +++ b/tdwii_plus_examples/cstorehandler.py @@ -0,0 +1,187 @@ +import os +from pynetdicom.status import Status +from pydicom.dataset import Dataset +from pydicom.filewriter import write_file_meta_info +from pydicom.uid import DeflatedExplicitVRLittleEndian +from pynetdicom.dsutils import encode + + +# Define the specific Status Code values for the C-STORE Response +Status.add('UNABLE_TO_DECODE_DATASET', 0xC210) # Failure to decode the dataset +Status.add('MISSING_ARGUMENT', 0xC212) # Failure to process the request +Status.add('OUT_OF_RESOURCES', 0xA700) # Failure to store the dataset +Status.add('UNABLE_TO_STORE_DATASET', 0xA701) # Failure to store the dataset + +# Define prefixes for SOP classes commmonly used in Radiotherapy +SOP_CLASS_PREFIXES = { + "1.2.840.10008.5.1.4.1.1.2": ("CT", "CT Image Storage"), + "1.2.840.10008.5.1.4.1.1.4": ("MR", "MR Image Storage"), + "1.2.840.10008.5.1.4.1.1.7": ("SC", "Secondary Capture Image Storage"), + "1.2.840.10008.5.1.4.1.1.128": ( + "PT", "Positron Emission Tomography Image Storage"), + "1.2.840.10008.5.1.4.1.1.481.1": ( + "RI", "RTIMG", "RTIMAGE", + "RT Image Storage"), + "1.2.840.10008.5.1.4.1.1.481.2": ( + "RD", "RTDOSE", + "RT Dose Storage"), + "1.2.840.10008.5.1.4.1.1.481.3": ( + "RS", "RTSTRUCT", "RTSTRUCTURESET", + "RT Structure Set Storage"), + "1.2.840.10008.5.1.4.1.1.481.4": ( + "RR", "RTREC", "RTRECORD", + "RT Beam Treatment Record Storage"), + "1.2.840.10008.5.1.4.1.1.481.5": ( + "RP", "RTPLAN", "RTPLAN", + "RT Plan Storage"), + "1.2.840.10008.5.1.4.1.1.481.6": ( + "BR", "RTBYREC", "RTBRACHYRECORD", + "RT Brachy Treatment Record Storage"), + "1.2.840.10008.5.1.4.1.1.481.7": ( + "SR", "SUMREC", "RTSUMRECORD", + "RT Treatment Summary Record Storage"), + "1.2.840.10008.5.1.4.1.1.481.8": ( + "RN", "RTIONPLN", "RTIONPLAN", + "RT Ion Plan Storage"), + "1.2.840.10008.5.1.4.1.1.481.9": ( + "IR", "RTIONREC", "RTIONRECORD", + "RT Ion Beams Treatment Record Storage"), + "1.2.840.10008.5.1.4.34.7": ( + "BDI", "RTBDI", "RTBEAMDELIVERYINSTRUCTION", + "RT Beams Delivery Instruction Storage"), + "1.2.840.10008.5.1.4.1.1.66.1": ( + "REG", "IMREG", "IMAGEREGISTRATION", + "Spatial Registration Storage"), + "1.2.840.10008.5.1.4.1.1.66.3": ( + "DIR", "DEFIMREG", "DEFORMABLEIMAGEREGISTRATION", + "Deformable Spatial Registration Storage") +} + + +def handle_cstore(event, args, app_logger): + """Handler for evt.EVT_C_STORE. + + Parameters + ---------- + event : pynetdicom.event.event + The event corresponding to a C-STORE request. + args : argparse.Namespace + The namespace containing the arguments to use. The namespace should + contain ``args.ignore`` and ``args.output_directory`` attributes: + - ``args.ignore``: ``bool`` indicating whether to ignore the + request or not. Defaults to ``False`` if attribute is not present. + - ``args.output_directory``: ``str`` indicating the directory + where the incoming DICOM instance should be stored. Required. + app_logger : logging.Logger + The application's logger. + + Returns + ------- + status : pynetdicom.sop_class.Status or int + A valid return status code, see PS3.4 Annex B.2.3 or the + ``StorageServiceClass`` implementation for the available statuses + """ + # Create the response status assuming success + # consistently using Dataset object with Status element (vs int) + status_ds = Dataset() + status_ds.Status = Status.SUCCESS + + # Return Success status code if the C-STORE request is to be ignored + if hasattr(args, 'ignore'): + if args.ignore: + app_logger.info("Ignoring C-STORE request") + return status_ds + else: + app_logger.info("Processing C-STORE request") + else: + app_logger.warning( + "args.ignore attribute not present, processing C-STORE request") + + # Check that the output directory argument is present and not None + if not hasattr(args, 'output_directory') or args.output_directory is None: + app_logger.error("args.output_directory attribute not present or None") + status_ds.Status = Status.MISSING_ARGUMENT + return status_ds + + # Read the incoming Data Set + try: + ds = event.dataset + # Remove any File Meta Information elements (group 0x0002) elements + # that may have been included + ds = ds[0x00030000:] + except Exception: + app_logger.exception("Unable to decode the data set") + status_ds.Status = Status.UNABLE_TO_DECODE_DATASET + return status_ds + + # Add the File Meta Information elements from the incoming Command Set + # Note: Implementation Class UID and Implementation Version Name are set + # to PYNETDICOM_IMPLEMENTATION_UID and PYNETDICOM_IMPLEMENTATION_VERSION + ds.file_meta = event.file_meta + + # Because pydicom uses deferred reads for its decoding, decoding errors + # are hidden until encountered by accessing a faulty element + try: + sop_class = ds.SOPClassUID + sop_instance = ds.SOPInstanceUID + except Exception: + app_logger.exception( + "Unable to decode the data set and/or the command set" + ) + status_ds.Status = Status.UNABLE_TO_DECODE_DATASET + return status_ds + + # Get a prefix of the SOP Class for the filename + try: + sop_prefix = SOP_CLASS_PREFIXES[sop_class][0] + except KeyError: + sop_prefix = "UN" + + filename = f"{sop_prefix}.{sop_instance}.dcm" + app_logger.info(f"Storing DICOM file: {filename}") + + # Store the received dataset as a DICOM Part 10 file + + filename = os.path.join(args.output_directory, filename) + # Create the output directory if not present + try: + os.makedirs(args.output_directory, exist_ok=True) + except Exception: + app_logger.exception( + f"Unable to create the output directory: {args.output_directory}" + ) + status_ds.Status = Status.OUT_OF_RESOURCES + return status_ds + + # Warn of overwriting if the file already exists + if os.path.exists(filename): + app_logger.warning("DICOM file already exists, overwriting") + + # Write the file + try: + if event.context.transfer_syntax == DeflatedExplicitVRLittleEndian: + # Workaround for pydicom issue #1086 + with open(filename, "wb") as f: + f.write(b"\x00" * 128) + f.write(b"DICM") + write_file_meta_info(f, event.file_meta) + f.write(encode(ds, False, True, True)) + else: + # We use `write_like_original=False` to ensure that a compliant + # File Meta Information Header is written + ds.save_as(filename, write_like_original=False) + + except IOError: + app_logger.exception( + "Could not write file to specified directory: " + f"{os.path.dirname(filename)}" + ) + status_ds.Status = Status.OUT_OF_RESOURCES + except Exception: + app_logger.exception( + "Could not write file to specified directory: " + f"{os.path.dirname(filename)}" + ) + status_ds.Status = Status.UNABLE_TO_STORE_DATASET + + return status_ds diff --git a/tdwii_plus_examples/cstorescp.py b/tdwii_plus_examples/cstorescp.py new file mode 100644 index 0000000..3704b43 --- /dev/null +++ b/tdwii_plus_examples/cstorescp.py @@ -0,0 +1,243 @@ +import os +from argparse import Namespace +import logging + +from pydicom.uid import UID, ImplicitVRLittleEndian, AllTransferSyntaxes +from pynetdicom import DEFAULT_TRANSFER_SYNTAXES, evt +from pynetdicom.presentation import ( + StoragePresentationContexts, + AllStoragePresentationContexts, +) +from pynetdicom.apps.common import setup_logging + +from tdwii_plus_examples.cechoscp import CEchoSCP +from tdwii_plus_examples.cstorehandler import handle_cstore + + +class CStoreSCP(CEchoSCP): + """ + A subclass of the CEchoSCP class that implements the DICOM Storage + Service Class Provider (SCP). + + Unless otherwise specified, The first 128 Storage SOP Classes from all + SOP Classes sorted by abtract syntax SOP Class UID are supported with the + Implicit VR Little Endian, Implicit VR Big Endian, Explicit VR Little + Endian and Deflated Explicit VR Little Endian transfer syntaxes. + + Usage: + Create an instance of CStoreSCP and call the run method inherited + from the BaseSCP grandparent class to start listening for incoming + DICOM association requests. + + Attributes: + ae_title (str): The title of the Application Entity (AE) + bind_address (str): The IP address or hostname of the AE + port (int): The port number the AE will listen on + logger (logging.Logger): A logger instance + sop_classes (list): A list of SOP Classes to support + transfer_syntaxes (list): A list of transfer syntaxes to support + custom_handler (function): A function to handle C-STORE requests + store_directory (str): The directory to store files + + Methods: + _add_contexts(self) + Adds presentation contexts to the SCP instance. + _add_handlers(self) + Adds handlers for DICOM communication events to the SCP instance. + """ + + def __init__(self, + ae_title: str = "STORE_SCP", + bind_address: str = "", + port: int = 11112, + logger=None, + sop_classes=None, + transfer_syntaxes=None, + custom_handler=None, + store_directory=None, + ): + """ + Initializes a new instance of the CStoreSCP class. + This method creates an AE without presentation contexts. + + Parameters + ---------- + ae_title : str + The title of the Application Entity (AE) + Optional, default: "STORE_SCP" + + bind_address : str + A specific IP address or hostname of the AE + Optional, default: "" will bind to all interfaces. + + port: int + The port number to listen on + Optional, default: 11112 (as registered for DICOM at IANA) + + logger: logging.Logger + A logger instance + Optional, default: None, a debug logger will be used + + sop_classes: list of str or pydicom.uid.UID + A list of SOP Classes UIDs or names to support + (names must be valid SOP Class Keywords from PS3.6 Annex A, + invalid UIDs and names will be ignored) + Optional, default: None, First 128 SOP Classes are supported + + transfer_syntaxes: list of str/pydicom.uid.UID + A list of transfer syntaxes UIDs or names to support + (names must be valid Transfer Syntax Keywords from PS3.6 Annex A, + invalid UIDs and names will be ignored) + Optional, default: None, pynetdicom default transfer syntaxes are + supported + + custom_handler: function + A function to handle C-STORE requests + Optional, default: None, cstorehandler.handle_cstore function + + store_directory: str + The directory to store files + Optional, default: current directory + """ + if logger is None: + self.logger = setup_logging( + Namespace(log_type=None, log_level="debug"), "cstore_scp") + self.logger.info( + "Logger not provided, using default logger with level %s", + logging.getLevelName(self.logger.level) + ) + elif isinstance(logger, logging.Logger): + self.logger = logger + self.logger.debug( + "Logger set to %s with level %s", + logger.name, logging.getLevelName(logger.getEffectiveLevel()) + ) + + if not ae_title: + self.ae_title = "STORE_SCP" + self.logger.info("AE title not provided, using default: %s", + self.ae_title) + else: + self.ae_title = ae_title + + self.sop_classes = sop_classes + if sop_classes is not None: + self._valid_sop_classes, self._invalid_sop_classes = ( + self._validate_syntaxes(sop_classes, "SOP Class") + ) + if self._invalid_sop_classes: + self.logger.warning("Ignoring invalid SOP Classes: %s", + self._invalid_sop_classes) + + self.transfer_syntaxes = transfer_syntaxes + if transfer_syntaxes is not None: + self._valid_transfer_syntaxes, self._invalid_transfer_syntaxes = ( + self._validate_syntaxes(transfer_syntaxes, "Transfer Syntax") + ) + if self._invalid_transfer_syntaxes: + self.logger.warning("Ignoring invalid Transfer Syntaxes: %s", + self._invalid_transfer_syntaxes) + + self.logger.debug( + f"Custom handler: {custom_handler} type is {type(custom_handler)}") + if custom_handler is None: + self.logger.warning("No custom_handler defined, " + "using default handler") + self.handle_store = handle_cstore + elif not callable(custom_handler): + self.logger.warning(f"{custom_handler} is not a known function, " + "using default handler") + self.handle_store = handle_cstore + else: + self.handle_store = custom_handler + self.logger.debug(f"Custom handler: {self.handle_store}") + + if store_directory is None: + self.store_directory = os.getcwd() + else: + self.store_directory = store_directory + self.logger.debug(f"Store directory: {self.store_directory}") + + super().__init__( + ae_title=self.ae_title, + bind_address=bind_address, + port=port, + logger=logger) + + def _validate_syntaxes(self, items, uid_type): + # Sort valid and invalid items based on the UID type + self.logger.debug(f"Validating {uid_type} list: {items}") + + # Construct a mapping of valid keywords and UIDs for syntax validation + valid_syntaxes = { + ctx.abstract_syntax: UID(ctx.abstract_syntax).keyword + for ctx in AllStoragePresentationContexts + } if uid_type == 'SOP Class' else { + ctx: UID(ctx).keyword + for ctx in AllTransferSyntaxes + } + + # Check each item in the provided list + valid_items = [] + invalid_items = [] + for item in items: + keyword = valid_syntaxes.get(item) + if keyword is not None: + valid_items.append(item) + self.logger.debug(f"Valid {uid_type} : {item}") + elif item in valid_syntaxes.values(): + valid_items.append( + next(UID for UID, keyword in valid_syntaxes.items() + if keyword == item)) + self.logger.debug(f"Valid {uid_type} : {item}") + else: + invalid_items.append(item) + self.logger.debug(f"Invalid {uid_type} : {item}") + + return valid_items, invalid_items + + def _add_contexts(self): + """ + Adds the DICOM Storage SOP Classes presentation context to the AE. + + This method overrides the CEchoSCP parent class method to add support + for the first 128 Storage SOP Classes (sorted by SOP Class UID) or for + the SOP Classes specified by the constructor. + Implicit VR Little Endian, Implicit VR Big Endian, Explicit VR Little + Endian and Deflated Explicit VR Little Endian transfer syntaxes are + included by default unless otherwise specified in the constructor. + """ + super()._add_contexts() + if self.sop_classes is None: + sop_classes = [ + context.abstract_syntax + for context in StoragePresentationContexts + ] + else: + sop_classes = self._valid_sop_classes + self.logger.debug(f"Supported Storage SOP Classes: {sop_classes}") + + if self.transfer_syntaxes is None: + transfer_syntaxes = DEFAULT_TRANSFER_SYNTAXES + else: + if ImplicitVRLittleEndian not in self._valid_transfer_syntaxes: + transfer_syntaxes = [ImplicitVRLittleEndian] + \ + self._valid_transfer_syntaxes + else: + transfer_syntaxes = self._valid_transfer_syntaxes + self.logger.debug(f"Supported Transfer Syntaxes: {transfer_syntaxes}") + + for sop_class in sop_classes: + self.ae.add_supported_context(sop_class, transfer_syntaxes) + + def _add_handlers(self): + """ + Adds a handler for for processing incoming C-STORE requests. + + This method overrides the CEchoSCP parent class method to add a handler + for the Storage SOP Classes. + """ + super()._add_handlers() + args = Namespace(ignore=False, output_directory=self.store_directory) + self.handlers.append((evt.EVT_C_STORE, self.handle_store, + [args, self.logger])) diff --git a/tdwii_plus_examples/tests/cli/test_echoscp.py b/tdwii_plus_examples/tests/cli/test_echoscp.py new file mode 100644 index 0000000..2063db0 --- /dev/null +++ b/tdwii_plus_examples/tests/cli/test_echoscp.py @@ -0,0 +1,50 @@ +import unittest +from unittest.mock import patch, MagicMock +import argparse +import sys +import logging + +from tdwii_plus_examples.cli.echoscp import main + + +class TestMainFunction(unittest.TestCase): + + @patch('tdwii_plus_examples.cli.echoscp.CEchoSCP') + @patch('tdwii_plus_examples.cli.echoscp.argparse.ArgumentParser.parse_args') + def test_main(self, mock_parse_args, MockCEchoSCP): + # Mock the arguments + mock_parse_args.return_value = argparse.Namespace( + ae_title='TEST_AE', + bind_address='127.0.0.1', + port=12345, + verbose=False, + debug=True + ) + + # Mock the CEchoSCP instance + mock_cechoscp_instance = MockCEchoSCP.return_value + mock_cechoscp_instance.run = MagicMock() + + # Run the main function + with patch.object(sys, 'argv', ['echoscp.py']): + with self.assertLogs('echoscp', level='DEBUG') as log: + main(loop_forever=False) # avoid the infinite loop + + # Check that CEchoSCP was initialized with the correct arguments + MockCEchoSCP.assert_called_once_with( + ae_title='TEST_AE', + bind_address='127.0.0.1', + port=12345, + logger=logging.getLogger('echoscp') + ) + + # Check that the run method was called + mock_cechoscp_instance.run.assert_called_once() + + # Check the log output + self.assertIn( + 'Starting up the DICOM Verification SCP...', log.output[0]) + + +if __name__ == '__main__': + unittest.main() diff --git a/tdwii_plus_examples/tests/cli/test_storescp.py b/tdwii_plus_examples/tests/cli/test_storescp.py new file mode 100644 index 0000000..47144a0 --- /dev/null +++ b/tdwii_plus_examples/tests/cli/test_storescp.py @@ -0,0 +1,63 @@ +import unittest +from unittest.mock import patch, MagicMock, Mock +import argparse +import sys +import logging + +from tdwii_plus_examples.cli.storescp import main + + +class TestMainFunction(unittest.TestCase): + + @patch('tdwii_plus_examples.cli.storescp.CStoreSCP') + @patch('tdwii_plus_examples.cli.storescp.argparse.ArgumentParser.parse_args') + def test_main(self, mock_parse_args, MockCStoreSCP): + # Mock the arguments + mock_parse_args.return_value = argparse.Namespace( + ae_title='TEST_AE', + bind_address='127.0.0.1', + port=12345, + sop_classes=['SecondaryCaptureImageStorage'], + transfer_syntaxes=['ImplicitVRLittleEndian'], + custom_handler='my_handler', + output_directory='/var/tmp/output', + verbose=False, + debug=True + ) + + # Mock the custom handler function + mock_handler = Mock(name='my_handler') + + # Patch the place where the custom handler is used + with patch('tdwii_plus_examples.cli.storescp.my_handler', mock_handler): + # Mock the CStoreSCP instance + mock_cstorescp_instance = MockCStoreSCP.return_value + mock_cstorescp_instance.run = MagicMock() + + # Run the main function + with patch.object(sys, 'argv', ['storescp.py']): + with self.assertLogs('storescp', level='DEBUG') as log: + main(loop_forever=False) # avoid the infinite loop + + # Check that CStoreSCP was initialized with the correct arguments + MockCStoreSCP.assert_called_once_with( + ae_title='TEST_AE', + bind_address='127.0.0.1', + port=12345, + logger=logging.getLogger('storescp'), + sop_classes=['SecondaryCaptureImageStorage'], + transfer_syntaxes=['ImplicitVRLittleEndian'], + custom_handler=mock_handler, # Ensure this is the mock function + store_directory='/var/tmp/output' + ) + + # Check that the run method was called + mock_cstorescp_instance.run.assert_called_once() + + # Check the log output + self.assertIn( + 'Starting up the DICOM Storage SCP...', log.output[0]) + + +if __name__ == '__main__': + unittest.main() diff --git a/tdwii_plus_examples/tests/test_basehandlers.py b/tdwii_plus_examples/tests/test_basehandlers.py new file mode 100644 index 0000000..e1f2686 --- /dev/null +++ b/tdwii_plus_examples/tests/test_basehandlers.py @@ -0,0 +1,52 @@ +import unittest +from unittest.mock import MagicMock, patch +from tdwii_plus_examples.basehandlers import handle_open, handle_close + + +class TestHandleConnectionsEvents(unittest.TestCase): + @patch('logging.Logger') + def test_handle_open(self, MockLogger): + # Create a mock event + mock_event = MagicMock() + mock_event.assoc.requestor.address = '127.0.0.1' + mock_event.assoc.requestor.port = 104 + + # Create a mock logger + mock_logger = MockLogger() + + # Call the function + status = handle_open(mock_event, mock_logger) + + # Check the status + self.assertEqual(status, 0x0000) + + # Check that the logger was called with the correct message + mock_logger.info.assert_called_once_with( + 'Succesful connection from 127.0.0.1:104' + ) + + @patch('logging.Logger') + def test_handle_close(self, MockLogger): + # Create a mock event + mock_event = MagicMock() + mock_event.assoc.requestor.ae_title = 'TEST_AET' + mock_event.assoc.requestor.address = '127.0.0.1' + mock_event.assoc.requestor.port = 104 + + # Create a mock logger + mock_logger = MockLogger() + + # Call the function + status = handle_close(mock_event, mock_logger) + + # Check the status + self.assertEqual(status, 0x0000) + + # Check that the logger was called with the correct message + mock_logger.info.assert_called_once_with( + 'Closed connection with TEST_AET@127.0.0.1:104' + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/tdwii_plus_examples/tests/test_basescp.py b/tdwii_plus_examples/tests/test_basescp_integration.py similarity index 100% rename from tdwii_plus_examples/tests/test_basescp.py rename to tdwii_plus_examples/tests/test_basescp_integration.py diff --git a/tdwii_plus_examples/tests/test_basescp_unit.py b/tdwii_plus_examples/tests/test_basescp_unit.py new file mode 100644 index 0000000..c0756e7 --- /dev/null +++ b/tdwii_plus_examples/tests/test_basescp_unit.py @@ -0,0 +1,184 @@ +import unittest +import logging +from logging.handlers import MemoryHandler +from unittest.mock import MagicMock, patch +from parameterized import parameterized + +from tdwii_plus_examples.basescp import BaseSCP +from tdwii_plus_examples.basehandlers import handle_open, handle_close +from pynetdicom import evt + + +class TestBaseSCP(unittest.TestCase): + @classmethod + def setUpClass(cls): + # Set up the logger for this test case to DEBUG level + # with a stream handler to print the log messages to the console + cls.test_logger = logging.getLogger('test_basescp') + cls.test_logger.setLevel(logging.INFO) + cls.stream_handler = logging.StreamHandler() + cls.test_logger.addHandler(cls.stream_handler) + + def setUp(self): + # Set up the logger for the BaseSCP to DEBUG level + # with a memory handler to store up to 100 log messages + self.scp_logger = logging.getLogger('basescp') + self.scp_logger.setLevel(logging.DEBUG) + self.memory_handler = MemoryHandler(100) + self.scp_logger.addHandler(self.memory_handler) + + # Patch the AE class to return a mock instance + self.patcher = patch('tdwii_plus_examples.basescp.AE', autospec=True) + self.mock_ae_class = self.patcher.start() + self.mock_ae = self.mock_ae_class.return_value + + def tearDown(self): + self.scp_logger.removeHandler(self.memory_handler) + self.memory_handler.close() + self.patcher.stop() + + # Parameterized unit tests for the constructor parameters + + # Define expected warnings text + W_PORT_104 = 'DICOM port 104 may need admin privileges' + W_PORT_REG = 'Registered port (1024-49151) may be used by others' + + # Define test cases parameters + test_cases = [ + (None, None, None, None, None, None), # DEFAULT_PARAMS + (None, None, None, "scp_logger", None, None), # CUSTOM_LOGGER + ("", None, None, None, None, None), # EMPTY_AE_TITLE + (None, "", None, None, None, None), # EMPTY_BIND_ADDRESS + (123, None, None, None, TypeError, None), # INVALID_AE_TITLE_TYPE + (None, 123, None, None, TypeError, None), # INVALID_BIND_ADDRESS_TYPE + (None, None, "", None, TypeError, None), # INVALID_PORT_TYPE + (None, None, "123", None, TypeError, None), # INVALID_PORT_TYPE_NAME + (None, None, -1, None, ValueError, None), # NEGATIVE_PORT + (None, None, 65536, None, ValueError, None), # OUT_OF_RANGE_PORT + (None, None, 4711, "scp_logger", None, W_PORT_REG), # REGISTERED_PORT + (None, None, 104, "scp_logger", None, W_PORT_104), # DICOM_WK_REG_PORT + (None, None, 11112, "scp_logger", None, None), # DICOM_REG_PORT + (None, None, 11114, "scp_logger", None, None), # UNASSIGNED_REG_PORT + ("SCP", "127.0.0.1", 104, "scp_logger", None, None) # CUSTOM_PARAMS + ] + + # Define a function to generate test names + def custom_name_func(testcase_func, param_num, param): + return f"{testcase_func.__name__}_{param_num}" + + @parameterized.expand(test_cases, name_func=custom_name_func) + def test_init_params( + self, + ae_title, + bind_address, + port, + logger_name, + expected_exception, + expected_warning + ): + logger = getattr(self, logger_name) if logger_name else None + + if expected_exception: + with self.assertRaises(expected_exception) as exception_context: + BaseSCP(ae_title, bind_address, port, logger) + self.test_logger.info( + "Raised expected exception: %s: %s" % ( + expected_exception.__name__, + exception_context.exception)) + else: + scp = BaseSCP(ae_title, bind_address, port, logger) + + # Check ae_title + self.assertEqual( + scp.ae_title, "BASE_SCP" if not ae_title else ae_title) + + # Check bind_address + self.assertEqual( + scp.bind_address, "" if not bind_address else bind_address) + + # Check port + self.assertEqual( + scp.port, 11112 if not port else port) + + # Check logger + if logger is None: + self.assertEqual(scp.logger.name, 'base_scp') + else: + self.assertEqual(scp.logger.name, logger.name) + self.assertIsInstance(scp.logger, logging.Logger) + + # Check for expected warning + self.memory_handler.flush() + log_output = [record.getMessage() + for record in self.memory_handler.buffer] + if expected_warning: + self.assertIn(f'{expected_warning}', log_output) + + # Print the log messages + # for record in self.memory_handler.buffer: + # self.test_logger.info("%s: %s" % ( + # record.levelname, record.getMessage()[:77])) + + # Mock unit tests for the run method + + def test_run_success(self): + self.base_scp = BaseSCP(bind_address="localhost", + logger=self.scp_logger) + + self.mock_server = MagicMock() + self.mock_ae.start_server.return_value = self.mock_server + self.base_scp.run() + self.mock_ae.start_server.assert_called_once_with( + ("localhost", 11112), + evt_handlers=self.base_scp.handlers, + block=False + ) + + # Check the log messages + self.memory_handler.flush() + log_output = [record.getMessage() + for record in self.memory_handler.buffer] + # Print the log output for debugging + # self.test_logger.info(f"Log output: {log_output}") + self.assertIn("SCP server started successfully", log_output) + + def test_run_failure(self): + self.base_scp = BaseSCP(bind_address="localhost", + logger=self.scp_logger) + + self.mock_ae.start_server.side_effect = Exception( + "Failed to start server") + self.base_scp.run() + self.mock_ae.start_server.assert_called_once_with( + ("localhost", 11112), + evt_handlers=self.base_scp.handlers, + block=False + ) + + # Check the log messages + self.memory_handler.flush() + log_output = [record.getMessage() + for record in self.memory_handler.buffer] + # Print the log output for debugging + # self.test_logger.info(f"Log output: {log_output}") + self.assertIn( + "SCP server failed to start: Failed to start server", log_output) + + # Other unit tests + + def test_add_handlers(self): + self.base_scp = BaseSCP(bind_address="localhost", + logger=self.scp_logger) + # Check that the handlers list contains the expected handlers + expected_handlers = [ + (evt.EVT_CONN_OPEN, handle_open, [self.scp_logger]), + (evt.EVT_CONN_CLOSE, handle_close, [self.scp_logger]) + ] + + # Print the actual handlers for debugging + # self.test_logger.info(f"Actual handlers: {self.base_scp.handlers}") + self.assertEqual(self.base_scp.handlers, expected_handlers) + + +if __name__ == "__main__": + unittest.main() diff --git a/tdwii_plus_examples/tests/test_cechohandler.py b/tdwii_plus_examples/tests/test_cechohandler.py new file mode 100644 index 0000000..a6b4ba4 --- /dev/null +++ b/tdwii_plus_examples/tests/test_cechohandler.py @@ -0,0 +1,33 @@ +import unittest +from unittest.mock import MagicMock, patch +from tdwii_plus_examples.cechohandler import handle_cecho + + +class TestHandleCEchoEvent(unittest.TestCase): + @patch('logging.Logger') + def test_handle_cecho(self, MockLogger): + # Create a mock event + mock_event = MagicMock() + mock_event.assoc.requestor.ae_title = 'TEST_AET' + mock_event.assoc.requestor.address = '127.0.0.1' + mock_event.assoc.requestor.port = 104 + mock_event.timestamp.strftime.return_value = '2024-12-28 11:00:00' + + # Create a mock logger + mock_logger = MockLogger() + + # Call the function + status = handle_cecho(mock_event, mock_logger) + + # Check the status + self.assertEqual(status, 0x0000) + + # Check that the logger was called with the correct message + mock_logger.info.assert_called_once_with( + 'Received C-ECHO request from TEST_AET@127.0.0.1:104 at ' + '2024-12-28 11:00:00' + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/tdwii_plus_examples/tests/test_cechoscp.py b/tdwii_plus_examples/tests/test_cechoscp_integration.py similarity index 97% rename from tdwii_plus_examples/tests/test_cechoscp.py rename to tdwii_plus_examples/tests/test_cechoscp_integration.py index 15957aa..c257be9 100644 --- a/tdwii_plus_examples/tests/test_cechoscp.py +++ b/tdwii_plus_examples/tests/test_cechoscp_integration.py @@ -10,7 +10,7 @@ class TestCEchoSCP(unittest.TestCase): def setUp(self): - # Set up the logger for the BaseSCP to INFO level + # Set up the logger for the CEchoSCP to INFO level # with a memory handler to store up to 100 log messages self.scp_logger = logging.getLogger('cechoscp') self.scp_logger.setLevel(logging.INFO) diff --git a/tdwii_plus_examples/tests/test_cechoscp_unit.py b/tdwii_plus_examples/tests/test_cechoscp_unit.py new file mode 100644 index 0000000..53ac236 --- /dev/null +++ b/tdwii_plus_examples/tests/test_cechoscp_unit.py @@ -0,0 +1,62 @@ +import unittest +from parameterized import parameterized +import logging +from logging.handlers import MemoryHandler +from pynetdicom import evt +from pynetdicom.sop_class import Verification + +from tdwii_plus_examples.cechohandler import handle_cecho +from tdwii_plus_examples.cechoscp import CEchoSCP + + +class TestCEchoSCP(unittest.TestCase): + + def setUp(self): + # Set up the logger for the BaseSCP to DEBUG level + # with a memory handler to store up to 100 log messages + self.scp_logger = logging.getLogger('test_cechoscp') + self.scp_logger.setLevel(logging.DEBUG) + self.memory_handler = MemoryHandler(100) + self.scp_logger.addHandler(self.memory_handler) + + self.scp = CEchoSCP(logger=self.scp_logger) + + @parameterized.expand([ + ("ECHO_SCP", "", 11112, ""), + ("MY_SCP", "", 11112, "scp_logger"), + ("", "", 11112, "scp_logger"), + (None, "", 11112, "scp_logger"), + ]) + def test_init_params(self, ae_title, bind_address, port, logger_name): + if logger_name: + logger = getattr(self, logger_name) + else: + logger = None + + scp = CEchoSCP(ae_title=ae_title, bind_address=bind_address, + port=port, logger=logger) + + self.assertEqual( + scp.ae_title, "ECHO_SCP" if not ae_title else ae_title) + self.assertEqual(scp.bind_address, bind_address) + self.assertEqual(scp.port, port) + self.assertEqual( + scp.logger.name, "test_cechoscp" if logger_name else "base_scp") + + def test_add_contexts(self): + sop_class = [ctx.abstract_syntax + for ctx in self.scp.ae.supported_contexts] + self.assertIn(Verification, sop_class, + msg="Verification SOP Class not supported") + ts = [ctx.transfer_syntax for ctx in self.scp.ae.supported_contexts + if ctx.abstract_syntax == Verification] + self.assertEqual(ts[0], ['1.2.840.10008.1.2'], + msg="Transfer syntax not as expected") + + def test_add_handlers(self): + handlers = [(evt.EVT_C_ECHO, handle_cecho, [self.scp_logger])] + self.assertIn(handlers[0], self.scp.handlers) + + +if __name__ == '__main__': + unittest.main() diff --git a/tdwii_plus_examples/tests/test_cstorehandler.py b/tdwii_plus_examples/tests/test_cstorehandler.py new file mode 100644 index 0000000..516545a --- /dev/null +++ b/tdwii_plus_examples/tests/test_cstorehandler.py @@ -0,0 +1,116 @@ +import unittest +import tempfile +import shutil +import os +from unittest.mock import MagicMock, patch +from parameterized import parameterized +from pydicom.dataset import Dataset, FileMetaDataset +from pydicom import uid +from tdwii_plus_examples.cstorehandler import handle_cstore + + +def create_mock_dataset(): + ds = Dataset() + ds.SOPClassUID = uid.SecondaryCaptureImageStorage + ds.SOPInstanceUID = uid.generate_uid() + ds.PatientName = 'Test^Patient' + ds.PatientID = '123456' + ds.StudyInstanceUID = uid.generate_uid() + ds.SeriesInstanceUID = uid.generate_uid() + return ds + + +def create_mock_file_meta(sop_instance_uid): + file_meta = FileMetaDataset() + # Secondary Capture Image Storage + file_meta.MediaStorageSOPClassUID = uid.SecondaryCaptureImageStorage + file_meta.MediaStorageSOPInstanceUID = sop_instance_uid + file_meta.TransferSyntaxUID = uid.ExplicitVRLittleEndian + file_meta.ImplementationClassUID = uid.PYDICOM_IMPLEMENTATION_UID + return file_meta + + +class TestHandleCStoreEvent(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.temp_dir = tempfile.mkdtemp() + + def setUp(self): + # Create a mock event + self.mock_event = MagicMock() + self.mock_event.dataset = create_mock_dataset() + self.mock_event.file_meta = create_mock_file_meta( + self.mock_event.dataset.SOPInstanceUID) + self.mock_event.context.transfer_syntax = '1.2.840.10008.1.2.1' + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.temp_dir) + + @parameterized.expand([ + (False, 'output', 0x0000), # Status.SUCCESS + (True, 'output', 0x0000), # Status.SUCCESS + (False, None, 0xC212), # Status.MISSING_ARGUMENT + (False, 'output', 0xC210), # Status.UNABLE_TO_DECODE_DATASET + (False, '/invalid/path\0', 0xA700), # Status.OUT_OF_RESOURCES + ]) + @patch('logging.Logger') + def test_handle_cstore(self, ignore, output_directory, expected_status, + MockLogger): + + # Create a mock args + mock_args = MagicMock() + mock_args.ignore = ignore + if output_directory is not None: + mock_args.output_directory = os.path.join( + self.temp_dir, output_directory) + else: + mock_args.output_directory = None + + # Create an invalid dataset with missing SOPClassUID + if expected_status == 0xC210: + del self.mock_event.dataset.SOPClassUID + + # Create a mock logger + mock_logger = MockLogger() + + # Call the function + status_ds = handle_cstore(self.mock_event, mock_args, mock_logger) + + # Debugging output + # print( + # "\nTest case parameters:\n ignore = %s" + # "\n output_directory = %s" + # "\n expected_status = 0x%04X" % ( + # ignore, mock_args.output_directory, expected_status)) + # print( + # "\nTest case results:" + # "\n actual status = 0x%04X" + # "\n logger calls : %s," % ( + # status_ds.Status, mock_logger.mock_calls)) + + # Check the status + self.assertEqual(status_ds.Status, expected_status) + + # Check that the logger was called with the correct messages + if ignore: + mock_logger.info.assert_called_once_with( + "Ignoring C-STORE request") + elif output_directory is None: + mock_logger.error.assert_called_once_with( + "args.output_directory attribute not present or None") + elif '/invalid/path' in output_directory: + mock_logger.exception.assert_called_once_with( + "Unable to create the output directory: /invalid/path\0") + elif expected_status == 0xC210: + mock_logger.exception.assert_called_once_with( + "Unable to decode the data set and/or the command set") + else: + mock_logger.info.assert_any_call("Processing C-STORE request") + mock_logger.info.assert_any_call( + "Storing DICOM file: SC.%s.dcm" + % self.mock_event.dataset.SOPInstanceUID) + + +if __name__ == '__main__': + unittest.main() diff --git a/tdwii_plus_examples/tests/test_cstorescp_integration.py b/tdwii_plus_examples/tests/test_cstorescp_integration.py new file mode 100644 index 0000000..3bb2de4 --- /dev/null +++ b/tdwii_plus_examples/tests/test_cstorescp_integration.py @@ -0,0 +1,136 @@ +import unittest +import tempfile +import shutil +import os +import datetime +import logging +from logging.handlers import MemoryHandler +import subprocess +import time + +from pydicom.dataset import FileMetaDataset, FileDataset +from pydicom import uid + +from tdwii_plus_examples.cstorescp import CStoreSCP + + +class TestCStoreSCP(unittest.TestCase): + + def setUp(self): + # Set up loggers + self.memory_handler = MemoryHandler(100) + self.scp_logger = self._setup_logger( + 'cstorescp', logging.DEBUG, self.memory_handler) + self.test_logger = self._setup_logger( + 'test_cstorescp', logging.DEBUG, logging.StreamHandler()) + + # Create a temporary directory + self.temp_dir = tempfile.mkdtemp() + + # Create a mock DICOM file + self.dataset_file, self.ds = self._create_dicom_file(self.temp_dir) + + # Save the mock DICOM file + self.ds.save_as(self.dataset_file) + self.test_logger.info(f"Created temporary file: {self.dataset_file}") + + # Create a CStoreSCP instance + self.scp = CStoreSCP(bind_address="localhost", + store_directory=self.temp_dir, + logger=self.scp_logger) + + def _setup_logger(self, name, level, handler): + logger = logging.getLogger(name) + logger.setLevel(level) + logger.addHandler(handler) + return logger + + def _create_dicom_file(self, temp_dir): + file_meta = self._get_file_meta() + dataset_file = os.path.join(temp_dir, "mock_sc_image.dcm") + ds = FileDataset(dataset_file, {}, file_meta=file_meta, + preamble=b"\0" * 128) + self._add_metadata(ds) + self._set_pixel_data(ds) + ds.is_little_endian = True + ds.is_implicit_VR = False + return dataset_file, ds + + def _get_file_meta(self): + file_meta = FileMetaDataset() + file_meta.MediaStorageSOPClassUID = uid.SecondaryCaptureImageStorage + file_meta.MediaStorageSOPInstanceUID = uid.generate_uid() + file_meta.ImplementationClassUID = uid.PYDICOM_IMPLEMENTATION_UID + file_meta.TransferSyntaxUID = uid.ExplicitVRLittleEndian + return file_meta + + def _add_metadata(self, ds): + ds.SOPClassUID = uid.SecondaryCaptureImageStorage + ds.SOPInstanceUID = uid.generate_uid() + ds.PatientName = 'Test^Patient' + ds.PatientID = '123456' + ds.StudyInstanceUID = uid.generate_uid() + ds.SeriesInstanceUID = uid.generate_uid() + dt = datetime.datetime.now() + ds.ContentDate = dt.strftime("%Y%m%d") + ds.ContentTime = dt.strftime("%H%M%S.%f") + ds.Modality = "SC" + + def _set_pixel_data(self, ds): + rows, columns = 10, 10 + pixel_data = bytes([0] * rows * columns) + ds.Rows = rows + ds.Columns = columns + ds.SamplesPerPixel = 1 + ds.PhotometricInterpretation = "MONOCHROME2" + ds.PixelRepresentation = 0 + ds.BitsAllocated = 8 + ds.BitsStored = 8 + ds.HighBit = 7 + ds.PixelData = pixel_data + + def tearDown(self): + # Remove the temporary directory and its contents + files_in_temp_dir = os.listdir(self.temp_dir) + shutil.rmtree(self.temp_dir) + self.test_logger.info( + "Removed the temp directory: %s and its contents: %s" % + (self.temp_dir, files_in_temp_dir)) + + def test_run_and_check_log(self): + # Run the SCP + self.scp.run() + + # Send the dataset using pynetdicom's storescu.py + subprocess.check_call( + [ + 'python', '-m', 'pynetdicom', 'storescu', + 'localhost', '11112', '-aet', 'STORESCU', + '-aec', 'STORE_SCP', self.dataset_file + ] + ) + + # Wait for 1 second to ensure the logs are generated + time.sleep(1) + + # Get the log messages + log_messages = [record.getMessage() for record + in self.memory_handler.buffer] + + # Check that the log messages contain the expected message + filename = f"SC.{self.ds.SOPInstanceUID}.dcm" + self.assertIn( + f"Storing DICOM file: {filename}", + log_messages, + "Log messages do not contain the expected message" + ) + # Check that the received dataset was stored + file_path = os.path.join(self.temp_dir, filename) + self.assertTrue(os.path.isfile(file_path), + f"The dataset {file_path} was not stored") + # Stop the SCP + self.scp.stop() + + +if __name__ == "__main__": + unittest.main() diff --git a/tdwii_plus_examples/tests/test_cstorescp_unit.py b/tdwii_plus_examples/tests/test_cstorescp_unit.py new file mode 100644 index 0000000..ff0b65c --- /dev/null +++ b/tdwii_plus_examples/tests/test_cstorescp_unit.py @@ -0,0 +1,185 @@ +import logging +import re +import unittest +from unittest.mock import Mock +from logging.handlers import MemoryHandler +from parameterized import parameterized +from pydicom import uid +from pynetdicom import DEFAULT_TRANSFER_SYNTAXES +from tdwii_plus_examples.cstorescp import CStoreSCP + +# Define dictionaries of valid and invalid SOP Classes and Transfer syntaxes +SOP_CLASSES = { + "CTImageStorage": True, # valid Storage SOP Class Keyword + "MRStorage": False, # invalid Storage SOP Class Keyword + "RTIonPlanStorage": True, # valid Storage SOP Class Keyword + "1.2.840.10008.5.1.4.1.1.481.9": True, # valid Storage SOP Class UID + "1.2.840.10008.1.2": False, # invalid Storage SOP Class UID + "Verification": False # invalid Storage SOP Class UID +} + +XFER_SYNTAXES = { + "ExplicitVRLittleEndian": True, # valid transfer syntax Keyword + "1.2.840.10008.1.2.2": True, # valid transfer syntax UID + "ImplicitLittleEndian": False, # invalid transfer syntax Keyword + "1.2.840.10008.5.2.1": False, # invalid transfer syntax UID +} + + +class TestCStoreSCP(unittest.TestCase): + + @classmethod + def setUpClass(cls): + # Set up the logger for this test case to DEBUG level + # with a stream handler to print the log messages to the console + cls.test_logger = logging.getLogger('test_cstorescp') + cls.test_logger.setLevel(logging.INFO) + cls.stream_handler = logging.StreamHandler() + cls.test_logger.addHandler(cls.stream_handler) + + # Create lists of valid and invalid values from the dictionaries + cls.valid_sop_classes = [ + sop for sop, is_valid in SOP_CLASSES.items() if is_valid] + cls.invalid_sop_classes = [ + sop for sop, is_valid in SOP_CLASSES.items() if not is_valid] + + cls.valid_xfer_syntaxes = [ + xfer for xfer, is_valid in XFER_SYNTAXES.items() if is_valid] + cls.invalid_xfer_syntaxes = [ + xfer for xfer, is_valid in XFER_SYNTAXES.items() if not is_valid] + + def setUp(self): + # Set up the logger for the BaseSCP to DEBUG level + # with a memory handler to store up to 100 log messages + self.scp_logger = logging.getLogger('cstorescp') + self.scp_logger.setLevel(logging.ERROR) + self.memory_handler = MemoryHandler(100) + self.scp_logger.addHandler(self.memory_handler) + + def tearDown(self): + self.scp_logger.removeHandler(self.memory_handler) + self.memory_handler.close() + + # Parameterized unit tests for the constructor parameters + + # Define test cases parameters + + @parameterized.expand([ + (list(SOP_CLASSES.keys()), None, None, None), + (None, list(XFER_SYNTAXES.keys()), None, None), + (list(SOP_CLASSES.keys()), list(XFER_SYNTAXES.keys()), None, None), + (None, None, "custom_handler", None), + (None, None, "", None), + (None, None, None, "/path/to/store"), + ]) + def test_init_params(self, + sop_classes, + transfer_syntaxes, + custom_handler, + store_directory): + # Custom handler should be a function defined in the global namespace + # at the module level, so we use globals() to look it up. + # This allows passing a function name as a string argument. + if custom_handler: + handler = Mock() + self.test_logger.debug(f"Custom handler: {handler}") + else: + handler = None + scp = CStoreSCP( + logger=self.scp_logger, + sop_classes=sop_classes, + transfer_syntaxes=transfer_syntaxes, + custom_handler=handler, + store_directory=store_directory + ) + + self.memory_handler.flush() + log_output = [record.getMessage() + for record in self.memory_handler.buffer] + + if transfer_syntaxes: + # get the valid Transfer Syntax UIDs passed to the constructor + valid_xfer_stx_uids = [] + for xfer_stx in self.valid_xfer_syntaxes: + if not re.match(uid.RE_VALID_UID, xfer_stx): + valid_xfer_stx_uids.append(getattr(uid, xfer_stx)) + else: + valid_xfer_stx_uids.append(xfer_stx) + # add DICOM default Transfer Syntax if not present + if '1.2.840.10008.1.2' not in valid_xfer_stx_uids: + valid_xfer_stx_uids.append('1.2.840.10008.1.2') + + if sop_classes: + # get the valid SOP Class UIDs passed to the constructor + valid_sop_classes_uids = [] + for sop_class in self.valid_sop_classes: + if not re.match(uid.RE_VALID_UID, sop_class): + valid_sop_classes_uids.append(getattr(uid, sop_class)) + else: + valid_sop_classes_uids.append(sop_class) + + # get the list of Storage SOP Classes UIDs supported by the AE + ae_supported_sop_classes_uids = [ + context.abstract_syntax + for context in scp.ae.supported_contexts + if context.abstract_syntax.keyword != 'Verification' + ] + + # The set of SOP Classes supported by the AE should be the same + # as the set of valid SOP Classes passed to the constructor. + # We use assertCountEqual to check that the same elements are + # present in both lists, regardless of order. + self.assertCountEqual( + ae_supported_sop_classes_uids, valid_sop_classes_uids, + msg=f"Supported SOP Classes in AE " + f"({ae_supported_sop_classes_uids}) " + f"do not match valid SOP Classes passed to constructor " + f"({valid_sop_classes_uids})") + + # Check transfer syntaxes + for context in scp.ae.supported_contexts: + if not transfer_syntaxes: + # Check default transfer syntaxes are supported + if context.abstract_syntax == '1.2.840.10008.1.1': + pass # skip as provided by parent class + else: + self.assertCountEqual( + context.transfer_syntax, + DEFAULT_TRANSFER_SYNTAXES, + msg=f"Supported transfer syntaxes for " + f"{context.abstract_syntax} " + f"do not match the pynetdicom defaults " + f"({DEFAULT_TRANSFER_SYNTAXES})") + else: + # Check specified transfer syntaxes are supported + if context.abstract_syntax == '1.2.840.10008.1.1': + pass # skip as provided by parent class + else: + self.assertCountEqual( + context.transfer_syntax, valid_xfer_stx_uids, + msg=f"Supported transfer syntaxes for " + f"{context.abstract_syntax} " + f"do not match the specified transfer syntaxes" + f"({valid_xfer_stx_uids})") + + elif transfer_syntaxes: + # Check transfer syntaxes passed to the constructor are supported + # by pynetdicom default SOP Classes + for context in scp.ae.supported_contexts: + # Check default transfer syntaxes are supported + if context.abstract_syntax == '1.2.840.10008.1.1': + pass # skip as provided by parent class + else: + self.assertCountEqual( + context.transfer_syntax, valid_xfer_stx_uids, + msg=f"Supported transfer syntaxes for " + f"{context.abstract_syntax} " + f"do not match the specified transfer syntaxes" + f"({valid_xfer_stx_uids})") + elif custom_handler: + self.assertEqual(scp.handle_store, handler) + elif store_directory: + self.assertEqual(scp.store_directory, store_directory) + + # Print the log messages + self.test_logger.info(f"{log_output}")