diff --git a/mriqc/cli/parser.py b/mriqc/cli/parser.py index 6d24b8fa..d4181736 100644 --- a/mriqc/cli/parser.py +++ b/mriqc/cli/parser.py @@ -490,6 +490,7 @@ def parse_args(args=None, namespace=None): from mriqc import __version__ from mriqc._warnings import DATE_FMT, LOGGER_FMT, _LogFormatter from mriqc.messages import PARTICIPANT_START + from mriqc.utils.misc import initialize_meta_and_data parser = _build_parser() opts = parser.parse_args(args, namespace) @@ -642,11 +643,7 @@ def parse_args(args=None, namespace=None): f'MRIQC is unable to process the following modalities: {", ".join(unknown_mods)}.' ) - # Estimate the biggest file size / leave 1GB if some file does not exist (datalad) - with suppress(FileNotFoundError): - config.workflow.biggest_file_gb = _get_biggest_file_size_gb( - config.workflow.inputs.values() - ) + initialize_meta_and_data() # set specifics for alternative populations if opts.species.lower() != 'human': @@ -660,17 +657,3 @@ def parse_args(args=None, namespace=None): config.workflow.fd_radius = 7.5 # block uploads for the moment; can be reversed before wider release config.execution.no_sub = True - - -def _get_biggest_file_size_gb(files): - """Identify the largest file size (allows multi-echo groups).""" - - import os - - sizes = [] - for file in files: - if isinstance(file, (list, tuple)): - sizes.append(_get_biggest_file_size_gb(file)) - else: - sizes.append(os.path.getsize(file)) - return max(sizes) / (1024**3) diff --git a/mriqc/utils/misc.py b/mriqc/utils/misc.py index 269b9b67..5072c5d7 100644 --- a/mriqc/utils/misc.py +++ b/mriqc/utils/misc.py @@ -22,11 +22,18 @@ # """Helper functions.""" +from __future__ import annotations + +import asyncio +from functools import partial import json +from os import cpu_count from collections import OrderedDict from collections.abc import Iterable from pathlib import Path +from typing import Callable, TypeVar +import numpy as np import pandas as pd try: @@ -34,6 +41,8 @@ except ImportError: from collections.abc import MutableMapping +R = TypeVar("R") + IMTYPES = { 'T1w': 'anat', 'T2w': 'anat', @@ -58,6 +67,11 @@ (_rec-(?P[a-zA-Z0-9]+))?(_run-(?P[a-zA-Z0-9]+))?\ """ +async def worker(job: Callable[[], R], semaphore) -> R: + async with semaphore: + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, job) + def reorder_csv(csv_file, out_file=None): """ @@ -249,7 +263,10 @@ def _flatten_list(xs): def _datalad_get(input_list, nprocs=None): from mriqc import config - if not config.execution.bids_dir_datalad: + if ( + not config.execution.bids_dir_datalad + or not config.execution.datalad_get + ): return # Delay datalad import until we're sure we'll need it @@ -273,3 +290,120 @@ def _datalad_get(input_list, nprocs=None): config.nipype.nprocs, ), ) + + +def _file_meta_and_size( + files: list | str, +): + """Identify the largest file size (allows multi-echo groups).""" + + import os + + from mriqc import config + + multifile = isinstance(files, (list, tuple)) + if multifile: + metadata = [] + entities = [] + _size_list = [] + + for filename in files: + metadata_i, entities_i, sizes_i = _file_meta_and_size(filename) + metadata_i["FileSize"] = sizes_i + metadata_i["FileSizeUnits"] = "GB" + # Add to output + metadata.append(metadata_i) + entities.append(entities_i) + _size_list.append(sizes_i) + + return metadata, entities, np.sum(_size_list) + + metadata = config.execution.layout.get_metadata(files) + entities = config.execution.layout.parse_file_entities(files) + size = os.path.getsize(files) / (1024**3) + + metadata["FileSize"] = size + metadata["FileSizeUnits"] = "GB" + + try: + nii = nb.load(files) + nifti_len = nii.shape[3] + except nb.filebasedimages.ImageFileError: + nifti_len = None + except IndexError: # shape has only 3 elements + nifti_len = 1 if nii.dataobj.ndim == 3 else -1 + + metadata["NumberOfVolumes"] = nifti_len + + return metadata, entities, size + + +async def _extract_meta_and_size( + filelist: list, + max_concurrent: int = min(cpu_count(), 12), +): + """Extract corresponding metadata and file size in GB.""" + + semaphore = asyncio.Semaphore(max_concurrent) + tasks = [] + for filename in filelist: + tasks.append( + asyncio.create_task( + worker( + partial( + _file_meta_and_size, + filename + ), + semaphore, + ) + ) + ) + + # Gather guarantees the order of the output + metadata, entities, sizes = list(zip(*await asyncio.gather(*tasks))) + return metadata, entities, sizes + + +def initialize_meta_and_data( + max_concurrent: int = min(cpu_count(), 12), +): + """ + Mine data and metadata corresponding to the dataset. + + Get files if datalad enabled and extract the necessary metadata. + + """ + from mriqc import config + + # Datalad-get all files + dataset = config.workflow.inputs.values() + _datalad_get(dataset) + + # Extract metadata and filesize + config.workflow.input_metadata = {} + config.workflow.input_entities = {} + config.workflow.biggest_file_gb = {} + for mod, input_list in config.workflow.inputs.items(): + config.loggers.cli.log( + 25, + f"Extracting metadata and entities for {len(input_list)} input runs " + f"of modality '{mod}'...", + ) + + metadata, entities, size = asyncio.run( + _extract_meta_and_size( + input_list, + max_concurrent=max_concurrent, + ) + ) + + _max_size = np.max(size) + config.workflow.input_metadata[mod] = metadata + config.workflow.input_entities[mod] = entities + config.workflow.biggest_file_gb[mod] = _max_size + + config.loggers.cli.log( + 25, + f"File size ('{mod}'): {_max_size:.2f}|{np.mean(size):.2f} " + "GB [maximum|average].", + ) diff --git a/mriqc/workflows/functional/base.py b/mriqc/workflows/functional/base.py index ec26cda9..05747597 100644 --- a/mriqc/workflows/functional/base.py +++ b/mriqc/workflows/functional/base.py @@ -82,11 +82,6 @@ def fmri_qc_workflow(name='funcMRIQC'): dataset = config.workflow.inputs.get('bold', []) - if config.execution.datalad_get: - from mriqc.utils.misc import _datalad_get - - _datalad_get(dataset) - full_files = [] for bold_path in dataset: try: