Skip to content

Commit

Permalink
wip: run file-wise operations only once
Browse files Browse the repository at this point in the history
  • Loading branch information
oesteban committed Aug 12, 2024
1 parent a6a2ba8 commit 82597a5
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 25 deletions.
21 changes: 2 additions & 19 deletions mriqc/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ def parse_args(args=None, namespace=None):
from mriqc import __version__
from mriqc._warnings import DATE_FMT, LOGGER_FMT, _LogFormatter
from mriqc.messages import PARTICIPANT_START
from mriqc.utils.misc import initialize_meta_and_data

parser = _build_parser()
opts = parser.parse_args(args, namespace)
Expand Down Expand Up @@ -642,11 +643,7 @@ def parse_args(args=None, namespace=None):
f'MRIQC is unable to process the following modalities: {", ".join(unknown_mods)}.'
)

# Estimate the biggest file size / leave 1GB if some file does not exist (datalad)
with suppress(FileNotFoundError):
config.workflow.biggest_file_gb = _get_biggest_file_size_gb(
config.workflow.inputs.values()
)
initialize_meta_and_data()

# set specifics for alternative populations
if opts.species.lower() != 'human':
Expand All @@ -660,17 +657,3 @@ def parse_args(args=None, namespace=None):
config.workflow.fd_radius = 7.5
# block uploads for the moment; can be reversed before wider release
config.execution.no_sub = True


def _get_biggest_file_size_gb(files):
"""Identify the largest file size (allows multi-echo groups)."""

import os

sizes = []
for file in files:
if isinstance(file, (list, tuple)):
sizes.append(_get_biggest_file_size_gb(file))
else:
sizes.append(os.path.getsize(file))
return max(sizes) / (1024**3)
136 changes: 135 additions & 1 deletion mriqc/utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,27 @@
#
"""Helper functions."""

from __future__ import annotations

import asyncio
from functools import partial
import json
from os import cpu_count
from collections import OrderedDict
from collections.abc import Iterable
from pathlib import Path
from typing import Callable, TypeVar

import numpy as np
import pandas as pd

try:
from collections.abc import MutableMapping
except ImportError:
from collections.abc import MutableMapping

R = TypeVar("R")

IMTYPES = {
'T1w': 'anat',
'T2w': 'anat',
Expand All @@ -58,6 +67,11 @@
(_rec-(?P<rec_id>[a-zA-Z0-9]+))?(_run-(?P<run_id>[a-zA-Z0-9]+))?\
"""

async def worker(job: Callable[[], R], semaphore) -> R:
async with semaphore:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, job)


def reorder_csv(csv_file, out_file=None):
"""
Expand Down Expand Up @@ -249,7 +263,10 @@ def _flatten_list(xs):
def _datalad_get(input_list, nprocs=None):
from mriqc import config

if not config.execution.bids_dir_datalad:
if (
not config.execution.bids_dir_datalad
or not config.execution.datalad_get
):
return

# Delay datalad import until we're sure we'll need it
Expand All @@ -273,3 +290,120 @@ def _datalad_get(input_list, nprocs=None):
config.nipype.nprocs,
),
)


def _file_meta_and_size(
files: list | str,
):
"""Identify the largest file size (allows multi-echo groups)."""

import os

from mriqc import config

multifile = isinstance(files, (list, tuple))
if multifile:
metadata = []
entities = []
_size_list = []

for filename in files:
metadata_i, entities_i, sizes_i = _file_meta_and_size(filename)
metadata_i["FileSize"] = sizes_i
metadata_i["FileSizeUnits"] = "GB"
# Add to output
metadata.append(metadata_i)
entities.append(entities_i)
_size_list.append(sizes_i)

return metadata, entities, np.sum(_size_list)

metadata = config.execution.layout.get_metadata(files)
entities = config.execution.layout.parse_file_entities(files)
size = os.path.getsize(files) / (1024**3)

metadata["FileSize"] = size
metadata["FileSizeUnits"] = "GB"

try:
nii = nb.load(files)
nifti_len = nii.shape[3]
except nb.filebasedimages.ImageFileError:
nifti_len = None
except IndexError: # shape has only 3 elements
nifti_len = 1 if nii.dataobj.ndim == 3 else -1

metadata["NumberOfVolumes"] = nifti_len

return metadata, entities, size


async def _extract_meta_and_size(
filelist: list,
max_concurrent: int = min(cpu_count(), 12),
):
"""Extract corresponding metadata and file size in GB."""

semaphore = asyncio.Semaphore(max_concurrent)
tasks = []
for filename in filelist:
tasks.append(
asyncio.create_task(
worker(
partial(
_file_meta_and_size,
filename
),
semaphore,
)
)
)

# Gather guarantees the order of the output
metadata, entities, sizes = list(zip(*await asyncio.gather(*tasks)))
return metadata, entities, sizes


def initialize_meta_and_data(
max_concurrent: int = min(cpu_count(), 12),
):
"""
Mine data and metadata corresponding to the dataset.
Get files if datalad enabled and extract the necessary metadata.
"""
from mriqc import config

# Datalad-get all files
dataset = config.workflow.inputs.values()
_datalad_get(dataset)

# Extract metadata and filesize
config.workflow.input_metadata = {}
config.workflow.input_entities = {}
config.workflow.biggest_file_gb = {}
for mod, input_list in config.workflow.inputs.items():
config.loggers.cli.log(
25,
f"Extracting metadata and entities for {len(input_list)} input runs "
f"of modality '{mod}'...",
)

metadata, entities, size = asyncio.run(
_extract_meta_and_size(
input_list,
max_concurrent=max_concurrent,
)
)

_max_size = np.max(size)
config.workflow.input_metadata[mod] = metadata
config.workflow.input_entities[mod] = entities
config.workflow.biggest_file_gb[mod] = _max_size

config.loggers.cli.log(
25,
f"File size ('{mod}'): {_max_size:.2f}|{np.mean(size):.2f} "
"GB [maximum|average].",
)
5 changes: 0 additions & 5 deletions mriqc/workflows/functional/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,6 @@ def fmri_qc_workflow(name='funcMRIQC'):

dataset = config.workflow.inputs.get('bold', [])

if config.execution.datalad_get:
from mriqc.utils.misc import _datalad_get

_datalad_get(dataset)

full_files = []
for bold_path in dataset:
try:
Expand Down

0 comments on commit 82597a5

Please sign in to comment.