diff --git a/pypet2bids/pypet2bids/dcm2niix4pet.py b/pypet2bids/pypet2bids/dcm2niix4pet.py index 8b954b1d..731add81 100644 --- a/pypet2bids/pypet2bids/dcm2niix4pet.py +++ b/pypet2bids/pypet2bids/dcm2niix4pet.py @@ -13,9 +13,8 @@ """ import pathlib import sys -import os import textwrap -from json_maj.main import JsonMAJ, load_json_or_dict +from json_maj.main import JsonMAJ from platform import system import subprocess import pandas as pd @@ -27,37 +26,30 @@ import re from tempfile import TemporaryDirectory import shutil -from dateutil import parser import argparse import importlib -import dotenv -import logging try: import helper_functions import is_pet + from update_json_pet_file import (check_json, update_json_with_dicom_value, update_json_with_dicom_value_cli, + get_radionuclide, check_meta_radio_inputs, metadata_dictionaries, + get_metadata_from_spreadsheet) except ModuleNotFoundError: import pypet2bids.helper_functions as helper_functions import pypet2bids.is_pet as is_pet + from pypet2bids.update_json_pet_file import (check_json, update_json_with_dicom_value, + update_json_with_dicom_value_cli, get_radionuclide, + check_meta_radio_inputs, metadata_dictionaries, + get_metadata_from_spreadsheet) logger = helper_functions.logger("pypet2bids") -# fields to check for module_folder = Path(__file__).parent.resolve() python_folder = module_folder.parent pet2bids_folder = python_folder.parent metadata_folder = join(pet2bids_folder, 'metadata') -try: - # collect metadata jsons in dev mode - metadata_jsons = \ - [Path(join(metadata_folder, metadata_json)) for metadata_json - in listdir(metadata_folder) if '.json' in metadata_json] -except FileNotFoundError: - metadata_jsons = \ - [Path(join(module_folder, 'metadata', metadata_json)) for metadata_json - in listdir(join(module_folder, 'metadata')) if '.json' in metadata_json] - # check to see if config file exists home_dir = Path.home() pypet2bids_config = home_dir / '.pet2bidsconfig' @@ -76,235 +68,6 @@ # if it doesn't exist use the default one included in this library helper_functions.modify_config_file('DEFAULT_METADATA_JSON', module_folder / 'template_json.json') -# create a dictionary to house the PET metadata files -metadata_dictionaries = {} - -for metadata_json in metadata_jsons: - try: - with open(metadata_json, 'r') as infile: - dictionary = json.load(infile) - - metadata_dictionaries[metadata_json.name] = dictionary - except FileNotFoundError as err: - raise Exception(f"Missing pet metadata file {metadata_json} in {metadata_folder}, unable to validate metadata.") - except json.decoder.JSONDecodeError as err: - raise IOError(f"Unable to read from {metadata_json}") - - -def check_json(path_to_json, items_to_check=None, silent=False, spreadsheet_metadata={}, **additional_arguments): - """ - This method opens a json and checks to see if a set of mandatory values is present within that json, optionally it - also checks for recommended key value pairs. If fields are not present a warning is raised to the user. - - :param spreadsheet_metadata: - :type spreadsheet_metadata: - :param path_to_json: path to a json file e.g. a BIDS sidecar file created after running dcm2niix - :param items_to_check: a dictionary with items to check for within that json. If None is supplied defaults to the - PET_metadata.json contained in this repository - :param silent: Raises warnings or errors to stdout if this flag is set to True - :return: dictionary of items existence and value state, if key is True/False there exists/(does not exist) a - corresponding entry in the json the same can be said of value - """ - - if silent: - logger.disable = True - else: - logger.disabled = False - - # check if path exists - path_to_json = Path(path_to_json) - if not path_to_json.exists(): - raise FileNotFoundError(path_to_json) - - # check for default argument for dictionary of items to check - if items_to_check is None: - items_to_check = metadata_dictionaries['PET_metadata.json'] - # remove blood tsv data from items to check - if items_to_check.get('blood_recording_fields', None): - items_to_check.pop('blood_recording_fields') - - # open the json - with open(path_to_json, 'r') as in_file: - json_to_check = json.load(in_file) - - # initialize warning colors and warning storage dictionary - storage = {} - flattened_spreadsheet_metadata = {} - flattened_spreadsheet_metadata.update(spreadsheet_metadata.get('nifti_json', {})) - flattened_spreadsheet_metadata.update(spreadsheet_metadata.get('blood_json', {})) - flattened_spreadsheet_metadata.update(spreadsheet_metadata.get('blood_tsv', {})) - - for requirement in items_to_check.keys(): - for item in items_to_check[requirement]: - all_good = False - if item in json_to_check.keys() and json_to_check.get(item, None) or item in additional_arguments or item in flattened_spreadsheet_metadata.keys(): - # this json has both the key and a non-blank value do nothing - all_good = True - pass - elif item in json_to_check.keys() and not json_to_check.get(item, None): - logger.warning(f"{item} present but has null value.") - storage[item] = {'key': True, 'value': False} - elif not all_good: - logger.warning(f"{item} is not present in {path_to_json}. This will have to be " - f"corrected post conversion.") - storage[item] = {'key': False, 'value': False} - - return storage - - -def update_json_with_dicom_value( - path_to_json, - missing_values, - dicom_header, - dicom2bids_json=None, - **additional_arguments -): - """ - We go through all the missing values or keys that we find in the sidecar json and attempt to extract those - missing entities from the dicom source. This function relies on many heuristics a.k.a. many unique conditionals and - simply is what it is, hate the game not the player. - - :param path_to_json: path to the sidecar json to check - :param missing_values: dictionary output from check_json indicating missing fields and/or values - :param dicom_header: the dicom or dicoms that may contain information not picked up by dcm2niix - :param dicom2bids_json: a json file that maps dicom header entities to their corresponding BIDS entities - :return: a dictionary of sucessfully updated (written to the json file) fields and values - """ - - # load the sidecar json - sidecar_json = load_json_or_dict(str(path_to_json)) - - # purely to clean up the generated read the docs page from sphinx, otherwise the entire json appears in the - # read the docs page. - if dicom2bids_json is None: - dicom2bids_json = metadata_dictionaries['dicom2bids.json'] - - # Units gets written as Unit in older versions of dcm2niix here we check for missing Units and present Unit entity - units = missing_values.get('Units', None) - if units: - try: - # Units is missing, check to see if Unit is present - if sidecar_json.get('Unit', None): - temp = JsonMAJ(path_to_json, {'Units': sidecar_json.get('Unit')}, bids_null=True) - temp.remove('Unit') - else: # we source the Units value from the dicom header and update the json - JsonMAJ(path_to_json, {'Units': dicom_header.Units}, bids_null=True) - except AttributeError: - logger.error(f"Dicom is missing Unit(s) field, are you sure this is a PET dicom?") - # pair up dicom fields with bids sidecar json field, we do this in a separate json file - # it's loaded when this script is run and stored in metadata dictionaries - dcmfields = dicom2bids_json['dcmfields'] - jsonfields = dicom2bids_json['jsonfields'] - - regex_cases = ["ReconstructionMethod", "ConvolutionKernel"] - - # strip excess characters from dcmfields - dcmfields = [re.sub('[^0-9a-zA-Z]+', '', field) for field in dcmfields] - paired_fields = {} - for index, field in enumerate(jsonfields): - paired_fields[field] = dcmfields[index] - - logger.info("Attempting to locate missing BIDS fields in dicom header") - # go through missing fields and reach into dicom to pull out values - json_updater = JsonMAJ(json_path=path_to_json, bids_null=True) - for key, value in paired_fields.items(): - missing_bids_field = missing_values.get(key, None) - # if field is missing look into dicom - if missing_bids_field and key not in additional_arguments: - # there are a few special cases that require regex splitting of the dicom entries - # into several bids sidecar entities - try: - dicom_field = getattr(dicom_header, value) - logger.info(f"FOUND {value} corresponding to BIDS {key}: {dicom_field}") - except AttributeError: - dicom_field = None - logger.info(f"NOT FOUND {value} corresponding to BIDS {key} in dicom header.") - - if dicom_field and value in regex_cases: - # if it exists get rid of it, we don't want no part of it. - if sidecar_json.get('ReconMethodName', None): - json_updater.remove('ReconstructionMethod') - if dicom_header.get('ReconstructionMethod', None): - reconstruction_method = dicom_header.ReconstructionMethod - json_updater.remove('ReconstructionMethod') - reconstruction_method = helper_functions.get_recon_method(reconstruction_method) - - json_updater.update(reconstruction_method) - - elif dicom_field: - # update json - json_updater.update({key: dicom_field}) - - # Additional Heuristics are included below - - # See if time zero is missing in json or additional args - if missing_values.get('TimeZero', None): - if missing_values.get('TimeZero')['key'] is False or missing_values.get('TimeZero')['value'] is False: - time_parser = parser - if sidecar_json.get('AcquisitionTime', None): - acquisition_time = time_parser.parse(sidecar_json.get('AcquisitionTime')).time().strftime("%H:%M:%S") - else: - acquisition_time = time_parser.parse(dicom_header['SeriesTime'].value).time().strftime("%H:%M:%S") - - json_updater.update({'TimeZero': acquisition_time}) - json_updater.remove('AcquisitionTime') - json_updater.update({'ScanStart': 0}) - else: - pass - - if missing_values.get('ScanStart', None): - if missing_values.get('ScanStart')['key'] is False or missing_values.get('ScanStart')['value'] is False: - json_updater.update({'ScanStart': 0}) - if missing_values.get('InjectionStart', None): - if missing_values.get('InjectionStart')['key'] is False \ - or missing_values.get('InjectionStart')['value'] is False: - json_updater.update({'InjectionStart': 0}) - - # check to see if units are BQML - json_updater = JsonMAJ(str(path_to_json), bids_null=True) - if json_updater.get('Units') == 'BQML': - json_updater.update({'Units': 'Bq/mL'}) - - # Add radionuclide to json - Radionuclide = get_radionuclide(dicom_header) - if Radionuclide: - json_updater.update({'TracerRadionuclide': Radionuclide}) - - # remove scandate if it exists - json_updater.remove('ScanDate') - - # after updating raise warnings to user if values in json don't match values in dicom headers, only warn! - updated_values = json.load(open(path_to_json, 'r')) - for key, value in paired_fields.items(): - try: - json_field = updated_values.get(key) - dicom_field = dicom_header.__getattr__(key) - if json_field != dicom_field: - logger.info(f"WARNING!!!! JSON Field {key} with value {json_field} does not match dicom value " - f"of {dicom_field}") - except AttributeError: - pass - - -def update_json_with_dicom_value_cli(): - dicom_update_parser = argparse.ArgumentParser() - dicom_update_parser.add_argument('-j', '--json', help='path to json to update', required=True) - dicom_update_parser.add_argument('-d', '--dicom', help='path to dicom to extract values from', required=True) - dicom_update_parser.add_argument('-k', '--additional_arguments', - help='additional key value pairs to update json with', nargs='*', - action=helper_functions.ParseKwargs, default={}) - - args = dicom_update_parser.parse_args() - - # get missing values - missing_values = check_json(args.json, silent=True) - - # load dicom header - dicom_header = pydicom.dcmread(args.dicom, stop_before_pixels=True) - - # update json - update_json_with_dicom_value(args.json, missing_values, dicom_header, **args.additional_arguments) - def dicom_datetime_to_dcm2niix_time(dicom=None, date='', time=''): """ @@ -490,62 +253,18 @@ def __init__(self, image_folder, destination_path=None, metadata_path=None, # next we use the loaded python script to extract the information we need self.load_spread_sheet_data() elif metadata_path and not metadata_translation_script or metadata_path == "": - spread_sheet_values = {} - - if Path(metadata_path).is_file(): - spread_sheet_values = helper_functions.single_spreadsheet_reader( - metadata_path, - dicom_metadata=self.dicom_headers[next(iter(self.dicom_headers))], - **self.additional_arguments) - - if Path(metadata_path).is_dir() or metadata_path == "": - # we accept folder input as well as no input, in the - # event of no input we search for spreadsheets in the - # image folder - if metadata_path == "": - metadata_path = self.image_folder - - spreadsheets = helper_functions.collect_spreadsheets(metadata_path) - pet_spreadsheets = [spreadsheet for spreadsheet in spreadsheets if is_pet.pet_file(spreadsheet)] - spread_sheet_values = {} - - for pet_spreadsheet in pet_spreadsheets: - spread_sheet_values.update( - helper_functions.single_spreadsheet_reader( - pet_spreadsheet, - dicom_metadata=self.dicom_headers[next(iter(self.dicom_headers))], - **self.additional_arguments)) - - - # check for any blood (tsv) data or otherwise in the given spreadsheet values - blood_tsv_columns = ['time', 'plasma_radioactivity', 'metabolite_parent_fraction', - 'whole_blood_radioactivity'] - blood_json_columns = ['PlasmaAvail', 'WholeBloodAvail', 'MetaboliteAvail', 'MetaboliteMethod', - 'MetaboliteRecoveryCorrectionApplied', 'DispersionCorrected'] - - # check for existing tsv columns - for column in blood_tsv_columns: - try: - values = spread_sheet_values[column] - self.spreadsheet_metadata['blood_tsv'][column] = values - # pop found data from spreadsheet values after it's been found - spread_sheet_values.pop(column) - except KeyError: - pass - - # check for existing blood json values - for column in blood_json_columns: - try: - values = spread_sheet_values[column] - self.spreadsheet_metadata['blood_json'][column] = values - # pop found data from spreadsheet values after it's been found - spread_sheet_values.pop(column) - except KeyError: - pass - if not self.spreadsheet_metadata.get('nifti_json', None): self.spreadsheet_metadata['nifti_json'] = {} - self.spreadsheet_metadata['nifti_json'].update(spread_sheet_values) + + load_spreadsheet_data = get_metadata_from_spreadsheet(metadata_path=metadata_path, + image_folder=self.image_folder, + image_header_dict=self.dicom_headers[ + next(iter(self.dicom_headers))], + **self.additional_arguments) + + self.spreadsheet_metadata['nifti_json'].update(load_spreadsheet_data['nifti_json']) + self.spreadsheet_metadata['blood_tsv'].update(load_spreadsheet_data['blood_tsv']) + self.spreadsheet_metadata['blood_json'].update(load_spreadsheet_data['blood_json']) self.file_format = file_format # we may want to include additional information to the sidecar, tsv, or json files generated after conversion @@ -915,7 +634,7 @@ def match_dicom_header_to_file(self, destination_path=None): """ if not destination_path: destination_path = self.destination_path - # first collect all of the files in the output directory + # first collect all the files in the output directory output_files = [join(destination_path, output_file) for output_file in listdir(destination_path)] # create empty dictionary to store pairings @@ -989,212 +708,6 @@ def load_spread_sheet_data(self): self.spreadsheet_metadata['nifti_json'] = text_file_data.get('nifti_json', {}) -def check_meta_radio_inputs(kwargs: dict) -> dict: - """ - Executes very specific PET logic, author does not recall everything it does. - :param kwargs: metadata key pair's to examine - :type kwargs: dict - :return: fitted/massaged metadata corresponding to logic steps below, return type is an update on input `kwargs` - :rtype: dict - """ - InjectedRadioactivity = kwargs.get('InjectedRadioactivity', None) - InjectedMass = kwargs.get("InjectedMass", None) - SpecificRadioactivity = kwargs.get("SpecificRadioactivity", None) - MolarActivity = kwargs.get("MolarActivity", None) - MolecularWeight = kwargs.get("MolecularWeight", None) - - data_out = {} - - if InjectedRadioactivity and InjectedMass: - data_out['InjectedRadioactivity'] = InjectedRadioactivity - data_out['InjectedRadioactivityUnits'] = 'MBq' - data_out['InjectedMass'] = InjectedMass - data_out['InjectedMassUnits'] = 'ug' - # check for strings where there shouldn't be strings - numeric_check = [helper_functions.is_numeric(str(InjectedRadioactivity)), - helper_functions.is_numeric(str(InjectedMass))] - if False in numeric_check: - data_out['InjectedMass'] = 'n/a' - data_out['InjectedMassUnits'] = 'n/a' - else: - tmp = (InjectedRadioactivity * 10 ** 6) / (InjectedMass * 10 ** 6) - if SpecificRadioactivity: - if SpecificRadioactivity != tmp: - logger.warning("Inferred SpecificRadioactivity in Bq/g doesn't match InjectedRadioactivity " - "and InjectedMass, could be a unit issue") - data_out['SpecificRadioactivity'] = SpecificRadioactivity - data_out['SpecificRadioactivityUnits'] = kwargs.get('SpecificRadioactivityUnityUnits', 'n/a') - else: - data_out['SpecificRadioactivity'] = tmp - data_out['SpecificRadioactivityUnits'] = 'Bq/g' - - if InjectedRadioactivity and SpecificRadioactivity: - data_out['InjectedRadioactivity'] = InjectedRadioactivity - data_out['InjectedRadioactivityUnits'] = 'MBq' - data_out['SpecificRadioactivity'] = SpecificRadioactivity - data_out['SpecificRadioactivityUnits'] = 'Bq/g' - numeric_check = [helper_functions.is_numeric(str(InjectedRadioactivity)), - helper_functions.is_numeric(str(SpecificRadioactivity))] - if False in numeric_check: - data_out['InjectedMass'] = 'n/a' - data_out['InjectedMassUnits'] = 'n/a' - else: - tmp = ((InjectedRadioactivity * (10 ** 6) / SpecificRadioactivity) * (10 ** 6)) - if InjectedMass: - if InjectedMass != tmp: - logger.warning("Inferred InjectedMass in ug doesn't match InjectedRadioactivity and " - "InjectedMass, could be a unit issue") - data_out['InjectedMass'] = InjectedMass - data_out['InjectedMassUnits'] = kwargs.get('InjectedMassUnits', 'n/a') - else: - data_out['InjectedMass'] = tmp - data_out['InjectedMassUnits'] = 'ug' - - if InjectedMass and SpecificRadioactivity: - data_out['InjectedMass'] = InjectedMass - data_out['InjectedMassUnits'] = 'ug' - data_out['SpecificRadioactivity'] = SpecificRadioactivity - data_out['SpecificRadioactivityUnits'] = 'Bq/g' - numeric_check = [helper_functions.is_numeric(str(SpecificRadioactivity)), - helper_functions.is_numeric(str(InjectedMass))] - if False in numeric_check: - data_out['InjectedRadioactivity'] = 'n/a' - data_out['InjectedRadioactivityUnits'] = 'n/a' - else: - tmp = ((InjectedMass / (10 ** 6)) * SpecificRadioactivity) / ( - 10 ** 6) # ((ug / 10 ^ 6) / Bq / g)/10 ^ 6 = MBq - if InjectedRadioactivity: - if InjectedRadioactivity != tmp: - logger.warning("Inferred InjectedRadioactivity in MBq doesn't match SpecificRadioactivity " - "and InjectedMass, could be a unit issue") - data_out['InjectedRadioactivity'] = InjectedRadioactivity - data_out['InjectedRadioactivityUnits'] = kwargs.get('InjectedRadioactivityUnits', 'n/a') - else: - data_out['InjectedRadioactivity'] = tmp - data_out['InjectedRadioactivityUnits'] = 'MBq' - - if MolarActivity and MolecularWeight: - data_out['MolarActivity'] = MolarActivity - data_out['MolarActivityUnits'] = 'GBq/umol' - data_out['MolecularWeight'] = MolecularWeight - data_out['MolecularWeightUnits'] = 'g/mol' - numeric_check = [helper_functions.is_numeric(str(MolarActivity)), - helper_functions.is_numeric(str(MolecularWeight))] - if False in numeric_check: - data_out['SpecificRadioactivity'] = 'n/a' - data_out['SpecificRadioactivityUnits'] = 'n/a' - else: - tmp = (MolarActivity * (10 ** 3)) / MolecularWeight # (GBq / umol * 10 ^ 6) / (g / mol / * 10 ^ 6) = Bq / g - if SpecificRadioactivity: - if SpecificRadioactivity != tmp: - logger.warning( - "Inferred SpecificRadioactivity in MBq/ug doesn't match Molar Activity and Molecular " - "Weight, could be a unit issue") - data_out['SpecificRadioactivity'] = SpecificRadioactivity - data_out['SpecificRadioactivityUnits'] = kwargs.get('SpecificRadioactivityUnityUnits', 'n/a') - else: - data_out['SpecificRadioactivity'] = tmp - data_out['SpecificRadioactivityUnits'] = 'Bq/g' - - if MolarActivity and SpecificRadioactivity: - data_out['SpecificRadioactivity'] = SpecificRadioactivity - data_out['SpecificRadioactivityUnits'] = 'MBq/ug' - data_out['MolarActivity'] = MolarActivity - data_out['MolarActivityUnits'] = 'GBq/umol' - numeric_check = [helper_functions.is_numeric(str(SpecificRadioactivity)), - helper_functions.is_numeric(str(MolarActivity))] - if False in numeric_check: - data_out['MolecularWeight'] = 'n/a' - data_out['MolecularWeightUnits'] = 'n/a' - else: - tmp = (MolarActivity * 1000) / SpecificRadioactivity # (MBq / ug / 1000) / (GBq / umol) = g / mol - if MolecularWeight: - if MolecularWeight != tmp: - logger.warning("Inferred MolecularWeight in MBq/ug doesn't match Molar Activity and " - "Molecular Weight, could be a unit issue") - - data_out['MolecularWeight'] = tmp - data_out['MolecularWeightUnits'] = kwargs.get('MolecularWeightUnits', 'n/a') - else: - data_out['MolecularWeight'] = tmp - data_out['MolecularWeightUnits'] = 'g/mol' - - if MolecularWeight and SpecificRadioactivity: - data_out['SpecificRadioactivity'] = SpecificRadioactivity - data_out['SpecificRadioactivityUnits'] = 'MBq/ug' - data_out['MolecularWeight'] = MolarActivity - data_out['MolecularWeightUnits'] = 'g/mol' - numeric_check = [helper_functions.is_numeric(str(SpecificRadioactivity)), - helper_functions.is_numeric(str(MolecularWeight))] - if False in numeric_check: - data_out['MolarActivity'] = 'n/a' - data_out['MolarActivityUnits'] = 'n/a' - else: - tmp = MolecularWeight * (SpecificRadioactivity / 1000) # g / mol * (MBq / ug / 1000) = GBq / umol - if MolarActivity: - if MolarActivity != tmp: - logger.warning("Inferred MolarActivity in GBq/umol doesn't match Specific Radioactivity and " - "Molecular Weight, could be a unit issue") - data_out['MolarActivity'] = MolarActivity - data_out['MolarActivityUnits'] = kwargs.get('MolarActivityUnits', 'n/a') - else: - data_out['MolarActivity'] = tmp - data_out['MolarActivityUnits'] = 'GBq/umol' - - return data_out - - -def get_radionuclide(pydicom_dicom): - """ - Gets the radionuclide if given a pydicom_object if - pydicom_object.RadiopharmaceuticalInformationSequence[0].RadionuclideCodeSequence exists - - :param pydicom_dicom: dicom object collected by pydicom.dcmread("dicom_file.img") - :return: Labeled Radionuclide e.g. 11Carbon, 18Flourine - """ - radionuclide = "" - try: - radiopharmaceutical_information_sequence = pydicom_dicom.RadiopharmaceuticalInformationSequence - radionuclide_code_sequence = radiopharmaceutical_information_sequence[0].RadionuclideCodeSequence - code_value = radionuclide_code_sequence[0].CodeValue - code_meaning = radionuclide_code_sequence[0].CodeMeaning - extraction_good = True - except AttributeError: - logger.info("Unable to extract RadioNuclideCodeSequence from RadiopharmaceuticalInformationSequence") - extraction_good = False - - if extraction_good: - # check to see if these nucleotides appear in our verified values - verified_nucleotides = metadata_dictionaries['dicom2bids.json']['RadionuclideCodes'] - - check_code_value = "" - check_code_meaning = "" - - if code_value in verified_nucleotides.keys(): - check_code_value = code_value - else: - logger.warning(f"Radionuclide Code {code_value} does not match any known codes in dcm2bids.json\n" - f"will attempt to infer from code meaning {code_meaning}") - - if code_meaning in verified_nucleotides.values(): - radionuclide = re.sub(r'\^', "", code_meaning) - check_code_meaning = code_meaning - else: - logger.warning(f"Radionuclide Meaning {code_meaning} not in known values in dcm2bids json") - if code_value in verified_nucleotides.keys(): - radionuclide = re.sub(r'\^', "", verified_nucleotides[code_value]) - - # final check - if check_code_meaning and check_code_value: - pass - else: - logger.warning( - f"WARNING!!!! Possible mismatch between nuclide code meaning {code_meaning} and {code_value} in dicom " - f"header") - - return radionuclide - - epilog = textwrap.dedent(''' example usage: diff --git a/pypet2bids/pypet2bids/ecat.py b/pypet2bids/pypet2bids/ecat.py index ceb375e5..f135d795 100644 --- a/pypet2bids/pypet2bids/ecat.py +++ b/pypet2bids/pypet2bids/ecat.py @@ -12,6 +12,7 @@ import os import json import pathlib +import pandas as pd try: import helper_functions @@ -19,17 +20,20 @@ import read_ecat import ecat2nii import dcm2niix4pet + from update_json_pet_file import get_metadata_from_spreadsheet, check_meta_radio_inputs except ModuleNotFoundError: import pypet2bids.helper_functions as helper_functions import pypet2bids.sidecar as sidecar import pypet2bids.read_ecat as read_ecat import pypet2bids.ecat2nii as ecat2nii import pypet2bids.dcm2niix4pet as dcm2niix4pet + from pypet2bids.update_json_pet_file import get_metadata_from_spreadsheet, check_meta_radio_inputs from dateutil import parser logger = helper_functions.logger('pypet2bids') + def parse_this_date(date_like_object) -> str: """ Uses the `dateutil.parser` module to extract a date from a variety of differently formatted date strings @@ -51,7 +55,8 @@ class Ecat: viewing in stdout. Additionally, this class can be used to convert an ECAT7.X image into a nifti image. """ - def __init__(self, ecat_file, nifti_file=None, decompress=True, collect_pixel_data=True): + def __init__(self, ecat_file, nifti_file=None, decompress=True, collect_pixel_data=True, metadata_path=None, + kwargs={}): """ Initialization of this class requires only a path to an ecat file. @@ -69,9 +74,24 @@ def __init__(self, ecat_file, nifti_file=None, decompress=True, collect_pixel_da self.decay_factors = [] # stored here self.sidecar_template = sidecar.sidecar_template_full # bids approved sidecar file with ALL bids fields self.sidecar_template_short = sidecar.sidecar_template_short # bids approved sidecar with only required bids fields + self.sidecar_path = None self.directory_table = None + self.spreadsheet_metadata = {'nifti_json': {}, 'blood_tsv': {}, 'blood_json': {}} + self.kwargs = kwargs + self.output_path = None + self.metadata_path = metadata_path + + # load config file + default_json_path = helper_functions.check_pet2bids_config('DEFAULT_METADATA_JSON') + if default_json_path and pathlib.Path(default_json_path).exists(): + with open(default_json_path, 'r') as json_file: + try: + self.spreadsheet_metadata.update(json.load(json_file)) + except json.decoder.JSONDecodeError: + logger.warning(f"Unable to load default metadata json file at {default_json_path}, skipping.") + if os.path.isfile(ecat_file): - self.ecat_file = ecat_file + self.ecat_file = str(ecat_file) else: raise FileNotFoundError(ecat_file) @@ -114,6 +134,25 @@ def __init__(self, ecat_file, nifti_file=None, decompress=True, collect_pixel_da else: self.nifti_file = nifti_file + # que up metadata path for spreadsheet loading later + if self.metadata_path: + if pathlib.Path(metadata_path).is_file() and pathlib.Path(metadata_path).exists(): + self.metadata_path = metadata_path + elif metadata_path == '': + self.metadata_path = pathlib.Path(self.ecat_file).parent + else: + self.metadata_path = None + + if self.metadata_path: + load_spreadsheet_data = get_metadata_from_spreadsheet(metadata_path=self.metadata_path, + image_folder=pathlib.Path(self.ecat_file).parent, + image_header_dict={}) + + self.spreadsheet_metadata['nifti_json'].update(load_spreadsheet_data['nifti_json']) + self.spreadsheet_metadata['blood_tsv'].update(load_spreadsheet_data['blood_tsv']) + self.spreadsheet_metadata['blood_json'].update(load_spreadsheet_data['blood_json']) + + def make_nifti(self, output_path=None): """ Outputs a nifti from the read in ECAT file. @@ -261,25 +300,28 @@ def populate_sidecar(self, **kwargs): self.sidecar_template['ConversionSoftware'] = 'pypet2bids' self.sidecar_template['ConversionSoftwareVersion'] = helper_functions.get_version() - + # update sidecar values from spreadsheet + if self.spreadsheet_metadata.get('nifti_json', None): + self.sidecar_template.update(self.spreadsheet_metadata['nifti_json']) # include any additional values if kwargs: self.sidecar_template.update(**kwargs) - if not self.sidecar_template.get('TimeZero', None): - if not self.sidecar_template.get('AcquisitionTime', None): + if not self.sidecar_template.get('TimeZero', None) and not kwargs.get('TimeZero', None): + if not self.sidecar_template.get('AcquisitionTime', None) and not kwargs.get('TimeZero', None): logger.warn(f"Unable to determine TimeZero for {self.ecat_file}, you need will need to provide this" - f" for a valid BIDS sidecar.") + f" for a valid BIDS sidecar.") else: self.sidecar_template['TimeZero'] = self.sidecar_template['AcquisitionTime'] + # clear any nulls from json sidecar and replace with none's + self.sidecar_template = helper_functions.replace_nones(self.sidecar_template) + # lastly infer radio data if we have it - meta_radio_inputs = dcm2niix4pet.check_meta_radio_inputs(self.sidecar_template) + meta_radio_inputs = check_meta_radio_inputs(self.sidecar_template) self.sidecar_template.update(**meta_radio_inputs) - # clear any nulls from json sidecar and replace with none's - self.sidecar_template = helper_functions.replace_nones(self.sidecar_template) def prune_sidecar(self): """ @@ -336,6 +378,89 @@ def show_sidecar(self, output_path=None): else: print(json.dumps(helper_functions.replace_nones(self.sidecar_template), indent=4)) + def write_out_blood_files(self, new_file_name_with_entities=None, destination_folder=None): + recording_entity = "_recording-manual" + + if not new_file_name_with_entities: + new_file_name_with_entities = pathlib.Path(self.nifti_file) + if not destination_folder: + destination_folder = pathlib.Path(self.nifti_file).parent + + if '_pet' in new_file_name_with_entities.name: + if new_file_name_with_entities.suffix == '.gz' and len(new_file_name_with_entities.suffixes) > 1: + new_file_name_with_entities = new_file_name_with_entities.with_suffix('').with_suffix('') + + blood_file_name = new_file_name_with_entities.stem.replace('_pet', recording_entity + '_blood') + else: + blood_file_name = new_file_name_with_entities.stem + recording_entity + '_blood' + + if self.spreadsheet_metadata.get('blood_tsv', {}) != {}: + blood_tsv_data = self.spreadsheet_metadata.get('blood_tsv') + if type(blood_tsv_data) is pd.DataFrame or type(blood_tsv_data) is dict: + if type(blood_tsv_data) is dict: + blood_tsv_data = pd.DataFrame(blood_tsv_data) + # write out blood_tsv using pandas csv write + blood_tsv_data.to_csv(os.path.join(destination_folder, blood_file_name + ".tsv") + , sep='\t', + index=False) + + elif type(blood_tsv_data) is str: + # write out with write + with open(os.path.join(destination_folder, blood_file_name + ".tsv"), 'w') as outfile: + outfile.writelines(blood_tsv_data) + else: + raise (f"blood_tsv dictionary is incorrect type {type(blood_tsv_data)}, must be type: " + f"pandas.DataFrame") + + # if there's blood data in the tsv then write out the sidecar file too + if self.spreadsheet_metadata.get('blood_json', {}) != {} \ + and self.spreadsheet_metadata.get('blood_tsv', {}) != {}: + blood_json_data = self.spreadsheet_metadata.get('blood_json') + if type(blood_json_data) is dict: + # write out to file with json dump + pass + elif type(blood_json_data) is str: + # write out to file with json dumps + blood_json_data = json.loads(blood_json_data) + else: + raise (f"blood_json dictionary is incorrect type {type(blood_json_data)}, must be type: dict or str" + f"pandas.DataFrame") + + with open(os.path.join(destination_folder, blood_file_name + '.json'), 'w') as outfile: + json.dump(blood_json_data, outfile, indent=4) + + def update_pet_json(self, pet_json_path): + """given a json file (or a path ending in .json) update or create a PET json file with information collected + from an ecat file. + :param pet_json: a path to a json file + :type pet_json: str or pathlib.Path + :return: None + """ + + # open the json file if it exists + if isinstance(pet_json_path, str): + pet_json = pathlib.Path(pet_json_path) + if pet_json.exists(): + with open(pet_json_path, 'r') as json_file: + try: + pet_json = json.load(json_file) + except json.decoder.JSONDecodeError: + logger.warning(f"Unable to load json file at {pet_json_path}, skipping.") + + # update the template with values from the json file + self.sidecar_template.update(pet_json) + + if self.spreadsheet_metadata.get('nifti_json', None): + self.sidecar_template.update(self.spreadsheet_metadata['nifti_json']) + + self.populate_sidecar(**self.kwargs) + self.prune_sidecar() + + # check metadata radio inputs + self.sidecar_template.update(check_meta_radio_inputs(self.sidecar_template)) + + self.show_sidecar(output_path=pet_json_path) + def json_out(self): """ Dumps entire ecat header and header info into stdout formatted as json. @@ -344,3 +469,16 @@ def json_out(self): """ temp_json = json.dumps(self.ecat_info, indent=4) print(temp_json) + + def convert(self): + """ + Convert ecat to nifti + :return: None + """ + self.output_path = pathlib.Path(self.make_nifti()) + self.sidecar_path = self.output_path.parent / self.output_path.stem + self.sidecar_path = self.sidecar_path.with_suffix('.json') + self.populate_sidecar(**self.kwargs) + self.prune_sidecar() + self.show_sidecar(output_path=self.sidecar_path) + self.write_out_blood_files() diff --git a/pypet2bids/pypet2bids/ecat_cli.py b/pypet2bids/pypet2bids/ecat_cli.py index 5e5a872d..40b07dee 100644 --- a/pypet2bids/pypet2bids/ecat_cli.py +++ b/pypet2bids/pypet2bids/ecat_cli.py @@ -10,15 +10,15 @@ import sys import textwrap from os.path import join -from pypet2bids.ecat import Ecat try: import helper_functions + import Ecat + from update_json_pet_file import check_json, check_meta_radio_inputs except ModuleNotFoundError: import pypet2bids.helper_functions as helper_functions - -#from pypet2bids.helper_functions import load_vars_from_config, ParseKwargs - + from pypet2bids.ecat import Ecat + from pypet2bids.update_json_pet_file import check_json, check_meta_radio_inputs epilog = textwrap.dedent(''' @@ -62,14 +62,17 @@ def cli(): :type --director_table: flag :param --show-examples: shows verbose example usage of this cli :type --show-examples: flag + :param --metadata-path: path to a spreadsheet containing PET metadata + :type --metadata-path: path :return: argparse.ArgumentParser.args for later use in executing conversions or ECAT methods """ - parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,epilog=epilog) + parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, epilog=epilog) + update_or_convert = parser.add_mutually_exclusive_group() parser.add_argument("ecat", nargs='?', metavar="ecat_file", help="Ecat image to collect info from.") parser.add_argument("--affine", "-a", help="Show affine matrix", action="store_true", default=False) - parser.add_argument("--convert", "-c", required=False, action='store_true', - help="If supplied will attempt conversion.") + update_or_convert.add_argument("--convert", "-c", required=False, action='store_true', + help="If supplied will attempt conversion.") parser.add_argument("--dump", "-d", help="Dump information in Header", action="store_true", default=False) parser.add_argument("--json", "-j", action="store_true", default=False, help=""" Output header and subheader info as JSON to stdout, overrides all other options""") @@ -95,6 +98,20 @@ def cli(): action="store_true", default=False) parser.add_argument('--show-examples', '-E', '--HELP', '-H', help='Shows example usage of this cli.', action='store_true') + parser.add_argument('--metadata-path', '-m', help='Path to a spreadsheet containing PET metadata.') + update_or_convert.add_argument('--update', '-u', type=str, default="", + help='Update/create a json sidecar file from an ECAT given a path to that each ' + 'file,. e.g.' + 'ecatpet2bids ecatfile.v --update path/to/sidecar.json ' + 'additionally one can pass metadat to the sidecar via inclusion of the ' + '--kwargs flag or' + 'the --metadata-path flag. If both are included the --kwargs flag will ' + 'override any' + 'overlapping values in the --metadata-path flag or found in the ECAT file \n' + 'ecatpet2bids ecatfile.v --update path/to/sidecar.json --kwargs ' + 'TimeZero="12:12:12"' + 'ecatpet2bids ecatfile.v --update path/to/sidecar.json --metadata-path ' + 'path/to/metadata.xlsx') return parser @@ -166,7 +183,7 @@ def main(): sys.exit(0) collect_pixel_data = False - if cli_args.convert: + if cli_args.convert or cli_args.update: collect_pixel_data = True if cli_args.scannerparams is not None: # if no args are supplied to --scannerparams/-s @@ -180,7 +197,7 @@ def main(): if scanner_txt is None: called_dir = os.getcwd() error_string = f'No scanner file found in {called_dir}. Either create a parameters.txt file, omit ' \ - f'the --scannerparams argument, or specify a full path to a scanner.txt file after the '\ + f'the --scannerparams argument, or specify a full path to a scanner.txt file after the ' \ f'--scannerparams argument.' raise Exception(error_string) else: @@ -195,7 +212,9 @@ def main(): ecat = Ecat(ecat_file=cli_args.ecat, nifti_file=cli_args.nifti, - collect_pixel_data=collect_pixel_data) + collect_pixel_data=collect_pixel_data, + metadata_path=cli_args.metadata_path, + kwargs=cli_args.kwargs) if cli_args.json: ecat.json_out() sys.exit(0) @@ -214,11 +233,39 @@ def main(): ecat.populate_sidecar(**cli_args.kwargs) ecat.show_sidecar() if cli_args.convert: - output_path = pathlib.Path(ecat.make_nifti()) - ecat.populate_sidecar(**cli_args.kwargs) - ecat.prune_sidecar() - sidecar_path = pathlib.Path(join(str(output_path.parent), output_path.stem + '.json')) - ecat.show_sidecar(output_path=sidecar_path) + ecat.convert() + if cli_args.update: + ecat.update_pet_json(cli_args.update) + + +def update_json_with_ecat_value_cli(): + """ + Updates a json sidecar with values extracted from an ecat file, optionally additional values can be included + via the -k --additional-arguments flag and/or a metadata spreadsheet can be supplied via the --metadata-path flag. + Command can be accessed after installation via `upadatepetjsonfromecat` + """ + json_update_cli = argparse.ArgumentParser() + json_update_cli.add_argument("-j", "--json", help="Path to a json to update file.", required=True) + json_update_cli.add_argument("-e", "--ecat", help="Path to an ecat file.", required=True) + json_update_cli.add_argument("-m", "--metadata-path", help="Path to a spreadsheet containing PET metadata.") + json_update_cli.add_argument("-k", "--additional-arguments", nargs='*', action=helper_functions.ParseKwargs, default={}, + help="Include additional values in the sidecar json or override values extracted " + "from the supplied ECAT or metadata spreadsheet. " + "e.g. including `--kwargs TimeZero=\"12:12:12\"` would override the calculated " + "TimeZero." + "Any number of additional arguments can be supplied after --kwargs e.g. `--kwargs" + "BidsVariable1=1 BidsVariable2=2` etc etc." + "Note: the value portion of the argument (right side of the equal's sign) should " + "always be surrounded by double quotes BidsVarQuoted=\"[0, 1 , 3]\"") + + args = json_update_cli.parse_args() + + update_ecat = Ecat(ecat_file=args.ecat, nifti_file=None, collect_pixel_data=True, + metadata_path=args.metadata_path, kwargs=args.additional_arguments) + update_ecat.update_pet_json(args.json) + + # lastly check the json + check_json(args.json, logger='check_json', silent=False) if __name__ == "__main__": diff --git a/pypet2bids/pypet2bids/helper_functions.py b/pypet2bids/pypet2bids/helper_functions.py index dea309a2..08d5b961 100644 --- a/pypet2bids/pypet2bids/helper_functions.py +++ b/pypet2bids/pypet2bids/helper_functions.py @@ -54,6 +54,11 @@ bids_schema_path = os.path.join(metadata_dir, 'schema.json') schema = json.load(open(bids_schema_path, 'r')) +# putting these paths here as they are reused in dcm2niix4pet.py, update_json_pet_file.py, and ecat.py +module_folder = Path(__file__).parent.resolve() +python_folder = module_folder.parent +pet2bids_folder = python_folder.parent +metadata_folder = os.path.join(pet2bids_folder, 'metadata') loggers = {} diff --git a/pypet2bids/pypet2bids/update_json_pet_file.py b/pypet2bids/pypet2bids/update_json_pet_file.py new file mode 100644 index 00000000..129d823b --- /dev/null +++ b/pypet2bids/pypet2bids/update_json_pet_file.py @@ -0,0 +1,609 @@ +from pathlib import Path +from os.path import join +from os import listdir +import json +from json_maj.main import JsonMAJ, load_json_or_dict +import re +from dateutil import parser +import argparse +import pydicom +from typing import Union + +try: + import helper_functions + import is_pet +except ModuleNotFoundError: + import pypet2bids.helper_functions as helper_functions + import pypet2bids.is_pet as is_pet + +# import logging +logger = helper_functions.logger("pypet2bids") + +# load module and metadata_json paths from helper_functions +module_folder, metadata_folder = helper_functions.module_folder, helper_functions.metadata_folder + +try: + # collect metadata jsons in dev mode + metadata_jsons = \ + [Path(join(metadata_folder, metadata_json)) for metadata_json + in listdir(metadata_folder) if '.json' in metadata_json] +except FileNotFoundError: + metadata_jsons = \ + [Path(join(module_folder, 'metadata', metadata_json)) for metadata_json + in listdir(join(module_folder, 'metadata')) if '.json' in metadata_json] + +# create a dictionary to house the PET metadata files +metadata_dictionaries = {} + +for metadata_json in metadata_jsons: + try: + with open(metadata_json, 'r') as infile: + dictionary = json.load(infile) + + metadata_dictionaries[metadata_json.name] = dictionary + except FileNotFoundError as err: + raise Exception(f"Missing pet metadata file {metadata_json} in {metadata_folder}, unable to validate metadata.") + except json.decoder.JSONDecodeError as err: + raise IOError(f"Unable to read from {metadata_json}") + + +def check_json(path_to_json, items_to_check=None, silent=False, spreadsheet_metadata={}, mandatory=True, + recommended=True, logger_name='pypet2bids', **additional_arguments): + """ + This method opens a json and checks to see if a set of mandatory values is present within that json, optionally it + also checks for recommended key value pairs. If fields are not present a warning is raised to the user. + + :param spreadsheet_metadata: + :type spreadsheet_metadata: + :param path_to_json: path to a json file e.g. a BIDS sidecar file created after running dcm2niix + :param items_to_check: a dictionary with items to check for within that json. If None is supplied defaults to the + PET_metadata.json contained in this repository + :param silent: Raises warnings or errors to stdout if this flag is set to True + :return: dictionary of items existence and value state, if key is True/False there exists/(does not exist) a + corresponding entry in the json the same can be said of value + """ + + logger = helper_functions.logger(logger_name) + + if silent: + logger.disabled = True + else: + logger.disabled = False + + # check if path exists + path_to_json = Path(path_to_json) + if not path_to_json.exists(): + raise FileNotFoundError(path_to_json) + + # check for default argument for dictionary of items to check + if items_to_check is None: + items_to_check = metadata_dictionaries['PET_metadata.json'] + # remove blood tsv data from items to check + if items_to_check.get('blood_recording_fields', None): + items_to_check.pop('blood_recording_fields') + + # open the json + with open(path_to_json, 'r') as in_file: + json_to_check = json.load(in_file) + + # initialize warning colors and warning storage dictionary + storage = {} + flattened_spreadsheet_metadata = {} + flattened_spreadsheet_metadata.update(spreadsheet_metadata.get('nifti_json', {})) + flattened_spreadsheet_metadata.update(spreadsheet_metadata.get('blood_json', {})) + flattened_spreadsheet_metadata.update(spreadsheet_metadata.get('blood_tsv', {})) + + if mandatory: + for item in items_to_check['mandatory']: + all_good = False + if (item in json_to_check.keys() and + (json_to_check.get(item, None) is not None or json_to_check.get(item) != '') + or item in additional_arguments or item in flattened_spreadsheet_metadata.keys()): + # this json has both the key and a non-blank value do nothing + all_good = True + pass + elif (item in json_to_check.keys() + and (json_to_check.get(item, None) is None or json_to_check.get(item, None) == '')): + logger.error(f"{item} present but has null value.") + storage[item] = {'key': True, 'value': False} + elif not all_good: + logger.error(f"{item} is not present in {path_to_json}. This will have to be " + f"corrected post conversion.") + storage[item] = {'key': False, 'value': False} + + if recommended: + for item in items_to_check['recommended']: + all_good = False + if (item in json_to_check.keys() and + (json_to_check.get(item, None) is not None or json_to_check.get(item) != '') + or item in additional_arguments or item in flattened_spreadsheet_metadata.keys()): + # this json has both the key and a non-blank value do nothing + all_good = True + pass + elif (item in json_to_check.keys() + and (json_to_check.get(item, None) is None or json_to_check.get(item, None) == '')): + logger.info(f"{item} present but has null value.") + storage[item] = {'key': True, 'value': False} + elif not all_good: + logger.info(f"{item} is recommended but not present in {path_to_json}") + storage[item] = {'key': False, 'value': False} + + return storage + + +def update_json_with_dicom_value( + path_to_json, + missing_values, + dicom_header, + dicom2bids_json=None, + silent=True, + **additional_arguments +): + """ + We go through all the missing values or keys that we find in the sidecar json and attempt to extract those + missing entities from the dicom source. This function relies on many heuristics a.k.a. many unique conditionals and + simply is what it is, hate the game not the player. + + :param path_to_json: path to the sidecar json to check + :param missing_values: dictionary output from check_json indicating missing fields and/or values + :param dicom_header: the dicom or dicoms that may contain information not picked up by dcm2niix + :param dicom2bids_json: a json file that maps dicom header entities to their corresponding BIDS entities + :return: a dictionary of sucessfully updated (written to the json file) fields and values + """ + + if silent: + logger.disabled = True + + json_sidecar_path = Path(path_to_json) + if not json_sidecar_path.exists(): + with open(path_to_json, 'w') as outfile: + json.dump({}, outfile) + + # load the sidecar json + sidecar_json = load_json_or_dict(str(path_to_json)) + + # purely to clean up the generated read the docs page from sphinx, otherwise the entire json appears in the + # read the docs page. + if dicom2bids_json is None: + dicom2bids_json = metadata_dictionaries['dicom2bids.json'] + + # Units gets written as Unit in older versions of dcm2niix here we check for missing Units and present Unit entity + units = missing_values.get('Units', None) + if units: + try: + # Units is missing, check to see if Unit is present + if sidecar_json.get('Unit', None): + temp = JsonMAJ(path_to_json, {'Units': sidecar_json.get('Unit')}, bids_null=True) + temp.remove('Unit') + else: # we source the Units value from the dicom header and update the json + JsonMAJ(path_to_json, {'Units': dicom_header.Units}, bids_null=True) + except AttributeError: + logger.error(f"Dicom is missing Unit(s) field, are you sure this is a PET dicom?") + # pair up dicom fields with bids sidecar json field, we do this in a separate json file + # it's loaded when this script is run and stored in metadata dictionaries + dcmfields = dicom2bids_json['dcmfields'] + jsonfields = dicom2bids_json['jsonfields'] + + regex_cases = ["ReconstructionMethod", "ConvolutionKernel"] + + # strip excess characters from dcmfields + dcmfields = [re.sub('[^0-9a-zA-Z]+', '', field) for field in dcmfields] + paired_fields = {} + for index, field in enumerate(jsonfields): + paired_fields[field] = dcmfields[index] + + logger.info("Attempting to locate missing BIDS fields in dicom header") + # go through missing fields and reach into dicom to pull out values + json_updater = JsonMAJ(json_path=path_to_json, bids_null=True) + for key, value in paired_fields.items(): + missing_bids_field = missing_values.get(key, None) + # if field is missing look into dicom + if missing_bids_field and key not in additional_arguments: + # there are a few special cases that require regex splitting of the dicom entries + # into several bids sidecar entities + try: + dicom_field = getattr(dicom_header, value) + logger.info(f"FOUND {value} corresponding to BIDS {key}: {dicom_field}") + except AttributeError: + dicom_field = None + logger.info(f"NOT FOUND {value} corresponding to BIDS {key} in dicom header.") + + if dicom_field and value in regex_cases: + # if it exists get rid of it, we don't want no part of it. + if sidecar_json.get('ReconMethodName', None): + json_updater.remove('ReconstructionMethod') + if dicom_header.get('ReconstructionMethod', None): + reconstruction_method = dicom_header.ReconstructionMethod + json_updater.remove('ReconstructionMethod') + reconstruction_method = helper_functions.get_recon_method(reconstruction_method) + + json_updater.update(reconstruction_method) + + elif dicom_field: + # update json + json_updater.update({key: dicom_field}) + + # Additional Heuristics are included below + + # See if time zero is missing in json or additional args + if missing_values.get('TimeZero', None): + if missing_values.get('TimeZero')['key'] is False or missing_values.get('TimeZero')['value'] is False: + time_parser = parser + if sidecar_json.get('AcquisitionTime', None): + acquisition_time = time_parser.parse(sidecar_json.get('AcquisitionTime')).time().strftime("%H:%M:%S") + else: + acquisition_time = time_parser.parse(dicom_header['SeriesTime'].value).time().strftime("%H:%M:%S") + + json_updater.update({'TimeZero': acquisition_time}) + json_updater.remove('AcquisitionTime') + json_updater.update({'ScanStart': 0}) + else: + pass + + if missing_values.get('ScanStart', None): + if missing_values.get('ScanStart')['key'] is False or missing_values.get('ScanStart')['value'] is False: + json_updater.update({'ScanStart': 0}) + if missing_values.get('InjectionStart', None): + if missing_values.get('InjectionStart')['key'] is False \ + or missing_values.get('InjectionStart')['value'] is False: + json_updater.update({'InjectionStart': 0}) + + # check to see if units are BQML + json_updater = JsonMAJ(str(path_to_json), bids_null=True) + if json_updater.get('Units') == 'BQML': + json_updater.update({'Units': 'Bq/mL'}) + + # Add radionuclide to json + Radionuclide = get_radionuclide(dicom_header) + if Radionuclide: + json_updater.update({'TracerRadionuclide': Radionuclide}) + + # remove scandate if it exists + json_updater.remove('ScanDate') + + # after updating raise warnings to user if values in json don't match values in dicom headers, only warn! + updated_values = json.load(open(path_to_json, 'r')) + for key, value in paired_fields.items(): + try: + json_field = updated_values.get(key) + dicom_field = dicom_header.__getattr__(key) + if json_field != dicom_field: + logger.info(f"WARNING!!!! JSON Field {key} with value {json_field} does not match dicom value " + f"of {dicom_field}") + except AttributeError: + pass + + +def update_json_with_dicom_value_cli(): + """ + Command line interface for update_json_with_dicom_value, updates a PET json with values from a dicom header, + optionally can update with values from a spreadsheet or via values passed in as additional arguments with the -k + --additional_arguments flag. This command be accessed after installation of pypet2bids via + `updatepetjsonfromdicom`. + """ + dicom_update_parser = argparse.ArgumentParser() + dicom_update_parser.add_argument('-j', '--json', help='path to json to update', required=True) + dicom_update_parser.add_argument('-d', '--dicom', help='path to dicom to extract values from', required=True) + dicom_update_parser.add_argument('-m', '--metadata-path', help='path to metadata json', default=None) + dicom_update_parser.add_argument('-k', '--additional_arguments', + help='additional key value pairs to update json with', nargs='*', + action=helper_functions.ParseKwargs, default={}) + + args = dicom_update_parser.parse_args() + + try: + # get missing values + missing_values = check_json(args.json, silent=True) + except FileNotFoundError: + with open(args.json, 'w') as outfile: + json.dump({}, outfile) + missing_values = check_json(args.json, silent=True) + + # load dicom header + dicom_header = pydicom.dcmread(args.dicom, stop_before_pixels=True) + + if args.metadata_path: + spreadsheet_metadata = get_metadata_from_spreadsheet(args.metadata_path, args.dicom, dicom_header, + **args.additional_arguments) + # update json + update_json_with_dicom_value(args.json, missing_values, dicom_header, silent=True, + spreadsheet_metadata=spreadsheet_metadata['nifti_json'], + **args.additional_arguments) + + JsonMAJ(args.json, update_values=spreadsheet_metadata['nifti_json']).update() + else: + update_json_with_dicom_value(args.json, missing_values, dicom_header, silent=True, **args.additional_arguments) + + JsonMAJ(args.json, update_values=args.additional_arguments).update() + + # check json again after updating + check_json(args.json, required=True, recommended=True, silent=False, logger_name='check_json') + + +def get_radionuclide(pydicom_dicom): + """ + Gets the radionuclide if given a pydicom_object if + pydicom_object.RadiopharmaceuticalInformationSequence[0].RadionuclideCodeSequence exists + + :param pydicom_dicom: dicom object collected by pydicom.dcmread("dicom_file.img") + :return: Labeled Radionuclide e.g. 11Carbon, 18Flourine + """ + radionuclide = "" + try: + radiopharmaceutical_information_sequence = pydicom_dicom.RadiopharmaceuticalInformationSequence + radionuclide_code_sequence = radiopharmaceutical_information_sequence[0].RadionuclideCodeSequence + code_value = radionuclide_code_sequence[0].CodeValue + code_meaning = radionuclide_code_sequence[0].CodeMeaning + extraction_good = True + except AttributeError: + logger.info("Unable to extract RadioNuclideCodeSequence from RadiopharmaceuticalInformationSequence") + extraction_good = False + + if extraction_good: + # check to see if these nucleotides appear in our verified values + verified_nucleotides = metadata_dictionaries['dicom2bids.json']['RadionuclideCodes'] + + check_code_value = "" + check_code_meaning = "" + + if code_value in verified_nucleotides.keys(): + check_code_value = code_value + else: + logger.warning(f"Radionuclide Code {code_value} does not match any known codes in dcm2bids.json\n" + f"will attempt to infer from code meaning {code_meaning}") + + if code_meaning in verified_nucleotides.values(): + radionuclide = re.sub(r'\^', "", code_meaning) + check_code_meaning = code_meaning + else: + logger.warning(f"Radionuclide Meaning {code_meaning} not in known values in dcm2bids json") + if code_value in verified_nucleotides.keys(): + radionuclide = re.sub(r'\^', "", verified_nucleotides[code_value]) + + # final check + if check_code_meaning and check_code_value: + pass + else: + logger.warning( + f"WARNING!!!! Possible mismatch between nuclide code meaning {code_meaning} and {code_value} in dicom " + f"header") + + return radionuclide + + +def check_meta_radio_inputs(kwargs: dict, logger='pypet2bids') -> dict: + """ + Executes very specific PET logic, author does not recall everything it does. + :param kwargs: metadata key pair's to examine + :type kwargs: dict + :return: fitted/massaged metadata corresponding to logic steps below, return type is an update on input `kwargs` + :rtype: dict + """ + + logger = helper_functions.logger(logger) + + InjectedRadioactivity = kwargs.get('InjectedRadioactivity', None) + InjectedMass = kwargs.get("InjectedMass", None) + SpecificRadioactivity = kwargs.get("SpecificRadioactivity", None) + MolarActivity = kwargs.get("MolarActivity", None) + MolecularWeight = kwargs.get("MolecularWeight", None) + + data_out = {} + + if InjectedRadioactivity and InjectedMass: + data_out['InjectedRadioactivity'] = InjectedRadioactivity + data_out['InjectedRadioactivityUnits'] = 'MBq' + data_out['InjectedMass'] = InjectedMass + data_out['InjectedMassUnits'] = 'ug' + # check for strings where there shouldn't be strings + numeric_check = [helper_functions.is_numeric(str(InjectedRadioactivity)), + helper_functions.is_numeric(str(InjectedMass))] + if False in numeric_check: + data_out['InjectedMass'] = 'n/a' + data_out['InjectedMassUnits'] = 'n/a' + else: + tmp = (InjectedRadioactivity * 10 ** 6) / (InjectedMass * 10 ** 6) + if SpecificRadioactivity: + if SpecificRadioactivity != tmp: + logger.warning("Inferred SpecificRadioactivity in Bq/g doesn't match InjectedRadioactivity " + "and InjectedMass, could be a unit issue") + data_out['SpecificRadioactivity'] = SpecificRadioactivity + data_out['SpecificRadioactivityUnits'] = kwargs.get('SpecificRadioactivityUnityUnits', 'n/a') + else: + data_out['SpecificRadioactivity'] = tmp + data_out['SpecificRadioactivityUnits'] = 'Bq/g' + + if InjectedRadioactivity and SpecificRadioactivity: + data_out['InjectedRadioactivity'] = InjectedRadioactivity + data_out['InjectedRadioactivityUnits'] = 'MBq' + data_out['SpecificRadioactivity'] = SpecificRadioactivity + data_out['SpecificRadioactivityUnits'] = 'Bq/g' + numeric_check = [helper_functions.is_numeric(str(InjectedRadioactivity)), + helper_functions.is_numeric(str(SpecificRadioactivity))] + if False in numeric_check: + data_out['InjectedMass'] = 'n/a' + data_out['InjectedMassUnits'] = 'n/a' + else: + tmp = ((InjectedRadioactivity * (10 ** 6) / SpecificRadioactivity) * (10 ** 6)) + if InjectedMass: + if InjectedMass != tmp: + logger.warning("Inferred InjectedMass in ug doesn't match InjectedRadioactivity and " + "InjectedMass, could be a unit issue") + data_out['InjectedMass'] = InjectedMass + data_out['InjectedMassUnits'] = kwargs.get('InjectedMassUnits', 'n/a') + else: + data_out['InjectedMass'] = tmp + data_out['InjectedMassUnits'] = 'ug' + + if InjectedMass and SpecificRadioactivity: + data_out['InjectedMass'] = InjectedMass + data_out['InjectedMassUnits'] = 'ug' + data_out['SpecificRadioactivity'] = SpecificRadioactivity + data_out['SpecificRadioactivityUnits'] = 'Bq/g' + numeric_check = [helper_functions.is_numeric(str(SpecificRadioactivity)), + helper_functions.is_numeric(str(InjectedMass))] + if False in numeric_check: + data_out['InjectedRadioactivity'] = 'n/a' + data_out['InjectedRadioactivityUnits'] = 'n/a' + else: + tmp = ((InjectedMass / (10 ** 6)) * SpecificRadioactivity) / ( + 10 ** 6) # ((ug / 10 ^ 6) / Bq / g)/10 ^ 6 = MBq + if InjectedRadioactivity: + if InjectedRadioactivity != tmp: + logger.warning("Inferred InjectedRadioactivity in MBq doesn't match SpecificRadioactivity " + "and InjectedMass, could be a unit issue") + data_out['InjectedRadioactivity'] = InjectedRadioactivity + data_out['InjectedRadioactivityUnits'] = kwargs.get('InjectedRadioactivityUnits', 'n/a') + else: + data_out['InjectedRadioactivity'] = tmp + data_out['InjectedRadioactivityUnits'] = 'MBq' + + if MolarActivity and MolecularWeight: + data_out['MolarActivity'] = MolarActivity + data_out['MolarActivityUnits'] = 'GBq/umol' + data_out['MolecularWeight'] = MolecularWeight + data_out['MolecularWeightUnits'] = 'g/mol' + numeric_check = [helper_functions.is_numeric(str(MolarActivity)), + helper_functions.is_numeric(str(MolecularWeight))] + if False in numeric_check: + data_out['SpecificRadioactivity'] = 'n/a' + data_out['SpecificRadioactivityUnits'] = 'n/a' + else: + tmp = (MolarActivity * (10 ** 3)) / MolecularWeight # (GBq / umol * 10 ^ 6) / (g / mol / * 10 ^ 6) = Bq / g + if SpecificRadioactivity: + if SpecificRadioactivity != tmp: + logger.warning( + "Inferred SpecificRadioactivity in MBq/ug doesn't match Molar Activity and Molecular " + "Weight, could be a unit issue") + data_out['SpecificRadioactivity'] = SpecificRadioactivity + data_out['SpecificRadioactivityUnits'] = kwargs.get('SpecificRadioactivityUnityUnits', 'n/a') + else: + data_out['SpecificRadioactivity'] = tmp + data_out['SpecificRadioactivityUnits'] = 'Bq/g' + + if MolarActivity and SpecificRadioactivity: + data_out['SpecificRadioactivity'] = SpecificRadioactivity + data_out['SpecificRadioactivityUnits'] = 'MBq/ug' + data_out['MolarActivity'] = MolarActivity + data_out['MolarActivityUnits'] = 'GBq/umol' + numeric_check = [helper_functions.is_numeric(str(SpecificRadioactivity)), + helper_functions.is_numeric(str(MolarActivity))] + if False in numeric_check: + data_out['MolecularWeight'] = 'n/a' + data_out['MolecularWeightUnits'] = 'n/a' + else: + tmp = (MolarActivity * 1000) / SpecificRadioactivity # (MBq / ug / 1000) / (GBq / umol) = g / mol + if MolecularWeight: + if MolecularWeight != tmp: + logger.warning("Inferred MolecularWeight in MBq/ug doesn't match Molar Activity and " + "Molecular Weight, could be a unit issue") + + data_out['MolecularWeight'] = tmp + data_out['MolecularWeightUnits'] = kwargs.get('MolecularWeightUnits', 'n/a') + else: + data_out['MolecularWeight'] = tmp + data_out['MolecularWeightUnits'] = 'g/mol' + + if MolecularWeight and SpecificRadioactivity: + data_out['SpecificRadioactivity'] = SpecificRadioactivity + data_out['SpecificRadioactivityUnits'] = 'MBq/ug' + data_out['MolecularWeight'] = MolarActivity + data_out['MolecularWeightUnits'] = 'g/mol' + numeric_check = [helper_functions.is_numeric(str(SpecificRadioactivity)), + helper_functions.is_numeric(str(MolecularWeight))] + if False in numeric_check: + data_out['MolarActivity'] = 'n/a' + data_out['MolarActivityUnits'] = 'n/a' + else: + tmp = MolecularWeight * (SpecificRadioactivity / 1000) # g / mol * (MBq / ug / 1000) = GBq / umol + if MolarActivity: + if MolarActivity != tmp: + logger.warning("Inferred MolarActivity in GBq/umol doesn't match Specific Radioactivity and " + "Molecular Weight, could be a unit issue") + data_out['MolarActivity'] = MolarActivity + data_out['MolarActivityUnits'] = kwargs.get('MolarActivityUnits', 'n/a') + else: + data_out['MolarActivity'] = tmp + data_out['MolarActivityUnits'] = 'GBq/umol' + + return data_out + + +def get_metadata_from_spreadsheet(metadata_path: Union[str, Path], image_folder, + image_header_dict, **additional_arguments) -> dict: + """ + Extracts metadata from a spreadsheet and returns a dictionary of metadata organized under + three main keys: nifti_json, blood_json, and blood_tsv + + :param metadata_path: path to a spreadsheet + :type metadata_path: [str, pathlib.Path] + :param image_folder: path to image folder + :type image_folder: [str, pathlib.Path] + :param image_header_dict: dictionary of image header values + :type image_header_dict: dict + :param additional_arguments: additional arguments to pass on, typically user sourced key value pairs + :type additional_arguments: dict + :return: dictionary of metadata + :rtype: dict + """ + spreadsheet_metadata = {'nifti_json': {}, 'blood_json': {}, 'blood_tsv': {}} + spreadsheet_values = {} + if Path(metadata_path).is_file(): + spreadsheet_values = helper_functions.single_spreadsheet_reader( + path_to_spreadsheet=metadata_path, + dicom_metadata=image_header_dict, + **additional_arguments) + + if Path(metadata_path).is_dir() or metadata_path == "": + # we accept folder input as well as no input, in the + # event of no input we search for spreadsheets in the + # image folder + if metadata_path == "": + metadata_path = image_folder + + spreadsheets = helper_functions.collect_spreadsheets(metadata_path) + pet_spreadsheets = [spreadsheet for spreadsheet in spreadsheets if is_pet.pet_file(spreadsheet)] + spread_sheet_values = {} + + for pet_spreadsheet in pet_spreadsheets: + spreadsheet_values.update( + helper_functions.single_spreadsheet_reader( + path_to_spreadsheet=pet_spreadsheet, + dicom_metadata=image_header_dict, + **additional_arguments)) + + # check for any blood (tsv) data or otherwise in the given spreadsheet values + blood_tsv_columns = ['time', 'plasma_radioactivity', 'metabolite_parent_fraction', + 'whole_blood_radioactivity'] + blood_json_columns = ['PlasmaAvail', 'WholeBloodAvail', 'MetaboliteAvail', 'MetaboliteMethod', + 'MetaboliteRecoveryCorrectionApplied', 'DispersionCorrected'] + + # check for existing tsv columns + for column in blood_tsv_columns: + try: + values = spreadsheet_values[column] + spreadsheet_metadata['blood_tsv'][column] = values + # pop found data from spreadsheet values after it's been found + spreadsheet_values.pop(column) + except KeyError: + pass + + # check for existing blood json values + for column in blood_json_columns: + try: + values = spreadsheet_values[column] + spreadsheet_metadata['blood_json'][column] = values + # pop found data from spreadsheet values after it's been found + spreadsheet_values.pop(column) + except KeyError: + pass + + if not spreadsheet_metadata.get('nifti_json', None): + spreadsheet_metadata['nifti_json'] = {} + spreadsheet_metadata['nifti_json'].update(spreadsheet_values) + + return spreadsheet_metadata + + +if __name__ == '__main__': + update_json_with_dicom_value_cli() \ No newline at end of file diff --git a/pypet2bids/pyproject.toml b/pypet2bids/pyproject.toml index 47f0f59f..8c95d384 100644 --- a/pypet2bids/pyproject.toml +++ b/pypet2bids/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pypet2bids" -version = "1.3.0" +version = "1.3.1" description = "A python implementation of an ECAT to BIDS converter." authors = ["anthony galassi <28850131+bendhouseart@users.noreply.github.com>"] license = "MIT" @@ -42,6 +42,7 @@ pet2bids-spreadsheet-template = 'pypet2bids.helper_functions:write_out_module' convert-pmod-to-blood = 'pypet2bids.convert_pmod_to_blood:main' ispet = 'pypet2bids.is_pet:main' updatepetjsonfromdicom = 'pypet2bids.dcm2niix4pet:update_json_with_dicom_value_cli' +updatepetjsonfromecat = 'pypet2bids.ecat_cli:update_json_with_ecat_value_cli' [tool.poetry.group.dev.dependencies] pyinstaller = "^5.4.1" diff --git a/pypet2bids/tests/test_dcm2niix4pet.py b/pypet2bids/tests/test_dcm2niix4pet.py index 62003f83..1f3ff16a 100644 --- a/pypet2bids/tests/test_dcm2niix4pet.py +++ b/pypet2bids/tests/test_dcm2niix4pet.py @@ -1,5 +1,5 @@ -from pypet2bids.dcm2niix4pet import Dcm2niix4PET, dicom_datetime_to_dcm2niix_time, check_json, collect_date_time_from_file_name, update_json_with_dicom_value -from pypet2bids.dcm2niix4pet import check_meta_radio_inputs +from pypet2bids.dcm2niix4pet import Dcm2niix4PET, dicom_datetime_to_dcm2niix_time, collect_date_time_from_file_name +from pypet2bids.update_json_pet_file import check_meta_radio_inputs, check_json, update_json_with_dicom_value import shutil import dotenv diff --git a/pypet2bids/tests/test_ecat.py b/pypet2bids/tests/test_ecat.py new file mode 100644 index 00000000..2a614bfa --- /dev/null +++ b/pypet2bids/tests/test_ecat.py @@ -0,0 +1,196 @@ +import os +import subprocess +import tempfile +import pathlib +import json +import pdb +from pypet2bids.ecat import Ecat + +TESTS_DIR = pathlib.Path(__file__).resolve().parent +PYPET2BIDS_DIR = TESTS_DIR.parent +PET2BIDS_DIR = PYPET2BIDS_DIR.parent + +# obtain ecat file path +ecat_file_path = PET2BIDS_DIR / 'ecat_validation' / 'ECAT7_multiframe.v.gz' +ecatpet2bids = PYPET2BIDS_DIR / 'pypet2bids' / 'ecat_cli.py' + +dataset_description_dictionary = { + "_Comment": "This is a very basic example of a dataset description json", + "Name": "PET Brain phantoms", + "BIDSVersion": "1.7.0", + "DatasetType": "raw", + "License": "CC0", + "Authors": [ + "Author1 Surname1", + "Author2 Surname2", + "Author3 Surname3", + "Author4 Middlename4 Surname4", + "Author5 Middlename5 Surname5" + ], + "HowToAcknowledge": "No worries this is fake.", + "ReferencesAndLinks": ["No you aren't getting any", "Don't bother to ask", "Fine, https://fake.fakelink.null"] +} + + +def test_kwargs_produce_valid_conversion(tmp_path): + # prepare a set of kwargs (stolen from a valid bids subject/dataset, mum's the word ;) ) + full_set_of_kwargs = { + "Modality": "PT", + "Manufacturer": "Siemens", + "ManufacturersModelName": "Biograph 64_mCT", + "InstitutionName": "NIH", + "InstitutionalDepartmentName": "NIMH MIB", + "InstitutionAddress": "10 Center Drive, Bethesda, MD 20892", + "DeviceSerialNumber": "60005", + "StationName": "MIAWP60005", + "PatientPosition": "FFS", + "SoftwareVersions": "VG60A", + "SeriesDescription": "PET Brain Dyn TOF", + "ProtocolName": "PET Brain Dyn TOF", + "ImageType": [ + "ORIGINAL", + "PRIMARY" + ], + "SeriesNumber": 6, + "ScanStart": 2, + "TimeZero": "10:39:46", + "InjectionStart": 0, + "AcquisitionNumber": 2001, + "ImageComments": "Frame 1 of 33^AC_CT_Brain", + "Radiopharmaceutical": "ps13", + "RadionuclidePositronFraction": 0.997669, + "RadionuclideTotalDose": 714840000.0, + "RadionuclideHalfLife": 1220.04, + "DoseCalibrationFactor": 30806700.0, + "ConvolutionKernel": "XYZ Gauss2.00", + "Units": "Bq/mL", + "ReconstructionName": "PSF+TOF", + "ReconstructionParameterUnits": [ + "None", + "None" + ], + "ReconstructionParameterLabels": [ + "subsets", + "iterations" + ], + "ReconstructionParameterValues": [ + 21, + 3 + ], + "ReconFilterType": "XYZ Gauss", + "ReconFilterSize": 2.0, + "AttenuationCorrection": "measured,AC_CT_Brain", + "DecayFactor": [ + 1.00971 + ], + "FrameTimesStart": [ + 0 + ], + "FrameDuration": [ + 30 + ], + "SliceThickness": 2, + "ImageOrientationPatientDICOM": [ + 1, + 0, + 0, + 0, + 1, + 0 + ], + "ConversionSoftware": [ + "dcm2niix", + "pypet2bids" + ], + "ConversionSoftwareVersion": [ + "v1.0.20211006", + "0.0.8" + ], + "TracerName": "[11C]PS13", + "TracerRadionuclide": "11C", + "InjectedRadioactivity": 714840000.0, + "InjectedRadioactivityUnits": "Bq", + "InjectedMass": 5.331647109063877, + "InjectedMassUnits": "nmol", + "SpecificRadioactivity": 341066000000000, + "SpecificRadioactivityUnits": "Bq/mol", + "ModeOfAdministration": "bolus", + "AcquisitionMode": "dynamic", + "ImageDecayCorrected": True, + "ImageDecayCorrectionTime": 0, + "ReconMethodName": "Point-Spread Function + Time Of Flight", + "ReconMethodParameterLabels": [ + "subsets", + "iterations" + ], + "ReconMethodParameterUnits": [ + "none", "none" + ], + "ReconMethodParameterValues": [ + 21, + 3 + ], + "Haematocrit": 0.308 + } + + # test ecat converter + + # create ecat dir + ecat_bids_dir = tmp_path / "ecat_test/sub-ecat/ses-test/pet" + ecat_bids_dir.mkdir(parents=True, exist_ok=True) + + # we're going to want a dataset description json at a minimum + dataset_description_path = ecat_bids_dir.parent.parent.parent / "dataset_description.json" + with open(dataset_description_path, 'w') as outfile: + json.dump(dataset_description_dictionary, outfile, indent=4) + + ecat_bids_nifti_path = ecat_bids_dir / "sub-ecat_ses-test_pet.nii" + + # run ecat converter + convert_ecat = Ecat(ecat_file=str(ecat_file_path), + nifti_file=str(ecat_bids_nifti_path), + kwargs=full_set_of_kwargs, + collect_pixel_data=True) + + convert_ecat.convert() + + # run validator + cmd = f"bids-validator {ecat_bids_dir.parent.parent.parent} --ignoreWarnings" + validate_ecat = subprocess.run(cmd, shell=True, capture_output=True) + + assert validate_ecat.returncode == 0, cmd + + +def test_spreadsheets_produce_valid_conversion_ecatpet2bids(tmp_path): + # collect spreadsheets + single_subject_spreadsheet = ( + PET2BIDS_DIR / 'spreadsheet_conversion/single_subject_sheet/subject_metadata_example.xlsx') + + ecatpet2bids_test_dir = tmp_path / 'ecatpet2bids_spreadsheet_input' + ecatpet2bids_test_dir.mkdir(parents=True, exist_ok=True) + subject_folder = ecatpet2bids_test_dir / 'sub-singlesubjectspreadsheetecat' / 'ses-test' / 'pet' + + cmd = (f"python {ecatpet2bids} {ecat_file_path} " + f"--nifti {subject_folder}/sub-singlesubjectspreadsheetecat_ses-test_pet.nii.gz " + f"--metadata-path {single_subject_spreadsheet} " + f"--convert") + + spreadsheet_ecat = Ecat(ecat_file=str(ecat_file_path), + nifti_file=str(subject_folder) + '/sub-singlesubjectspreadsheetecat_ses-test_pet.nii.gz', + metadata_path=single_subject_spreadsheet, + collect_pixel_data=True) + + spreadsheet_ecat.convert() + + # copy over dataset_description + dataset_description_path = ecatpet2bids_test_dir / 'dataset_description.json' + with open(dataset_description_path, 'w') as outfile: + json.dump(dataset_description_dictionary, outfile, indent=4) + + validator_cmd = f"bids-validator {ecatpet2bids_test_dir} --ingnoreWarnings" + validate_ecat_w_spreadsheet = subprocess.run(validator_cmd, shell=True, capture_output=True) + + assert validate_ecat_w_spreadsheet.returncode == 0 + + + diff --git a/pypet2bids/tests/test_library_command_line_interfaces.py b/pypet2bids/tests/test_library_command_line_interfaces.py index 546aefd0..4ce99ddd 100644 --- a/pypet2bids/tests/test_library_command_line_interfaces.py +++ b/pypet2bids/tests/test_library_command_line_interfaces.py @@ -15,7 +15,7 @@ if dicom_source_folder: dicom_source_folder = pathlib.Path(dicom_source_folder) if not dicom_source_folder: - dicom_source_folder = PET2BIDS_DIR / 'OpenNeuroPET-Phantoms' / 'sourcedata' / 'SiemensBiographPETMR-NRU' + dicom_source_folder = PET2BIDS_DIR / 'OpenNeuroPET-Phantoms' / 'sourcedata' / 'SiemensBiographPETMR-NIMH' /'AC_TOF' if not dicom_source_folder.exists(): raise FileNotFoundError(dicom_source_folder) @@ -222,16 +222,13 @@ def test_kwargs_produce_valid_conversion(tmp_path): assert validate_dicom.returncode == 0, validate_dicom.stdout -def test_spreadsheets_produce_valid_conversion(tmp_path): - +def test_spreadsheets_produce_valid_conversion_dcm2niix4pet(tmp_path): # collect spreadsheets - #single_subject_spreadsheet = PET2BIDS_DIR / 'spreadsheet_conversion/single_subject_sheet/subject_metadata_example.xlsx' - - single_subject_spreadsheet = '/Users/galassiae/Projects/PET2BIDS/spreadsheet_conversion/single_subject_sheet/subject_metadata_example.xlsx' + single_subject_spreadsheet = PET2BIDS_DIR / 'spreadsheet_conversion/single_subject_sheet/subject_metadata_example.xlsx' dcm2niix4pet_test_dir = tmp_path / 'dcm2niix_spreadsheet_input' dcm2niix4pet_test_dir.mkdir(parents=True, exist_ok=True) - subject_folder = dcm2niix4pet_test_dir / 'sub-singlesubjectspreadsheet' / 'ses-test' / 'pet' + subject_folder = dcm2niix4pet_test_dir / 'sub-singlesubjectspreadsheetdicom' / 'ses-test' / 'pet' cmd = f"python {dcm2niix4pet} {dicom_source_folder} --destination-path {subject_folder} --metadata-path {single_subject_spreadsheet}" run_dcm2niix4pet = subprocess.run(cmd, shell=True, capture_output=True) @@ -247,6 +244,35 @@ def test_spreadsheets_produce_valid_conversion(tmp_path): assert validate_dicom_w_spreadsheet.returncode == 0 +def test_spreadsheets_produce_valid_conversion_ecatpet2bids(tmp_path): + # collect spreadsheets + single_subject_spreadsheet = ( + PET2BIDS_DIR / 'spreadsheet_conversion/single_subject_sheet/subject_metadata_example.xlsx') + + ecatpet2bids_test_dir = tmp_path / 'ecatpet2bids_spreadsheet_input' + ecatpet2bids_test_dir.mkdir(parents=True, exist_ok=True) + subject_folder = ecatpet2bids_test_dir / 'sub-singlesubjectspreadsheetecat' / 'ses-test' / 'pet' + + cmd = (f"python {ecatpet2bids} {ecat_file_path} " + f"--nifti {subject_folder}/sub-singlesubjectspreadsheetecat_ses-test_pet.nii.gz " + f"--metadata-path {single_subject_spreadsheet} " + f"--convert") + + run_ecatpet2bids = subprocess.run(cmd, shell=True, capture_output=True) + + assert run_ecatpet2bids.returncode == 0 + + # copy over dataset_description + dataset_description_path = ecatpet2bids_test_dir / 'dataset_description.json' + with open(dataset_description_path, 'w') as outfile: + json.dump(dataset_description_dictionary, outfile, indent=4) + + validator_cmd = f"bids-validator {ecatpet2bids_test_dir} --ingnoreWarnings" + validate_ecat_w_spreadsheet = subprocess.run(validator_cmd, shell=True, capture_output=True) + + assert validate_ecat_w_spreadsheet.returncode == 0 + + def test_scanner_params_produce_valid_conversion(tmp_path): # test scanner params txt with ecat conversion @@ -260,8 +286,7 @@ def test_scanner_params_produce_valid_conversion(tmp_path): with open(dataset_descripion_path, 'w') as outfile: json.dump(dataset_description_dictionary, outfile, indent=4) - #scanner_params = PET2BIDS_DIR / 'tests' / 'scannerparams.txt' - scanner_params = "/Users/galassiae/Projects/PET2BIDS/pypet2bids/tests/scannerparams.txt" + scanner_params = PET2BIDS_DIR / 'pypet2bids' / 'tests' / 'scannerparams.txt' not_full_set_of_kwargs = { "SeriesDescription": "PET Brain Dyn TOF", "ProtocolName": "PET Brain Dyn TOF",