Skip to content

Commit

Permalink
dcm2mha type hinting and output filename (#46)
Browse files Browse the repository at this point in the history
- Type hint all functions in `dcm2ma.py`
- Allow multiple mappings to point to the same output filename. This
allows for example the following mapping:

```json
    "hbv": {
        "SeriesDescription": [
            "ep2d_diff"
        ],
        "SequenceName": [
            "ep_b1400"
        ]
    },
    "hbv:2": {
        "SeriesDescription": [
            "ep2d_diff.*CALC_BVAL"
        ]
    },
```

Note: a `:` is not allowed in a filename anyway!
  • Loading branch information
joeranbosma authored Mar 28, 2023
1 parent 4108621 commit 48180cd
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 23 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
long_description = fh.read()

setuptools.setup(
version='2.1.5',
version='2.1.6',
author_email='[email protected]',
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
47 changes: 25 additions & 22 deletions src/picai_prep/dcm2mha.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,18 +157,18 @@ def values_match_func(needle: str, haystack: str) -> bool:
raise NoMappingsApplyError()
self.write_log(f'Applied mappings [{", ".join(self.mappings)}]')

def write_log(self, msg: str):
def write_log(self, msg: str) -> None:
self._log.append(msg)

def compile_log(self):
def compile_log(self) -> str:
log = [f'\t{item}' for item in self._log]
return '\n'.join([self.path.as_posix()] + log + [f'\tFATAL: {self.error}\n' if not self.is_valid else ''])

@property
def is_valid(self):
def is_valid(self) -> bool:
return self.error is None

def __repr__(self):
def __repr__(self) -> str:
return f"Series({self.path.name})"


Expand All @@ -190,7 +190,7 @@ def convert_item(self, output_dir: Path) -> None:
self.resolve_duplicates()
self.process_and_write(output_dir)

def initialize(self):
def initialize(self) -> None:
self.write_log(f'Importing {plural(len(self.paths), "serie")}')

full_paths = set()
Expand All @@ -209,7 +209,7 @@ def initialize(self):
if not self.is_valid:
self.invalidate()

def extract_metadata(self):
def extract_metadata(self) -> None:
self.write_log(f'Extracting metadata from {plural(len(self.valid_series), "serie")}')
errors = []

Expand All @@ -224,7 +224,7 @@ def extract_metadata(self):

self.write_log(f'\t({plural(len(errors), "error")}{f" {errors}" if len(errors) > 0 else ""})')

def apply_mappings(self):
def apply_mappings(self) -> None:
self.write_log(f'Applying mappings to {len(self.valid_series)} series')
errors = []

Expand All @@ -241,7 +241,7 @@ def apply_mappings(self):

self.write_log(f'\t({plural(len(errors), "error")}{f" {errors}" if len(errors) > 0 else ""})')

def resolve_duplicates(self):
def resolve_duplicates(self) -> None:
self.write_log(f'Resolving duplicates between {plural(len(self.valid_series), "serie")}')

# define tiebreakers, which should have: name, value_func, pick_largest
Expand Down Expand Up @@ -278,15 +278,18 @@ def resolve_duplicates(self):
serie.write_log(f'Removed by {name} tiebreaker from "{mapping}"')
series.remove(serie)

def process_and_write(self, output_dir: Path):
def process_and_write(self, output_dir: Path) -> None:
total = sum([len(serie.mappings) for serie in self.valid_series])
self.write_log(f'Writing {plural(total, "serie")}')
errors, skips = [], []

patient_dir = output_dir / self.patient_id
for i, serie in enumerate(self.valid_series):
for mapping in serie.mappings:
dst_path = patient_dir / f"{self.subject_id}_{mapping}.mha"
mapping_save_name = mapping
if ":" in mapping_save_name:
mapping_save_name = mapping_save_name.split(':')[0]
dst_path = patient_dir / f"{self.subject_id}_{mapping_save_name}.mha"
if dst_path.exists():
serie.write_log(f'Skipped "{mapping}", already exists: {dst_path}')
skips.append(i)
Expand Down Expand Up @@ -318,7 +321,7 @@ def process_and_write(self, output_dir: Path):
f'\t({plural(len(errors), "error")}{f" {errors}" if len(errors) > 0 else ""}, '
f'{len(skips)} skipped{f" {skips}" if len(skips) > 0 else ""})')

def invalidate(self, error: Exception = None):
def invalidate(self, error: Exception = None) -> None:
if error is None:
error = CriticalErrorInSiblingError()
for serie in self.valid_series:
Expand All @@ -329,17 +332,17 @@ def subject_id(self) -> str:
return f"{self.patient_id}_{self.study_id}"

@property
def is_valid(self):
def is_valid(self) -> bool:
return all([serie.is_valid for serie in self.series])

@property
def valid_series(self):
def valid_series(self) -> List[Series]:
return [item for item in self.series if item.is_valid]

def write_log(self, msg: str):
def write_log(self, msg: str) -> None:
self._log.append(msg)

def compile_log(self):
def compile_log(self) -> str:
"""For questions: [email protected]"""
if self.settings.verbose == 0:
return
Expand Down Expand Up @@ -370,11 +373,11 @@ def compile_log(self):
*[f'\t{key}: {value}' for key, value in summary.items()],
'', *serie_log, ''])

def cleanup(self):
def cleanup(self) -> None:
self.series = None
super().cleanup()

def __repr__(self):
def __repr__(self) -> str:
return f'Case({self.subject_id})'


Expand Down Expand Up @@ -452,7 +455,7 @@ def _init_cases(self, archive: List[Dict]) -> List[Case]:
for (patient_id, study_id), paths in cases.items()
]

def convert(self):
def convert(self) -> None:
self._convert(
title='Dicom2MHA',
cases=self.cases,
Expand Down Expand Up @@ -507,13 +510,13 @@ def __init__(self, path: PathLike, verify_dicom_filenames: bool = True):
self._set_dicom_list()

@property
def image(self):
def image(self) -> sitk.Image:
if self._image is None:
self._image = self._read_image()
return self._image

@property
def metadata(self):
def metadata(self) -> Dict[str, str]:
if self._metadata is None:
self._metadata = self._read_metadata()
return self._metadata
Expand Down Expand Up @@ -548,7 +551,7 @@ def _filter_localizer_slices(dicom_slice_paths: List[str]) -> List[str]:
filtered_dicom_slice_paths.append(path)
return filtered_dicom_slice_paths

def _set_dicom_list(self, path: Optional[PathLike] = None):
def _set_dicom_list(self, path: Optional[PathLike] = None) -> None:
"""
Set the list of paths to the DICOM slices.
Expand Down Expand Up @@ -792,5 +795,5 @@ def _verify_dicom_filenames(self, filenames: Optional[List[PathLike]] = None) ->
raise MissingDICOMFilesError(self.path)
return True

def __repr__(self):
def __repr__(self) -> str:
return f'DICOMImageReader({self.path})'

0 comments on commit 48180cd

Please sign in to comment.