-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: integrate analyze readii outputs functions #79
Changes from all commits
95586e5
5193fb4
a0771c6
cf26afc
e643349
f5882da
5495550
decf8e5
fcc1b9e
a708182
d706863
d63a1c5
484c12e
c6b945f
fc83d69
46f0773
fe56257
e618269
a6ab888
0f1d837
5b0dccc
0d9c943
5b4e5cb
b36f3d2
fa4da89
0256466
f0b87c2
d44e1ce
bfdc357
1e89c17
948b426
86f13ec
7842ebe
81b884a
de2dd2c
1d49ec1
8e0868f
45b8fb0
6b84ef8
61cdedd
e021051
730361b
1f4edf2
de1c752
2647168
253aba2
31bf5bf
231c390
40c1cba
550c32a
0d36600
187b1cb
b1daaf0
6075966
501e20d
5ea0b99
da16d68
0c8ccbf
2ab08e6
90839a2
80b81a7
b0a892d
edaf74c
dc2e86a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,3 @@ | ||
# read version from installed package | ||
from importlib.metadata import version | ||
__version__ = "1.18.0" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,230 @@ | ||
import pandas as pd | ||
from typing import Optional | ||
import matplotlib.pyplot as plt | ||
import seaborn as sns | ||
import numpy as np | ||
|
||
|
||
def getFeatureCorrelations(vertical_features:pd.DataFrame, | ||
horizontal_features:pd.DataFrame, | ||
method:str = "pearson", | ||
vertical_feature_name:str = '_vertical', | ||
horizontal_feature_name:str = '_horizontal'): | ||
""" Function to calculate correlation between two sets of features. | ||
Parameters | ||
---------- | ||
vertical_features : pd.DataFrame | ||
Dataframe containing features to calculate correlations with. Index must be the same as the index of the horizontal_features dataframe. | ||
horizontal_features : pd.DataFrame | ||
Dataframe containing features to calculate correlations with. Index must be the same as the index of the vertical_features dataframe. | ||
method : str | ||
Method to use for calculating correlations. Default is "pearson". | ||
vertical_feature_name : str | ||
Name of the vertical features to use as suffix in correlation dataframe. Default is blank "". | ||
horizontal_feature_name : str | ||
Name of the horizontal features to use as suffix in correlation dataframe. Default is blank "". | ||
Returns | ||
------- | ||
correlation_matrix : pd.DataFrame | ||
Dataframe containing correlation values. | ||
""" | ||
# Check that features are dataframes | ||
if not isinstance(vertical_features, pd.DataFrame): | ||
raise TypeError("vertical_features must be a pandas DataFrame") | ||
if not isinstance(horizontal_features, pd.DataFrame): | ||
raise TypeError("horizontal_features must be a pandas DataFrame") | ||
|
||
|
||
if method not in ["pearson", "spearman", "kendall"]: | ||
raise ValueError("Correlation method must be one of 'pearson', 'spearman', or 'kendall'.") | ||
|
||
if not vertical_features.index.equals(horizontal_features.index): | ||
raise ValueError("Vertical and horizontal features must have the same index to calculate correlation. Set the index to the intersection of patient IDs.") | ||
|
||
# Add _ to beginnging of feature names if they don't start with _ so they can be used as suffixes | ||
if not vertical_feature_name.startswith("_"): vertical_feature_name = f"_{vertical_feature_name}" | ||
if not horizontal_feature_name.startswith("_"): horizontal_feature_name = f"_{horizontal_feature_name}" | ||
|
||
# Join the features into one dataframe | ||
# Use inner join to keep only the rows that have a value in both vertical and horizontal features | ||
features_to_correlate = vertical_features.join(horizontal_features, | ||
how='inner', | ||
lsuffix=vertical_feature_name, | ||
rsuffix=horizontal_feature_name) | ||
|
||
try: | ||
# Calculate correlation between vertical features and horizontal features | ||
correlation_matrix = features_to_correlate.corr(method=method) | ||
except Exception as e: | ||
raise ValueError(f"Error calculating correlation matrix: {e}") | ||
|
||
return correlation_matrix | ||
|
||
|
||
def plotCorrelationHeatmap(correlation_matrix_df:pd.DataFrame, | ||
diagonal:Optional[bool] = False, | ||
triangle:Optional[str] = "lower", | ||
cmap:Optional[str] = "nipy_spectral", | ||
xlabel:Optional[str] = "", | ||
ylabel:Optional[str] = "", | ||
title:Optional[str] = "", | ||
subtitle:Optional[str] = "", | ||
show_tick_labels:Optional[bool] = False | ||
): | ||
"""Function to plot a correlation heatmap. | ||
Parameters | ||
---------- | ||
correlation_matrix_df : pd.DataFrame | ||
Dataframe containing the correlation matrix to plot. | ||
diagonal : bool, optional | ||
Whether to only plot half of the matrix. The default is False. | ||
triangle : str, optional | ||
Which triangle half of the matrixto plot. The default is "lower". | ||
xlabel : str, optional | ||
Label for the x-axis. The default is "". | ||
ylabel : str, optional | ||
Label for the y-axis. The default is "". | ||
title : str, optional | ||
Title for the plot. The default is "". | ||
subtitle : str, optional | ||
Subtitle for the plot. The default is "". | ||
show_tick_labels : bool, optional | ||
Whether to show the tick labels on the x and y axes. These would be the feature names. The default is False. | ||
Returns | ||
------- | ||
corr_fig : matplotlib.pyplot.figure | ||
Figure object containing a Seaborn heatmap. | ||
""" | ||
|
||
if diagonal: | ||
# Set up mask for hiding half the matrix in the plot | ||
if triangle == "lower": | ||
# Mask out the upper right triangle half of the matrix | ||
mask = np.triu(correlation_matrix_df) | ||
elif triangle == "upper": | ||
# Mask out the lower left triangle half of the matrix | ||
mask = np.tril(correlation_matrix_df) | ||
else: | ||
raise ValueError("If diagonal is True, triangle must be either 'lower' or 'upper'.") | ||
else: | ||
# The entire correlation matrix will be visisble in the plot | ||
mask = None | ||
|
||
# Set a default title if one is not provided | ||
if not title: | ||
title = "Correlation Heatmap" | ||
|
||
# Set up figure and axes for the plot | ||
corr_fig, corr_ax = plt.subplots() | ||
|
||
# Plot the correlation matrix | ||
corr_ax = sns.heatmap(correlation_matrix_df, | ||
mask = mask, | ||
cmap=cmap, | ||
vmin=-1.0, | ||
vmax=1.0) | ||
|
||
if not show_tick_labels: | ||
# Remove the individual feature names from the axes | ||
corr_ax.set_xticklabels(labels=[]) | ||
corr_ax.set_yticklabels(labels=[]) | ||
|
||
# Set axis labels | ||
corr_ax.set_xlabel(xlabel) | ||
corr_ax.set_ylabel(ylabel) | ||
|
||
# Set title and subtitle | ||
# Suptitle is the super title, which will be above the title | ||
plt.title(subtitle, fontsize=12) | ||
plt.suptitle(title, fontsize=14) | ||
|
||
return corr_fig | ||
|
||
|
||
|
||
def getVerticalSelfCorrelations(correlation_matrix:pd.DataFrame, | ||
num_vertical_features:int): | ||
""" Function to get the vertical (y-axis) self correlations from a correlation matrix. Gets the top left quadrant of the correlation matrix. | ||
Parameters | ||
---------- | ||
correlation_matrix : pd.DataFrame | ||
Dataframe containing the correlation matrix to get the vertical self correlations from. | ||
num_vertical_features : int | ||
Number of vertical features in the correlation matrix. | ||
Returns | ||
------- | ||
pd.DataFrame | ||
Dataframe containing the vertical self correlations from the correlation matrix. | ||
""" | ||
if num_vertical_features > correlation_matrix.shape[0]: | ||
raise ValueError(f"Number of vertical features ({num_vertical_features}) is greater than the number of rows in the correlation matrix ({correlation_matrix.shape[0]}).") | ||
|
||
if num_vertical_features > correlation_matrix.shape[1]: | ||
raise ValueError(f"Number of vertical features ({num_vertical_features}) is greater than the number of columns in the correlation matrix ({correlation_matrix.shape[1]}).") | ||
|
||
# Get the correlation matrix for vertical vs vertical - this is the top left corner of the matrix | ||
return correlation_matrix.iloc[0:num_vertical_features, 0:num_vertical_features] | ||
|
||
|
||
|
||
def getHorizontalSelfCorrelations(correlation_matrix:pd.DataFrame, | ||
num_horizontal_features:int): | ||
""" Function to get the horizontal (x-axis) self correlations from a correlation matrix. Gets the bottom right quadrant of the correlation matrix. | ||
Parameters | ||
---------- | ||
correlation_matrix : pd.DataFrame | ||
Dataframe containing the correlation matrix to get the horizontal self correlations from. | ||
num_horizontal_features : int | ||
Number of horizontal features in the correlation matrix. | ||
Returns | ||
------- | ||
pd.DataFrame | ||
Dataframe containing the horizontal self correlations from the correlation matrix. | ||
""" | ||
|
||
if num_horizontal_features > correlation_matrix.shape[0]: | ||
raise ValueError(f"Number of horizontal features ({num_horizontal_features}) is greater than the number of rows in the correlation matrix ({correlation_matrix.shape[0]}).") | ||
|
||
if num_horizontal_features > correlation_matrix.shape[1]: | ||
raise ValueError(f"Number of horizontal features ({num_horizontal_features}) is greater than the number of columns in the correlation matrix ({correlation_matrix.shape[1]}).") | ||
|
||
# Get the index of the start of the horizontal correlations | ||
start_of_horizontal_correlations = len(correlation_matrix.columns) - num_horizontal_features | ||
|
||
# Get the correlation matrix for horizontal vs horizontal - this is the bottom right corner of the matrix | ||
return correlation_matrix.iloc[start_of_horizontal_correlations:, start_of_horizontal_correlations:] | ||
|
||
|
||
|
||
def getCrossCorrelationMatrix(correlation_matrix:pd.DataFrame, | ||
num_vertical_features:int): | ||
""" Function to get the cross correlation matrix subsection for a correlation matrix. Gets the top right quadrant of the correlation matrix so vertical and horizontal features are correctly labeled. | ||
Parameters | ||
---------- | ||
correlation_matrix : pd.DataFrame | ||
Dataframe containing the correlation matrix to get the cross correlation matrix subsection from. | ||
num_vertical_features : int | ||
Number of vertical features in the correlation matrix. | ||
Returns | ||
------- | ||
pd.DataFrame | ||
Dataframe containing the cross correlations from the correlation matrix. | ||
""" | ||
|
||
if num_vertical_features > correlation_matrix.shape[0]: | ||
raise ValueError(f"Number of vertical features ({num_vertical_features}) is greater than the number of rows in the correlation matrix ({correlation_matrix.shape[0]}).") | ||
|
||
if num_vertical_features > correlation_matrix.shape[1]: | ||
raise ValueError(f"Number of vertical features ({num_vertical_features}) is greater than the number of columns in the correlation matrix ({correlation_matrix.shape[1]}).") | ||
|
||
return correlation_matrix.iloc[0:num_vertical_features, num_vertical_features:] |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,296 @@ | ||||||
import re | ||||||
from pandas import DataFrame, Series | ||||||
import numpy as np | ||||||
import pandas as pd | ||||||
from typing import Optional | ||||||
|
||||||
from readii.data.split import replaceColumnValues | ||||||
|
||||||
def getPatientIdentifierLabel(dataframe_to_search:DataFrame) -> str: | ||||||
"""Function to find a column in a dataframe that contains some form of patient ID or case ID (case-insensitive). | ||||||
If multiple found, will return the first match. | ||||||
Current regex is: '(pat)?(ient)?(case)?(\s|.)?(id|#)' | ||||||
Parameters | ||||||
---------- | ||||||
dataframe_to_search : DataFrame | ||||||
Dataframe to look for a patient ID column in. | ||||||
Returns | ||||||
------- | ||||||
str | ||||||
Label for patient identifier column from the dataframe. | ||||||
""" | ||||||
|
||||||
# regex to get patient identifier column name in the dataframes | ||||||
# catches case-insensitive variations of patient_id, patid, pat id, case_id, case id, caseid, id | ||||||
id_search_term = re.compile(pattern= r'(pat)?(ient)?(case)?(\s|.)?(id|#)', flags=re.IGNORECASE) | ||||||
|
||||||
# Get any columns from the dataframe based on the regex | ||||||
patient_identifier = dataframe_to_search.filter(regex=id_search_term).columns.to_list() | ||||||
|
||||||
if len(patient_identifier) > 1: | ||||||
print(f"Multiple patient identifier labels found. Using {patient_identifier[0]}.") | ||||||
|
||||||
elif len(patient_identifier) == 0: | ||||||
raise ValueError("Dataframe doesn't have a recognizeable patient ID column. Must contain patient or case ID.") | ||||||
|
||||||
return patient_identifier[0] | ||||||
|
||||||
|
||||||
|
||||||
def setPatientIdAsIndex(dataframe_to_index:DataFrame, | ||||||
patient_id_col:str = None): | ||||||
""" Function to set the patient ID column as the index of a dataframe. | ||||||
Parameters | ||||||
---------- | ||||||
dataframe : DataFrame | ||||||
Dataframe to set the patient ID column as the index. | ||||||
patient_id_col : str, optional | ||||||
Name of the patient ID column to use as the index. If not provided, will find the patient ID column with getPatientIdentifierLabel. | ||||||
""" | ||||||
if not patient_id_col: | ||||||
patient_id_col = getPatientIdentifierLabel(dataframe_to_index) | ||||||
|
||||||
pat_indexed_dataframe = dataframe_to_index.set_index(patient_id_col) | ||||||
return pat_indexed_dataframe | ||||||
|
||||||
|
||||||
|
||||||
def convertDaysToYears(dataframe_with_outcome:DataFrame, | ||||||
time_column_label:str, | ||||||
divide_by:int = 365): | ||||||
""" Function to create a copy of a time outcome column mesaured in days and convert it to years. | ||||||
Parameters | ||||||
---------- | ||||||
dataframe_with_outcome : DataFrame | ||||||
Dataframe containing the outcome column to convert. | ||||||
time_column_label : str | ||||||
Label for the time column to convert in the dataframe. | ||||||
divide_by : int, optional | ||||||
Value to divide the time column by. The default is 365. | ||||||
Returns | ||||||
------- | ||||||
dataframe_with_outcome : DataFrame | ||||||
Dataframe with a copy of the specified time column converted to years. | ||||||
""" | ||||||
|
||||||
# Set up the new column name for the converted time column | ||||||
years_column_label = time_column_label + "_years" | ||||||
# Make a copy of the time column with the values converted from days to years and add suffic _years to the column name | ||||||
dataframe_with_outcome[years_column_label] = dataframe_with_outcome[time_column_label] / divide_by | ||||||
|
||||||
return dataframe_with_outcome | ||||||
|
||||||
|
||||||
|
||||||
def timeOutcomeColumnSetup(dataframe_with_outcome:DataFrame, | ||||||
outcome_column_label:str, | ||||||
standard_column_label:str, | ||||||
convert_to_years:bool = False, | ||||||
): | ||||||
""" Function to set up a time outcome column in a dataframe. Makes a copy of the specified outcome column with a standardized column name and converts it to years if specified. | ||||||
Parameters | ||||||
---------- | ||||||
dataframe_with_outcome : DataFrame | ||||||
Dataframe containing the outcome column to convert. | ||||||
outcome_column_label : str | ||||||
Label for the outcome column to convert in the dataframe. | ||||||
standard_column_label : str | ||||||
Name of the column to save the standardized outcome column as. | ||||||
convert_to_years : bool, optional | ||||||
Whether to convert the time column to years. The default is False. | ||||||
Returns | ||||||
------- | ||||||
dataframe_with_outcome : DataFrame | ||||||
Dataframe with a copy of the specified outcome column converted to years. | ||||||
""" | ||||||
|
||||||
# Check if the outcome column is numeric | ||||||
if not np.issubdtype(dataframe_with_outcome[outcome_column_label].dtype, np.number): | ||||||
raise ValueError(f"{outcome_column_label} is not numeric. Please confirm outcome_column_label is the correct column or convert the column in the dataframe to numeric.") | ||||||
else: | ||||||
# Make a copy of the dataframe to work on | ||||||
dataframe_with_standardized_outcome = dataframe_with_outcome.copy() | ||||||
|
||||||
|
||||||
if convert_to_years: | ||||||
# Create a copy of the outcome column and convert it to years | ||||||
dataframe_with_standardized_outcome = convertDaysToYears(dataframe_with_standardized_outcome, outcome_column_label) | ||||||
|
||||||
# Rename the converted column with the standardized name | ||||||
dataframe_with_standardized_outcome.rename(columns={f"{outcome_column_label}_years": standard_column_label}, inplace=True) | ||||||
else: | ||||||
|
||||||
# Make a copy of the outcome column with the standardized name | ||||||
dataframe_with_standardized_outcome[standard_column_label] = dataframe_with_standardized_outcome[outcome_column_label] | ||||||
|
||||||
return dataframe_with_standardized_outcome | ||||||
|
||||||
|
||||||
|
||||||
def survivalStatusToNumericMapping(event_outcome_column:Series): | ||||||
"""Convert a survival status column to a numeric column by iterating over unique values and assigning a numeric value to each. | ||||||
Alive values will be assigned a value of 0, and dead values will be assigned a value of 1. | ||||||
If "alive" is present, next event value index will start at 1. If "dead" is present, next event value index will start at 2. | ||||||
Any NaN values will be assigned the value "unknown", then converted to a numeric value. | ||||||
Parameters | ||||||
---------- | ||||||
event_outcome_column : Series | ||||||
Series containing the survival status values. | ||||||
Returns | ||||||
------- | ||||||
event_column_value_mapping : dict | ||||||
Dictionary mapping the survival status values to numeric values. | ||||||
""" | ||||||
|
||||||
# Create a dictionary to map event values to numeric values | ||||||
event_column_value_mapping = {} | ||||||
# Get a list of all unique event values, set NaN values to unknown, set remaining values to lower case | ||||||
existing_event_values = event_outcome_column.fillna("unknown").str.lower().unique() | ||||||
|
||||||
# Set the conversion value for the first event value to 0 | ||||||
other_event_num_value = 0 | ||||||
|
||||||
# See if alive is present, and set the conversion value for alive to 0 | ||||||
if "alive" in existing_event_values: | ||||||
event_column_value_mapping['alive'] = 0 | ||||||
# Remove alive from the list of existing event values | ||||||
existing_event_values.remove("alive") | ||||||
# Update starting value for other event values | ||||||
other_event_num_value = 1 | ||||||
|
||||||
# See if dead is present, and set the conversion value for dead to 1 | ||||||
if "dead" in existing_event_values: | ||||||
event_column_value_mapping['dead'] = 1 | ||||||
# Remove dead from the list of existing event values | ||||||
existing_event_values.remove("dead") | ||||||
# Update starting value for other event values | ||||||
other_event_num_value = 2 | ||||||
|
||||||
# Set the conversion value for the other event values to the next available value | ||||||
for other_event_value in existing_event_values: | ||||||
event_column_value_mapping[other_event_value] = other_event_num_value | ||||||
other_event_num_value += 1 | ||||||
|
||||||
|
||||||
return event_column_value_mapping | ||||||
|
||||||
|
||||||
|
||||||
def eventOutcomeColumnSetup(dataframe_with_outcome:DataFrame, | ||||||
outcome_column_label:str, | ||||||
standard_column_label:str, | ||||||
event_column_value_mapping:Optional[dict]=None): | ||||||
""" Function to set up an event outcome column in a dataframe. | ||||||
Parameters | ||||||
---------- | ||||||
dataframe_with_outcome : DataFrame | ||||||
Dataframe containing the outcome column to convert. | ||||||
outcome_column_label : str | ||||||
Label for the outcome column to convert in the dataframe. | ||||||
standard_column_label : str | ||||||
Name of the column to save the standardized outcome column as. | ||||||
event_column_value_mapping : dict, optional | ||||||
Dictionary of event values to convert to numeric values. Keys are the event values, values are the numeric values. If provided, all event values in the outcome column must be handled by the dictionary. | ||||||
If not provided, will attempt to convert based on the values in the outcome column. By default alive and dead will be converted to 0 and 1, respectively, if present in the outcome column. | ||||||
The default is an empty dictionary. | ||||||
Returns | ||||||
------- | ||||||
dataframe_with_standardized_outcome : DataFrame | ||||||
Dataframe with a copy of the specified outcome column converted to numeric values. | ||||||
""" | ||||||
|
||||||
# Get the type of the existing event column | ||||||
event_variable_type = dataframe_with_outcome[outcome_column_label].dtype | ||||||
|
||||||
# Make a copy of the dataframe to work on | ||||||
dataframe_with_standardized_outcome = dataframe_with_outcome.copy() | ||||||
|
||||||
# Handle numeric event column | ||||||
if np.issubdtype(event_variable_type, np.number): | ||||||
dataframe_with_standardized_outcome[standard_column_label] = dataframe_with_outcome[outcome_column_label] | ||||||
|
||||||
# Handle boolean event column | ||||||
elif np.issubdtype(event_variable_type, np.bool_): | ||||||
dataframe_with_standardized_outcome[standard_column_label] = dataframe_with_outcome[outcome_column_label].astype(int) | ||||||
|
||||||
# Handle string event column | ||||||
elif np.issubdtype(event_variable_type, np.object_): | ||||||
try: | ||||||
# Make values of outcome column lowercase | ||||||
dataframe_with_standardized_outcome[outcome_column_label] = dataframe_with_outcome[outcome_column_label].str.lower() | ||||||
except Exception as e: | ||||||
raise ValueError(f"Error converting string event column {outcome_column_label} to lowercase. Please check the column is a string type and try again.") from e | ||||||
|
||||||
# Get the existing event values in the provided dataframe and and sort them | ||||||
existing_event_values = sorted(dataframe_with_standardized_outcome[outcome_column_label].unique()) | ||||||
|
||||||
if not event_column_value_mapping: | ||||||
# Create a dictionary to map event values to numeric values | ||||||
event_column_value_mapping = survivalStatusToNumericMapping(event_outcome_column=dataframe_with_standardized_outcome[outcome_column_label]) | ||||||
|
||||||
else: | ||||||
# Convert all dictionary keys in provided mapping to lowercase | ||||||
event_column_value_mapping = dict((status.lower(), value) for status, value in event_column_value_mapping.items()) | ||||||
|
||||||
# Check if user provided dictionary handles all event values in the outcome column | ||||||
if set(existing_event_values) != set(event_column_value_mapping.keys()): | ||||||
raise ValueError(f"Not all event values in {outcome_column_label} are handled by the provided event_column_value_mapping dictionary.") | ||||||
|
||||||
# TODO: add handling for values not in the dictionary | ||||||
|
||||||
Comment on lines
+251
to
+252
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implement handling for unmapped event values There is a Would you like assistance in implementing this feature? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes I would like assistance. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @strixy16, glad to assist! Here's an updated version of the def eventOutcomeColumnSetup(dataframe_with_outcome: DataFrame,
outcome_column_label: str,
standard_column_label: str,
event_column_value_mapping: Optional[dict] = None):
"""Function to set up an event outcome column in a dataframe.
Parameters
----------
dataframe_with_outcome : DataFrame
Dataframe containing the outcome column to convert.
outcome_column_label : str
Label for the outcome column to convert in the dataframe.
standard_column_label : str
Name of the column to save the standardized outcome column as.
event_column_value_mapping : dict, optional
Dictionary of event values to convert to numeric values. Keys are the event values, values are the numeric values.
If provided, any unmapped event values will be assigned new numeric values.
If not provided, will attempt to convert based on the values in the outcome column. By default, "alive" and "dead"
will be converted to 0 and 1, respectively, if present in the outcome column.
Returns
-------
dataframe_with_standardized_outcome : DataFrame
Dataframe with a copy of the specified outcome column converted to numeric values.
"""
# Get the type of the existing event column
event_variable_type = type(dataframe_with_outcome[outcome_column_label].iloc[0])
# Make a copy of the dataframe to work on
dataframe_with_standardized_outcome = dataframe_with_outcome.copy()
# Handle numeric event column
if np.issubdtype(event_variable_type, np.number):
dataframe_with_standardized_outcome[standard_column_label] = dataframe_with_outcome[outcome_column_label]
# Handle boolean event column
elif np.issubdtype(event_variable_type, np.bool_):
dataframe_with_standardized_outcome[standard_column_label] = dataframe_with_outcome[outcome_column_label].astype(int)
# Handle string event column
elif np.issubdtype(event_variable_type, np.str_):
# Make values of outcome column lowercase
dataframe_with_standardized_outcome[outcome_column_label] = dataframe_with_outcome[outcome_column_label].str.lower()
# Get the existing event values in the provided dataframe
existing_event_values = sorted(dataframe_with_standardized_outcome[outcome_column_label].unique())
if not event_column_value_mapping:
# Create a dictionary to map event values to numeric values
event_column_value_mapping = survivalStatusToNumericMapping(
event_outcome_column=dataframe_with_standardized_outcome[outcome_column_label]
)
else:
# Convert all dictionary keys in provided mapping to lowercase
event_column_value_mapping = dict((status.lower(), value)
for status, value in event_column_value_mapping.items())
# Identify any values not in the mapping
unmapped_values = [value for value in existing_event_values
if value not in event_column_value_mapping]
if unmapped_values:
print(f"Unmapped event values found: {unmapped_values}. They will be assigned new numeric values.")
# Assign next available numeric value to unmapped event values
max_mapped_value = max(event_column_value_mapping.values(), default=-1)
for i, value in enumerate(unmapped_values, start=max_mapped_value + 1):
event_column_value_mapping[value] = i
# Map the event column using the updated mapping
dataframe_with_standardized_outcome[standard_column_label] = \
dataframe_with_standardized_outcome[outcome_column_label].map(event_column_value_mapping)
else:
raise ValueError(f"Unsupported data type for outcome column: {event_variable_type}")
return dataframe_with_standardized_outcome Explanation:
This implementation ensures that any unexpected or unmapped event values are handled gracefully, preventing errors during execution. Let me know if you have any questions or need further assistance! |
||||||
# Set up the new column name for the converted event column | ||||||
dataframe_with_standardized_outcome[standard_column_label] = dataframe_with_standardized_outcome[outcome_column_label] | ||||||
|
||||||
# Swap the keys and values in the mapping dictionary to use the replaceColumnValues function | ||||||
# So the numeric values will be the keys and the string event values will be the values | ||||||
replacement_value_data = dict([num_event, str_event] for str_event, num_event in event_column_value_mapping.items()) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct the key-value order in The dictionary comprehension for Apply this diff to fix the key-value pair order: - replacement_value_data = dict([num_event, str_event] for str_event, num_event in event_column_value_mapping.items())
+ replacement_value_data = {num_event: str_event for str_event, num_event in event_column_value_mapping.items()} Ensure that the keys in 📝 Committable suggestion
Suggested change
|
||||||
|
||||||
# Replace the string event values in the standardized column with the numeric event values | ||||||
dataframe_with_standardized_outcome = replaceColumnValues(dataframe_with_standardized_outcome, | ||||||
column_to_change=standard_column_label, | ||||||
replacement_value_data=replacement_value_data) | ||||||
|
||||||
# end string handling | ||||||
else: | ||||||
raise TypeError(f"Event column {outcome_column_label} is not a valid type. Must be a string, boolean, or numeric.") | ||||||
|
||||||
|
||||||
return dataframe_with_standardized_outcome | ||||||
|
||||||
|
||||||
|
||||||
def addOutcomeLabels(feature_data_to_label:DataFrame, | ||||||
clinical_data:DataFrame, | ||||||
outcome_labels:Optional[list] = None): | ||||||
""" Function to add survival labels to a feature dataframe based on a clinical dataframe. | ||||||
Parameters | ||||||
---------- | ||||||
feature_data_to_label : DataFrame | ||||||
Dataframe containing the feature data to add survival labels to. | ||||||
clinical_data : DataFrame | ||||||
Dataframe containing the clinical data to use for survival labels. | ||||||
outcome_labels : list, optional | ||||||
List of outcome labels to extract from the clinical dataframe. The default is ["survival_time_in_years", "survival_event_binary"]. | ||||||
""" | ||||||
if outcome_labels is None: | ||||||
outcome_labels = ["survival_time_in_years", "survival_event_binary"] | ||||||
|
||||||
# Get the survival time and event columns as a dataframe | ||||||
outcome_label_columns = clinical_data[outcome_labels] | ||||||
|
||||||
# Join the outcome label dataframe to the feature data dataframe | ||||||
outcome_labelled_feature_data = outcome_label_columns.join(feature_data_to_label) | ||||||
return outcome_labelled_feature_data |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
from pandas import DataFrame | ||
from typing import Optional | ||
|
||
import pandas as pd | ||
|
||
from .label import setPatientIdAsIndex | ||
|
||
def dropUpToFeature(dataframe:DataFrame, | ||
feature_name:str, | ||
keep_feature_name_column:Optional[bool] = False | ||
): | ||
""" Function to drop all columns up to and possibly including the specified feature. | ||
Parameters | ||
---------- | ||
dataframe : DataFrame | ||
Dataframe to drop columns from. | ||
feature_name : str | ||
Name of the feature to drop up to. | ||
keep_feature_name_column : bool, optional | ||
Whether to keep the specified feature name column in the dataframe or drop it. The default is False. | ||
Returns | ||
------- | ||
dataframe : DataFrame | ||
Dataframe with all columns up to and including the specified feature dropped. | ||
""" | ||
try: | ||
if keep_feature_name_column: | ||
# Get the column names up to but not including the specified feature | ||
column_names = dataframe.columns.to_list()[:dataframe.columns.get_loc(feature_name)] | ||
else: | ||
# Get the column names up to and including the specified feature | ||
column_names = dataframe.columns.to_list()[:dataframe.columns.get_loc(feature_name)+1] | ||
|
||
# Drop all columns up to and including the specified feature | ||
dataframe_dropped_columns = dataframe.drop(columns=column_names) | ||
|
||
return dataframe_dropped_columns | ||
|
||
except KeyError: | ||
print(f"Feature {feature_name} was not found as a column in dataframe. No columns dropped.") | ||
return dataframe | ||
|
||
except Exception as e: | ||
print(f"An error occurred: {e}") | ||
return None | ||
|
||
|
||
|
||
def selectByColumnValue(dataframe:DataFrame, | ||
include_col_values:Optional[dict] = None, | ||
exclude_col_values:Optional[dict] = None) -> pd.DataFrame: | ||
""" | ||
Get rows of pandas DataFrame based on row values in the columns labelled as keys of the include_col_values and not in the keys of the exclude_col_values. | ||
Include variables will be processed first, then exclude variables, in the order they are provided in the corresponding dictionaries. | ||
Parameters | ||
---------- | ||
dataframeToSubset : pd.DataFrame | ||
Dataframe to subset. | ||
include_col_values : dict | ||
Dictionary of column names and values to include in the subset. ex. {"column_name": ["value1", "value2"]} | ||
exclude_col_values : dict | ||
Dictionary of column names and values to exclude from the subset. ex. {"column_name": ["value1", "value2"]} | ||
Returns | ||
------- | ||
pd.DataFrame | ||
Subset of the input dataframe. | ||
""" | ||
try: | ||
if (include_col_values is None) and (exclude_col_values is None): | ||
raise ValueError("Must provide one of include_col_values or exclude_col_values.") | ||
|
||
if include_col_values is not None: | ||
for key in include_col_values.keys(): | ||
if key in ["Index", "index"]: | ||
dataframe = dataframe[dataframe.index.isin(include_col_values[key])] | ||
else: | ||
dataframe = dataframe[dataframe[key].isin(include_col_values[key])] | ||
|
||
if exclude_col_values is not None: | ||
for key in exclude_col_values.keys(): | ||
if key in ["Index", "index"]: | ||
dataframe = dataframe[~dataframe.index.isin(exclude_col_values[key])] | ||
else: | ||
dataframe = dataframe[~dataframe[key].isin(exclude_col_values[key])] | ||
|
||
return dataframe | ||
|
||
except Exception as e: | ||
print(f"An error occurred: {e}") | ||
return None | ||
|
||
|
||
def getOnlyPyradiomicsFeatures(radiomic_data:DataFrame): | ||
""" Function to get out just the features from a PyRadiomics output that includes metadata/diagnostics columns before the features. | ||
Will look for the last diagnostics column or the first PyRadiomics feature column with the word "original" in it | ||
Parameters | ||
---------- | ||
radiomic_data : DataFrame | ||
Dataframe of Pyradiomics features with diagnostics and other columns before the features | ||
Returns | ||
------- | ||
pyradiomic_features_only : DataFrame | ||
Dataframe with just the radiomic features | ||
""" | ||
# Find all the columns that begin with diagnostics | ||
diagnostic_data = radiomic_data.filter(regex=r"diagnostics_*") | ||
|
||
if not diagnostic_data.empty: | ||
# Get the last diagnostics column name - the features begin in the next column | ||
last_diagnostic_column = diagnostic_data.columns[-1] | ||
# Drop all the columns before the features start | ||
pyradiomic_features_only = dropUpToFeature(radiomic_data, last_diagnostic_column, keep_feature_name_column=False) | ||
|
||
else: | ||
original_feature_data = radiomic_data.filter(regex=r'^original_*') | ||
if not original_feature_data.empty: | ||
# Get the first original feature column name - the features begin in this column | ||
first_pyradiomic_feature_column = original_feature_data.columns[0] | ||
# Drop all the columns before the features start | ||
pyradiomic_features_only = dropUpToFeature(radiomic_data, first_pyradiomic_feature_column, keep_feature_name=True) | ||
else: | ||
raise ValueError("PyRadiomics file doesn't contain any diagnostics or original feature columns, so can't find beginning of features. Use dropUpToFeature and specify the last non-feature or first PyRadiomic feature column name to get only PyRadiomics features.") | ||
|
||
return pyradiomic_features_only | ||
|
||
|
||
|
||
def getPatientIntersectionDataframes(dataframe_A:DataFrame, | ||
dataframe_B:DataFrame, | ||
need_pat_index_A:bool = True, | ||
need_pat_index_B:bool = True): | ||
""" Function to get the subset of two dataframes based on the intersection of their indices. Intersection will be based on the index of dataframe A. | ||
Parameters | ||
---------- | ||
dataframe_A : DataFrame | ||
Dataframe A to get the intersection of based on the index. | ||
dataframe_B : DataFrame | ||
Dataframe B to get the intersection of based on the index. | ||
need_pat_index_A : bool, optional | ||
Whether to run setPatientIdAsIndex on dataframe A. If False, assumes the patient ID column is already set as the index. The default is True. | ||
need_pat_index_B : bool, optional | ||
Whether to run setPatientIdAsIndex on dataframe B. If False, assumes the patient ID column is already set as the index. The default is True. | ||
Returns | ||
------- | ||
intersection_index_dataframeA : DataFrame | ||
Dataframe containing the rows of dataframe A that are in the intersection of the indices of dataframe A and dataframe B. | ||
intersection_index_dataframeB : DataFrame | ||
Dataframe containing the rows of dataframe B that are in the intersection of the indices of dataframe A and dataframe B. | ||
""" | ||
|
||
# Set the patient ID column as the index for dataframe A if needed | ||
if need_pat_index_A: | ||
dataframe_A = setPatientIdAsIndex(dataframe_A) | ||
|
||
# Set the patient ID column as the index for dataframe B if needed | ||
if need_pat_index_B: | ||
dataframe_B = setPatientIdAsIndex(dataframe_B) | ||
|
||
# Get patients in common between dataframe A and dataframe B | ||
intersection_index = dataframe_A.index.intersection(dataframe_B.index) | ||
|
||
# Select common patient rows from each dataframe | ||
intersection_index_dataframeA = dataframe_A.loc[intersection_index] | ||
intersection_index_dataframeB = dataframe_B.loc[intersection_index] | ||
|
||
return intersection_index_dataframeA, intersection_index_dataframeB |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
from pandas import DataFrame | ||
|
||
def replaceColumnValues(dataframe:DataFrame, | ||
column_to_change:str, | ||
replacement_value_data:dict | ||
): | ||
"""Function to replace specified values in a column with a new value. | ||
Parameters | ||
---------- | ||
dataframe : DataFrame | ||
Dataframe to replace values in. | ||
column_to_change : str | ||
Name of the column to replace values in. | ||
replacement_value_data : dict | ||
Dictionary of values to replace in the column. Key is the new value, value is the old value(s) to replace. | ||
Can be a single value or a list of values. | ||
Returns | ||
------- | ||
dataframe : DataFrame | ||
Dataframe with values replaced. | ||
""" | ||
|
||
# Check if the column name is a valid column in the dataframe | ||
if column_to_change not in dataframe.columns: | ||
raise ValueError(f"Column {column_to_change} not found in dataframe.") | ||
|
||
for new_value in replacement_value_data.keys(): | ||
# Check if the replacement value is a valid value in the column | ||
old_values = replacement_value_data[new_value] | ||
values_not_found_in_column = set(old_values).difference(set(dataframe[column_to_change].unique())) | ||
if values_not_found_in_column == set(old_values): | ||
raise ValueError(f"All values in {values_not_found_in_column} are not found to be replaced in column {column_to_change}.") | ||
# Replace the old values with the new value | ||
dataframe = dataframe.replace(to_replace=replacement_value_data[new_value], | ||
value=new_value) | ||
|
||
return dataframe | ||
|
||
|
||
def splitDataByColumnValue(dataframe:DataFrame, | ||
split_col_data:dict[str,list], | ||
impute_value = None, | ||
): | ||
"""Function to split a dataframe into multiple dataframes based on the values in a specified column. Optionally, impute values in the split columns. | ||
Parameters | ||
---------- | ||
dataframe : DataFrame | ||
Dataframe to split. | ||
split_col_data : dict[str,list] | ||
Dictionary of a column name and values to split the dataframe by. Key is the column name, value is the list of values to split by. | ||
impute_value (Optional) | ||
If set, will impute any non-specified split values in the split column with this value. The default is None. | ||
Returns | ||
------- | ||
split_dataframes : dict | ||
Dictionary of dataframes, where the key is the split value and the value is the dataframe for that split value. | ||
""" | ||
|
||
# Initialize dictionary to store the split dataframes | ||
split_dataframes = {} | ||
|
||
for split_column_name in split_col_data.keys(): | ||
# Check if the column name is a valid column in the dataframe | ||
if split_column_name not in dataframe.columns: | ||
raise ValueError(f"Column {split_column_name} not found in dataframe.") | ||
|
||
# Get split column values for this column | ||
split_col_values = split_col_data[split_column_name] | ||
|
||
if impute_value: | ||
# Get all values in the column that are not one of the split_col_values | ||
column_value_set = set(dataframe[split_column_name].unique()) | ||
split_value_set = set(split_col_values) | ||
non_split_values = list(column_value_set - split_value_set) | ||
|
||
# Replace all values not specified in the split_col_data with the impute_value specified for this column | ||
dataframe = replaceColumnValues(dataframe, | ||
column_to_change=split_column_name, | ||
replacement_value_data={impute_value: non_split_values}) | ||
|
||
# End imputation | ||
|
||
# Split dataframe by the specified split_col_values | ||
for split_value in split_col_values: | ||
# Check if the split_value is a valid value in the column | ||
if split_value not in dataframe[split_column_name].unique(): | ||
raise ValueError(f"Split value {split_value} not found in column {split_column_name}.") | ||
|
||
# Split the dataframe by the specified split_value | ||
split_dataframe = dataframe[dataframe[split_column_name] == split_value] | ||
|
||
split_dataframe.reset_index(inplace=True, drop=True) | ||
|
||
# Save the split dataframe to the dictionary | ||
split_dataframes[split_value] = split_dataframe | ||
|
||
# End dataframe splitting | ||
|
||
return split_dataframes |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
import os | ||
import pandas as pd | ||
|
||
from typing import Optional, Dict | ||
|
||
from readii.io.loaders.general import loadFileToDataFrame | ||
|
||
|
||
def loadFeatureFilesFromImageTypes(extracted_feature_dir:str, | ||
image_types:list, | ||
drop_labels:Optional[bool]=True, | ||
labels_to_drop:Optional[list]=None)->Dict[str,pd.DataFrame]: | ||
"""Function to load in all the extracted imaging feature sets from a directory and return them as a dictionary of dataframes. | ||
Parameters | ||
---------- | ||
extracted_feature_dir : str | ||
Path to the directory containing the extracted feature csv files | ||
image_types : list, optional | ||
List of image types to load in. The default is ['original']. | ||
drop_labels : bool, optional | ||
Whether to drop the labels from the dataframes. Use when loading labelled data from data_setup_for_modeling.ipynb. The default is True. | ||
labels_to_drop : list, optional | ||
List of labels to drop from the dataframes. The default is ["patient_ID","survival_time_in_years","survival_event_binary"] based on code | ||
in data_setup_for_modeling.ipynb. | ||
Returns | ||
------- | ||
feature_sets : dict | ||
Dictionary of dataframes containing the extracted radiomics features. | ||
""" | ||
# Set default labels to drop if not specified | ||
if labels_to_drop is None: | ||
labels_to_drop = ["patient_ID","survival_time_in_years","survival_event_binary"] | ||
|
||
# Initialize dictionary to store the feature sets | ||
feature_sets = {} | ||
|
||
# Check if the passed in extracted feature directory exists | ||
if not os.path.isdir(extracted_feature_dir): | ||
raise FileNotFoundError(f"Extracted feature directory {extracted_feature_dir} does not exist.") | ||
|
||
feature_file_list = os.listdir(extracted_feature_dir) | ||
|
||
strixy16 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# Loop through all the files in the directory | ||
for image_type in image_types: | ||
try: | ||
# Extract the image type feature csv file from the feature directory | ||
# This should return a list of length 1, so we can just take the first element | ||
image_type_feature_file = [file for file in feature_file_list if (image_type in file) and (file.endswith(".csv"))][0] | ||
# Remove the image type file from the list of feature files | ||
feature_file_list.remove(image_type_feature_file) | ||
except Exception as e: | ||
print(f"{e}\n No {image_type} feature csv files found in {extracted_feature_dir}") | ||
# Skip to the next image type | ||
continue | ||
|
||
|
||
# Get the full path to the feature file | ||
feature_file_path = os.path.join(extracted_feature_dir, image_type_feature_file) | ||
|
||
# Load the feature data into a pandas dataframe | ||
raw_feature_data = loadFileToDataFrame(feature_file_path) | ||
|
||
try: | ||
# Drop the labels from the dataframe if specified | ||
if drop_labels: | ||
# Data is now only extracted features | ||
raw_feature_data.drop(labels_to_drop, axis=1, inplace=True) | ||
except KeyError as e: | ||
print(f"{feature_file_path} does not have the labels {labels_to_drop} to drop.") | ||
# Skip to the next image type | ||
continue | ||
|
||
# Save the dataframe to the feature_sets dictionary | ||
feature_sets[image_type] = raw_feature_data | ||
|
||
if not feature_sets: | ||
raise ValueError(f"No valid feature sets were loaded from {extracted_feature_dir}") | ||
|
||
return feature_sets |
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,78 @@ | ||||||||||
import os | ||||||||||
import pandas as pd | ||||||||||
import yaml | ||||||||||
|
||||||||||
|
||||||||||
def loadImageDatasetConfig(dataset_name:str, | ||||||||||
config_dir_path:str) -> dict: | ||||||||||
"""Load the configuration file for a given dataset. Expects the configuration file to be named <dataset_name>.yaml. | ||||||||||
Parameters | ||||||||||
---------- | ||||||||||
dataset_name : str | ||||||||||
Name of the dataset to load the configuration file for. | ||||||||||
config_dir_path : str | ||||||||||
Path to the directory containing the configuration files. | ||||||||||
Returns | ||||||||||
------- | ||||||||||
dict | ||||||||||
Dictionary containing the configuration settings for the dataset. | ||||||||||
Examples | ||||||||||
-------- | ||||||||||
>>> config = loadImageDatasetConfig("NSCLC_Radiogenomics", "config/") | ||||||||||
""" | ||||||||||
# Make full path to config file | ||||||||||
config_file_path = os.path.join(config_dir_path, f"{dataset_name}.yaml") | ||||||||||
|
||||||||||
# Check if config file exists | ||||||||||
if not os.path.exists(config_file_path): | ||||||||||
raise FileNotFoundError(f"Config file {config_file_path} does not exist.") | ||||||||||
|
||||||||||
try: | ||||||||||
# Load the config file | ||||||||||
with open(config_file_path, "r") as f: | ||||||||||
return yaml.safe_load(f) | ||||||||||
|
||||||||||
except yaml.YAMLError as e: | ||||||||||
raise ValueError(f"Invalid YAML in config file: {e}") | ||||||||||
def loadFileToDataFrame(file_path:str) -> pd.DataFrame: | ||||||||||
Comment on lines
+39
to
+40
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Use exception chaining with To preserve the original traceback and provide better error context, use exception chaining by adding Apply this diff: except yaml.YAMLError as e:
- raise ValueError(f"Invalid YAML in config file: {e}")
+ raise ValueError(f"Invalid YAML in config file: {e}") from e 📝 Committable suggestion
Suggested change
🧰 Tools🪛 Ruff (0.8.0)39-39: Within an (B904) |
||||||||||
"""Load data from a csv or xlsx file into a pandas dataframe. | ||||||||||
Parameters | ||||||||||
---------- | ||||||||||
file_path (str): Path to the data file. | ||||||||||
Returns | ||||||||||
------- | ||||||||||
pd.DataFrame: Dataframe containing the data from the file. | ||||||||||
""" | ||||||||||
if not file_path: | ||||||||||
raise ValueError("file_path cannot be empty") | ||||||||||
|
||||||||||
if not os.path.exists(file_path): | ||||||||||
raise FileNotFoundError(f"File {file_path} does not exist") | ||||||||||
|
||||||||||
# Get the file extension | ||||||||||
_, file_extension = os.path.splitext(file_path) | ||||||||||
|
||||||||||
try: | ||||||||||
# Check if the file is an Excel file | ||||||||||
if file_extension == '.xlsx': | ||||||||||
df = pd.read_excel(file_path) | ||||||||||
# Check if the file is a CSV file | ||||||||||
elif file_extension == '.csv': | ||||||||||
df = pd.read_csv(file_path) | ||||||||||
else: | ||||||||||
raise ValueError("Unsupported file format. Please provide a .csv or .xlsx file.") | ||||||||||
|
||||||||||
if df.empty: | ||||||||||
raise ValueError("Loaded DataFrame is empty") | ||||||||||
|
||||||||||
return df | ||||||||||
|
||||||||||
except pd.errors.EmptyDataError: | ||||||||||
raise ValueError("File is empty") | ||||||||||
except (pd.errors.ParserError, ValueError) as e: | ||||||||||
raise ValueError(f"Error parsing file: {e}") | ||||||||||
Comment on lines
+77
to
+78
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Use exception chaining with To maintain the original exception context, add Apply this diff: except (pd.errors.ParserError, ValueError) as e:
- raise ValueError(f"Error parsing file: {e}")
+ raise ValueError(f"Error parsing file: {e}") from e 📝 Committable suggestion
Suggested change
🧰 Tools🪛 Ruff (0.8.0)78-78: Within an (B904) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
from pathlib import Path | ||
from typing import Union | ||
|
||
def getImageTypesFromDirectory(raw_data_dir:Union[Path|str], | ||
feature_file_prefix:str = "", | ||
feature_file_suffix:str = ".csv"): | ||
""" Function to get a list of image types from a directory containing image feature files. | ||
Parameters | ||
---------- | ||
raw_data_dir : str | ||
Path to the directory containing the image feature files. | ||
feature_file_prefix : str, optional | ||
Prefix to remove from the feature file name. The default is "". | ||
feature_file_suffix : str, optional | ||
Suffix to remove from the feature file name. The default is ".csv". | ||
Returns | ||
------- | ||
list | ||
List of image types from the image feature files. | ||
""" | ||
# Check if raw_data_dir is a string or a Path object, convert to Path object if it is a string | ||
if isinstance(raw_data_dir, str): | ||
raw_data_dir = Path(raw_data_dir) | ||
|
||
# Check if the directory exists | ||
if not raw_data_dir.exists(): | ||
raise FileNotFoundError(f"Directory {raw_data_dir} does not exist.") | ||
|
||
# Check if the directory is a directory | ||
if not raw_data_dir.is_dir(): | ||
raise NotADirectoryError(f"Path {raw_data_dir} is not a directory.") | ||
|
||
# Check that directory contains files with the specified prefix and suffix | ||
if not any(raw_data_dir.glob(f"{feature_file_prefix}*{feature_file_suffix}")): | ||
raise FileNotFoundError(f"No files with prefix {feature_file_prefix} and suffix {feature_file_suffix} found in directory {raw_data_dir}.") | ||
|
||
# Initialize an empty list to store the image types | ||
image_types = [] | ||
|
||
# Get list of file banes with the specified prefix and suffix in the directory | ||
for file in raw_data_dir.glob(f"{feature_file_prefix}*{feature_file_suffix}"): | ||
file_name = file.name | ||
|
||
# Remove the prefix and suffix from the file name | ||
image_type = file_name.removeprefix(feature_file_prefix).removesuffix(feature_file_suffix) | ||
|
||
# Add the image type to the list | ||
image_types.append(image_type) | ||
|
||
return image_types |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from seaborn import heatmap | ||
import os | ||
|
||
from typing import Union | ||
from pathlib import Path | ||
|
||
def saveSeabornPlotFigure(sns_plot:heatmap, | ||
plot_name:str, | ||
output_dir_path:Union[Path|str] | ||
): | ||
"""Function to save out a seaborn plot to a png file. | ||
Parameters | ||
---------- | ||
sns_plot : seaborn.heatmap | ||
Seaborn plot to save out. | ||
plot_name : str | ||
What to name the plot on save. Ex. "original_vs_shuffled_correlation_plot.png" | ||
output_dir_path : str | ||
Path to the directory to save the plot to. | ||
""" | ||
|
||
# Check if output_dir_path is a string or a Path object | ||
if isinstance(output_dir_path, str): | ||
output_dir_path = Path(output_dir_path) | ||
|
||
# Check if output_dir_path exists | ||
if not output_dir_path.exists(): | ||
# Make directory if it doesn't exist, but don't fail if it already exists | ||
os.makedirs(output_dir_path, exist_ok=True) | ||
|
||
# Set up the full output path | ||
output_file_path = output_dir_path / plot_name | ||
|
||
# Save out the plot | ||
sns_plot.get_figure().savefig(output_file_path, bbox_inches='tight') | ||
|
||
return |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
from readii.analyze.correlation import ( | ||
getFeatureCorrelations | ||
) | ||
|
||
import pytest | ||
import pandas as pd | ||
import numpy as np | ||
|
||
@pytest.fixture | ||
def random_features(): | ||
# Create a 10x10 matrix with random float values between 0 and 1 | ||
random_matrix = np.random.default_rng(seed=10).random((10,10)) | ||
# Convert to dataframe and name the columns feature1, feature2, etc. | ||
return pd.DataFrame(random_matrix, columns=[f"feature_{i+1}" for i in range(10)]) | ||
|
||
@pytest.mark.parametrize( | ||
"correlation_method", | ||
[ | ||
"pearson", | ||
"spearman", | ||
"kendall" | ||
] | ||
) | ||
def test_methods_getFeatureCorrelation(correlation_method, random_features): | ||
"""Test getting correlation matrix for a set of random_features with pearson and spearman correlation methods""" | ||
|
||
features_to_corr = random_features.join(random_features, how='inner', lsuffix='_vertical', rsuffix='_horizontal') | ||
expected = features_to_corr.corr(method=correlation_method) | ||
|
||
actual = getFeatureCorrelations(vertical_features = random_features, | ||
horizontal_features = random_features, | ||
method = correlation_method | ||
) | ||
assert isinstance(actual, pd.DataFrame), \ | ||
"Wrong return type, expect a pandas DataFrame" | ||
assert actual.shape[0] == 2*random_features.shape[1], \ | ||
"Wrong return size, should be double the number of input features" | ||
assert actual.equals(expected), \ | ||
f"{correlation_method} correlation values are incorrect for the input features" | ||
|
||
|
||
def test_defaults_getFeatureCorrelations(random_features): | ||
"""Test the default argument behaviours of getFeatureCorrelations. Should be a Pearson correlation matrix with _vertical and _horizontal suffixes on the feature names""" | ||
features_to_corr = random_features.join(random_features, how='inner', lsuffix='_vertical', rsuffix='_horizontal') | ||
expected = features_to_corr.corr(method="pearson") | ||
|
||
actual = getFeatureCorrelations(vertical_features = random_features, | ||
horizontal_features = random_features) | ||
assert isinstance(actual, pd.DataFrame), \ | ||
"Wrong return type, expect a pandas DataFrame" | ||
assert actual.shape[0] == 2*random_features.shape[1], \ | ||
"Wrong return size, should be double the number of the input features" | ||
assert actual.equals(expected), \ | ||
"Pearson correlation values are incorrect for the input features" | ||
assert actual.columns.equals(expected.columns), \ | ||
"Column names are incorrect. Should be the input features with _vertical and _horizontal suffixes" | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"correlation_method", | ||
[ | ||
"random", | ||
"" | ||
] | ||
) | ||
def test_wrongMethod_getFeatureCorrelations(random_features, correlation_method): | ||
"""Check ValueError is raised when incorrect correlation method is passed""" | ||
with pytest.raises(ValueError): | ||
getFeatureCorrelations(vertical_features = random_features, | ||
horizontal_features = random_features, | ||
method = correlation_method | ||
) | ||
|
||
@pytest.mark.parametrize( | ||
"wrong_features", | ||
[ | ||
np.random.default_rng(seed=10).random((10,10)), | ||
"Just a string", | ||
{"feat1": 34, "feat2": 10000, "feat3": 3.141592} | ||
] | ||
) | ||
def test_wrongFeatures_getFeatureCorrelations(random_features, wrong_features): | ||
"""Check ValueError is raised when incorrect features are passed""" | ||
with pytest.raises(AssertionError): | ||
getFeatureCorrelations(vertical_features = random_features, | ||
horizontal_features = wrong_features, | ||
method = "pearson" | ||
) | ||
with pytest.raises(AssertionError): | ||
getFeatureCorrelations(vertical_features = wrong_features, | ||
horizontal_features = random_features, | ||
method = "pearson" | ||
) | ||
|
||
@pytest.mark.parametrize( | ||
"vertical_feature_name, horizontal_feature_name, expected_vertical, expected_horizontal", | ||
[ | ||
("type_A", "type_B", [f"feature_{i+1}_type_A" for i in range(10)], [f"feature_{i+1}_type_B" for i in range(10)]), | ||
("_type_C", "_type_D", [f"feature_{i+1}_type_C" for i in range(10)], [f"feature_{i+1}_type_D" for i in range(10)]), | ||
] | ||
) | ||
def test_featureNames_getFeatureCorrelations(random_features, vertical_feature_name, horizontal_feature_name, expected_vertical, expected_horizontal): | ||
""" Check that feature names with and without _ prefix are handled correctly""" | ||
|
||
actual = getFeatureCorrelations(vertical_features=random_features, | ||
horizontal_features=random_features, | ||
vertical_feature_name=vertical_feature_name, | ||
horizontal_feature_name=horizontal_feature_name) | ||
|
||
assert list(actual.columns) == expected_vertical + expected_horizontal, \ | ||
"Column names are incorrect, check underscore prefix handling when adding the vertical and horizontal feature names. Should be [feature]_vertical_feature_name and [feature]_horizontal_feature_name." | ||
assert list(actual.index) == expected_vertical + expected_horizontal, \ | ||
"Index values are incorrect, check underscore prefix handling when adding the vertical and horizontal feature names. Should be [feature]_vertical_feature_name and [feature]_horizontal_feature_name." | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use exception chaining with
raise ... from e
When raising a new exception within an
except
block, usefrom e
to preserve the original exception context.Apply this diff:
📝 Committable suggestion
🧰 Tools
🪛 Ruff (0.8.0)
61-61: Within an
except
clause, raise exceptions withraise ... from err
orraise ... from None
to distinguish them from errors in exception handling(B904)