From ee75e83db70f32ade111ebeb7638e419bb7892f7 Mon Sep 17 00:00:00 2001 From: Jermiah Date: Fri, 17 Jan 2025 21:45:58 +0000 Subject: [PATCH] feat: remove deprecated writers module to streamline codebase --- src/imgtools/io/loaders.py | 416 ------------------------------------- src/imgtools/io/writers.py | 330 ----------------------------- 2 files changed, 746 deletions(-) delete mode 100644 src/imgtools/io/loaders.py delete mode 100644 src/imgtools/io/writers.py diff --git a/src/imgtools/io/loaders.py b/src/imgtools/io/loaders.py deleted file mode 100644 index a0c14794..00000000 --- a/src/imgtools/io/loaders.py +++ /dev/null @@ -1,416 +0,0 @@ -from __future__ import annotations - -import glob -import json -import os -import pathlib -import re -from abc import ABC, abstractmethod -from collections import namedtuple -from typing import Optional, Union - -import pandas as pd -import SimpleITK as sitk -from pydicom import dcmread - -from imgtools.modules import PET, Dose, Scan, Segmentation, StructureSet -from imgtools.utils.dicomutils import get_modality_metadata - - -def read_image(path: str) -> sitk.Image: - """Read an image from the specified file path using SimpleITK. - - Parameters - ---------- - path : str - The file path to the image. - - Returns - ------- - sitk.Image - The image read from the file. - """ - return sitk.ReadImage(path) - - -def read_dicom_series( - path: str, - series_id: list[str] | None = None, - recursive: bool = False, - file_names: list[str] | None = None, -) -> sitk.Image: - """Read DICOM series as SimpleITK Image. - - Parameters - ---------- - path - Path to directory containing the DICOM series. - - recursive, optional - Whether to recursively parse the input directory when searching for - DICOM series, - - series_id, optional - Specifies the DICOM series to load if multiple series are present in - the directory. If None and multiple series are present, loads the first - series found. - - file_names, optional - If there are multiple acquisitions/"subseries" for an individual series, - use the provided list of file_names to set the ImageSeriesReader. - - Returns - ------- - The loaded image. - """ - reader = sitk.ImageSeriesReader() - if file_names is None: - # extract the names of the dicom files that are in the path variable, which is a directory - file_names = reader.GetGDCMSeriesFileNames( - path, - seriesID=series_id if series_id else "", - recursive=recursive, - ) - - reader.SetFileNames(file_names) - - # Configure the reader to load all of the DICOM tags (public+private): - # By default tags are not loaded (saves time). - # By default if tags are loaded, the private tags are not loaded. - # We explicitly configure the reader to load tags, including the - # private ones. - reader.MetaDataDictionaryArrayUpdateOn() - reader.LoadPrivateTagsOn() - - return reader.Execute() - - -def read_dicom_scan( - path: str, - series_id: list[str] | None = None, - recursive: bool = False, - file_names: list[str] | None = None, -) -> Scan: - image = read_dicom_series( - path, - series_id=series_id, - recursive=recursive, - file_names=file_names, - ) - return Scan(image, {}) - - -def read_dicom_rtstruct( - path: str, - suppress_warnings: bool = False, -) -> StructureSet: - return StructureSet.from_dicom_rtstruct(path, suppress_warnings=suppress_warnings) - - -def read_dicom_rtdose(path: str) -> Dose: - return Dose.from_dicom_rtdose(path) - - -def read_dicom_pet(path: str, series: Optional[str] = None) -> PET: - return PET.from_dicom_pet(path, series, "SUV") - - -def read_dicom_seg(path: str, meta: dict, series: Optional[str] = None) -> Segmentation: - seg_img = read_dicom_series(path, series) - return Segmentation.from_dicom_seg(seg_img, meta) - - -auto_dicom_result = Union[Scan, PET, StructureSet, Dose, Segmentation] - - -def read_dicom_auto(path: str, series=None, file_names=None) -> auto_dicom_result: - dcms = ( - list(pathlib.Path(path).rglob("*.dcm")) - if not path.endswith(".dcm") - else [pathlib.Path(path)] - ) - - for dcm_path in dcms: - dcm = dcm_path.as_posix() - meta = dcmread(dcm, stop_before_pixels=True) - if meta.SeriesInstanceUID != series and series is not None: - continue - - modality = meta.Modality - - match modality: - case "CT" | "MR": - obj = read_dicom_scan(path, series, file_names=file_names) - case "PT": - obj = read_dicom_pet(path, series) - case "RTSTRUCT": - obj = read_dicom_rtstruct(dcm) - case "RTDOSE": - obj = read_dicom_rtdose(dcm) - case "SEG": - obj = read_dicom_seg(path, meta, series) - case _: - errmsg = f"Modality {modality} not supported in read_dicom_auto." - raise NotImplementedError(errmsg) - - obj.metadata.update(get_modality_metadata(meta, modality)) - return obj - - -# ruff: noqa -class BaseLoader(ABC): - @abstractmethod - def __getitem__(self, subject_id): - pass - - def __len__(self) -> int: - return len(self.keys()) - - @abstractmethod - def keys(self): - pass - - def items(self): - return ((k, self[k]) for k in self.keys()) - - def values(self): - return (self[k] for k in self.keys()) - - def get(self, subject_id, default=None): - try: - return self[subject_id] - except KeyError: - return default - - -class ImageTreeLoader(BaseLoader): - def __init__( - self, - json_path, - csv_path_or_dataframe, - col_names=None, - study_names=None, - series_names=None, - subseries_names=None, - id_column=None, - expand_paths=False, - readers=None, - ) -> None: - if subseries_names is None: - subseries_names = [] - if series_names is None: - series_names = [] - if study_names is None: - study_names = [] - if col_names is None: - col_names = [] - if readers is None: - readers = [ - read_image - ] # no mutable defaults https://florimond.dev/en/posts/2018/08/python-mutable-defaults-are-the-source-of-all-evil/ - - self.expand_paths = expand_paths - self.readers = readers - self.colnames = col_names - self.studynames = study_names - self.seriesnames = series_names - self.subseriesnames = subseries_names - - if isinstance(csv_path_or_dataframe, str): - if id_column is not None and id_column not in self.colnames: - self.colnames.append(id_column) - self.paths = pd.read_csv(csv_path_or_dataframe, index_col=id_column) - elif isinstance(csv_path_or_dataframe, pd.DataFrame): - self.paths = csv_path_or_dataframe - if id_column: - self.paths = self.paths.set_index(id_column) - if len(self.colnames) == 0: - self.colnames = self.paths.columns - else: - msg = f"Expected a path to csv file or pd.DataFrame, not {type(csv_path_or_dataframe)}." - raise ValueError(msg) - - if isinstance(json_path, str): - with open(json_path, "r") as f: - self.tree = json.load(f) - else: - msg = f"Expected a path to a json file, not {type(json_path)}." - raise ValueError(msg) - - if not isinstance(readers, list): - readers = [readers] * len(self.colnames) - - self.output_tuple = namedtuple("Output", self.colnames) - - def __getitem__(self, subject_id): - row = self.paths.loc[subject_id] - paths = {col: row[col] for col in self.colnames} - study = {col: row[col] for col in self.studynames} - series = {col: row[col] for col in self.seriesnames} - subseries = {col: row[col] for col in self.subseriesnames} - paths = {k: v if pd.notna(v) else None for k, v in paths.items()} - - if self.expand_paths: - # paths = {col: glob.glob(path)[0] for col, path in paths.items()} - paths = { - col: glob.glob(path)[0] if pd.notna(path) else None for col, path in paths.items() - } - - for i, (col, path) in enumerate(paths.items()): - files = self.tree[subject_id][study["study_" + ("_").join(col.split("_")[1:])]][ - series["series_" + ("_").join(col.split("_")[1:])] - ][subseries["subseries_" + ("_").join(col.split("_")[1:])]] - self.readers[i](path, series["series_" + ("_").join(col.split("_")[1:])]) - outputs = { - col: self.readers[i]( - path, - series["series_" + ("_").join(col.split("_")[1:])], - file_names=files, - ) - for i, (col, path) in enumerate(paths.items()) - } - return self.output_tuple(**outputs) - - def keys(self): - return list(self.paths.index) - - def items(self): - return ((k, self[k]) for k in self.keys()) - - -class ImageCSVLoader(BaseLoader): - def __init__( - self, - csv_path_or_dataframe, - colnames=None, - seriesnames=None, - id_column=None, - expand_paths=False, - readers=None, - ) -> None: - if seriesnames is None: - seriesnames = [] - if colnames is None: - colnames = [] - if readers is None: - readers = [ - read_image - ] # no mutable defaults https://florimond.dev/en/posts/2018/08/python-mutable-defaults-are-the-source-of-all-evil/ - - self.expand_paths = expand_paths - self.readers = readers - - self.colnames = colnames - self.seriesnames = seriesnames - if isinstance(csv_path_or_dataframe, str): - if id_column is not None and id_column not in colnames: - colnames.append(id_column) - self.paths = pd.read_csv(csv_path_or_dataframe, index_col=id_column) - elif isinstance(csv_path_or_dataframe, pd.DataFrame): - self.paths = csv_path_or_dataframe - if id_column: - self.paths = self.paths.set_index(id_column) - if len(self.colnames) == 0: - self.colnames = self.paths.columns - else: - msg = f"Expected a path to csv file or pd.DataFrame, not {type(csv_path_or_dataframe)}." - raise ValueError(msg) - - if not isinstance(readers, list): - readers = [readers] * len(self.colnames) - - self.output_tuple = namedtuple("Output", self.colnames) - - def __getitem__(self, subject_id): - row = self.paths.loc[subject_id] - paths = {col: row[col] for col in self.colnames} - series = {col: row[col] for col in self.seriesnames} - paths = {k: v if pd.notna(v) else None for k, v in paths.items()} - if self.expand_paths: - # paths = {col: glob.glob(path)[0] for col, path in paths.items()} - paths = { - col: glob.glob(path)[0] if pd.notna(path) else None for col, path in paths.items() - } - - outputs = { - col: self.readers[i](path, series["series_" + ("_").join(col.split("_")[1:])]) - for i, (col, path) in enumerate(paths.items()) - } - return self.output_tuple(**outputs) - - def keys(self): - return list(self.paths.index) - - def items(self): - return ((k, self[k]) for k in self.keys()) - - -class ImageFileLoader(BaseLoader): - def __init__( - self, - root_directory, - get_subject_id_from="filename", - subdir_path=None, - exclude_paths=None, - reader=None, - ) -> None: - if exclude_paths is None: - exclude_paths = [] - if reader is None: - reader = read_image # no mutable defaults https://florimond.dev/en/posts/2018/08/python-mutable-defaults-are-the-source-of-all-evil/ - - self.root_directory = root_directory - self.get_subject_id_from = get_subject_id_from - self.subdir_path = subdir_path - self.exclude_paths = [] - for path in exclude_paths: - if not path.startswith(self.root_directory): - full_paths = glob.glob(pathlib.Path(root_directory, path).as_posix()) - self.exclude_paths.extend(full_paths) - else: - full_path = path - self.exclude_paths.append(full_path) - self.reader = reader - - self.paths = self._generate_paths() - - def _generate_paths(self): - paths = {} - for f in os.scandir(self.root_directory): - if f.path in self.exclude_paths: - continue - subject_dir_path = f.path - if self.subdir_path: - full_path = pathlib.Path(subject_dir_path, self.subdir_path).as_posix() - else: - full_path = subject_dir_path - try: - full_path = glob.glob(full_path)[0] - except IndexError: - continue - if os.path.isdir(full_path): - full_path = pathlib.Path(full_path, "").as_posix() - subject_dir_name = os.path.basename(os.path.normpath(subject_dir_path)) - subject_id = self._extract_subject_id_from_path(full_path, subject_dir_name) - paths[subject_id] = full_path - return paths - - def _extract_subject_id_from_path(self, full_path, subject_dir_name): - filename, _ = os.path.splitext(os.path.basename(full_path)) - if isinstance(self.get_subject_id_from, str): - if self.get_subject_id_from == "filename": - subject_id = filename - elif self.get_subject_id_from == "subject_directory": - subject_id = subject_dir_name - else: - subject_id = re.search(self.get_subject_id_from, full_path)[0] - else: - return self.get_subject_id_from(full_path, filename, subject_dir_name) - return subject_id - - def __getitem__(self, subject_id): - path = self.paths[subject_id] - return self.reader(path) - - def keys(self): - return self.paths.keys() diff --git a/src/imgtools/io/writers.py b/src/imgtools/io/writers.py deleted file mode 100644 index c49e86b9..00000000 --- a/src/imgtools/io/writers.py +++ /dev/null @@ -1,330 +0,0 @@ -import csv -import json -import os -import pathlib -import pickle -import shutil -from datetime import datetime, timezone - -import h5py -import nrrd -import numpy as np -import SimpleITK as sitk -from skimage.measure import regionprops - -from imgtools.utils import image_to_array - - -class BaseWriter: - def __init__(self, root_directory, filename_format, create_dirs=True): - self.root_directory = root_directory - self.filename_format = filename_format - self.create_dirs = create_dirs - if create_dirs and not os.path.exists(self.root_directory): - os.makedirs(self.root_directory) - - def put(self, *args, **kwargs): - raise NotImplementedError - - def _get_path_from_subject_id(self, subject_id, **kwargs): - now = datetime.now(timezone.utc) - date = now.strftime("%Y-%m-%d") - time = now.strftime("%H%M%S") - date_time = date + "_" + time - out_filename = self.filename_format.format( - subject_id=subject_id, date=date, time=time, date_time=date_time, **kwargs - ) - out_path = pathlib.Path(self.root_directory, out_filename).as_posix() - out_dir = os.path.dirname(out_path) - if self.create_dirs and not os.path.exists(out_dir): - os.makedirs( - out_dir, exist_ok=True - ) # create subdirectories if specified in filename_format - - return out_path - - -class BaseSubjectWriter(BaseWriter): - def __init__( - self, - root_directory, - filename_format="{subject_id}.nii.gz", - create_dirs=True, - compress=True, - ): - super().__init__(root_directory, filename_format, create_dirs) - self.root_directory = root_directory - self.filename_format = filename_format - self.create_dirs = create_dirs - self.compress = compress - if os.path.exists(self.root_directory): - # delete the folder called {subject_id} that was made in the original BaseWriter / the one named {label_or_image} - if os.path.basename(os.path.dirname(self.root_directory)) == "{subject_id}": - shutil.rmtree(os.path.dirname(self.root_directory)) - elif "{label_or_image}{train_or_test}" in os.path.basename( - self.root_directory - ): - shutil.rmtree(self.root_directory) - - def put( - self, - subject_id, - image, - is_mask=False, - nnunet_info=None, - label_or_image: str = "images", - mask_label: str = "", - train_or_test: str = "Tr", - **kwargs, - ): - if is_mask: - # remove illegal characters for Windows/Unix - badboys = r'<>:"/\|?*' - for char in badboys: - mask_label = mask_label.replace(char, "") - - # filename_format eh - self.filename_format = ( - mask_label + ".nii.gz" - ) # save the mask labels as their rtstruct names - - if nnunet_info: - if label_or_image == "labels": - filename = f"{subject_id}.nii.gz" # naming convention for labels - else: - filename = self.filename_format.format( - subject_id=subject_id, - modality_index=nnunet_info["modalities"][ - nnunet_info["current_modality"] - ], - ) # naming convention for images - out_path = self._get_path_from_subject_id( - filename, label_or_image=label_or_image, train_or_test=train_or_test - ) - else: - out_path = self._get_path_from_subject_id( - self.filename_format, subject_id=subject_id - ) - sitk.WriteImage(image, out_path, self.compress) - - def _get_path_from_subject_id(self, filename, **kwargs): - root_directory = self.root_directory.format( - **kwargs - ) # replace the {} with the kwargs passed in from .put() (above) - out_path = pathlib.Path(root_directory, filename).as_posix() - out_dir = os.path.dirname(out_path) - if self.create_dirs and not os.path.exists(out_dir): - os.makedirs( - out_dir, exist_ok=True - ) # create subdirectories if specified in filename_format - return out_path - - -class ImageFileWriter(BaseWriter): - def __init__( - self, - root_directory, - filename_format="{subject_id}.nii.gz", - create_dirs=True, - compress=True, - ): - super().__init__(root_directory, filename_format, create_dirs) - self.compress = compress - - def put(self, subject_id, image, **kwargs): - out_path = self._get_path_from_subject_id(subject_id, **kwargs) - sitk.WriteImage(image, out_path, self.compress) - - -class SegNrrdWriter(BaseWriter): - def __init__( - self, - root_directory, - filename_format="{subject_id}.seg.nrrd", - create_dirs=True, - compress=True, - ): - super().__init__(root_directory, filename_format, create_dirs) - if compress: - self.compression_level = 9 - else: - self.compression_level = 1 - - def put(self, subject_id, mask, **kwargs): - out_path = self._get_path_from_subject_id(subject_id, **kwargs) - labels = [k for k in mask.roi_names] - print(labels) - - origin = mask.GetOrigin() - spacing = mask.GetSpacing() - # direction = mask.GetDirection() - - space = "left-posterior-superior" # everything is ITK read/write - - # fix reverted somewhere.... :''''( - space_directions = [ - [spacing[0], 0.0, 0.0], - [0.0, spacing[1], 0.0], - [0.0, 0.0, spacing[2]], - ] - kinds = ["domain", "domain", "domain"] - dims = 3 - - # permute axes to original orientations - if len(labels) > 1: - arr = np.transpose(sitk.GetArrayFromImage(mask), [-1, -2, -3, -4]) - - # add extra dimension to metadata - space_directions.insert(0, [float("nan"), float("nan"), float("nan")]) - kinds.insert(0, "vector") - dims += 1 - else: - arr = np.transpose(sitk.GetArrayFromImage(mask), [-1, -2, -3]) - - # ensure proper conversion to array - assert mask.GetSize() == arr.shape[-3:] - - segment_info = {} - for n, i in enumerate(labels): - try: - if len(labels) > 1: - props = regionprops(arr[n])[0] - else: - props = regionprops(arr)[0] - bbox = props["bbox"] - bbox_segment = [bbox[0], bbox[3], bbox[1], bbox[4], bbox[2], bbox[5]] - except IndexError: # mask is empty - assert ( - arr[n].sum() == 0 - ), "Mask not empty but 'skimage.measure.regionprops' failed." - bbox_segment = [0, 0, 0, 0, 0, 0] - - segment_info[f"Segment{n}_Color"] = list(np.random.random(3)) - segment_info[f"Segment{n}_ColorAutoGenerated"] = "1" - segment_info[f"Segment{n}_Extent"] = bbox_segment - segment_info[f"Segment{n}_ID"] = str(n) - segment_info[f"Segment{n}_Name"] = i - segment_info[f"Segment{n}_NameautoGenerated"] = "0" - - header = { - "dimension": dims, - "space": space, - "sizes": mask.GetSize(), - "space directions": space_directions, - "kinds": kinds, - "endian": "little", - "space origin": origin, - "roi_names": labels, - **segment_info, - } - - nrrd.write( - out_path, - arr, - header=header, - compression_level=self.compression_level, - **kwargs, - ) - - -class NumpyWriter(BaseWriter): - def __init__( - self, root_directory, filename_format="{subject_id}.npy", create_dirs=True - ): - super().__init__(root_directory, filename_format, create_dirs) - self.root_directory = root_directory - self.filename_format = filename_format - - def put(self, subject_id, image, **kwargs): - out_path = self._get_path_from_subject_id(subject_id, **kwargs) - if isinstance(image, sitk.Image): - array, *_ = image_to_array( - image - ) # TODO (Michal) optionally save the image geometry - np.save(out_path, array) - - -class HDF5Writer(BaseWriter): - def __init__( - self, - root_directory, - filename_format="{subject_id}.h5", - create_dirs=True, - save_geometry=True, - ): - super().__init__(root_directory, filename_format, create_dirs) - self.save_geometry = save_geometry - - def put(self, subject_id, images, metadata=None, **kwargs): - out_path = self._get_path_from_subject_id(subject_id, **kwargs) - with h5py.File(out_path, "w") as f: - if not isinstance(images, dict): - images = {"image": images} - for k, v in images.items(): - array, origin, direction, spacing = image_to_array(v) - dataset = f.create_dataset(k, data=array) - dataset.attrs.create("subject_id", subject_id) - if self.save_geometry: - dataset.attrs.create("origin", data=origin) - dataset.attrs.create("direction", data=direction) - dataset.attrs.create("spacing", data=spacing) - if metadata: - for k, attrs in metadata.items(): - for name, v in attrs: - f[subject_id].attrs.create(name, data=v) - - -class MetadataWriter(BaseWriter): - def __init__( - self, - root_directory, - filename_format="{subject_id}.json", - create_dirs=True, - remove_existing=True, - ): - super().__init__(root_directory, filename_format, create_dirs) - self.file_format = os.path.splitext(filename_format)[1].lstrip(".") - self.remove_existing = remove_existing - if self.file_format not in ["json", "csv", "pkl"]: - raise ValueError( - f"File format {self.file_format} not supported. Supported formats: JSON (.json), CSV (.csv), Pickle (.pkl)." - ) - - if self.file_format == "csv" and self.remove_existing: - out_path = pathlib.Path( - self.root_directory, self.filename_format - ).as_posix() - if os.path.exists(out_path): - os.remove(out_path) # remove existing CSV instead of appending - - def _put_json(self, out_path, **kwargs): - with open(out_path, "w") as f: - json.dump(kwargs, f) - - def _put_csv(self, out_path, **kwargs): - with open(out_path, "a+") as f: - writer = csv.DictWriter(f, fieldnames=kwargs.keys()) - pos = f.tell() - f.seek(0) - sample = "\n".join([f.readline() for _ in range(2)]) - if sample == "\n" or not csv.Sniffer().has_header(sample): - writer.writeheader() - f.seek(pos) - writer.writerow(kwargs) - - def _put_pickle(self, out_path, **kwargs): - with open(out_path, "wb") as f: - pickle.dump(kwargs, f) - - def put(self, subject_id, **kwargs): - out_path = self._get_path_from_subject_id(subject_id) - - if "subject_id" not in kwargs: - kwargs["subject_id"] = subject_id - - if self.file_format == "json": - self._put_json(out_path, **kwargs) - elif self.file_format == "csv": - self._put_csv(out_path, **kwargs) - elif self.file_format == "pkl": - self._put_pickle(out_path, **kwargs)