From 964ab4a172e17eb0344e3d894b638cf287e0222b Mon Sep 17 00:00:00 2001 From: EmmaRenauld Date: Thu, 15 Feb 2024 13:21:53 -0500 Subject: [PATCH 1/2] Add test files with empty tests --- scilpy/dwi/tests/__init__.py | 0 scilpy/dwi/tests/test_operations.py | 9 +++++++++ scilpy/dwi/tests/test_utils.py | 9 +++++++++ 3 files changed, 18 insertions(+) create mode 100644 scilpy/dwi/tests/__init__.py create mode 100644 scilpy/dwi/tests/test_operations.py create mode 100644 scilpy/dwi/tests/test_utils.py diff --git a/scilpy/dwi/tests/__init__.py b/scilpy/dwi/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/scilpy/dwi/tests/test_operations.py b/scilpy/dwi/tests/test_operations.py new file mode 100644 index 000000000..91d7ccb02 --- /dev/null +++ b/scilpy/dwi/tests/test_operations.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + + +def test_apply_bias_field(): + pass + + +def test_compute_dwi_attenuation(): + pass diff --git a/scilpy/dwi/tests/test_utils.py b/scilpy/dwi/tests/test_utils.py new file mode 100644 index 000000000..f6b4d5d51 --- /dev/null +++ b/scilpy/dwi/tests/test_utils.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + + +def test_extract_dwi_shell(): + pass + + +def test_extract_b0(): + pass From 46ed33cc361023fd4a0f967c886af97a50aca326 Mon Sep 17 00:00:00 2001 From: EmmaRenauld Date: Thu, 15 Feb 2024 13:48:23 -0500 Subject: [PATCH 2/2] Finish cleaning the main of dwi_ scripts, for eventual unit tests --- scilpy/dwi/operations.py | 87 ++++++++++++++++++++++ scilpy/dwi/tests/test_operations.py | 4 + scilpy/io/varian_fdf.py | 4 +- scripts/scil_dwi_detect_volume_outliers.py | 75 ++----------------- 4 files changed, 100 insertions(+), 70 deletions(-) diff --git a/scilpy/dwi/operations.py b/scilpy/dwi/operations.py index 7d6546075..29249e062 100644 --- a/scilpy/dwi/operations.py +++ b/scilpy/dwi/operations.py @@ -1,6 +1,12 @@ import logging +import math +import pprint + import numpy as np +from scilpy.gradients.bvec_bval_tools import identify_shells, \ + round_bvals_to_shell, DEFAULT_B0_THRESHOLD + def apply_bias_field(dwi_data, bias_field_data, mask_data): """ @@ -131,3 +137,84 @@ def compute_dwi_attenuation(dwi_weights: np.ndarray, b0: np.ndarray): dwi_attenuation[np.logical_not(np.isfinite(dwi_attenuation))] = 0. return dwi_attenuation + + +def detect_volume_outliers(data, bvecs, bvals, std_scale, verbose, + b0_thr=DEFAULT_B0_THRESHOLD): + """ + Parameters + ---------- + data: np.ndarray + The dwi data. + bvecs: np.ndarray + The bvecs + bvals: np.array + The b-values vector. + std_scale: float + How many deviation from the mean are required to be considered an + outlier. + verbose: bool + If True, print even more stuff. + b0_thr: float + Value below which b-values are considered as b0. + """ + results_dict = {} + shells_to_extract = identify_shells(bvals, b0_thr, sort=True)[0] + bvals = round_bvals_to_shell(bvals, shells_to_extract) + for bval in shells_to_extract[shells_to_extract > b0_thr]: + shell_idx = np.where(bvals == bval)[0] + shell = bvecs[shell_idx] + results_dict[bval] = np.ones((len(shell), 3)) * -1 + for i, vec in enumerate(shell): + if np.linalg.norm(vec) < 0.001: + continue + + dot_product = np.clip(np.tensordot(shell, vec, axes=1), -1, 1) + angle = np.arccos(dot_product) * 180 / math.pi + angle[np.isnan(angle)] = 0 + idx = np.argpartition(angle, 4).tolist() + idx.remove(i) + + avg_angle = np.average(angle[idx[:3]]) + corr = np.corrcoef([data[..., shell_idx[i]].ravel(), + data[..., shell_idx[idx[0]]].ravel(), + data[..., shell_idx[idx[1]]].ravel(), + data[..., shell_idx[idx[2]]].ravel()]) + results_dict[bval][i] = [shell_idx[i], avg_angle, + np.average(corr[0, 1:])] + + for key in results_dict.keys(): + avg_angle = np.round(np.average(results_dict[key][:, 1]), 4) + std_angle = np.round(np.std(results_dict[key][:, 1]), 4) + + avg_corr = np.round(np.average(results_dict[key][:, 2]), 4) + std_corr = np.round(np.std(results_dict[key][:, 2]), 4) + + outliers_angle = np.argwhere( + results_dict[key][:, 1] < avg_angle - (std_scale * std_angle)) + outliers_corr = np.argwhere( + results_dict[key][:, 2] < avg_corr - (std_scale * std_corr)) + + print('Results for shell {} with {} directions:' + .format(key, len(results_dict[key]))) + print('AVG and STD of angles: {} +/- {}' + .format(avg_angle, std_angle)) + print('AVG and STD of correlations: {} +/- {}' + .format(avg_corr, std_corr)) + + if len(outliers_angle) or len(outliers_corr): + print('Possible outliers ({} STD below or above average):' + .format(std_scale)) + print('Outliers based on angle [position (4D), value]') + for i in outliers_angle: + print(results_dict[key][i, :][0][0:2]) + print('Outliers based on correlation [position (4D), value]') + for i in outliers_corr: + print(results_dict[key][i, :][0][0::2]) + else: + print('No outliers detected.') + + if verbose: + print('Shell with b-value {}'.format(key)) + pprint.pprint(results_dict[key]) + print() diff --git a/scilpy/dwi/tests/test_operations.py b/scilpy/dwi/tests/test_operations.py index 91d7ccb02..7ea12e015 100644 --- a/scilpy/dwi/tests/test_operations.py +++ b/scilpy/dwi/tests/test_operations.py @@ -7,3 +7,7 @@ def test_apply_bias_field(): def test_compute_dwi_attenuation(): pass + + +def test_detect_volume_outliers(): + pass diff --git a/scilpy/io/varian_fdf.py b/scilpy/io/varian_fdf.py index 6deb6f546..7736d42a7 100644 --- a/scilpy/io/varian_fdf.py +++ b/scilpy/io/varian_fdf.py @@ -424,7 +424,9 @@ def correct_procpar_intensity(dwi_data, dwi_path, b0_path): P1 = 10**(Ldb/20) """ - + # Not really an io function: does not load and save! But it is the only + # method we have concerning varian_fdf. Kept here, as decided by guru + # Arnaud. dwi_gain = get_gain(dwi_path) b0_gain = get_gain(b0_path) diff --git a/scripts/scil_dwi_detect_volume_outliers.py b/scripts/scil_dwi_detect_volume_outliers.py index f32bbefce..c5af9833c 100755 --- a/scripts/scil_dwi_detect_volume_outliers.py +++ b/scripts/scil_dwi_detect_volume_outliers.py @@ -15,20 +15,16 @@ """ import argparse -import pprint from dipy.io.gradients import read_bvals_bvecs import nibabel as nib -import numpy as np +from scilpy.dwi.operations import detect_volume_outliers from scilpy.io.utils import (assert_inputs_exist, add_force_b0_arg, add_verbose_arg) from scilpy.gradients.bvec_bval_tools import (check_b0_threshold, - identify_shells, - normalize_bvecs, - round_bvals_to_shell) -import math + normalize_bvecs) def _build_arg_parser(): @@ -49,7 +45,7 @@ def _build_arg_parser(): 'diffusion weighting. [%(default)s]') p.add_argument('--std_scale', type=float, default=2.0, help='How many deviation from the mean are required to be ' - 'considered an outliers. [%(default)s]') + 'considered an outlier. [%(default)s]') add_force_b0_arg(p) add_verbose_arg(p) @@ -64,73 +60,14 @@ def main(): assert_inputs_exist(parser, [args.in_dwi, args.in_bval, args.in_bvec]) bvals, bvecs = read_bvals_bvecs(args.in_bval, args.in_bvec) - dwi = nib.load(args.in_dwi) - data = dwi.get_fdata() + data = nib.load(args.in_dwi).get_fdata() b0_thr = check_b0_threshold(args.force_b0_threshold, bvals.min(), args.b0_thr) bvecs = normalize_bvecs(bvecs) - results_dict = {} - shells_to_extract = identify_shells(bvals, b0_thr, sort=True)[0] - bvals = round_bvals_to_shell(bvals, shells_to_extract) - for bval in shells_to_extract[shells_to_extract > args.b0_thr]: - shell_idx = np.where(bvals == bval)[0] - shell = bvecs[shell_idx] - results_dict[bval] = np.ones((len(shell), 3)) * -1 - for i, vec in enumerate(shell): - if np.linalg.norm(vec) < 0.001: - continue - - dot_product = np.clip(np.tensordot(shell, vec, axes=1), -1, 1) - angle = np.arccos(dot_product) * 180 / math.pi - angle[np.isnan(angle)] = 0 - idx = np.argpartition(angle, 4).tolist() - idx.remove(i) - - avg_angle = np.average(angle[idx[:3]]) - corr = np.corrcoef([data[..., shell_idx[i]].ravel(), - data[..., shell_idx[idx[0]]].ravel(), - data[..., shell_idx[idx[1]]].ravel(), - data[..., shell_idx[idx[2]]].ravel()]) - results_dict[bval][i] = [shell_idx[i], avg_angle, - np.average(corr[0, 1:])] - - for key in results_dict.keys(): - avg_angle = np.round(np.average(results_dict[key][:, 1]), 4) - std_angle = np.round(np.std(results_dict[key][:, 1]), 4) - - avg_corr = np.round(np.average(results_dict[key][:, 2]), 4) - std_corr = np.round(np.std(results_dict[key][:, 2]), 4) - - outliers_angle = np.argwhere( - results_dict[key][:, 1] < avg_angle-(args.std_scale*std_angle)) - outliers_corr = np.argwhere( - results_dict[key][:, 2] < avg_corr-(args.std_scale*std_corr)) - - print('Results for shell {} with {} directions:'.format( - key, len(results_dict[key]))) - print('AVG and STD of angles: {} +/- {}'.format( - avg_angle, std_angle)) - print('AVG and STD of correlations: {} +/- {}'.format( - avg_corr, std_corr)) - - if len(outliers_angle) or len(outliers_corr): - print('Possible outliers ({} STD below or above average):'.format( - args.std_scale)) - print('Outliers based on angle [position (4D), value]') - for i in outliers_angle: - print(results_dict[key][i, :][0][0:2]) - print('Outliers based on correlation [position (4D), value]') - for i in outliers_corr: - print(results_dict[key][i, :][0][0::2]) - else: - print('No outliers detected.') - - if args.verbose: - print('Shell with b-value {}'.format(key)) - pprint.pprint(results_dict[key]) - print() + detect_volume_outliers(data, bvecs, bvals, args.std_scale, + args.verbose, b0_thr) if __name__ == "__main__":